1. Import Libraries

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, current_timestamp
import pyspark.sql.functions as F


2. Read Files (with InferSchema)

In [0]:
# Read all 5 files from Bronze (Raw) Layer
accounts_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/mnt/project2/bronze/accounts.csv")

customers_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/mnt/project2/bronze/customers.csv")

loans_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/mnt/project2/bronze/loans.csv")

loan_payments_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/mnt/project2/bronze/loan_payments.csv")

transactions_df = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load("/mnt/project2/bronze/transactions.csv")


3. Clean and Transform Files

In [0]:
# Accounts - Drop Duplicates and Nulls
accounts_df = accounts_df.dropDuplicates(["account_id"]).dropna(subset=["account_id", "customer_id"])

# Customers - Drop Duplicates and Nulls
customers_df = customers_df.dropDuplicates(["customer_id"]).dropna(subset=["customer_id"])

# Loans - Drop Duplicates and Nulls
loans_df = loans_df.dropDuplicates(["loan_id"]).dropna(subset=["loan_id", "customer_id"])

# Loan Payments - Drop Duplicates and Nulls
loan_payments_df = loan_payments_df.dropDuplicates(["payment_id"]).dropna(subset=["payment_id", "loan_id"])

# Transactions - Drop Duplicates and Nulls
transactions_df = transactions_df.dropDuplicates(["transaction_id"]).dropna(subset=["transaction_id", "account_id"])


4. Handle Dates Properly

In [0]:

# Loan Payments - Payment Date
loan_payments_df = loan_payments_df.withColumn("payment_date", to_date(col("payment_date"), "yyyy-MM-dd"))

# Transactions - Transaction Date
transactions_df = transactions_df.withColumn("transaction_date", to_date(col("transaction_date"), "yyyy-MM-dd"))


5. Save Cleaned Files (Parquet in Silver Layer)

In [0]:
accounts_df.write.format("delta").mode("overwrite").save("/mnt/project2/silver/accounts_cleaned")
customers_df.write.format("delta").mode("overwrite").save("/mnt/project2/silver/customers_cleaned")
loans_df.write.format("delta").mode("overwrite").save("/mnt/project2/silver/loans_cleaned")
loan_payments_df.write.format("delta").mode("overwrite").save("/mnt/project2/silver/loan_payments_cleaned")
transactions_df.write.format("delta").mode("overwrite").save("/mnt/project2/silver/transactions_cleaned")

6. Merge Files (Final Cleaned Data Creation)

In [0]:
from pyspark.sql.functions import col

# Merge all together based on correct keys
final_df = accounts_df.join(customers_df, "customer_id", "inner")\
    .join(loans_df, "customer_id", "left")\
    .join(loan_payments_df, "loan_id", "left")\
    .join(transactions_df, "account_id", "left")\
    .select(
        col("account_id"),
        col("transaction_id"),
        col("customer_id"),
        col("loan_id"),
        col("payment_id"),
        col("transaction_amount"),
        col("transaction_date"),
        col("payment_amount"),
        col("payment_date"),
        col("loan_amount")
    )

display(final_df)

account_id,transaction_id,customer_id,loan_id,payment_id,transaction_amount,transaction_date,payment_amount,payment_date,loan_amount
50,100.0,31,50,59.0,375.25,2024-04-09,3000.0,2024-02-28,37500.5
33,70.0,85,33,100.0,375.25,2024-03-10,1000.0,2024-04-10,15000.25
85,33.0,65,85,44.0,150.0,2024-02-02,2250.0,2024-02-13,25000.25
21,20.0,53,21,20.0,375.25,2024-01-20,1050.0,2024-01-20,10000.25
3,11.0,78,3,82.0,100.5,2024-01-11,4150.0,2024-03-22,15000.0
4,78.0,34,4,73.0,275.75,2024-03-18,3700.0,2024-03-13,30000.25
12,2.0,81,12,5.0,200.75,2024-01-02,300.0,2024-01-05,20000.0
56,5.0,28,56,7.0,250.0,2024-01-05,400.0,2024-01-07,17500.0
19,40.0,76,19,38.0,375.25,2024-02-09,1950.0,2024-02-07,32500.75
66,25.0,26,66,15.0,250.0,2024-01-25,800.0,2024-01-15,17500.5


In [0]:
from pyspark.sql.functions import current_timestamp

# Remove Duplicates for merged file
final_df = final_df.dropDuplicates(["account_id", "transaction_id", "customer_id", "loan_id", "payment_id"])

# Save Final Cleaned File in Delta Format
final_df.write.format("delta").mode("overwrite").save("/mnt/project2/silver/merged_data_delta")