### Bronze Pipeline   

In [14]:

import os
import pandas as pd



if os.path.exists('datamart/bronze') == False:
    os.makedirs('datamart/bronze', exist_ok=True)
    
    bronze_featureCS = pd.read_csv('data/feature_clickstream.csv')
    bronze_featureAttr = pd.read_csv('data/features_attributes.csv')
    bronze_featureFin = pd.read_csv('data/features_financials.csv')
    bronze_LMSloans = pd.read_csv('data/lms_loan_daily.csv')
    print("CSV files read")

    for col in bronze_featureCS.columns:
        bronze_featureCS[col] = bronze_featureCS[col].astype(str)

    bronze_featureCS.info()

    for col in bronze_featureAttr.columns:
        bronze_featureAttr[col] = bronze_featureAttr[col].astype(str)

    bronze_featureAttr.info()

    for col in bronze_featureFin.columns:
        bronze_featureFin[col] = bronze_featureFin[col].astype(str)

    bronze_featureFin.info()

    for col in bronze_LMSloans.columns:
        bronze_LMSloans[col] = bronze_LMSloans[col].astype(str)

    bronze_LMSloans.info()

    print("columns converted to string")

    bronze_featureCS.to_parquet('datamart/bronze/feature_clickstream.parquet', index=False, compression='gzip', engine='pyarrow')
    bronze_featureAttr.to_parquet('datamart/bronze/features_attributes.parquet', index=False, compression='gzip', engine='pyarrow')
    bronze_featureFin.to_parquet('datamart/bronze/features_financials.parquet', index=False, compression='gzip', engine='pyarrow')
    bronze_LMSloans.to_parquet('datamart/bronze/LMS_loans.parquet', index=False, compression='gzip', engine='pyarrow')

    print("parquest files created")
    print("\nData Ingestion Completed")
    
else:
    print("Directory already exists")
     


print("\nBronze Layer Completed")


Directory already exists

Bronze Layer Completed


### Spark Utils

In [18]:
from pyspark.sql import SparkSession

def create_spark_session(app_name):
    spark = SparkSession.builder \
        .appName(app_name) \
        .master("local[*]") \
        .config("spark.driver.memory", "16g") \
        .getOrCreate()
    
    print(f"Spark session '{app_name}' created successfully.")
    
    return spark

def stop_spark_session(spark):
    spark.stop()
    print("Spark session stopped.")

def read_parquet(spark, file_path):
    df = spark.read.parquet(file_path)
    print(f"Data read from {file_path} successfully.")
    return df

def write_parquet(df, file_path):
    """Write Spark DataFrame to single parquet file (like bronze layer)"""
    # Convert Spark DataFrame to Pandas and save as single compressed parquet file
    pandas_df = df.toPandas()
    pandas_df.to_parquet(file_path, index=False, compression='gzip', engine='pyarrow')
    print(f"Data written to {file_path} successfully.")
    

In [None]:
import os
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, FloatType

spark = create_spark_session("Data Cleaning")

if os.path.exists('datamart\silver') == False:
    os.makedirs('datamart\silver', exist_ok=True)
else:
    print("Directory already exists")
    
    silver_featureCS = read_parquet(spark, 'datamart/bronze/feature_clickstream.parquet')
    silver_featureAttr = read_parquet(spark, 'datamart/bronze/features_attributes.parquet')
    silver_featureFin = read_parquet(spark, 'datamart/bronze/features_financials.parquet')
    silver_LMSloans = read_parquet(spark, 'datamart/bronze/LMS_loans.parquet')
    print("parquet files read")
    



def remove_flagged_customers(silver_dfs, flagged_customers):
    """Remove flagged customers from all datasets"""
    print("Remove Flagged Customers\n")
    filtered_dfs = {}
    for name, df in silver_dfs.items():
        before = df.count()
        filtered_df = df.join(flagged_customers, on='Customer_ID', how='left_anti')
        after = filtered_df.count()
        removed = before - after
        print(f"  {name:15} - Removed {removed:5} rows ({before} → {after})")
        filtered_dfs[name] = filtered_df
    print("\nRemoved\n")
    return filtered_dfs
     

Spark session 'Data Cleaning' created successfully.
Directory already exists
Data read from datamart/bronze/feature_clickstream.parquet successfully.
Data read from datamart/bronze/features_attributes.parquet successfully.
Data read from datamart/bronze/features_financials.parquet successfully.
Data read from datamart/bronze/LMS_loans.parquet successfully.
parquet files read
Data read from datamart/bronze/LMS_loans.parquet successfully.
parquet files read


In [20]:
# Parse dates (d/M/yyyy format)
print("Parsing dates with format d/M/yyyy")
silver_featureAttr = silver_featureAttr.withColumn('snapshot_date', F.to_date('snapshot_date', 'd/M/yyyy'))
silver_featureFin = silver_featureFin.withColumn('snapshot_date', F.to_date('snapshot_date', 'd/M/yyyy'))
silver_LMSloans = silver_LMSloans.withColumn('snapshot_date', F.to_date('snapshot_date', 'd/M/yyyy')) \
                             .withColumn('loan_start_date', F.to_date('loan_start_date', 'd/M/yyyy'))
silver_featureCS = silver_featureCS.withColumn('snapshot_date', F.to_date('snapshot_date', 'd/M/yyyy'))
print("Parsed 5 date columns\n")

# --- ATTRIBUTES CLEANING ---
print("Cleaning attributes dataset")
# Replace placeholders with NULL
silver_featureAttr = silver_featureAttr.withColumn('Occupation', 
    F.when(F.trim(F.col('Occupation')).isin('_______', '_'), None).otherwise(F.col('Occupation')))
print("Replaced placeholders ('_______', '_') with NULL in Occupation")

# Clean Age: remove non-digits, cast to integer (handle empty strings)
silver_featureAttr = silver_featureAttr.withColumn('Age', 
    F.when(F.trim(F.regexp_replace('Age', r'[^0-9]', '')) == '', None)
     .otherwise(F.regexp_replace('Age', r'[^0-9]', '').cast(IntegerType())))
print("Cleaned Age with r\"[^0-9]\" → IntegerType")

# Flag quality issues
silver_featureAttr = silver_featureAttr \
    .withColumn('age_flag', F.when((F.col('Age') < 18) | (F.col('Age') > 100), 1).otherwise(0)) \
    .withColumn('ssn_flag', F.when(F.trim(F.col('SSN')).rlike(r'^\d{3}-\d{2}-\d{4}$'), 0).otherwise(1)) \
    .withColumn('data_quality_issue', F.when((F.col('age_flag') == 1) | (F.col('ssn_flag') == 1), 1).otherwise(0))
print("Flagged invalid Age/SSN\n")

# --- FINANCIALS CLEANING ---
print("Cleaning financials dataset")
# Replace categorical placeholders with NULL
for col_name in ['Credit_Mix', 'Payment_of_Min_Amount', 'Payment_Behaviour']:
    silver_featureFin = silver_featureFin.withColumn(col_name,
        F.when(F.trim(F.col(col_name)).isin('_______', '_', 'NM', '!@9#%8'), None).otherwise(F.col(col_name)))
print("Replaced placeholders ('_', 'NM', '!@9#%8') with NULL in 3 categorical columns")

# Clean float columns: keep digits and decimal point only (handle empty strings)
float_cols = ['Annual_Income', 'Monthly_Inhand_Salary', 'Outstanding_Debt', 
              'Total_EMI_per_month', 'Amount_invested_monthly', 'Monthly_Balance',
              'Changed_Credit_Limit', 'Interest_Rate', 'Credit_Utilization_Ratio']
for col_name in float_cols:
    silver_featureFin = silver_featureFin.withColumn(col_name,
        F.when(F.trim(F.regexp_replace(col_name, r'[^0-9.]', '')) == '', None)
         .otherwise(F.regexp_replace(col_name, r'[^0-9.]', '').cast(FloatType())))
print(f"Cleaned {len(float_cols)} columns with r\"[^0-9.]\" → FloatType")

# Clean integer columns: keep digits only (handle empty strings)
int_cols = ['Num_of_Loan', 'Num_Bank_Accounts', 'Num_Credit_Card', 
            'Delay_from_due_date', 'Num_of_Delayed_Payment', 'Num_Credit_Inquiries']
for col_name in int_cols:
    silver_featureFin = silver_featureFin.withColumn(col_name,
        F.when(F.trim(F.regexp_replace(col_name, r'[^0-9]', '')) == '', None)
         .otherwise(F.regexp_replace(col_name, r'[^0-9]', '').cast(IntegerType())))
print(f"Cleaned {len(int_cols)} columns with r\"[^0-9]\" → IntegerType")

# Flag quality issues
silver_featureFin = silver_featureFin \
    .withColumn('negative_financials_flag', F.when(
        (F.col('Annual_Income') < 0) | (F.col('Monthly_Inhand_Salary') < 0) | (F.col('Outstanding_Debt') < 0), 
        1).otherwise(0)) \
    .withColumn('data_quality_issue', F.col('negative_financials_flag'))
print("Flagged negative financial values\n")

# --- LOAN_DAILY CLEANING ---
print("Cleaning loan_daily dataset...")
# Clean integer columns: keep digits only (handle empty strings)
loan_int_cols = ['tenure', 'installment_num', 'loan_amt', 'due_amt', 'paid_amt', 'overdue_amt', 'balance']
for col_name in loan_int_cols:
    silver_LMSloans = silver_LMSloans.withColumn(col_name,
        F.when(F.trim(F.regexp_replace(col_name, r'[^0-9]', '')) == '', None)
         .otherwise(F.regexp_replace(col_name, r'[^0-9]', '').cast(IntegerType())))
print(f"Cleaned {len(loan_int_cols)} columns with r\"[^0-9]\" → IntegerType")

# Flag quality issues
silver_LMSloans = silver_LMSloans \
    .withColumn('negative_loan_vals_flag', F.when(
        (F.col('loan_amt') < 0) | (F.col('due_amt') < 0) | (F.col('paid_amt') < 0) | (F.col('overdue_amt') < 0),
        1).otherwise(0)) \
    .withColumn('data_quality_issue', F.col('negative_loan_vals_flag'))
print("Flagged negative loan values\n")

# --- CLICKSTREAM CLEANING ---
print("Cleaning clickstream dataset")
# Clean signed integer columns (keep digits and minus sign, handle empty strings)
for i in range(1, 21):
    silver_featureCS = silver_featureCS.withColumn(f'fe_{i}',
        F.when(F.trim(F.regexp_replace(f'fe_{i}', r'[^0-9-]', '')) == '', None)
         .otherwise(F.regexp_replace(f'fe_{i}', r'[^0-9-]', '').cast(IntegerType())))
print("Cleaned 20 features (fe_1 to fe_20) with r\"[^0-9-]\" → IntegerType\n")

# --- FLAG CUSTOMERS ---
print("Identifying flagged customers")
flagged_attr = silver_featureAttr.filter(F.col('data_quality_issue') == 1).select('Customer_ID')
flagged_fin = silver_featureFin.filter(F.col('data_quality_issue') == 1).select('Customer_ID')
flagged_loan = silver_LMSloans.filter(F.col('data_quality_issue') == 1).select('Customer_ID')
all_flagged = flagged_attr.union(flagged_fin).union(flagged_loan).distinct()

# Calculate flagging statistics
total_customers = silver_featureAttr.select('Customer_ID').union(
    silver_featureFin.select('Customer_ID')).union(
    silver_LMSloans.select('Customer_ID')).union(
    silver_featureCS.select('Customer_ID')).distinct().count()
flagged_count = all_flagged.count()
pct = (flagged_count / total_customers * 100.0) if total_customers else 0.0
print(f"Flagged {flagged_count}/{total_customers} customers ({pct:.2f}%) for removal\n")

# Prepare clean datasets (drop quality flag columns)
silver_dfs = {
    'attributes': silver_featureAttr.drop('age_flag', 'ssn_flag', 'data_quality_issue'),
    'financials': silver_featureFin.drop('negative_financials_flag', 'data_quality_issue'),
    'loan_daily': silver_LMSloans.drop('negative_loan_vals_flag', 'data_quality_issue'),
    'clickstream': silver_featureCS
}

# Remove flagged customers
filtered_dfs = remove_flagged_customers(silver_dfs, all_flagged)

for name, df in filtered_dfs.items():
    print(f"\n{name.upper()} DataFrame Info:")
    print(f"Rows: {df.count()}")
    df.printSchema()

print("Silver Cleaning Complete\n")


# Save data
output_path = 'datamart/silver'

for name, df in filtered_dfs.items():
    output_file = os.path.join(output_path, f'{name}.parquet')
    write_parquet(df, output_file)
    print(f"Saved {name}")

print("Save Complete\n")


Parsing dates with format d/M/yyyy
Parsed 5 date columns

Cleaning attributes dataset
Replaced placeholders ('_______', '_') with NULL in Occupation
Cleaned Age with r"[^0-9]" → IntegerType
Flagged invalid Age/SSN

Cleaning financials dataset
Replaced placeholders ('_', 'NM', '!@9#%8') with NULL in 3 categorical columns
Cleaned 9 columns with r"[^0-9.]" → FloatType
Cleaned 6 columns with r"[^0-9]" → IntegerType
Flagged negative financial values

Cleaning loan_daily dataset...
Cleaned 7 columns with r"[^0-9]" → IntegerType
Flagged negative loan values

Cleaning clickstream dataset
Cleaned 20 features (fe_1 to fe_20) with r"[^0-9-]" → IntegerType

Identifying flagged customers
Cleaned 20 features (fe_1 to fe_20) with r"[^0-9-]" → IntegerType

Identifying flagged customers
Flagged 1633/12500 customers (13.06%) for removal

Remove Flagged Customers

Flagged 1633/12500 customers (13.06%) for removal

Remove Flagged Customers

  attributes      - Removed  1633 rows (12500 → 10867)
  attribut

### Gold Pipeline

In [21]:
import os
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

# Configuration
PREDICTION_MOB = 3
OBSERVATION_MOB = 6
OVERDUE_THRESHOLD = 0

if os.path.exists('datamart/gold') == False:
    os.makedirs('datamart/gold', exist_ok=True)
    
    # Load silver data
    silver_path = 'datamart/silver'
    attributes_df = read_parquet(spark, os.path.join(silver_path, 'attributes.parquet'))
    financials_df = read_parquet(spark, os.path.join(silver_path, 'financials.parquet'))
    loan_daily_df = read_parquet(spark, os.path.join(silver_path, 'loan_daily.parquet'))
    clickstream_df = read_parquet(spark, os.path.join(silver_path, 'clickstream.parquet'))
    print("Silver files loaded")
    
    # Create label store
    loan_info = loan_daily_df.select("loan_id", "Customer_ID", "loan_start_date").distinct()
    labels_df = loan_info \
        .withColumn("prediction_date", F.add_months(F.col("loan_start_date"), PREDICTION_MOB)) \
        .withColumn("observation_date", F.add_months(F.col("loan_start_date"), OBSERVATION_MOB))
    
    default_events = loan_daily_df.filter(
        (F.col("installment_num") >= PREDICTION_MOB) & 
        (F.col("installment_num") <= OBSERVATION_MOB) &
        (F.col("overdue_amt") > OVERDUE_THRESHOLD)
    ).select("loan_id").distinct().withColumn("defaulted_flag", F.lit(1))
    
    label_store = labels_df.join(default_events, "loan_id", "left") \
        .withColumn("label", F.when(F.col("defaulted_flag").isNotNull(), 1).otherwise(0)) \
        .select("Customer_ID", "loan_id", "prediction_date", "observation_date", "label")
    print("Label store created")
    
    # Time-aware filtering
    loan_application = loan_daily_df.filter(F.col("installment_num") == 0) \
        .select("loan_id", "Customer_ID", "loan_amt", "tenure", "loan_start_date")
    
    clickstream_history = clickstream_df.join(
        loan_application.select("Customer_ID", "loan_start_date"), "Customer_ID"
    ).filter(F.col("snapshot_date") < F.col("loan_start_date"))
    
    loan_history = loan_daily_df.join(
        label_store.select("loan_id", "prediction_date"), "loan_id"
    ).filter((F.col("installment_num") >= 0) & (F.col("installment_num") < PREDICTION_MOB))
    print("Time-aware filtering applied")
    
    # Impute NULLs with median
    numeric_cols = ['Annual_Income', 'Monthly_Inhand_Salary', 'Num_Bank_Accounts', 'Num_Credit_Card',
                    'Interest_Rate', 'Num_of_Loan', 'Delay_from_due_date', 'Num_of_Delayed_Payment',
                    'Changed_Credit_Limit', 'Num_Credit_Inquiries', 'Outstanding_Debt',
                    'Credit_Utilization_Ratio', 'Total_EMI_per_month', 'Amount_invested_monthly', 'Monthly_Balance']
    
    for col_name in numeric_cols:
        median_val = financials_df.approxQuantile(col_name, [0.5], 0.01)[0]
        financials_df = financials_df.withColumn(col_name, F.coalesce(F.col(col_name), F.lit(median_val)))
    
    age_median = attributes_df.approxQuantile('Age', [0.5], 0.01)[0]
    attributes_df = attributes_df.withColumn('Age', F.coalesce(F.col('Age'), F.lit(age_median)))
    print("NULL imputation completed")
    
    # Aggregate loan history
    loan_agg = loan_history.groupBy("Customer_ID").agg(
        F.sum("paid_amt").alias("hist_total_paid"),
        F.sum("due_amt").alias("hist_total_due"),
        F.sum("overdue_amt").alias("hist_total_overdue_amount")
    ).withColumn("hist_Loan_Payment_Ratio", 
        F.when(F.col("hist_total_due") > 0, F.col("hist_total_paid") / F.col("hist_total_due")).otherwise(1.0))
    
    # Aggregate clickstream (fe_10 only - strongest predictor)
    clickstream_agg = clickstream_history.groupBy("Customer_ID").agg(
        F.mean("fe_10").alias("fe_10_mean"),
        F.stddev("fe_10").alias("fe_10_std")
    )
    print("Aggregations completed")
    
    # Get latest snapshots
    attributes_as_of = attributes_df.join(
        label_store.select("Customer_ID", "prediction_date"), "Customer_ID"
    ).filter(F.col("snapshot_date") <= F.col("prediction_date")) \
     .groupBy("Customer_ID").agg(F.max('snapshot_date').alias('latest_snapshot'))
    
    attributes_latest = attributes_df.join(
        attributes_as_of, 
        on=[attributes_df.Customer_ID == attributes_as_of.Customer_ID, 
            attributes_df.snapshot_date == attributes_as_of.latest_snapshot]
    ).select(attributes_df["*"])
    
    financials_as_of = financials_df.join(
        label_store.select("Customer_ID", "prediction_date"), "Customer_ID"
    ).filter(F.col("snapshot_date") <= F.col("prediction_date")) \
     .groupBy("Customer_ID").agg(F.max('snapshot_date').alias('latest_snapshot'))
    
    financials_latest = financials_df.join(
        financials_as_of, 
        on=[financials_df.Customer_ID == financials_as_of.Customer_ID, 
            financials_df.snapshot_date == financials_as_of.latest_snapshot]
    ).select(financials_df["*"])
    
    # Engineer features
    years_col = F.regexp_extract(F.col("Credit_History_Age"), r"(\d+)\s+Years", 1).cast(IntegerType())
    months_col = F.regexp_extract(F.col("Credit_History_Age"), r"(\d+)\s+Months", 1).cast(IntegerType())
    
    financials_features = financials_latest.withColumn(
        "Credit_History_Months", F.coalesce(years_col, F.lit(0)) * 12 + F.coalesce(months_col, F.lit(0))
    ).withColumn("DTI", F.col("Total_EMI_per_month") / F.col("Monthly_Inhand_Salary")
    ).withColumn("Savings_Ratio", F.col("Amount_invested_monthly") / F.col("Monthly_Inhand_Salary")
    ).withColumn("Monthly_Surplus", F.col("Monthly_Inhand_Salary") - F.col("Total_EMI_per_month") - F.col("Amount_invested_monthly")
    ).withColumn("Debt_to_Annual_Income", F.col("Outstanding_Debt") / F.col("Annual_Income"))
    print("Feature engineering completed")
    
    # Join all features
    gold_features = label_store \
        .join(loan_application.select("loan_id", "loan_amt", "tenure"), "loan_id", "inner") \
        .join(attributes_latest.select("Customer_ID", "Age", "Occupation"), "Customer_ID", "inner") \
        .join(financials_features.drop("snapshot_date"), "Customer_ID", "left") \
        .join(clickstream_agg, "Customer_ID", "left") \
        .join(loan_agg, "Customer_ID", "left")
    
    # Filter to 15 safe features
    safe_features = ['Credit_History_Months', 'Credit_Mix', 'Age', 'Monthly_Inhand_Salary', 'Occupation',
                     'loan_amt', 'tenure', 'Interest_Rate', 'fe_10_mean', 'fe_10_std',
                     'Savings_Ratio', 'DTI', 'Num_Bank_Accounts', 'Num_Credit_Card', 'Amount_invested_monthly']
    
    id_cols = ['Customer_ID', 'loan_id', 'prediction_date', 'observation_date', 'label']
    selected_cols = id_cols + [c for c in safe_features if c in gold_features.columns]
    gold_features_filtered = gold_features.select(*selected_cols)
    
    # Save gold features
    write_parquet(gold_features_filtered, 'datamart/gold/gold_features.parquet')
    print("Gold features saved")
    
    # Save label store
    write_parquet(label_store, 'datamart/gold/label_store.parquet')
    print("Label store saved")
    
else:
    print("Directory already exists")

print("\nGold Layer Completed")

Data read from datamart/silver\attributes.parquet successfully.
Data read from datamart/silver\financials.parquet successfully.
Data read from datamart/silver\loan_daily.parquet successfully.
Data read from datamart/silver\clickstream.parquet successfully.
Silver files loaded
Label store created
Time-aware filtering applied
NULL imputation completed
Aggregations completed
Feature engineering completed
Data written to datamart/gold/gold_features.parquet successfully.
Gold features saved
Data written to datamart/gold/label_store.parquet successfully.
Label store saved

Gold Layer Completed


### ML Training Pipeline

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, precision_recall_curve, roc_curve
from sklearn.preprocessing import LabelEncoder
import joblib
from datetime import datetime
import json

# Configuration
MODEL_STORE_PATH = 'model_store'
RANDOM_STATE = 42

if os.path.exists(MODEL_STORE_PATH) == False:
    os.makedirs(MODEL_STORE_PATH, exist_ok=True)
    
    # Load gold features
    gold_df = pd.read_parquet('datamart/gold/gold_features.parquet')
    print(f"Loaded {len(gold_df)} records from gold features")
    
    # Prepare features and target
    feature_cols = ['Credit_History_Months', 'Age', 'Monthly_Inhand_Salary', 'loan_amt', 'tenure',
                    'Interest_Rate', 'fe_10_mean', 'fe_10_std', 'Savings_Ratio', 'DTI',
                    'Num_Bank_Accounts', 'Num_Credit_Card', 'Amount_invested_monthly']
    
    categorical_cols = ['Credit_Mix', 'Occupation']
    target_col = 'label'
    
    # Handle categorical features
    label_encoders = {}
    for col in categorical_cols:
        le = LabelEncoder()
        gold_df[col] = le.fit_transform(gold_df[col].fillna('Unknown'))
        label_encoders[col] = le
        feature_cols.append(col)
    
    # Handle missing values
    gold_df[feature_cols] = gold_df[feature_cols].fillna(gold_df[feature_cols].median())
    
    X = gold_df[feature_cols]
    y = gold_df[target_col]
    
    # Train-test split (80-20)
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_STATE, stratify=y)
    print(f"Train: {len(X_train)} samples, Test: {len(X_test)} samples")
    print(f"Train default rate: {y_train.mean():.2%}, Test default rate: {y_test.mean():.2%}")
    
    # Define models to train
    models = {
        'logistic_regression': LogisticRegression(max_iter=1000, random_state=RANDOM_STATE, class_weight='balanced'),
        'random_forest': RandomForestClassifier(n_estimators=100, max_depth=10, random_state=RANDOM_STATE, class_weight='balanced'),
        'gradient_boosting': GradientBoostingClassifier(n_estimators=100, max_depth=5, random_state=RANDOM_STATE)
    }
    
    # Train and evaluate models
    model_results = {}
    for name, model in models.items():
        print(f"\nTraining {name}...")
        model.fit(X_train, y_train)
        
        # Predictions
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)
        y_proba_train = model.predict_proba(X_train)[:, 1]
        y_proba_test = model.predict_proba(X_test)[:, 1]
        
        # Metrics
        train_auc = roc_auc_score(y_train, y_proba_train)
        test_auc = roc_auc_score(y_test, y_proba_test)
        
        model_results[name] = {
            'model': model,
            'train_auc': train_auc,
            'test_auc': test_auc,
            'y_pred_test': y_pred_test,
            'y_proba_test': y_proba_test
        }
        
        print(f"  Train AUC: {train_auc:.4f}, Test AUC: {test_auc:.4f}")
    
    # Select best model by test AUC
    best_model_name = max(model_results.keys(), key=lambda k: model_results[k]['test_auc'])
    best_model = model_results[best_model_name]['model']
    best_test_auc = model_results[best_model_name]['test_auc']
    
    print(f"\nBest model: {best_model_name} (Test AUC: {best_test_auc:.4f})")
    
    # Save best model
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    model_metadata = {
        'model_name': best_model_name,
        'timestamp': timestamp,
        'train_samples': len(X_train),
        'test_samples': len(X_test),
        'train_auc': float(model_results[best_model_name]['train_auc']),
        'test_auc': float(best_test_auc),
        'feature_columns': feature_cols,
        'label_encoders': {k: v.classes_.tolist() for k, v in label_encoders.items()}
    }
    
    joblib.dump(best_model, f'{MODEL_STORE_PATH}/best_model_{timestamp}.pkl')
    joblib.dump(label_encoders, f'{MODEL_STORE_PATH}/label_encoders_{timestamp}.pkl')
    with open(f'{MODEL_STORE_PATH}/model_metadata_{timestamp}.json', 'w') as f:
        json.dump(model_metadata, f, indent=2)
    
    # Save test data for monitoring
    test_data = X_test.copy()
    test_data['true_label'] = y_test.values
    test_data['predicted_label'] = model_results[best_model_name]['y_pred_test']
    test_data['predicted_proba'] = model_results[best_model_name]['y_proba_test']
    test_data.to_parquet(f'{MODEL_STORE_PATH}/test_predictions_{timestamp}.parquet', index=False)
    
    print(f"\nModel artifacts saved to {MODEL_STORE_PATH}")
    
else:
    print("Model store already exists")

print("\nML Training Pipeline Completed")

### ML Inference Pipeline

In [None]:
import os
import pandas as pd
import joblib
import json
from datetime import datetime

# Configuration
INFERENCE_OUTPUT_PATH = 'datamart/gold'

if not os.path.exists(f'{INFERENCE_OUTPUT_PATH}/model_predictions.parquet'):
    
    # Find latest model
    model_files = [f for f in os.listdir(MODEL_STORE_PATH) if f.startswith('best_model_')]
    if not model_files:
        raise FileNotFoundError("No trained model found. Please run training pipeline first.")
    
    latest_model_file = sorted(model_files)[-1]
    timestamp = latest_model_file.replace('best_model_', '').replace('.pkl', '')
    
    # Load model artifacts
    model = joblib.load(f'{MODEL_STORE_PATH}/{latest_model_file}')
    label_encoders = joblib.load(f'{MODEL_STORE_PATH}/label_encoders_{timestamp}.pkl')
    with open(f'{MODEL_STORE_PATH}/model_metadata_{timestamp}.json', 'r') as f:
        metadata = json.load(f)
    
    print(f"Loaded model: {metadata['model_name']} (Test AUC: {metadata['test_auc']:.4f})")
    
    # Load gold features for inference
    gold_df = pd.read_parquet('datamart/gold/gold_features.parquet')
    inference_df = gold_df.copy()
    
    # Prepare features (same as training)
    feature_cols = metadata['feature_columns']
    categorical_cols = ['Credit_Mix', 'Occupation']
    
    for col in categorical_cols:
        le = label_encoders[col]
        inference_df[col] = inference_df[col].fillna('Unknown')
        inference_df[col] = inference_df[col].apply(lambda x: x if x in le.classes_ else 'Unknown')
        inference_df[col] = le.transform(inference_df[col])
    
    inference_df[feature_cols] = inference_df[feature_cols].fillna(inference_df[feature_cols].median())
    
    # Make predictions
    X_inference = inference_df[feature_cols]
    predictions = model.predict(X_inference)
    probabilities = model.predict_proba(X_inference)[:, 1]
    
    # Create predictions table
    predictions_df = pd.DataFrame({
        'Customer_ID': inference_df['Customer_ID'],
        'loan_id': inference_df['loan_id'],
        'prediction_date': inference_df['prediction_date'],
        'observation_date': inference_df['observation_date'],
        'true_label': inference_df['label'],
        'predicted_label': predictions,
        'predicted_proba': probabilities,
        'model_name': metadata['model_name'],
        'model_version': timestamp,
        'inference_timestamp': datetime.now()
    })
    
    # Save predictions
    predictions_df.to_parquet(f'{INFERENCE_OUTPUT_PATH}/model_predictions.parquet', 
                              index=False, compression='gzip', engine='pyarrow')
    print(f"Predictions saved: {len(predictions_df)} records")
    print(f"Predicted default rate: {predictions.mean():.2%}")
    
else:
    print("Predictions already exist")
    predictions_df = pd.read_parquet(f'{INFERENCE_OUTPUT_PATH}/model_predictions.parquet')
    print(f"Loaded existing predictions: {len(predictions_df)} records")

print("\nML Inference Pipeline Completed")

### Model Monitoring Pipeline

In [None]:
import os
import pandas as pd
import numpy as np
from sklearn.metrics import (accuracy_score, precision_score, recall_score, f1_score, 
                            roc_auc_score, confusion_matrix, classification_report)
from datetime import datetime

# Configuration
MONITORING_OUTPUT_PATH = 'datamart/gold'

if not os.path.exists(f'{MONITORING_OUTPUT_PATH}/model_monitoring.parquet'):
    
    # Load predictions
    predictions_df = pd.read_parquet(f'{MONITORING_OUTPUT_PATH}/model_predictions.parquet')
    print(f"Monitoring {len(predictions_df)} predictions")
    
    # Calculate overall metrics
    y_true = predictions_df['true_label']
    y_pred = predictions_df['predicted_label']
    y_proba = predictions_df['predicted_proba']
    
    accuracy = accuracy_score(y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall = recall_score(y_true, y_pred, zero_division=0)
    f1 = f1_score(y_true, y_pred, zero_division=0)
    auc = roc_auc_score(y_true, y_proba)
    
    print(f"\nOverall Performance:")
    print(f"  Accuracy:  {accuracy:.4f}")
    print(f"  Precision: {precision:.4f}")
    print(f"  Recall:    {recall:.4f}")
    print(f"  F1-Score:  {f1:.4f}")
    print(f"  AUC-ROC:   {auc:.4f}")
    
    # Confusion matrix
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    print(f"\nConfusion Matrix:")
    print(f"  True Negatives:  {tn}")
    print(f"  False Positives: {fp}")
    print(f"  False Negatives: {fn}")
    print(f"  True Positives:  {tp}")
    
    # Time-based monitoring (by prediction_date)
    predictions_df['prediction_month'] = pd.to_datetime(predictions_df['prediction_date']).dt.to_period('M')
    
    monthly_metrics = []
    for month in predictions_df['prediction_month'].unique():
        month_data = predictions_df[predictions_df['prediction_month'] == month]
        
        if len(month_data) > 0:
            y_true_month = month_data['true_label']
            y_pred_month = month_data['predicted_label']
            y_proba_month = month_data['predicted_proba']
            
            monthly_metrics.append({
                'period': str(month),
                'num_predictions': len(month_data),
                'actual_default_rate': y_true_month.mean(),
                'predicted_default_rate': y_pred_month.mean(),
                'accuracy': accuracy_score(y_true_month, y_pred_month),
                'precision': precision_score(y_true_month, y_pred_month, zero_division=0),
                'recall': recall_score(y_true_month, y_pred_month, zero_division=0),
                'f1_score': f1_score(y_true_month, y_pred_month, zero_division=0),
                'auc_roc': roc_auc_score(y_true_month, y_proba_month) if len(np.unique(y_true_month)) > 1 else np.nan
            })
    
    monthly_metrics_df = pd.DataFrame(monthly_metrics)
    
    # Data drift detection (distribution shift)
    predictions_df['proba_bucket'] = pd.cut(predictions_df['predicted_proba'], 
                                            bins=[0, 0.2, 0.4, 0.6, 0.8, 1.0],
                                            labels=['0.0-0.2', '0.2-0.4', '0.4-0.6', '0.6-0.8', '0.8-1.0'])
    
    drift_metrics = []
    for month in predictions_df['prediction_month'].unique():
        month_data = predictions_df[predictions_df['prediction_month'] == month]
        distribution = month_data['proba_bucket'].value_counts(normalize=True).to_dict()
        
        drift_metrics.append({
            'period': str(month),
            'proba_distribution': distribution,
            'mean_proba': month_data['predicted_proba'].mean(),
            'std_proba': month_data['predicted_proba'].std()
        })
    
    drift_metrics_df = pd.DataFrame(drift_metrics)
    
    # Create monitoring summary
    monitoring_summary = {
        'monitoring_timestamp': datetime.now(),
        'model_version': predictions_df['model_version'].iloc[0],
        'total_predictions': len(predictions_df),
        'overall_metrics': {
            'accuracy': float(accuracy),
            'precision': float(precision),
            'recall': float(recall),
            'f1_score': float(f1),
            'auc_roc': float(auc)
        },
        'confusion_matrix': {
            'true_negatives': int(tn),
            'false_positives': int(fp),
            'false_negatives': int(fn),
            'true_positives': int(tp)
        },
        'monthly_performance': monthly_metrics_df.to_dict('records'),
        'drift_analysis': drift_metrics_df.to_dict('records')
    }
    
    # Save monitoring results
    monitoring_df = pd.DataFrame([monitoring_summary])
    monitoring_df.to_parquet(f'{MONITORING_OUTPUT_PATH}/model_monitoring.parquet', 
                            index=False, compression='gzip', engine='pyarrow')
    
    # Save monthly metrics separately for easier querying
    monthly_metrics_df.to_parquet(f'{MONITORING_OUTPUT_PATH}/monthly_performance.parquet',
                                  index=False, compression='gzip', engine='pyarrow')
    
    print(f"\nMonitoring results saved to {MONITORING_OUTPUT_PATH}")
    print(f"Tracked {len(monthly_metrics_df)} time periods")
    
else:
    print("Monitoring results already exist")

print("\nModel Monitoring Pipeline Completed")

### Model Visualization & Analysis

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix
import numpy as np

# Set plot style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (15, 10)

# Load data
predictions_df = pd.read_parquet('datamart/gold/model_predictions.parquet')
monthly_df = pd.read_parquet('datamart/gold/monthly_performance.parquet')

# Create comprehensive visualization
fig, axes = plt.subplots(2, 3, figsize=(18, 12))
fig.suptitle('Model Performance & Stability Dashboard', fontsize=16, fontweight='bold')

# 1. ROC Curve
y_true = predictions_df['true_label']
y_proba = predictions_df['predicted_proba']
fpr, tpr, _ = roc_curve(y_true, y_proba)
from sklearn.metrics import roc_auc_score
auc = roc_auc_score(y_true, y_proba)

axes[0, 0].plot(fpr, tpr, label=f'AUC = {auc:.4f}', linewidth=2)
axes[0, 0].plot([0, 1], [0, 1], 'k--', label='Random')
axes[0, 0].set_xlabel('False Positive Rate')
axes[0, 0].set_ylabel('True Positive Rate')
axes[0, 0].set_title('ROC Curve')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)

# 2. Precision-Recall Curve
precision, recall, _ = precision_recall_curve(y_true, y_proba)
axes[0, 1].plot(recall, precision, linewidth=2)
axes[0, 1].set_xlabel('Recall')
axes[0, 1].set_ylabel('Precision')
axes[0, 1].set_title('Precision-Recall Curve')
axes[0, 1].grid(True, alpha=0.3)

# 3. Confusion Matrix
y_pred = predictions_df['predicted_label']
cm = confusion_matrix(y_true, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', ax=axes[0, 2], cbar=False)
axes[0, 2].set_xlabel('Predicted')
axes[0, 2].set_ylabel('Actual')
axes[0, 2].set_title('Confusion Matrix')

# 4. Performance Over Time (AUC)
monthly_df_sorted = monthly_df.sort_values('period')
axes[1, 0].plot(range(len(monthly_df_sorted)), monthly_df_sorted['auc_roc'], 
                marker='o', linewidth=2, markersize=8)
axes[1, 0].axhline(y=monthly_df_sorted['auc_roc'].mean(), color='r', 
                   linestyle='--', label=f"Mean: {monthly_df_sorted['auc_roc'].mean():.4f}")
axes[1, 0].set_xlabel('Time Period')
axes[1, 0].set_ylabel('AUC-ROC')
axes[1, 0].set_title('Model Stability - AUC Over Time')
axes[1, 0].set_xticks(range(len(monthly_df_sorted)))
axes[1, 0].set_xticklabels(monthly_df_sorted['period'], rotation=45, ha='right')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)

# 5. Actual vs Predicted Default Rates
x_pos = np.arange(len(monthly_df_sorted))
width = 0.35
axes[1, 1].bar(x_pos - width/2, monthly_df_sorted['actual_default_rate'], 
               width, label='Actual', alpha=0.8)
axes[1, 1].bar(x_pos + width/2, monthly_df_sorted['predicted_default_rate'], 
               width, label='Predicted', alpha=0.8)
axes[1, 1].set_xlabel('Time Period')
axes[1, 1].set_ylabel('Default Rate')
axes[1, 1].set_title('Actual vs Predicted Default Rates')
axes[1, 1].set_xticks(x_pos)
axes[1, 1].set_xticklabels(monthly_df_sorted['period'], rotation=45, ha='right')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3, axis='y')

# 6. Prediction Distribution
axes[1, 2].hist(predictions_df['predicted_proba'], bins=50, edgecolor='black', alpha=0.7)
axes[1, 2].axvline(predictions_df['predicted_proba'].mean(), color='r', 
                   linestyle='--', label=f"Mean: {predictions_df['predicted_proba'].mean():.3f}")
axes[1, 2].set_xlabel('Predicted Probability')
axes[1, 2].set_ylabel('Frequency')
axes[1, 2].set_title('Prediction Distribution')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('model_performance_dashboard.png', dpi=300, bbox_inches='tight')
print("Dashboard saved as 'model_performance_dashboard.png'")
plt.show()

# Print summary statistics
print("\n" + "="*60)
print("MODEL PERFORMANCE SUMMARY")
print("="*60)
print(f"Total Predictions: {len(predictions_df):,}")
print(f"Overall AUC-ROC: {auc:.4f}")
print(f"Mean Monthly AUC: {monthly_df_sorted['auc_roc'].mean():.4f} ± {monthly_df_sorted['auc_roc'].std():.4f}")
print(f"AUC Stability: {'STABLE' if monthly_df_sorted['auc_roc'].std() < 0.05 else 'UNSTABLE'}")
print(f"Actual Default Rate: {y_true.mean():.2%}")
print(f"Predicted Default Rate: {y_pred.mean():.2%}")
print("="*60)

### Model Governance & SOP

## Model Refresh Strategy

### When to Refresh the Model:

**1. Performance Degradation Triggers:**
- AUC drops below **0.70** for 2 consecutive months
- Precision drops below **0.60** (too many false positives)
- Recall drops below **0.50** (missing too many defaults)
- F1-Score drops below **0.55**

**2. Data Drift Triggers:**
- Predicted default rate differs from actual by >**5%** for 2+ months
- Distribution shift: >**20%** change in prediction probability distribution
- Feature distribution shifts significantly (monitored via monthly stats)

**3. Business Triggers:**
- Quarterly mandatory review (every 3 months)
- Major policy changes (new loan products, risk appetite changes)
- Regulatory requirements

**4. Time-Based Trigger:**
- Maximum model age: **6 months** without refresh

---

## Model Refresh Process (SOP):

### Phase 1: Alert & Assessment (Week 1)
1. Automated monitoring detects trigger condition
2. Data science team reviews monitoring dashboard
3. Investigate root cause (data drift, concept drift, data quality)
4. Create refresh decision report

### Phase 2: Data Preparation (Week 2)
1. Collect latest data (last 6 months minimum)
2. Run bronze → silver → gold pipeline
3. Validate data quality and feature distributions
4. Create new training/validation/test splits (70/15/15)

### Phase 3: Model Training & Validation (Week 3)
1. Retrain all candidate models (Logistic, RF, GBM, XGBoost)
2. Perform hyperparameter tuning
3. Validate on holdout test set
4. Compare new model vs. production model:
   - AUC improvement >**2%** → proceed
   - AUC similar ±2% → review business case
   - AUC worse → investigate before proceeding

### Phase 4: Shadow Deployment (Week 4)
1. Deploy new model alongside production model
2. Run both models on live data (no customer impact)
3. Compare predictions for 1 week
4. Check for prediction consistency and stability

### Phase 5: Production Deployment (Week 5)
1. **Blue-Green Deployment:**
   - Deploy new model to production environment (green)
   - Keep old model running (blue)
   - Route 10% traffic to new model
   - Monitor for 2 days
   - Increase to 50% for 2 days
   - Full cutover to new model
   - Keep old model on standby for 1 week

2. **Rollback Plan:**
   - If AUC drops >5% → immediate rollback
   - If error rate spikes → immediate rollback
   - Keep previous model for 2 weeks before archival

### Phase 6: Post-Deployment Monitoring (Ongoing)
1. Daily monitoring for first week
2. Weekly monitoring for first month
3. Monthly monitoring thereafter
4. Document performance in model registry

---

## Deployment Architecture:

### Option 1: Batch Prediction (Recommended for Current Scale)
- **Schedule:** Daily at 2 AM
- **Process:** Load new loan applications → score → store results
- **Latency:** <5 minutes for 10,000 loans
- **Infrastructure:** Single VM with 8 cores, 16GB RAM
- **Cost:** Low, suitable for current volume

### Option 2: Real-Time API (Future Scale)
- **Architecture:** REST API with model server (FastAPI/Flask)
- **Latency:** <100ms per prediction
- **Scaling:** Auto-scaling based on load
- **Infrastructure:** Kubernetes cluster
- **Cost:** Higher, use when volume >100k loans/day

---

## Model Registry & Versioning:

### Versioning Schema:
```
v{YEAR}.{MONTH}.{INCREMENT}
Example: v2025.10.01
```

### Stored Artifacts:
1. **Model binary** (.pkl file)
2. **Metadata** (performance metrics, training date, features)
3. **Label encoders** (for categorical features)
4. **Training data statistics** (for drift detection)
5. **Model card** (documentation, known limitations)

---

## Monitoring Dashboard KPIs:

### Red (Critical):
- AUC < 0.65
- Precision < 0.50
- Recall < 0.40

### Yellow (Warning):
- AUC 0.65-0.70
- Precision 0.50-0.60  
- Recall 0.40-0.50
- Prediction drift >5%

### Green (Healthy):
- AUC > 0.70
- Precision > 0.60
- Recall > 0.50
- Prediction drift <3%

---

## Roles & Responsibilities:

| Role | Responsibility |
|------|----------------|
| **Data Scientist** | Model development, training, validation, documentation |
| **ML Engineer** | Deployment, monitoring, infrastructure, CI/CD |
| **Data Engineer** | Pipeline maintenance, data quality, feature engineering |
| **Product Manager** | Business requirements, approval for model refresh |
| **Risk Manager** | Model risk assessment, compliance validation |

---

## Compliance & Audit:

1. **Model Risk Management:**
   - Quarterly model validation by independent team
   - Annual regulatory review
   - Document all model changes

2. **Explainability:**
   - Maintain feature importance rankings
   - Provide SHAP values for key decisions
   - Document model limitations

3. **Bias Testing:**
   - Monthly fairness metrics by demographic groups
   - Disparate impact testing
   - Adverse action reasons for declined loans

## 📊 End-to-End ML Pipeline Summary

### Pipeline Architecture:

```
Bronze → Silver → Gold → ML Training → Inference → Monitoring → Visualization
```

### Completed Components:

✅ **1. Model Training Pipeline**
- Trains 3 models: Logistic Regression, Random Forest, Gradient Boosting
- Performs 80-20 train-test split with stratification
- Selects best model based on Test AUC
- Saves model artifacts to `model_store/`

✅ **2. Model Inference Pipeline**  
- Loads best model from model store
- Generates predictions for all loans
- Saves predictions to `datamart/gold/model_predictions.parquet`

✅ **3. Model Monitoring Pipeline**
- Calculates performance metrics (Accuracy, Precision, Recall, F1, AUC)
- Tracks performance over time (monthly)
- Detects data drift in prediction distributions
- Saves monitoring results to `datamart/gold/model_monitoring.parquet`

✅ **4. Visualization Dashboard**
- ROC Curve & AUC visualization
- Precision-Recall Curve
- Confusion Matrix heatmap
- Performance stability over time
- Actual vs Predicted default rates
- Prediction distribution analysis

✅ **5. Model Governance & SOP**
- Clear trigger conditions for model refresh
- 5-phase refresh process (Assessment → Deployment)
- Blue-green deployment strategy with rollback plan
- Model versioning and registry structure
- Monitoring KPI thresholds (Red/Yellow/Green)
- Roles & responsibilities matrix

### Data Lineage:

```
data/*.csv 
  → datamart/bronze/*.parquet (raw ingestion)
  → datamart/silver/*.parquet (cleaned & validated)  
  → datamart/gold/gold_features.parquet (engineered features)
  → model_store/best_model_*.pkl (trained model)
  → datamart/gold/model_predictions.parquet (inference results)
  → datamart/gold/model_monitoring.parquet (monitoring metrics)
  → model_performance_dashboard.png (visualization)
```

### Key Metrics:

| Metric | Threshold | Status |
|--------|-----------|--------|
| AUC-ROC | > 0.70 | ✅ Monitor |
| Precision | > 0.60 | ✅ Monitor |
| Recall | > 0.50 | ✅ Monitor |
| F1-Score | > 0.55 | ✅ Monitor |
| Prediction Drift | < 5% | ✅ Monitor |

### Next Steps:

1. Run all pipeline cells in sequence
2. Review model performance dashboard
3. Schedule monthly monitoring jobs
4. Implement automated alerting for threshold breaches
5. Set up quarterly model review calendar