# Loan Prediction - Medallion Architecture Pipeline

## Project Overview
This notebook implements a Bronze → Silver → Gold data pipeline for loan prediction analysis.

### Medallion Architecture Layers:
- **Bronze Layer**: Raw data ingestion with error handling
- **Silver Layer**: Cleaned and typed data
- **Gold Layer**: Business metrics and analytics

## Setup and Configuration

In [1]:
import time
import json
from collections import defaultdict
from functools import reduce
from typing import List, Tuple, Any
import builtins
import findspark

findspark.init()

# For Spark (will install if needed)
try:
    from pyspark.sql import SparkSession
    from pyspark.sql.window import Window
    from pyspark.sql.functions import *
    import pyspark.sql.functions as F
    from pyspark.sql.types import *
    pyspark_available = True
except ImportError:
    print("PySpark not available. Install with: pip install pyspark")
    pyspark_available = False

print("Setup complete!")


Setup complete!


In [2]:
if pyspark_available:
    spark = SparkSession.builder \
        .appName("LoanPrediction") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .getOrCreate()
    #spark.sparkContext.setLogLevel("WARN")
    spark.sparkContext.setLogLevel("ERROR")  # Only show errors, not warnings
    print("Spark session initialized successfully!")
    print(f"Spark version: {spark.version}")
else:
    print("Skipping Spark tasks - PySpark not available")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/25 14:39:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark session initialized successfully!
Spark version: 3.5.0


----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 45900)
Traceback (most recent call last):
  File "/usr/lib/python3.11/socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "/usr/lib/python3.11/socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "/usr/lib/python3.11/socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/usr/lib/python3.11/socketserver.py", line 755, in __init__
    self.handle()
  File "/opt/spark/python/pyspark/accumulators.py", line 295, in handle
    poll(accum_updates)
  File "/opt/spark/python/pyspark/accumulators.py", line 267, in poll
    if self.rfile in r and func():
                           ^^^^^^
  File "/opt/spark/python/pyspark/accumulators.py", line 271, in accum_updates
    num_updates = read_int(self.rfile)
                  ^^

## Bronze Layer - Raw Data Ingestion

The Bronze layer ingests raw CSV data with error handling and metadata enrichment.

In [3]:
if pyspark_available:
    print("=== Bronze Layer: Ingesting Accepted and Rejected Loans ===\n")

    # Define file paths
    accepted_path = "data/lendingclub/accepted_2007_to_2018Q4.csv"
    rejected_path = "data/lendingclub/rejected_2007_to_2018Q4.csv"
    
    def process_loan_file(file_path, loan_type):
        """Process a loan CSV file and return parsed RDD with metadata"""
        print(f"Processing {loan_type} loans from: {file_path}")
        
        # Read the file
        raw_rdd = spark.sparkContext.textFile(file_path)
        header = raw_rdd.first()
        header_cols = header.split(",")
        
        # Filter out header and parse rows
        data_rdd = raw_rdd.filter(lambda row: row != header)
        
        def parse_safe(row):
            try:
                parts = row.split(",")
                # Create a dictionary with all columns
                record = {}
                for i, col_name in enumerate(header_cols):
                    record[col_name] = parts[i] if i < len(parts) else None
                
                # Add metadata
                record["loan_type"] = loan_type
                record["ingestion_time"] = time.time()
                record["source"] = file_path.split("/")[-1]
                record["status"] = "valid"
                
                return record
            except Exception as e:
                return {
                    "raw_data": row,
                    "loan_type": loan_type,
                    "ingestion_time": time.time(),
                    "source": file_path.split("/")[-1],
                    "status": "parse_error",
                    "error_message": str(e)
                }
        
        parsed_rdd = data_rdd.map(parse_safe)
        return parsed_rdd
    
    # Process both files using RDDs
    accepted_rdd = process_loan_file(accepted_path, "accepted")
    rejected_rdd = process_loan_file(rejected_path, "rejected")
    
    # Convert each RDD to DataFrame SEPARATELY to preserve their schemas
    print("\nConverting RDDs to DataFrames...")
    accepted_df = spark.createDataFrame(accepted_rdd)
    rejected_df = spark.createDataFrame(rejected_rdd)
    
    # Union DataFrames (this properly merges schemas with all columns)
    print("Combining accepted and rejected loan DataFrames...")
    bronze_df = accepted_df.unionByName(rejected_df, allowMissingColumns=True)
    
    print("\n=== Bronze Layer Data Sample ===")
    # Sample first to avoid processing entire dataset
    bronze_df.show(10, truncate=True)
    
    # Data quality metrics - use sampling for efficiency
    print("\n=== Data Quality Metrics (Approximate) ===")
    
    # Sample 10% of data for quick metrics
    sample_df = bronze_df.sample(fraction=0.1, seed=42)
    sample_count = sample_df.count()
    
    accepted_sample = sample_df.filter(col("loan_type") == "accepted").count()
    rejected_sample = sample_df.filter(col("loan_type") == "rejected").count()
    
    print(f"Sample size (10%): {sample_count:,}")
    print(f"  - Accepted loans (sample): {accepted_sample:,} ({accepted_sample/sample_count*100:.1f}%)")
    print(f"  - Rejected loans (sample): {rejected_sample:,} ({rejected_sample/sample_count*100:.1f}%)")
    print(f"\nEstimated total records: ~{sample_count * 10:,}")
    
    # Save to Bronze layer with partitioning by loan_type
    # This is the main operation - write directly without intermediate steps
    print("\nSaving to Bronze layer (this may take several minutes)...")
    bronze_df.write.mode("overwrite").partitionBy("loan_type").parquet("data/medallion/bronze")
    print("\n✅ Bronze data saved to data/medallion/bronze (partitioned by loan_type)")
    
    # Now read back from parquet for verification (more efficient)
    print("\n=== Verification: Reading from Bronze Layer ===")
    bronze_saved = spark.read.parquet("data/medallion/bronze")
    
    total_records = bronze_saved.count()
    accepted_total = bronze_saved.filter(col("loan_type") == "accepted").count()
    rejected_total = bronze_saved.filter(col("loan_type") == "rejected").count()
    
    print(f"Total records saved: {total_records:,}")
    print(f"  - Accepted loans: {accepted_total:,} ({accepted_total/total_records*100:.1f}%)")
    print(f"  - Rejected loans: {rejected_total:,} ({rejected_total/total_records*100:.1f}%)")
    
    # Show available columns
    print("\n=== Available Columns ===")
    all_columns = bronze_saved.columns
    print(f"Total columns: {len(all_columns)}")
    print("First 20 columns:", all_columns[:20])
    
    # Update bronze_df to point to saved data for downstream use
    bronze_df = bronze_saved

=== Bronze Layer: Ingesting Accepted and Rejected Loans ===

Processing accepted loans from: data/lendingclub/accepted_2007_to_2018Q4.csv


                                                                                

Processing rejected loans from: data/lendingclub/rejected_2007_to_2018Q4.csv

Converting RDDs to DataFrames...
Combining accepted and rejected loan DataFrames...

=== Bronze Layer Data Sample ===


                                                                                

+--------------+--------------------+----------+--------+----------+----------------+----------------+-----------+--------------+-------+------------------------+-----------------------+--------------------------+--------------------+-------------------------+-------------+-----------+-----------+----+-------------------+-----+---------+----------------+----------+--------------------+---------------+--------------+-----------+---------------+-----+---------------+------------+-----------------+-------------+----------------------------+---------------+--------------------+------------------------------+---------------+-------------------+---------------+-------------+--------------+--------+-------+--------------------+-------------------+------+------------+--------------+-----------+--------+--------+------------------+--------------------+-------------------+---------------+------------+---------+-----------+---------+----------+---------+------------------+--------------------+---

                                                                                

Sample size (10%): 2,991,157
  - Accepted loans (sample): 226,766 (7.6%)
  - Rejected loans (sample): 2,764,391 (92.4%)

Estimated total records: ~29,911,570

Saving to Bronze layer (this may take several minutes)...


                                                                                


✅ Bronze data saved to data/medallion/bronze (partitioned by loan_type)

=== Verification: Reading from Bronze Layer ===
Total records saved: 29,909,442
  - Accepted loans: 2,260,701 (7.6%)
  - Rejected loans: 27,648,741 (92.4%)

=== Available Columns ===
Total columns: 164
First 20 columns: ['acc_now_delinq', 'acc_open_past_24mths', 'addr_state', 'all_util', 'annual_inc', 'annual_inc_joint', 'application_type', 'avg_cur_bal', 'bc_open_to_buy', 'bc_util', 'chargeoff_within_12_mths', 'collection_recovery_fee', 'collections_12_mths_ex_med', 'debt_settlement_flag', 'debt_settlement_flag_date', 'deferral_term', 'delinq_2yrs', 'delinq_amnt', 'desc', 'disbursement_method']


## Silver Layer - Data Cleaning and Standardization

The Silver layer cleanses data, enforces types, and validates quality.

In [4]:
if pyspark_available:
    print("=== Silver Layer: Cleaned and Standardized Data ===\n")
    
    # Read from Bronze layer
    bronze_df = spark.read.parquet("data/medallion/bronze")
    
    # Filter only valid records
    valid_bronze_df = bronze_df.filter(col("status") == "valid")
    
    print("Processing accepted and rejected loans separately due to different schemas...\n")
    
    # ===== ACCEPTED LOANS PROCESSING =====
    print("=== Processing Accepted Loans ===")
    accepted_df = valid_bronze_df.filter(col("loan_type") == "accepted")
    
    print(f"Total accepted loans before cleaning: {accepted_df.count():,}")
    
    # Clean and standardize accepted loans with expanded columns
    accepted_silver = accepted_df.select(
        # Core loan identifiers
        col("id").cast(StringType()).alias("loan_id"),
        col("member_id").cast(StringType()),
        col("loan_type"),
        
        # Loan amounts and rates
        col("loan_amnt").cast(DoubleType()).alias("loan_amount"),
        col("funded_amnt").cast(DoubleType()).alias("funded_amount"),
        col("funded_amnt_inv").cast(DoubleType()).alias("funded_amount_investors"),
        regexp_replace(col("int_rate"), "%", "").cast(DoubleType()).alias("interest_rate"),
        col("installment").cast(DoubleType()),
        
        # Loan classification
        col("grade").cast(StringType()),
        col("sub_grade").cast(StringType()),
        col("term").cast(StringType()),
        col("loan_status").cast(StringType()),
        col("purpose").cast(StringType()),
        col("title").cast(StringType()).alias("loan_title"),
        col("application_type").cast(StringType()),
        
        # Borrower information
        col("annual_inc").cast(DoubleType()).alias("annual_income"),
        col("annual_inc_joint").cast(DoubleType()).alias("annual_income_joint"),
        col("emp_length").cast(StringType()).alias("employment_length"),
        col("emp_title").cast(StringType()).alias("employment_title"),
        col("home_ownership").cast(StringType()),
        col("verification_status").cast(StringType()),
        col("verification_status_joint").cast(StringType()),
        col("addr_state").cast(StringType()).alias("state"),
        col("zip_code").cast(StringType()),
        
        # Credit information - Basic
        col("dti").cast(DoubleType()).alias("debt_to_income_ratio"),
        col("dti_joint").cast(DoubleType()).alias("debt_to_income_ratio_joint"),
        col("fico_range_low").cast(IntegerType()),
        col("fico_range_high").cast(IntegerType()),
        col("last_fico_range_low").cast(IntegerType()),
        col("last_fico_range_high").cast(IntegerType()),
        col("earliest_cr_line").cast(StringType()).alias("earliest_credit_line"),
        
        # Credit information - Delinquencies
        col("delinq_2yrs").cast(IntegerType()).alias("delinquencies_2yrs"),
        col("delinq_amnt").cast(DoubleType()).alias("delinquency_amount"),
        col("acc_now_delinq").cast(IntegerType()).alias("accounts_now_delinquent"),
        col("mths_since_last_delinq").cast(IntegerType()).alias("months_since_last_delinquency"),
        col("mths_since_last_major_derog").cast(IntegerType()).alias("months_since_last_derogatory"),
        col("mths_since_last_record").cast(IntegerType()).alias("months_since_last_record"),
        
        # Credit information - Inquiries and Accounts
        col("inq_last_6mths").cast(IntegerType()).alias("inquiries_last_6mths"),
        col("inq_last_12m").cast(IntegerType()).alias("inquiries_last_12m"),
        col("inq_fi").cast(IntegerType()).alias("inquiries_financial_institutions"),
        col("open_acc").cast(IntegerType()).alias("open_accounts"),
        col("total_acc").cast(IntegerType()).alias("total_accounts"),
        col("acc_open_past_24mths").cast(IntegerType()).alias("accounts_opened_past_24mths"),
        
        # Credit information - Revolving Credit
        col("revol_bal").cast(DoubleType()).alias("revolving_balance"),
        col("revol_bal_joint").cast(DoubleType()).alias("revolving_balance_joint"),
        regexp_replace(col("revol_util"), "%", "").cast(DoubleType()).alias("revolving_utilization"),
        col("num_rev_accts").cast(IntegerType()).alias("num_revolving_accounts"),
        col("num_actv_rev_tl").cast(IntegerType()).alias("num_active_revolving_trades"),
        col("num_rev_tl_bal_gt_0").cast(IntegerType()).alias("num_revolving_trades_balance_gt_0"),
        
        # Credit information - Bankcard and Installment
        col("num_bc_tl").cast(IntegerType()).alias("num_bankcard_accounts"),
        col("num_actv_bc_tl").cast(IntegerType()).alias("num_active_bankcard_accounts"),
        col("num_bc_sats").cast(IntegerType()).alias("num_satisfactory_bankcard_accounts"),
        col("bc_open_to_buy").cast(DoubleType()).alias("bankcard_open_to_buy"),
        regexp_replace(col("bc_util"), "%", "").cast(DoubleType()).alias("bankcard_utilization"),
        col("num_il_tl").cast(IntegerType()).alias("num_installment_accounts"),
        col("total_bal_il").cast(DoubleType()).alias("total_balance_installment"),
        col("il_util").cast(DoubleType()).alias("installment_utilization"),
        
        # Credit information - Public Records
        col("pub_rec").cast(IntegerType()).alias("public_records"),
        col("pub_rec_bankruptcies").cast(IntegerType()).alias("public_record_bankruptcies"),
        col("tax_liens").cast(IntegerType()),
        col("collections_12_mths_ex_med").cast(IntegerType()).alias("collections_12mths_ex_medical"),
        
        # Credit information - Mortgage
        col("mort_acc").cast(IntegerType()).alias("mortgage_accounts"),
        col("total_bal_ex_mort").cast(DoubleType()).alias("total_balance_ex_mortgage"),
        
        # Credit information - Advanced Metrics
        col("tot_cur_bal").cast(DoubleType()).alias("total_current_balance"),
        col("tot_hi_cred_lim").cast(DoubleType()).alias("total_high_credit_limit"),
        col("total_bc_limit").cast(DoubleType()).alias("total_bankcard_limit"),
        col("total_il_high_credit_limit").cast(DoubleType()),
        col("total_rev_hi_lim").cast(DoubleType()).alias("total_revolving_high_credit_limit"),
        col("avg_cur_bal").cast(DoubleType()).alias("average_current_balance"),
        col("all_util").cast(DoubleType()).alias("all_utilization"),
        col("max_bal_bc").cast(DoubleType()).alias("max_balance_bankcard"),
        col("percent_bc_gt_75").cast(DoubleType()).alias("percent_bankcard_gt_75"),
        col("pct_tl_nvr_dlq").cast(DoubleType()).alias("percent_trades_never_delinquent"),
        
        # Payment information - Dates
        col("issue_d").cast(StringType()).alias("issue_date"),
        col("last_pymnt_d").cast(StringType()).alias("last_payment_date"),
        col("next_pymnt_d").cast(StringType()).alias("next_payment_date"),
        col("last_credit_pull_d").cast(StringType()).alias("last_credit_pull_date"),
        
        # Payment information - Amounts
        col("total_pymnt").cast(DoubleType()).alias("total_payment"),
        col("total_pymnt_inv").cast(DoubleType()).alias("total_payment_investors"),
        col("total_rec_prncp").cast(DoubleType()).alias("total_principal_received"),
        col("total_rec_int").cast(DoubleType()).alias("total_interest_received"),
        col("total_rec_late_fee").cast(DoubleType()).alias("total_late_fee_received"),
        col("last_pymnt_amnt").cast(DoubleType()).alias("last_payment_amount"),
        col("out_prncp").cast(DoubleType()).alias("outstanding_principal"),
        col("out_prncp_inv").cast(DoubleType()).alias("outstanding_principal_investors"),
        
        # Collections and Recovery
        col("recoveries").cast(DoubleType()),
        col("collection_recovery_fee").cast(DoubleType()),
        
        # Hardship and Settlement
        col("hardship_flag").cast(StringType()),
        col("hardship_type").cast(StringType()),
        col("hardship_status").cast(StringType()),
        col("hardship_loan_status").cast(StringType()),
        col("debt_settlement_flag").cast(StringType()),
        col("settlement_status").cast(StringType()),
        col("settlement_amount").cast(DoubleType()),
        col("settlement_percentage").cast(DoubleType()),
        
        # Secondary Applicant (for joint loans)
        col("sec_app_fico_range_low").cast(IntegerType()).alias("secondary_app_fico_range_low"),
        col("sec_app_fico_range_high").cast(IntegerType()).alias("secondary_app_fico_range_high"),
        col("sec_app_earliest_cr_line").cast(StringType()).alias("secondary_app_earliest_credit_line"),
        col("sec_app_inq_last_6mths").cast(IntegerType()).alias("secondary_app_inquiries_last_6mths"),
        col("sec_app_mort_acc").cast(IntegerType()).alias("secondary_app_mortgage_accounts"),
        col("sec_app_open_acc").cast(IntegerType()).alias("secondary_app_open_accounts"),
        col("sec_app_revol_util").cast(DoubleType()).alias("secondary_app_revolving_utilization"),
        col("sec_app_num_rev_accts").cast(IntegerType()).alias("secondary_app_num_revolving_accounts"),
        
        # Add processing metadata
        lit(current_timestamp()).alias("silver_processed_time")
    )
    
    print(f"\n=== DATA QUALITY CHECKS ===")
    
    # 1. Check for duplicates
    print("\n1. Checking for duplicate loan IDs...")
    total_loans = accepted_silver.count()
    unique_loans = accepted_silver.select("loan_id").distinct().count()
    duplicates = total_loans - unique_loans
    print(f"   Total records: {total_loans:,}")
    print(f"   Unique loan IDs: {unique_loans:,}")
    print(f"   Duplicates: {duplicates:,}")
    
    # 2. Filter unrealistic values
    print("\n2. Filtering unrealistic values...")
    accepted_clean = accepted_silver \
        .filter(col("loan_amount").isNotNull()) \
        .filter(col("loan_amount") > 0) \
        .filter(col("loan_amount") <= 100000) \
        .filter(col("annual_income").isNotNull()) \
        .filter(col("annual_income") > 0) \
        .filter(col("annual_income") <= 10000000) \
        .filter(col("debt_to_income_ratio").isNotNull()) \
        .filter(col("debt_to_income_ratio") >= 0) \
        .filter(col("debt_to_income_ratio") <= 100) \
        .filter((col("interest_rate").isNull()) | ((col("interest_rate") >= 0) & (col("interest_rate") <= 35)))
    
    unrealistic_count = total_loans - accepted_clean.count()
    print(f"   Removed {unrealistic_count:,} records with unrealistic values")
    print(f"   Criteria: loan_amount (0-100K), annual_income (0-10M), DTI (0-100%), interest_rate (0-35%)")
    
    # 3. Date validation (issue_date should be 2007-2018)
    print("\n3. Validating dates (issue_date 2007-2018)...")
    from pyspark.sql.functions import to_date, year
    accepted_with_dates = accepted_clean.withColumn("issue_year", year(to_date(col("issue_date"), "MMM-yyyy")))
    
    accepted_final = accepted_with_dates \
        .filter((col("issue_year").isNull()) | ((col("issue_year") >= 2007) & (col("issue_year") <= 2018))) \
        .drop("issue_year")
    
    date_invalid_count = accepted_clean.count() - accepted_final.count()
    print(f"   Removed {date_invalid_count:,} records with out-of-range dates")
    
    # 4. Remove duplicates (keep first occurrence)
    print("\n4. Removing duplicate loan IDs...")
    from pyspark.sql import Window
    window_spec = Window.partitionBy("loan_id").orderBy(col("silver_processed_time"))
    accepted_deduped = accepted_final \
        .withColumn("row_num", F.row_number().over(window_spec)) \
        .filter(col("row_num") == 1) \
        .drop("row_num")
    
    dedup_removed = accepted_final.count() - accepted_deduped.count()
    print(f"   Removed {dedup_removed:,} duplicate records")
    
    # Final dataset
    accepted_silver = accepted_deduped
    
    print(f"\n✅ Accepted loans after all cleaning: {accepted_silver.count():,}")
    print(f"   Total removed: {total_loans - accepted_silver.count():,} ({100*(total_loans - accepted_silver.count())/total_loans:.2f}%)")
    
    accepted_silver.show(5, truncate=True)
    
    # ===== REJECTED LOANS PROCESSING =====
    print("\n=== Processing Rejected Loans ===")
    rejected_df = valid_bronze_df.filter(col("loan_type") == "rejected")
    
    # DEBUGGING: Find which columns actually contain data
    print("Analyzing rejected loan data structure...")
    sample_row = rejected_df.first()
    
    if sample_row is None:
        print("ERROR: No rejected loans found in Bronze layer!")
        raise ValueError("No rejected loan records found")
    
    # Check all columns for non-null/non-empty values
    non_null_cols = []
    for col_name in rejected_df.columns:
        val = sample_row[col_name]
        if val is not None and str(val).strip() != '' and col_name not in ['ingestion_time', 'status', 'source', 'loan_type']:
            non_null_cols.append(col_name)
            print(f"  {col_name}: {val}")
    
    print(f"\nTotal columns with data: {len(non_null_cols)}")
    
    # The rejected CSV has these columns:
    # "Amount Requested,Application Date,Loan Title,Risk_Score,Debt-To-Income Ratio,Zip Code,State,Employment Length,Policy Code"
    # Let's try to map them by searching for similar column names
    
    # Expected rejected columns from CSV - map to actual parquet column names
    from typing import Dict, Optional
    rejected_col_mapping: Dict[str, Optional[str]] = {
        "Amount Requested": None,
        "Application Date": None,
        "Loan Title": None,
        "Risk_Score": None,
        "Debt-To-Income Ratio": None,
        "Zip Code": None,
        "State": None,
        "Employment Length": None,
        "Policy Code": None
    }
    
    # Search for matches (case-insensitive, handle spaces/underscores)
    for csv_col in rejected_col_mapping.keys():
        # Try exact match first
        if csv_col in rejected_df.columns:
            rejected_col_mapping[csv_col] = csv_col
            continue
        
        # Try with underscores instead of spaces
        col_with_underscores = csv_col.replace(" ", "_")
        if col_with_underscores in rejected_df.columns:
            rejected_col_mapping[csv_col] = col_with_underscores
            continue
        
        # Try lowercase with underscores
        col_lower = csv_col.lower().replace(" ", "_")
        if col_lower in rejected_df.columns:
            rejected_col_mapping[csv_col] = col_lower
            continue
    
    print("\n=== Column Mapping Results ===")
    for csv_col, parquet_col in rejected_col_mapping.items():
        print(f"{csv_col} -> {parquet_col}")
    
    # Clean and standardize rejected loans using the mapped columns
    rejected_silver = rejected_df.select(
        # Generate a unique ID for rejected loans
        monotonically_increasing_id().cast(StringType()).alias("loan_id"),
        col("loan_type"),
        
        # Map rejected loan columns
        col(rejected_col_mapping["Amount Requested"]).cast(DoubleType()).alias("loan_amount") if rejected_col_mapping["Amount Requested"] else lit(None).cast(DoubleType()).alias("loan_amount"),
        col(rejected_col_mapping["Loan Title"]).cast(StringType()).alias("loan_title") if rejected_col_mapping["Loan Title"] else lit(None).cast(StringType()).alias("loan_title"),
        col(rejected_col_mapping["Risk_Score"]).cast(DoubleType()).alias("risk_score") if rejected_col_mapping["Risk_Score"] else lit(None).cast(DoubleType()).alias("risk_score"),
        regexp_replace(col(rejected_col_mapping["Debt-To-Income Ratio"]), "%", "").cast(DoubleType()).alias("debt_to_income_ratio") if rejected_col_mapping["Debt-To-Income Ratio"] else lit(None).cast(DoubleType()).alias("debt_to_income_ratio"),
        col(rejected_col_mapping["Zip Code"]).cast(StringType()).alias("zip_code") if rejected_col_mapping["Zip Code"] else lit(None).cast(StringType()).alias("zip_code"),
        col(rejected_col_mapping["State"]).cast(StringType()).alias("state") if rejected_col_mapping["State"] else lit(None).cast(StringType()).alias("state"),
        col(rejected_col_mapping["Employment Length"]).cast(StringType()).alias("employment_length") if rejected_col_mapping["Employment Length"] else lit(None).cast(StringType()).alias("employment_length"),
        col(rejected_col_mapping["Application Date"]).cast(StringType()).alias("application_date") if rejected_col_mapping["Application Date"] else lit(None).cast(StringType()).alias("application_date"),
        col(rejected_col_mapping["Policy Code"]).cast(StringType()).alias("policy_code") if rejected_col_mapping["Policy Code"] else lit(None).cast(StringType()).alias("policy_code"),
        
        # Rejected loans don't have these fields - set to null
        lit(None).cast(DoubleType()).alias("funded_amount"),
        lit(None).cast(DoubleType()).alias("interest_rate"),
        lit(None).cast(DoubleType()).alias("installment"),
        lit(None).cast(StringType()).alias("grade"),
        lit(None).cast(StringType()).alias("sub_grade"),
        lit(None).cast(StringType()).alias("term"),
        lit("Rejected").cast(StringType()).alias("loan_status"),
        lit(None).cast(DoubleType()).alias("annual_income"),
        lit(None).cast(StringType()).alias("home_ownership"),
        lit(None).cast(StringType()).alias("verification_status"),
        
        # Add processing metadata
        lit(current_timestamp()).alias("silver_processed_time")
    )
    
    print(f"\n=== DATA QUALITY CHECKS FOR REJECTED LOANS ===")
    
    # 1. Check for duplicates (rejected loans don't have unique IDs, so check by all fields)
    print("\n1. Checking for duplicate applications...")
    total_rejected = rejected_silver.count()
    unique_rejected = rejected_silver.select("loan_amount", "loan_title", "application_date", "state").distinct().count()
    duplicates_rejected = total_rejected - unique_rejected
    print(f"   Total records: {total_rejected:,}")
    print(f"   Unique applications: {unique_rejected:,}")
    print(f"   Potential duplicates: {duplicates_rejected:,}")
    
    # 2. Filter unrealistic values and nulls
    print("\n2. Filtering unrealistic values and null DTI...")
    rejected_clean = rejected_silver \
        .filter(col("loan_amount").isNotNull()) \
        .filter(col("loan_amount") > 0) \
        .filter(col("loan_amount") <= 100000) \
        .filter(col("debt_to_income_ratio").isNotNull()) \
        .filter(col("debt_to_income_ratio") >= 0) \
        .filter(col("debt_to_income_ratio") <= 100) \
        .filter((col("risk_score").isNull()) | ((col("risk_score") >= 300) & (col("risk_score") <= 850)))
    
    unrealistic_rejected = total_rejected - rejected_clean.count()
    print(f"   Removed {unrealistic_rejected:,} records with unrealistic values or null DTI")
    print(f"   Criteria: loan_amount (0-100K), DTI not null and (0-100%), risk_score (300-850 or null)")
    
    # 3. Date validation
    print("\n3. Validating application dates (2007-2018)...")
    from pyspark.sql.functions import to_date, year
    rejected_with_dates = rejected_clean.withColumn("app_year", year(to_date(col("application_date"), "yyyy-MM-dd")))
    
    rejected_final = rejected_with_dates \
        .filter((col("app_year").isNull()) | ((col("app_year") >= 2007) & (col("app_year") <= 2018))) \
        .drop("app_year")
    
    date_invalid_rejected = rejected_clean.count() - rejected_final.count()
    print(f"   Removed {date_invalid_rejected:,} records with out-of-range dates")
    
    # 4. Remove exact duplicates (keep first occurrence)
    print("\n4. Removing exact duplicate applications...")
    rejected_deduped = rejected_final.dropDuplicates(["loan_amount", "loan_title", "application_date", "state", "zip_code"])
    
    dedup_removed_rejected = rejected_final.count() - rejected_deduped.count()
    print(f"   Removed {dedup_removed_rejected:,} duplicate records")
    
    # Final dataset
    rejected_silver = rejected_deduped
    
    print(f"\n✅ Rejected loans after all cleaning: {rejected_silver.count():,}")
    print(f"   Total removed: {total_rejected - rejected_silver.count():,} ({100*(total_rejected - rejected_silver.count())/total_rejected:.2f}%)")
    
    rejected_silver.show(5, truncate=True)
    
    # ===== SAVE BOTH TO SILVER LAYER =====
    print("\n=== Saving to Silver Layer ===")
    
    # Save accepted loans
    print("Saving accepted loans to data/medallion/silver/accepted...")
    accepted_silver.write.mode("overwrite").parquet("data/medallion/silver/accepted")
    
    # Save rejected loans  
    print("Saving rejected loans to data/medallion/silver/rejected...")
    rejected_silver.write.mode("overwrite").parquet("data/medallion/silver/rejected")
    
    print("\n✅ Silver layer saved successfully!")
    print("  - Accepted loans: data/medallion/silver/accepted")
    print("  - Rejected loans: data/medallion/silver/rejected")
    
    # ===== DATA QUALITY SUMMARY =====
    print("\n=== Silver Layer Data Quality Summary ===")
    
    # Read back for verification
    accepted_silver_saved = spark.read.parquet("data/medallion/silver/accepted")
    rejected_silver_saved = spark.read.parquet("data/medallion/silver/rejected")
    
    print(f"\nAccepted Loans: {accepted_silver_saved.count():,} records")
    print("Key columns null counts:")
    accepted_silver_saved.select([
        F.count(when(col(c).isNull(), c)).alias(f"{c}_nulls") 
        for c in ["loan_amount", "interest_rate", "annual_income", "debt_to_income_ratio"]
    ]).show()
    
    print(f"\nRejected Loans: {rejected_silver_saved.count():,} records")
    print("Key columns null counts:")
    rejected_silver_saved.select([
        F.count(when(col(c).isNull(), c)).alias(f"{c}_nulls") 
        for c in ["loan_amount", "risk_score", "debt_to_income_ratio"]
    ]).show()
    
    # Store for Gold layer (use accepted loans for analysis)
    silver_df = accepted_silver_saved

=== Silver Layer: Cleaned and Standardized Data ===

Processing accepted and rejected loans separately due to different schemas...

=== Processing Accepted Loans ===
Total accepted loans before cleaning: 2,260,701

=== DATA QUALITY CHECKS ===

1. Checking for duplicate loan IDs...


                                                                                

   Total records: 2,260,701
   Unique loan IDs: 2,260,701
   Duplicates: 0

2. Filtering unrealistic values...


                                                                                

   Removed 69,023 records with unrealistic values
   Criteria: loan_amount (0-100K), annual_income (0-10M), DTI (0-100%), interest_rate (0-35%)

3. Validating dates (issue_date 2007-2018)...


                                                                                

   Removed 0 records with out-of-range dates

4. Removing duplicate loan IDs...


                                                                                

   Removed 0 duplicate records


                                                                                


✅ Accepted loans after all cleaning: 2,191,678


                                                                                

   Total removed: 69,023 (3.05%)


                                                                                

+---------+---------+---------+-----------+-------------+-----------------------+-------------+-----------+-----+---------+----------+-----------+------------------+------------------+----------------+-------------+-------------------+-----------------+-----------------+--------------+-------------------+-------------------------+-----+--------+--------------------+--------------------------+--------------+---------------+-------------------+--------------------+--------------------+------------------+------------------+-----------------------+-----------------------------+----------------------------+------------------------+--------------------+------------------+--------------------------------+-------------+--------------+---------------------------+-----------------+-----------------------+---------------------+----------------------+---------------------------+---------------------------------+---------------------+----------------------------+----------------------------------+-

                                                                                

   Total records: 27,648,741
   Unique applications: 10,723,228
   Potential duplicates: 16,925,513

2. Filtering unrealistic values and null DTI...


                                                                                

   Removed 2,120,481 records with unrealistic values or null DTI
   Criteria: loan_amount (0-100K), DTI not null and (0-100%), risk_score (300-850 or null)

3. Validating application dates (2007-2018)...


                                                                                

   Removed 0 records with out-of-range dates

4. Removing exact duplicate applications...


                                                                                

   Removed 5,412,025 duplicate records


                                                                                


✅ Rejected loans after all cleaning: 20,116,235


                                                                                

   Total removed: 7,532,506 (27.24%)


                                                                                

+-------+---------+-----------+--------------------+----------+--------------------+--------+-----+-----------------+----------------+-----------+-------------+-------------+-----------+-----+---------+----+-----------+-------------+--------------+-------------------+---------------------+
|loan_id|loan_type|loan_amount|          loan_title|risk_score|debt_to_income_ratio|zip_code|state|employment_length|application_date|policy_code|funded_amount|interest_rate|installment|grade|sub_grade|term|loan_status|annual_income|home_ownership|verification_status|silver_processed_time|
+-------+---------+-----------+--------------------+----------+--------------------+--------+-----+-----------------+----------------+-----------+-------------+-------------+-----------+-----+---------+----+-----------+-------------+--------------+-------------------+---------------------+
|  74390| rejected|    15000.0|  "Paying Back Bills|      NULL|                 0.0|      0%|483xx|               MI|      2009

                                                                                

Saving rejected loans to data/medallion/silver/rejected...


                                                                                


✅ Silver layer saved successfully!
  - Accepted loans: data/medallion/silver/accepted
  - Rejected loans: data/medallion/silver/rejected

=== Silver Layer Data Quality Summary ===

Accepted Loans: 2,191,678 records
Key columns null counts:
+-----------------+-------------------+-------------------+--------------------------+
|loan_amount_nulls|interest_rate_nulls|annual_income_nulls|debt_to_income_ratio_nulls|
+-----------------+-------------------+-------------------+--------------------------+
|                0|                  0|                  0|                         0|
+-----------------+-------------------+-------------------+--------------------------+


Rejected Loans: 20,116,235 records
Key columns null counts:
+-----------------+----------------+--------------------------+
|loan_amount_nulls|risk_score_nulls|debt_to_income_ratio_nulls|
+-----------------+----------------+--------------------------+
|                0|        12837284|                         0|
+-----

## Gold Layer - Business Metrics and Analytics

The Gold layer creates aggregated business metrics for analysis and reporting.

In [4]:
## Gold Layer - Business Metrics and Analytics (Complete Implementation)

# This should be added after your Silver layer code in Project.ipynb

#%% Cell 1: Load Silver Data and Register SQL Tables
print("=== Gold Layer: Business Analytics and ML ===\n")
print("Loading Silver layer data...")

# Load cleaned data from Silver layer
accepted_df = spark.read.parquet("data/medallion/silver/accepted")
rejected_df = spark.read.parquet("data/medallion/silver/rejected")

print(f"✅ Accepted loans: {accepted_df.count():,} records")
print(f"✅ Rejected loans: {rejected_df.count():,} records")

# Register as SQL temporary views for analysis
accepted_df.createOrReplaceTempView("accepted_loans")
rejected_df.createOrReplaceTempView("rejected_loans")

print("\n✅ SQL tables registered for querying\n")

#%% Cell 2: Business Analytics Query 1 - Loan Status Distribution
print("=== 1. Loan Status Distribution Analysis ===\n")

query = """
SELECT 
    loan_status,
    COUNT(*) as total_loans,
    ROUND(SUM(loan_amount), 2) as total_loan_amount,
    ROUND(AVG(loan_amount), 2) as avg_loan_amount,
    ROUND(AVG(interest_rate), 2) as avg_interest_rate,
    ROUND(AVG(annual_income), 2) as avg_annual_income,
    ROUND(AVG(debt_to_income_ratio), 2) as avg_dti
FROM accepted_loans
WHERE loan_status IS NOT NULL
GROUP BY loan_status
ORDER BY total_loans DESC
"""

status_summary = spark.sql(query)
print("Loan Status Summary:")
status_summary.show(20, truncate=False)

# Save to Gold layer
status_summary.write.mode("overwrite").parquet("data/medallion/gold/status_summary")
print("✅ Saved to data/medallion/gold/status_summary\n")

#%% Cell 3: Business Analytics Query 2 - Default Risk by Grade
print("=== 2. Default Risk Analysis by Loan Grade ===\n")

query = """
SELECT 
    grade,
    sub_grade,
    COUNT(*) as total_loans,
    SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) as defaults,
    ROUND(100.0 * SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) / COUNT(*), 2) as default_rate_pct,
    ROUND(AVG(loan_amount), 2) as avg_loan_amount,
    ROUND(AVG(interest_rate), 2) as avg_interest_rate,
    ROUND(AVG(annual_income), 2) as avg_income
FROM accepted_loans
WHERE grade IS NOT NULL
GROUP BY grade, sub_grade
ORDER BY grade, sub_grade
"""

grade_analysis = spark.sql(query)
print("Default Analysis by Loan Grade:")
grade_analysis.show(35, truncate=False)

# Summary by main grade only
grade_summary = spark.sql("""
    SELECT 
        grade,
        COUNT(*) as total_loans,
        SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) as defaults,
        ROUND(100.0 * SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) / COUNT(*), 2) as default_rate_pct,
        ROUND(AVG(interest_rate), 2) as avg_rate
    FROM accepted_loans
    WHERE grade IS NOT NULL
    GROUP BY grade
    ORDER BY grade
""")
print("\nSummary by Main Grade:")
grade_summary.show()

grade_analysis.write.mode("overwrite").parquet("data/medallion/gold/grade_analysis")
print("✅ Saved to data/medallion/gold/grade_analysis\n")

#%% Cell 4: Business Analytics Query 3 - Income vs Default Risk
print("=== 3. Income vs Default Risk Analysis ===\n")

query = """
SELECT 
    CASE 
        WHEN annual_income < 30000 THEN 'Low (<30K)'
        WHEN annual_income < 60000 THEN 'Medium (30-60K)'
        WHEN annual_income < 100000 THEN 'High (60-100K)'
        ELSE 'Very High (>100K)'
    END as income_bracket,
    COUNT(*) as total_loans,
    ROUND(AVG(loan_amount), 2) as avg_loan_amount,
    SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) as defaults,
    ROUND(100.0 * SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) / COUNT(*), 2) as default_rate_pct,
    ROUND(AVG(debt_to_income_ratio), 2) as avg_dti,
    ROUND(AVG(interest_rate), 2) as avg_rate
FROM accepted_loans
WHERE annual_income IS NOT NULL
GROUP BY 
    CASE 
        WHEN annual_income < 30000 THEN 'Low (<30K)'
        WHEN annual_income < 60000 THEN 'Medium (30-60K)'
        WHEN annual_income < 100000 THEN 'High (60-100K)'
        ELSE 'Very High (>100K)'
    END
ORDER BY 
    CASE income_bracket
        WHEN 'Low (<30K)' THEN 1
        WHEN 'Medium (30-60K)' THEN 2
        WHEN 'High (60-100K)' THEN 3
        ELSE 4
    END
"""

income_analysis = spark.sql(query)
print("Income vs Default Risk:")
income_analysis.show(truncate=False)

income_analysis.write.mode("overwrite").parquet("data/medallion/gold/income_analysis")
print("✅ Saved to data/medallion/gold/income_analysis\n")

#%% Cell 5: Business Analytics Query 4 - Purpose Analysis
print("=== 4. Loan Purpose Analysis ===\n")

query = """
SELECT 
    purpose,
    COUNT(*) as total_loans,
    ROUND(AVG(loan_amount), 2) as avg_loan_amount,
    ROUND(AVG(interest_rate), 2) as avg_rate,
    SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) as defaults,
    ROUND(100.0 * SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) / COUNT(*), 2) as default_rate_pct
FROM accepted_loans
WHERE purpose IS NOT NULL
GROUP BY purpose
ORDER BY total_loans DESC
"""

purpose_analysis = spark.sql(query)
print("Top Loan Purposes by Volume:")
purpose_analysis.show(20, truncate=False)

purpose_analysis.write.mode("overwrite").parquet("data/medallion/gold/purpose_analysis")
print("✅ Saved to data/medallion/gold/purpose_analysis\n")

#%% Cell 6: Business Analytics Query 5 - Geographic Analysis
print("=== 5. Geographic Loan Distribution ===\n")

query = """
SELECT 
    state,
    COUNT(*) as total_loans,
    ROUND(SUM(loan_amount), 2) as total_loan_volume,
    ROUND(AVG(loan_amount), 2) as avg_loan_amount,
    ROUND(AVG(annual_income), 2) as avg_income,
    SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) as defaults,
    ROUND(100.0 * SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) / COUNT(*), 2) as default_rate_pct
FROM accepted_loans
WHERE state IS NOT NULL
GROUP BY state
ORDER BY total_loans DESC
LIMIT 20
"""

geo_analysis = spark.sql(query)
print("Top 20 States by Loan Volume:")
geo_analysis.show(20, truncate=False)

geo_analysis.write.mode("overwrite").parquet("data/medallion/gold/geo_analysis")
print("✅ Saved to data/medallion/gold/geo_analysis\n")

#%% Cell 7: Business Analytics Query 6 - Time Series Analysis
print("=== 6. Loan Trends Over Time ===\n")

query = """
SELECT 
    YEAR(TO_DATE(issue_date, 'MMM-yyyy')) as year,
    QUARTER(TO_DATE(issue_date, 'MMM-yyyy')) as quarter,
    COUNT(*) as total_loans,
    ROUND(SUM(loan_amount), 2) as total_volume,
    ROUND(AVG(loan_amount), 2) as avg_loan,
    ROUND(AVG(interest_rate), 2) as avg_rate,
    SUM(CASE WHEN loan_status IN ('Charged Off', 'Default') THEN 1 ELSE 0 END) as defaults
FROM accepted_loans
WHERE issue_date IS NOT NULL
GROUP BY YEAR(TO_DATE(issue_date, 'MMM-yyyy')), QUARTER(TO_DATE(issue_date, 'MMM-yyyy'))
ORDER BY year, quarter
"""

time_analysis = spark.sql(query)
print("Loan Volume Trends by Year and Quarter:")
time_analysis.show(50, truncate=False)

time_analysis.write.mode("overwrite").parquet("data/medallion/gold/time_analysis")
print("✅ Saved to data/medallion/gold/time_analysis\n")

#%% Cell 8: Prepare ML Training Data
print("=== Machine Learning: Default Prediction Model ===\n")
print("Preparing training data...\n")

# Create binary target variable (1 = default, 0 = paid)
ml_data = accepted_df.withColumn(
    "default",
    when(col("loan_status").isin(["Charged Off", "Default"]), 1.0).otherwise(0.0)
).filter(
    # Only include loans with final status
    col("loan_status").isin(["Fully Paid", "Charged Off", "Default"])
).select(
    "default",
    "loan_amount",
    "interest_rate",
    "annual_income",
    "debt_to_income_ratio",
    "fico_range_low",
    "fico_range_high",
    "open_accounts",
    "total_accounts",
    "revolving_balance",
    "revolving_utilization",
    "delinquencies_2yrs",
    "inquiries_last_6mths",
    "public_records",
    "grade",
    "home_ownership",
    "purpose",
    "term"
).na.drop()

ml_count = ml_data.count()
print(f"ML dataset size: {ml_count:,} records")

# Check class distribution
class_dist = ml_data.groupBy("default").count().orderBy("default")
print("\nClass Distribution:")
class_dist.show()

# Calculate class imbalance ratio
defaults = ml_data.filter(col("default") == 1.0).count()
non_defaults = ml_data.filter(col("default") == 0.0).count()
imbalance_ratio = non_defaults / defaults if defaults > 0 else 0
print(f"Class imbalance ratio (non-default/default): {imbalance_ratio:.2f}")
print(f"Default rate: {100*defaults/ml_count:.2f}%\n")

#%% Cell 9: Feature Engineering Pipeline
print("Building feature engineering pipeline...\n")

from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline

# Index categorical variables
grade_indexer = StringIndexer(inputCol="grade", outputCol="grade_idx", handleInvalid="keep")
home_indexer = StringIndexer(inputCol="home_ownership", outputCol="home_idx", handleInvalid="keep")
purpose_indexer = StringIndexer(inputCol="purpose", outputCol="purpose_idx", handleInvalid="keep")
term_indexer = StringIndexer(inputCol="term", outputCol="term_idx", handleInvalid="keep")

# Define feature columns
numeric_features = [
    "loan_amount", "interest_rate", "annual_income", "debt_to_income_ratio",
    "fico_range_low", "fico_range_high", "open_accounts", "total_accounts",
    "revolving_balance", "revolving_utilization", "delinquencies_2yrs",
    "inquiries_last_6mths", "public_records"
]

categorical_features = ["grade_idx", "home_idx", "purpose_idx", "term_idx"]

all_features = numeric_features + categorical_features

# Assemble features
assembler = VectorAssembler(inputCols=all_features, outputCol="features_raw", handleInvalid="skip")

# Scale features
scaler = StandardScaler(inputCol="features_raw", outputCol="features", withMean=True, withStd=True)

print("Feature engineering pipeline components:")
print(f"  - Categorical indexers: {len([grade_indexer, home_indexer, purpose_indexer, term_indexer])}")
print(f"  - Numeric features: {len(numeric_features)}")
print(f"  - Categorical features: {len(categorical_features)}")
print(f"  - Total features: {len(all_features)}")
print(f"  - Feature scaling: StandardScaler (mean=0, std=1)\n")

#%% Cell 10: Train-Test Split
print("Splitting data into train/test sets...\n")

# Stratified split to maintain class balance
train_data, test_data = ml_data.randomSplit([0.8, 0.2], seed=42)

train_count = train_data.count()
test_count = test_data.count()

print(f"Training set: {train_count:,} records ({100*train_count/ml_count:.1f}%)")
print(f"Test set: {test_count:,} records ({100*test_count/ml_count:.1f}%)")

# Cache for performance
train_data.cache()
test_data.cache()

# Check class balance in splits
print("\nTraining set class distribution:")
train_data.groupBy("default").count().show()

print("Test set class distribution:")
test_data.groupBy("default").count().show()

#%% Cell 11: Model 1 - Logistic Regression
print("=== Training Model 1: Logistic Regression ===\n")

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
import time

start_time = time.time()

# Create logistic regression classifier
lr = LogisticRegression(
    featuresCol="features",
    labelCol="default",
    maxIter=20,
    regParam=0.01,
    elasticNetParam=0.0,  # L2 regularization
    family="binomial"
)

# Build pipeline
lr_pipeline = Pipeline(stages=[
    grade_indexer, home_indexer, purpose_indexer, term_indexer,
    assembler, scaler, lr
])

# Train model
print("Training Logistic Regression model...")
lr_model = lr_pipeline.fit(train_data)

training_time = time.time() - start_time
print(f"✅ Training completed in {training_time:.2f} seconds")

# Make predictions
print("\nGenerating predictions on test set...")
lr_predictions = lr_model.transform(test_data)

# Evaluate model
auc_evaluator = BinaryClassificationEvaluator(labelCol="default", metricName="areaUnderROC")
pr_evaluator = BinaryClassificationEvaluator(labelCol="default", metricName="areaUnderPR")
acc_evaluator = MulticlassClassificationEvaluator(labelCol="default", predictionCol="prediction", metricName="accuracy")

lr_auc = auc_evaluator.evaluate(lr_predictions)
lr_pr = pr_evaluator.evaluate(lr_predictions)
lr_accuracy = acc_evaluator.evaluate(lr_predictions)

print(f"\n=== Logistic Regression Results ===")
print(f"AUC-ROC: {lr_auc:.4f}")
print(f"AUC-PR: {lr_pr:.4f}")
print(f"Accuracy: {lr_accuracy:.4f}")

# Detailed metrics
from pyspark.sql.functions import sum as _sum

tp = lr_predictions.filter((col("default") == 1) & (col("prediction") == 1)).count()
fp = lr_predictions.filter((col("default") == 0) & (col("prediction") == 1)).count()
tn = lr_predictions.filter((col("default") == 0) & (col("prediction") == 0)).count()
fn = lr_predictions.filter((col("default") == 1) & (col("prediction") == 0)).count()

precision = tp / (tp + fp) if (tp + fp) > 0 else 0
recall = tp / (tp + fn) if (tp + fn) > 0 else 0
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"\nConfusion Matrix:")
print(f"  True Positives (TP):  {tp:,}")
print(f"  False Positives (FP): {fp:,}")
print(f"  True Negatives (TN):  {tn:,}")
print(f"  False Negatives (FN): {fn:,}")
print(f"\nDetailed Metrics:")
print(f"  Precision: {precision:.4f}")
print(f"  Recall: {recall:.4f}")
print(f"  F1-Score: {f1:.4f}")

# Show sample predictions
print("\nSample Predictions:")
lr_predictions.select("default", "prediction", "probability").show(10, truncate=False)

#%% Cell 12: Model 2 - Random Forest
print("\n=== Training Model 2: Random Forest Classifier ===\n")

from pyspark.ml.classification import RandomForestClassifier

start_time = time.time()

# Create random forest classifier
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol="default",
    numTrees=100,
    maxDepth=10,
    minInstancesPerNode=20,
    seed=42
)

# Build pipeline
rf_pipeline = Pipeline(stages=[
    grade_indexer, home_indexer, purpose_indexer, term_indexer,
    assembler, scaler, rf
])

# Train model
print("Training Random Forest model (100 trees, max depth 10)...")
rf_model = rf_pipeline.fit(train_data)

training_time = time.time() - start_time
print(f"✅ Training completed in {training_time:.2f} seconds")

# Make predictions
print("\nGenerating predictions on test set...")
rf_predictions = rf_model.transform(test_data)

# Evaluate model
rf_auc = auc_evaluator.evaluate(rf_predictions)
rf_pr = pr_evaluator.evaluate(rf_predictions)
rf_accuracy = acc_evaluator.evaluate(rf_predictions)

print(f"\n=== Random Forest Results ===")
print(f"AUC-ROC: {rf_auc:.4f}")
print(f"AUC-PR: {rf_pr:.4f}")
print(f"Accuracy: {rf_accuracy:.4f}")

# Detailed metrics
tp_rf = rf_predictions.filter((col("default") == 1) & (col("prediction") == 1)).count()
fp_rf = rf_predictions.filter((col("default") == 0) & (col("prediction") == 1)).count()
tn_rf = rf_predictions.filter((col("default") == 0) & (col("prediction") == 0)).count()
fn_rf = rf_predictions.filter((col("default") == 1) & (col("prediction") == 0)).count()

precision_rf = tp_rf / (tp_rf + fp_rf) if (tp_rf + fp_rf) > 0 else 0
recall_rf = tp_rf / (tp_rf + fn_rf) if (tp_rf + fn_rf) > 0 else 0
f1_rf = 2 * (precision_rf * recall_rf) / (precision_rf + recall_rf) if (precision_rf + recall_rf) > 0 else 0

print(f"\nConfusion Matrix:")
print(f"  True Positives (TP):  {tp_rf:,}")
print(f"  False Positives (FP): {fp_rf:,}")
print(f"  True Negatives (TN):  {tn_rf:,}")
print(f"  False Negatives (FN): {fn_rf:,}")
print(f"\nDetailed Metrics:")
print(f"  Precision: {precision_rf:.4f}")
print(f"  Recall: {recall_rf:.4f}")
print(f"  F1-Score: {f1_rf:.4f}")

#%% Cell 13: Feature Importance Analysis
print("\n=== Feature Importance Analysis ===\n")

# Extract feature importances from Random Forest
rf_classifier = rf_model.stages[-1]
feature_importance = rf_classifier.featureImportances.toArray()

# Create DataFrame with feature names and importances
importance_data = [(all_features[i], float(feature_importance[i])) 
                   for i in range(len(all_features))]

importance_df = spark.createDataFrame(importance_data, ["feature", "importance"]) \
    .orderBy(desc("importance"))

print("Top 15 Most Important Features:")
importance_df.show(15, truncate=False)

# Save feature importance
importance_df.write.mode("overwrite").parquet("data/medallion/gold/feature_importance")
print("✅ Saved to data/medallion/gold/feature_importance\n")

#%% Cell 14: Model Comparison
print("=== Model Performance Comparison ===\n")

comparison_data = [
    ("Logistic Regression", lr_auc, lr_pr, lr_accuracy, precision, recall, f1, training_time),
    ("Random Forest", rf_auc, rf_pr, rf_accuracy, precision_rf, recall_rf, f1_rf, training_time)
]

comparison_df = spark.createDataFrame(comparison_data, 
    ["Model", "AUC-ROC", "AUC-PR", "Accuracy", "Precision", "Recall", "F1-Score", "Training_Time_Sec"])

print("Model Performance Comparison:")
comparison_df.show(truncate=False)

# Save comparison
comparison_df.write.mode("overwrite").parquet("data/medallion/gold/model_comparison")
print("✅ Saved to data/medallion/gold/model_comparison\n")

# Determine best model
best_model_name = "Random Forest" if rf_auc > lr_auc else "Logistic Regression"
best_model = rf_model if rf_auc > lr_auc else lr_model
best_predictions = rf_predictions if rf_auc > lr_auc else lr_predictions

print(f"🏆 Best Model: {best_model_name} (AUC-ROC: {max(rf_auc, lr_auc):.4f})\n")

#%% Cell 15: Save Models and Predictions
print("=== Saving Models and Predictions ===\n")

# Save best model
best_model.write().overwrite().save("data/gold/best_model")
print(f"✅ Best model ({best_model_name}) saved to data/gold/best_model")

# Save Logistic Regression model
lr_model.write().overwrite().save("data/gold/lr_model")
print("✅ Logistic Regression model saved to data/gold/lr_model")

# Save Random Forest model
rf_model.write().overwrite().save("data/gold/rf_model")
print("✅ Random Forest model saved to data/gold/rf_model")

# Save predictions with loan details
predictions_output = best_predictions.select(
    "default",
    "prediction",
    "probability",
    "loan_amount",
    "interest_rate",
    "annual_income",
    "debt_to_income_ratio",
    "grade",
    "purpose",
    "home_ownership"
)

predictions_output.write.mode("overwrite").parquet("data/medallion/gold/predictions")
print("✅ Predictions saved to data/medallion/gold/predictions\n")

#%% Cell 16: Gold Layer Summary
print("=== Gold Layer Summary ===\n")

print("📊 Business Analytics Outputs:")
outputs = [
    ("Loan Status Summary", "data/gold/status_summary"),
    ("Grade Analysis", "data/gold/grade_analysis"),
    ("Income Analysis", "data/gold/income_analysis"),
    ("Purpose Analysis", "data/gold/purpose_analysis"),
    ("Geographic Analysis", "data/gold/geo_analysis"),
    ("Time Series Analysis", "data/gold/time_analysis")
]

for name, path in outputs:
    try:
        count = spark.read.parquet(path).count()
        print(f"  ✅ {name}: {count:,} records")
    except:
        print(f"  ❌ {name}: Not found")

print("\n🤖 Machine Learning Outputs:")
ml_outputs = [
    ("Model Comparison", "data/gold/model_comparison"),
    ("Feature Importance", "data/gold/feature_importance"),
    ("Predictions", "data/gold/predictions"),
    ("Best Model", "data/gold/best_model"),
    ("Logistic Regression Model", "data/gold/lr_model"),
    ("Random Forest Model", "data/gold/rf_model")
]

for name, path in outputs:
    try:
        # Models don't have count, just check existence
        if "model" in path:
            import os
            exists = os.path.exists(path)
            print(f"  ✅ {name}: Saved" if exists else f"  ❌ {name}: Not found")
        else:
            count = spark.read.parquet(path).count()
            print(f"  ✅ {name}: {count:,} records")
    except:
        print(f"  ❌ {name}: Not found")

print("\n" + "="*60)
print("✅ GOLD LAYER COMPLETE!")
print("="*60)
print("\nData is ready for:")
print("  • Business dashboards (Tableau, Power BI)")
print("  • Real-time prediction API endpoints")
print("  • Risk management applications")
print("  • Regulatory reporting")
print("="*60)

=== Gold Layer: Business Analytics and ML ===

Loading Silver layer data...
✅ Accepted loans: 2,191,678 records
✅ Rejected loans: 20,116,235 records

✅ SQL tables registered for querying

=== 1. Loan Status Distribution Analysis ===

Loan Status Summary:
+---------------------------------------------------+-----------+-----------------+---------------+-----------------+-----------------+-------+
|loan_status                                        |total_loans|total_loan_amount|avg_loan_amount|avg_interest_rate|avg_annual_income|avg_dti|
+---------------------------------------------------+-----------+-----------------+---------------+-----------------+-----------------+-------+
|Fully Paid                                         |1028139    |1.45026608E10    |14105.74       |12.62            |77488.41         |17.82  |
|Current                                            |867772     |1.3787017825E10  |15887.83       |12.77            |80357.57         |19.15  |
|Charged Off             

                                                                                

+-----+---------+-----------+--------+----------------+---------------+-----------------+----------+
|grade|sub_grade|total_loans|defaults|default_rate_pct|avg_loan_amount|avg_interest_rate|avg_income|
+-----+---------+-----------+--------+----------------+---------------+-----------------+----------+
|A    |A1       |84193      |1358    |1.61            |15284.76       |5.6              |101394.41 |
|A    |A2       |67304      |1657    |2.46            |14223.68       |6.55             |90365.9   |
|A    |A3       |70650      |1984    |2.81            |14117.75       |7.09             |87777.83  |
|A    |A4       |92160      |3415    |3.71            |14891.61       |7.55             |86758.69  |
|A    |A5       |103720     |5155    |4.97            |14373.13       |8.18             |84257.46  |
|B    |B1       |121568     |7159    |5.89            |14188.17       |9.06             |82759.93  |
|B    |B2       |122397     |8087    |6.61            |14415.35       |9.95             |80

                                                                                

+-----------------+-----------+---------------+--------+----------------+-------+--------+
|income_bracket   |total_loans|avg_loan_amount|defaults|default_rate_pct|avg_dti|avg_rate|
+-----------------+-----------+---------------+--------+----------------+-------+--------+
|Low (<30K)       |128507     |7440.65        |18418   |14.33           |22.13  |14.22   |
|Medium (30-60K)  |774103     |11309.18       |104856  |13.55           |20.22  |13.57   |
|High (60-100K)   |813653     |16030.71       |94584   |11.62           |18.4   |12.97   |
|Very High (>100K)|475415     |21405.7        |42516   |8.94            |15.57  |12.23   |
+-----------------+-----------+---------------+--------+----------------+-------+--------+

✅ Saved to data/medallion/gold/income_analysis

=== 4. Loan Purpose Analysis ===

Top Loan Purposes by Volume:
+---------------------------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

+----+-------+-----------+-------------+--------+--------+--------+
|year|quarter|total_loans|total_volume |avg_loan|avg_rate|defaults|
+----+-------+-----------+-------------+--------+--------+--------+
|2007|2      |15         |46475.0      |3098.33 |9.58    |0       |
|2007|3      |101        |619175.0     |6130.45 |11.14   |6       |
|2007|4      |203        |1799050.0    |8862.32 |11.79   |20      |
|2008|1      |526        |4950775.0    |9412.12 |12.08   |59      |
|2008|2      |251        |1704400.0    |6790.44 |11.72   |20      |
|2008|3      |152        |900350.0     |5923.36 |11.99   |14      |
|2008|4      |268        |2278650.0    |8502.43 |11.87   |40      |
|2009|1      |464        |4191875.0    |9034.21 |12.71   |46      |
|2009|2      |440        |3925400.0    |8921.36 |12.18   |52      |
|2009|3      |597        |5741225.0    |9616.79 |12.13   |63      |
|2009|4      |915        |9155525.0    |10006.04|12.36   |128     |
|2010|1      |1173       |1.17608E7    |10026.26

                                                                                

✅ Saved to data/medallion/gold/time_analysis

=== Machine Learning: Default Prediction Model ===

Preparing training data...



                                                                                

ML dataset size: 1,287,680 records

Class Distribution:


                                                                                

+-------+-------+
|default|  count|
+-------+-------+
|    0.0|1027482|
|    1.0| 260198|
+-------+-------+



                                                                                

Class imbalance ratio (non-default/default): 3.95
Default rate: 20.21%

Building feature engineering pipeline...

Feature engineering pipeline components:
  - Categorical indexers: 4
  - Numeric features: 13
  - Categorical features: 4
  - Total features: 17
  - Feature scaling: StandardScaler (mean=0, std=1)

Splitting data into train/test sets...



                                                                                

Training set: 1,029,772 records (80.0%)
Test set: 257,908 records (20.0%)

Training set class distribution:


                                                                                

+-------+------+
|default| count|
+-------+------+
|    0.0|821653|
|    1.0|208119|
+-------+------+

Test set class distribution:


                                                                                

+-------+------+
|default| count|
+-------+------+
|    0.0|205829|
|    1.0| 52079|
+-------+------+

=== Training Model 1: Logistic Regression ===

Training Logistic Regression model...


                                                                                

✅ Training completed in 11.03 seconds

Generating predictions on test set...


                                                                                


=== Logistic Regression Results ===
AUC-ROC: 0.7040
AUC-PR: 0.3672
Accuracy: 0.7991


                                                                                


Confusion Matrix:
  True Positives (TP):  3,415
  False Positives (FP): 3,150
  True Negatives (TN):  202,679
  False Negatives (FN): 48,664

Detailed Metrics:
  Precision: 0.5202
  Recall: 0.0656
  F1-Score: 0.1165

Sample Predictions:
+-------+----------+-----------------------------------------+
|default|prediction|probability                              |
+-------+----------+-----------------------------------------+
|0.0    |0.0       |[0.9356171043872634,0.06438289561273658] |
|0.0    |0.0       |[0.9291166755014926,0.07088332449850743] |
|0.0    |0.0       |[0.9217085984185907,0.07829140158140935] |
|0.0    |0.0       |[0.9596892542055858,0.04031074579441418] |
|0.0    |0.0       |[0.9672687456409638,0.03273125435903623] |
|0.0    |0.0       |[0.9421571653686195,0.05784283463138051] |
|0.0    |0.0       |[0.9498740635927281,0.05012593640727192] |
|0.0    |0.0       |[0.9437995009716834,0.056200499028316586]|
|0.0    |0.0       |[0.916901948569573,0.08309805143042703]  |
|0.0  

25/11/25 15:01:07 ERROR Executor: Exception in task 2.0 in stage 361.0 (TID 1244)
java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.ml.tree.impl.DTStatsAggregator.<init>(DTStatsAggregator.scala:77)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22(RandomForest.scala:651)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$22$adapted(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$5723/1539952513.apply(Unknown Source)
	at scala.Array$.tabulate(Array.scala:418)
	at org.apache.spark.ml.tree.impl.RandomForest$.$anonfun$findBestSplits$21(RandomForest.scala:647)
	at org.apache.spark.ml.tree.impl.RandomForest$$$Lambda$5698/1506505074.apply(Unknown Source)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:855)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:855)
	at org.apache.spark.rdd.RDD$$Lambda$2450/819020905.apply(Unknown Source)
	at org.apache.spark

ConnectionRefusedError: [Errno 111] Connection refused

ConnectionRefusedError: [Errno 111] Connection refused

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


## Summary

### Medallion Architecture Complete! ✅

**Data Pipeline Flow:**
1. **Bronze Layer** → Raw CSV ingestion with error handling → `data/medallion/bronze/loan_bronze.parquet`
2. **Silver Layer** → Data cleaning, typing, validation → `data/medallion/silver/loan_silver.parquet`
3. **Gold Layer** → Business metrics and analytics → `data/medallion/gold/*.parquet`

**Key Metrics Calculated:**
- Loan status distribution
- Income analysis by loan status
- Loan amount distributions
- Interest rate analysis

**Note about Parquet Files:**
The multiple `.parquet` files in your bronze folder (part-00000, part-00001, etc.) are **normal and expected**! Spark partitions large datasets into multiple files for distributed processing. The folder `loan_bronze.parquet/` is the complete dataset.