In [0]:
# ============================================================
# ONE-CELL DATA QUALITY CORRECTION PIPELINE (SILVER-READY)
# ============================================================

from pyspark.sql import functions as F
from pyspark.sql.window import Window
from collections import Counter

# ------------------------------------------------------------
# 1. LOAD RAW DATA
# ------------------------------------------------------------
customers_raw= spark.read.option("header",True).csv("/Volumes/hdfc_data_mentor/etl/shrutivolume/customer.csv")
branches_raw= spark.read.option("header",True).csv("/Volumes/hdfc_data_mentor/etl/shrutivolume/branches.csv")
loans_raw=spark.read.json("/Volumes/hdfc_data_mentor/etl/shrutivolume/loans.json")
disb_raw=spark.read.parquet("/Volumes/hdfc_data_mentor/etl/shrutivolume/disbursements.parquet")

# ------------------------------------------------------------
# 2. NULL HANDLING (MANDATORY FIELDS)
# ------------------------------------------------------------
customers_valid = customers_raw.filter("customer_id IS NOT NULL")
loans_valid = loans_raw.filter("loan_id IS NOT NULL AND customer_id IS NOT NULL")
disb_valid = disb_raw.filter("loan_id IS NOT NULL")
branches_valid = branches_raw.filter("branch_id IS NOT NULL")

# ------------------------------------------------------------
# 3. DATA TYPE & ENCODING FIXES
# ------------------------------------------------------------

# Fix comma-separated numeric values
loans_valid = loans_valid.withColumn(
    "loan_amount",
    F.regexp_replace("loan_amount", ",", "").cast("double")
)

# ------------------------------------------------------------
# 4. TIMESTAMP STANDARDIZATION (FINAL SAFE FIX)
# ------------------------------------------------------------
# Handles:
# yyyy-MM-dd HH:mm:ss
# dd-MM-yyyy HH:mm

loans_valid = loans_valid.withColumn(
    "update_ts",
    F.coalesce(
        F.expr("try_to_timestamp(update_ts, 'yyyy-MM-dd HH:mm:ss')"),
        F.expr("try_to_timestamp(update_ts, 'dd-MM-yyyy HH:mm')")
    )
)

customers_valid = customers_valid.withColumn(
    "update_ts",
    F.coalesce(
        F.expr("try_to_timestamp(update_ts, 'yyyy-MM-dd HH:mm:ss')"),
        F.expr("try_to_timestamp(update_ts, 'dd-MM-yyyy HH:mm')")
    )
)

disb_valid = disb_valid.withColumn(
    "update_ts",
    F.coalesce(
        F.expr("try_to_timestamp(update_ts, 'yyyy-MM-dd HH:mm:ss')"),
        F.expr("try_to_timestamp(update_ts, 'dd-MM-yyyy HH:mm')")
    )
)

# ------------------------------------------------------------
# 5. DEDUPLICATION (DETERMINISTIC)
# ------------------------------------------------------------

# Customers: latest record per customer
w_cust = Window.partitionBy("customer_id").orderBy(F.col("update_ts").desc_nulls_last())
customers_clean = (
    customers_valid
    .withColumn("rn", F.row_number().over(w_cust))
    .filter("rn = 1")
    .drop("rn")
)

# Loans: latest record per loan
w_loan = Window.partitionBy("loan_id").orderBy(F.col("update_ts").desc_nulls_last())
loans_clean = (
    loans_valid
    .withColumn("rn", F.row_number().over(w_loan))
    .filter("rn = 1")
    .drop("rn")
)

# Disbursements: latest per loan
w_disb = Window.partitionBy("loan_id").orderBy(F.col("update_ts").desc_nulls_last())
disb_clean = (
    disb_valid
    .withColumn("rn", F.row_number().over(w_disb))
    .filter("rn = 1")
    .drop("rn")
)

# ------------------------------------------------------------
# 6. CATEGORICAL STANDARDIZATION
# ------------------------------------------------------------
loans_clean = loans_clean.withColumn(
    "status",
    F.upper(F.trim("status"))
)

valid_status = ["APPROVED", "PENDING", "DISBURSED", "CLOSED"]
loans_clean = loans_clean.filter(F.col("status").isin(valid_status))

# ------------------------------------------------------------
# 7. REFERENTIAL INTEGRITY
# ------------------------------------------------------------
loans_clean = loans_clean.join(
    branches_valid.select("branch_id"),
    "branch_id",
    "inner"
)

# ------------------------------------------------------------
# 8. ADD STANDARDIZED METADATA (OPTIONAL)
# ------------------------------------------------------------
customers_clean = customers_clean.withColumn("record_source", F.lit("CUSTOMER_SYS"))
loans_clean = loans_clean.withColumn("record_source", F.lit("LOAN_SYS"))
disb_clean = disb_clean.withColumn("record_source", F.lit("DISBURSEMENT_SYS"))

# ------------------------------------------------------------
# 9. FINAL VALIDATION CHECKS
# ------------------------------------------------------------

def find_duplicate_columns(df):
    return [c for c, n in Counter(df.columns).items() if n > 1]

print("Customer duplicate columns:", find_duplicate_columns(customers_clean))
print("Loan duplicate columns:", find_duplicate_columns(loans_clean))
print("Disbursement duplicate columns:", find_duplicate_columns(disb_clean))

print("Final record counts")
print("Customers:", customers_clean.count())
print("Loans:", loans_clean.count())
print("Disbursements:", disb_clean.count())
print("Branches:", branches_valid.count())

# ============================================================
# OUTPUT DATAFRAMES (SILVER-READY)
# customers_clean
# loans_clean
# disb_clean
# branches_valid
# ============================================================


Customer duplicate columns: []
Loan duplicate columns: []
Disbursement duplicate columns: []
Final record counts
Customers: 30
Loans: 41
Disbursements: 29
Branches: 8


In [0]:
assert customers_clean.filter("customer_id IS NULL").count() == 0, "NULL customer_id in Silver"
assert loans_clean.filter("loan_id IS NULL OR customer_id IS NULL").count() == 0, "NULL keys in Silver loans"
assert disb_clean.filter("loan_id IS NULL").count() == 0, "NULL loan_id in Silver disbursements"
assert branches_valid.filter("branch_id IS NULL").count() == 0, "NULL branch_id in Silver branches"


In [0]:
assert customers_clean.groupBy("customer_id").count().filter("count > 1").count() == 0, "Duplicate customers in Silver"
assert loans_clean.groupBy("loan_id").count().filter("count > 1").count() == 0, "Duplicate loans in Silver"


In [0]:
invalid_branch_refs = loans_clean.join(
    branches_valid.select("branch_id"),
    "branch_id",
    "left_anti"
).count()

assert invalid_branch_refs == 0, "Broken branch reference in Silver loans"


In [0]:
assert loans_clean.filter("loan_amount < 0").count() == 0, "Negative loan amount found"


In [0]:
customers_clean.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_customers")

loans_clean.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_loans")

disb_clean.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_disbursements")

branches_valid.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_branches")


In [0]:
spark.table("silver_customers").count()
spark.table("silver_loans").count()
spark.table("silver_disbursements").count()
spark.table("silver_branches").count()


8

In [0]:
silver_fact = spark.table("silver_loans").alias("l") \
    .join(spark.table("silver_customers").alias("c"), "customer_id", "left") \
    .join(spark.table("silver_disbursements").alias("d"), "loan_id", "left") \
    .join(spark.table("silver_branches").alias("b"), "branch_id", "left")


In [0]:
silver_fact.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_customer_loan_fact")


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-8338144220922009>, line 4[0m
[1;32m      1[0m silver_fact[38;5;241m.[39mwrite \
[1;32m      2[0m     [38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m) \
[1;32m      3[0m     [38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m) \
[0;32m----> 4[0m     [38;5;241m.[39msaveAsTable([38;5;124m"[39m[38;5;124msilver_customer_loan_fact[39m[38;5;124m"[39m)

File [0;32m/databricks/python/lib/python3.12/site-packages/pyspark/sql/connect/readwriter.py:737[0m, in [0;36mDataFrameWriter.saveAsTable[0;34m(self, name, format, mode, partitionBy, **options)[0m
[1;32m    735[0m [38;5;28mself[39m[38;5;241m.[39m_write[38;5;241m.[39mtable_name [38;5;241m=[39m name
[1;32m    736[0m [38;5;28mself[39m[38;5;241m.[39m_write

In [0]:
from pyspark.sql import functions as F

l = spark.table("silver_loans").alias("l")
c = spark.table("silver_customers").alias("c")
d = spark.table("silver_disbursements").alias("d")
b = spark.table("silver_branches").alias("b")

silver_fact = (
    l.join(c, "customer_id", "left")
     .join(d, "loan_id", "left")
     .join(b, "branch_id", "left")
     .select(
        # ---- Keys ----
        l.loan_id,
        l.customer_id,
        l.branch_id,

        # ---- Loan attributes ----
        l.loan_amount,
        l.interest_rate,
        l.loan_type,
        l.tenure_months,
        l.status,
        l.origination_date.alias("loan_origination_date"),
        l.update_ts.alias("loan_update_ts"),

        # ---- Customer attributes ----
        c.full_name,
        c.dob,
        c.pan,
        c.mobile,
        c.email,
        c.city.alias("customer_city"),
        c.annual_income,
        c.update_ts.alias("customer_update_ts"),

        # ---- Disbursement attributes ----
        d.disbursement_id,
        d.disbursement_date,
        d.disbursement_amount,
        d.payment_mode,
        d.update_ts.alias("disbursement_update_ts"),

        # ---- Branch attributes ----
        b.branch_name,
        b.city.alias("branch_city"),
        b.region,

        # ---- Metadata ----
        F.current_timestamp().alias("silver_ingest_ts")
     )
)


In [0]:
silver_fact.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("silver_customer_loan_fact")


In [0]:
 #Gold needs to support:
 1. customer 360 analytics
 2.branch level performance
 3.loan portfolio monitoring
 4.disbursement performance
 
 #Load Silver Fact (Single Source of Truth)
from pyspark.sql import functions as F

silver_fact = spark.table("silver_customer_loan_fact")


In [0]:
gold_base = spark.table("silver_customer_loan_fact")


In [0]:
gold_base.display()

loan_id,customer_id,branch_id,loan_amount,interest_rate,loan_type,tenure_months,status,loan_origination_date,loan_update_ts,full_name,dob,pan,mobile,email,customer_city,annual_income,customer_update_ts,disbursement_id,disbursement_date,disbursement_amount,payment_mode,disbursement_update_ts,branch_name,branch_city,region,silver_ingest_ts
L50001,C1000,BR105,2057234.0,10.5,Personal,12,APPROVED,2025-11-26,2025-12-01T00:00:00.000Z,,05-04-2007,JLBIB2876M,,c1000@example.com,Chennai,443377,2025-11-15T00:00:00.000Z,D84436419,2025-12-01,2057234.0,RTGS,2025-12-04T00:00:00.000Z,HitechCity,Hyderabad,South,2025-12-24T07:44:48.741Z
L50002,C1001,BR106,1960486.0,8.5,Auto,48,PENDING,2025-11-23,2025-11-23T00:00:00.000Z,Kabir Singh,23-08-1979,NAWIQ1462F,918106000000.0,c1001@example.com,Kolkata,2445150,2025-11-24T00:00:00.000Z,,,,,,Gachibowli,Hyderabad,South,2025-12-24T07:44:48.741Z
L50003,C1002,BR106,727030.0,9.2,Auto,48,PENDING,2025-11-18,2025-11-19T00:00:00.000Z,Meera Nair,17-04-2000,GBGRS7166F,916512000000.0,c1002@example.com,Mumbai,1375791,2025-12-11T00:00:00.000Z,D37499621,2025-12-01,654327.0,IMPS,2025-11-20T00:00:00.000Z,Gachibowli,Hyderabad,South,2025-12-24T07:44:48.741Z
L50004,C1003,BR103,2421753.0,8.5,Home,60,PENDING,2025-11-21,2025-11-26T00:00:00.000Z,Vihaan Khan,02-09-2020,WTMHM5219V,919126000000.0,c1003@example.com,Mumbai,1698696,2025-11-19T00:00:00.000Z,D64403047,2025-11-30,2421753.0,IMPS,2025-11-28T00:00:00.000Z,Indiranagar,Bengaluru,South,2025-12-24T07:44:48.741Z
L50005,C1004,BR104,1914865.0,10.5,Auto,12,PENDING,2025-11-13,2025-11-22T00:00:00.000Z,Ishaan Singh,14-03-1997,HFXEI9849G,916909000000.0,c1004@example.com,Chennai,2247752,2025-11-23T00:00:00.000Z,D96808887,2025-11-13,1914865.0,NEFT,2025-11-23T00:00:00.000Z,Koramangala,Bengaluru,South,2025-12-24T07:44:48.741Z
L50006,C1005,BR105,1507901.0,11.0,Personal,24,PENDING,2025-11-17,2025-11-21T00:00:00.000Z,Aarav Singh,15-10-2015,EBTUK4244M,918527000000.0,c1005@example.com,Hyderabad,1826176,2025-11-14T00:00:00.000Z,D29721737,2025-11-18,1357110.9,RTGS,2025-11-23T00:00:00.000Z,HitechCity,Hyderabad,South,2025-12-24T07:44:48.741Z
L50007,C1006,BR101,1435270.0,12.5,Home,36,APPROVED,2025-11-18,2025-11-22T00:00:00.000Z,Ananya Gupta,02-11-2022,OSSRD4960D,917783000000.0,c1006@example.com,Kolkata,2259522,2025-11-12T00:00:00.000Z,D10346384,2025-12-01,1435270.0,RTGS,2025-11-26T00:00:00.000Z,Bandra,Mumbai,West,2025-12-24T07:44:48.741Z
L50008,C1006,BR100,947531.0,10.5,Personal,36,CLOSED,2025-11-29,2025-12-07T00:00:00.000Z,Ananya Gupta,02-11-2022,OSSRD4960D,917783000000.0,c1006@example.com,Kolkata,2259522,2025-11-12T00:00:00.000Z,D47948592,2025-12-12,947531.0,IMPS,2025-12-08T00:00:00.000Z,Andheri,Mumbai,West,2025-12-24T07:44:48.741Z
L50009,C1007,BR102,260513.0,8.5,Auto,60,APPROVED,2025-11-10,2025-11-15T00:00:00.000Z,Riya Singh,07-12-1992,UKVEH0439T,917867000000.0,,Hyderabad,1479781,2025-11-25T00:00:00.000Z,,,,,,Powai,Mumbai,West,2025-12-24T07:44:48.741Z
L50010,C1008,BR105,1751301.0,11.0,Education,48,PENDING,2025-11-11,2025-11-12T00:00:00.000Z,Ananya Das,29-08-2008,MMBHD7525N,919690000000.0,c1008@example.com,Pune,1925714,2025-11-16T00:00:00.000Z,,,,,,HitechCity,Hyderabad,South,2025-12-24T07:44:48.741Z


In [0]:
#customer exposure mart
from pyspark.sql import functions as F

gold_customer_360 = (
    gold_base
    .groupBy(
        "customer_id",
        "full_name",
        "customer_city"
    )
    .agg(
        F.countDistinct("loan_id").alias("total_loans"),
        F.sum("loan_amount").alias("total_loan_exposure"),
        F.avg("loan_amount").alias("avg_loan_amount"),
        F.max("loan_amount").alias("max_loan_amount")
    )
)


In [0]:
gold_customer_360.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("gold_customer_360")


In [0]:
%sql
select * from gold_customer_360;

customer_id,full_name,customer_city,total_loans,total_loan_exposure,avg_loan_amount,max_loan_amount
C1022,Vihaan Patel,Hyderabad,1,708111.0,708111.0,708111.0
C1005,Aarav Singh,Hyderabad,1,1507901.0,1507901.0,1507901.0
C1021,Arjun Das,Hyderabad,2,2494909.0,1247454.5,2225303.0
C1015,Arjun Khan,Hyderabad,1,701951.0,701951.0,701951.0
C1019,Aditya Patel,Chennai,1,2797175.0,2797175.0,2797175.0
C1023,Meera Patel,Mumbai,2,5002629.0,2501314.5,2571542.0
C1006,Ananya Gupta,Kolkata,2,2382801.0,1191400.5,1435270.0
C1020,Kabir Nair,Kolkata,1,2056019.0,2056019.0,2056019.0
C1011,Aarav Iyer,Mumbai,2,2006249.0,1003124.5,1238582.0
C1001,Kabir Singh,Kolkata,1,1960486.0,1960486.0,1960486.0


In [0]:
#branch Performance
gold_branch_performance = (
    gold_base
    .groupBy(
        "branch_id",
        "branch_name",
        "branch_city",
        "region"
    )
    .agg(
        F.countDistinct("loan_id").alias("loan_count"),
        F.countDistinct("customer_id").alias("customer_count"),
        F.sum("loan_amount").alias("total_branch_exposure"),
        F.avg("loan_amount").alias("avg_loan_amount")
    )
)


In [0]:
gold_branch_performance.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("gold_branch_performance")


In [0]:
%sql
select * from gold_branch_performance;

branch_id,branch_name,branch_city,region,loan_count,customer_count,total_branch_exposure,avg_loan_amount
BR105,HitechCity,Hyderabad,South,10,9,18632688.0,1863268.8
BR101,Bandra,Mumbai,West,2,2,1958642.0,979321.0
BR107,SaltLake,Kolkata,East,3,3,2098158.0,699386.0
BR102,Powai,Mumbai,West,6,6,10398408.0,1733068.0
BR100,Andheri,Mumbai,West,4,4,4793073.0,1198268.25
BR104,Koramangala,Bengaluru,South,7,7,14207790.0,2029684.2857142857
BR103,Indiranagar,Bengaluru,South,5,5,6485778.0,1297155.6
BR106,Gachibowli,Hyderabad,South,4,4,8202786.0,2050696.5


In [0]:
gold_loan_lifecycle = (
    gold_base
    .groupBy("status")
    .agg(
        F.countDistinct("loan_id").alias("loan_count"),
        F.sum("loan_amount").alias("total_amount")
    )
)


In [0]:
gold_loan_lifecycle.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("gold_loan_lifecycle")


In [0]:
%sql
select * from gold_loan_lifecycle;

status,loan_count,total_amount
CLOSED,11,16676922.0
DISBURSED,6,10617654.0
APPROVED,11,15502528.0
PENDING,13,23980219.0


In [0]:
gold_disbursement_efficiency = (
    gold_base
    .withColumn(
        "disbursement_delay_days",
        F.datediff("disbursement_date", "loan_origination_date")
    )
    .groupBy(
        "branch_id",
        "branch_name"
    )
    .agg(
        F.avg("disbursement_delay_days").alias("avg_disbursement_delay_days"),
        F.count("loan_id").alias("loan_count")
    )
)


In [0]:
gold_disbursement_efficiency.write \
    .mode("overwrite") \
    .format("delta") \
    .saveAsTable("gold_disbursement_efficiency")


In [0]:
%sql
select * from gold_disbursement_efficiency;

branch_id,branch_name,avg_disbursement_delay_days,loan_count
BR103,Indiranagar,9.0,5
BR100,Andheri,12.333333333333334,4
BR104,Koramangala,1.75,7
BR102,Powai,6.4,6
BR106,Gachibowli,8.666666666666666,4
BR107,SaltLake,,3
BR101,Bandra,13.0,2
BR105,HitechCity,5.625,10


In [0]:
assert spark.table("gold_customer_360").count() > 0
assert spark.table("gold_branch_performance").count() > 0
assert spark.table("gold_loan_lifecycle").count() > 0


In [0]:
%sql
CREATE OR REPLACE VIEW vw_customer_360 AS
SELECT * FROM gold_customer_360;

CREATE OR REPLACE VIEW vw_branch_performance AS
SELECT * FROM gold_branch_performance;
