In [0]:
from pyspark import SparkConf, SparkContext
from pyspark.sql.functions import col, count, when, avg, sum
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

In [0]:
from pyspark.sql import SparkSession

# Get existing Spark session or create a new one if it doesn’t exist
spark = SparkSession.getActiveSession()
if spark is None:
    spark = SparkSession.builder.appName("ANZ_Banking_Analysis").getOrCreate()


**BRONZE LAYER**

Read the file

In [0]:
df = spark.sql("select * from data.default.anz")

In [0]:
%sql
desc data.default.anz

In [0]:
df.count()

**SILVER LAYER**

Ensure the dataset has the expected column names and data types.



In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

expected_schema = StructType([
    StructField("customer_id", StringType(), False),
    StructField("age", IntegerType(), True),
    StructField("gender", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("transaction_date", StringType(), True),
    StructField("merchant_name", StringType(), True),
    StructField("transaction_type", StringType(), True)
])

# Validate schema
actual_schema = df.schema
if actual_schema != expected_schema:
    print("⚠️ Schema Mismatch! Expected Schema:")
    df.printSchema()
else:
    print("✅ Schema is valid!")

Identify and handle null values	

In [0]:
# Count missing values per column
from pyspark.sql.functions import col, sum

df.select([sum(col(c).isNull().cast("int")).alias(c) for c in df.columns]).show()

# Drop rows with missing customer_id (critical field)
df_cleaned = df.dropna(subset=["customer_id"])

# Fill missing age with the median age
from pyspark.sql.functions import expr
median_age = df_cleaned.approxQuantile("age", [0.5], 0.01)[0]
df_cleaned = df_cleaned.fillna({"age": median_age})

Checking Data Integrity


In [0]:
# Check if customer_id is unique
df_cleaned.groupBy("customer_id").count().filter("count > 1").show()

In [0]:
df_cleaned = df_cleaned.dropDuplicates(["customer_id"])


Detecting Duplicates

In [0]:
df_cleaned.groupBy(df.columns).count().filter("count > 1").show()
df_cleaned = df_cleaned.dropDuplicates()


Outlier Detection

In [0]:
#Detect transactions with extreme amounts.
# Calculate transaction amount quantiles
percentiles = df_cleaned.approxQuantile("amount", [0.01, 0.99], 0.01)
lower, upper = percentiles[0], percentiles[1]

# Filter out extreme transactions
df_cleaned = df_cleaned.filter((col("amount") >= lower) & (col("amount") <= upper))
display(df_cleaned)



Save the Cleansed Data into the Table

In [0]:
df_cleaned.write.mode("overwrite").saveAsTable("data.default.anz_cleansed")


In [0]:
df_cleaned = spark.sql("SELECT * FROM data.default.anz_cleansed")
display(df_cleaned)

In [0]:
df_original = df.count()
display(df_original)
df_cleansed = df_cleaned.count()
display(df_cleansed)

In [0]:
#Checking the number of unique values
unique_count = df.select(col('account')).distinct().count()
display(unique_count)

**GOLD LAYER**

In [0]:
# Exploratory Data Analysis (EDA)

df_grouped = df_cleaned.groupBy("customer_id").agg(
    count("transaction_id").alias("total_transactions"),
    avg("balance").alias("avg_balance"),
    sum("amount").alias("total_spent")
)
display(df_grouped)

DEBIT CARD AND CREDIT CARD

In [0]:
# Replace null values in 'card_present_flag' with 0
df_cleaned = df_cleaned.fillna({"card_present_flag": 0})

display(df_cleaned)

In [0]:
#Total card Transactions


# Count transactions by movement type (Debit/Credit)
debit_credit_count = df_cleaned.groupBy("movement").agg(count("*").alias("transaction_count"))

# Sum transaction amounts by movement type
debit_credit_amount = df_cleaned.groupBy("movement").agg(sum("amount").alias("total_amount"))

# Show results
debit_credit_count.show()
debit_credit_amount.show()


In [0]:
#Average Transaction Amounts by Card Type
from pyspark.sql.functions import mean

# Calculate the average amount for debit and credit transactions
avg_transaction = df_cleaned.groupBy("movement").agg(mean("amount").alias("avg_amount"))

# Show results
avg_transaction.show()



In [0]:
#Spending Trends Over Time

from pyspark.sql.functions import to_date

# Convert 'transaction_date' column to Date type (if it's not already)
df_cleaned = df_cleaned.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Aggregate total transaction amount per day
daily_trend = df_cleaned.groupBy("date", "movement").agg(sum("amount").alias("total_amount"))

# Show results
display(daily_trend)




In [0]:
# Grouping transactions by movement, age, and gender
age_gender_spending = df_cleaned.groupBy("movement", "age", "gender").agg(sum("amount").alias("total_spent"))

# Show results
age_gender_spending.show()

# Grouping transactions by movement and merchant
merchant_spending = df_cleaned.groupBy("movement", "merchant_suburb").agg(sum("amount").alias("merchant_total"))

# Show results
display(merchant_spending)


In [0]:
#High-Value Transactions Detection
# Find top 10 highest transactions
high_value_transactions = df_cleaned.orderBy(col("amount").desc()).limit(10)

# Show results
display(high_value_transactions)


In [0]:
#Card Usage (Online vs. In-Store Transactions)


# Group by movement and card_present_flag
card_usage = df_cleaned.groupBy("movement", "card_present_flag").agg(count("*").alias("transaction_count"))

# Show results
card_usage.show()

In [0]:
final_data = debit_credit_count \
    .join(debit_credit_amount, "movement", "left") \
    .join(avg_transaction, "movement", "left") \
    .join(age_gender_spending, "movement", "left") \
    .join(card_usage, "movement", "left")

# Show the final merged data
display(final_data)

In [0]:
%python
# Drop the existing column if it exists
if "transaction_count_new" in final_data.columns:
    final_data = final_data.drop("transaction_count_new")

# Rename the column
final_data = final_data.withColumnRenamed("transaction_count", "transaction_count_new1")

# Write the DataFrame to the table
final_data.write.format("delta").mode("overwrite").saveAsTable("data.default.anz_final")


In [0]:
final = spark.sql("select * from data.default.anz_final")
display(final)