In [15]:
from utils.data_processing_bronze_table import process_bronze_table
from utils.data_processing_silver_table import process_silver_table
from utils.data_processing_gold_table import process_gold_table

In [16]:
import os

import os
import glob
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

In [17]:
# set up config -- Need to make it more modular
model_train_date_str = "2024-09-01"
train_test_period_months = 12
oot_period_months = 2
train_test_ratio = 0.8

config = {}
config["model_train_date_str"] = model_train_date_str
config["train_test_period_months"] = train_test_period_months
config["oot_period_months"] =  oot_period_months
config["model_train_date"] =  datetime.strptime(model_train_date_str, "%Y-%m-%d")
config["oot_end_date"] =  config['model_train_date'] - timedelta(days = 1)
config["oot_start_date"] =  config['model_train_date'] - relativedelta(months = oot_period_months)
config["train_test_end_date"] =  config["oot_start_date"] - timedelta(days = 1)
config["train_test_start_date"] =  config["oot_start_date"] - relativedelta(months = train_test_period_months)
config["train_test_ratio"] = train_test_ratio 


pprint.pprint(config)

{'model_train_date': datetime.datetime(2024, 9, 1, 0, 0),
 'model_train_date_str': '2024-09-01',
 'oot_end_date': datetime.datetime(2024, 8, 31, 0, 0),
 'oot_period_months': 2,
 'oot_start_date': datetime.datetime(2024, 7, 1, 0, 0),
 'train_test_end_date': datetime.datetime(2024, 6, 30, 0, 0),
 'train_test_period_months': 12,
 'train_test_ratio': 0.8,
 'train_test_start_date': datetime.datetime(2023, 7, 1, 0, 0)}


In [18]:
# Start Spark
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

# gold_path = "datamart/gold/feature_store/2023_01_01.parquet"
# # Read the folder — Spark finds all part files & ignores .crc automatically
# df = spark.read.parquet(gold_path)

# # Now you have a Spark DataFrame for that year
# df.show()
# df.printSchema()

### Get Lables

In [19]:
# connect to label store
folder_path = "datamart/gold/label_store/"
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
label_store_sdf = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",label_store_sdf.count())

label_store_sdf.show()

row_count: 8974
+--------------------+-----------+-----+----------+-------------+
|             loan_id|customer_id|label| label_def|snapshot_date|
+--------------------+-----------+-----+----------+-------------+
|CUS_0x1037_2023_0...| CUS_0x1037|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1069_2023_0...| CUS_0x1069|    0|30dpd_6mob|   2023-07-01|
|CUS_0x114a_2023_0...| CUS_0x114a|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1184_2023_0...| CUS_0x1184|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1297_2023_0...| CUS_0x1297|    1|30dpd_6mob|   2023-07-01|
|CUS_0x12fb_2023_0...| CUS_0x12fb|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1325_2023_0...| CUS_0x1325|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1341_2023_0...| CUS_0x1341|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1375_2023_0...| CUS_0x1375|    1|30dpd_6mob|   2023-07-01|
|CUS_0x13a8_2023_0...| CUS_0x13a8|    0|30dpd_6mob|   2023-07-01|
|CUS_0x13ef_2023_0...| CUS_0x13ef|    0|30dpd_6mob|   2023-07-01|
|CUS_0x1440_2023_0...| CUS_0x1440|    0|30dpd_6mob|   2023-0

In [20]:
# extract label store
labels_sdf = label_store_sdf.filter((col("snapshot_date") >= config["train_test_start_date"]) & (col("snapshot_date") <= config["oot_end_date"]))

print("extracted labels_sdf", labels_sdf.count(), config["train_test_start_date"], config["oot_end_date"])

extracted labels_sdf 6961 2023-07-01 00:00:00 2024-08-31 00:00:00


### Get Features

In [21]:
feature_location = "datamart/gold/feature_store"

features_files_list = [
    os.path.join(feature_location, os.path.basename(f))
    for f in glob.glob(os.path.join(feature_location, '*'))
]

# Load CSV into DataFrame - connect to feature store
features_store_sdf = spark.read.option("header", "true").parquet(*features_files_list)
print("row_count:",features_store_sdf.count())
features_store_sdf.show()


row_count: 8974
+-----------+-------------+---+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------------+------------------------+-------------------+-----------------------+---------------+------------------------+-------------+---------+-------------------+-------------+-------------+------------+----------------+-----------+-----------------------+-----------------+--------------------+-----------------+-------------------+------------------------+-------------------+--------------------+--------------------+------------------+---------------------+-----------------------+---------------------+-------------------+-----------------+------------------+-------------------------+------------------------+------------------------+-------------------+---------------+--------------+---------------------------+----------------------------+---------------

In [22]:
# extract feature store
features_sdf = features_store_sdf.filter((col("snapshot_date") >= config["train_test_start_date"]) & (col("snapshot_date") <= config["oot_end_date"]))
print("extracted features_sdf", features_sdf.count(), config["train_test_start_date"], config["oot_end_date"])

extracted features_sdf 5889 2023-07-01 00:00:00 2024-08-31 00:00:00


In [23]:
y_df = labels_sdf.toPandas().sort_values(by='customer_id')
X_df = features_sdf.toPandas().sort_values(by='customer_id')

X_df['snapshot_date'] = pd.to_datetime(X_df['snapshot_date'])
y_df['snapshot_date'] = pd.to_datetime(y_df['snapshot_date'])

print(len(X_df))
print(len(y_df))

missing_customers = y_df[~y_df['customer_id'].isin(X_df['customer_id'])]
print(len(missing_customers))




CodeCache: size=131072Kb used=38665Kb max_used=38674Kb free=92406Kb
 bounds [0x000000010b1f8000, 0x000000010d808000, 0x00000001131f8000]
 total_blobs=14095 nmethods=13101 adapters=905
 compilation: disabled (not enough contiguous free space left)
5889
6961
3085


In [24]:
merged_df = pd.merge(X_df, y_df, on='customer_id', how='left')
merged_df = merged_df.dropna()
merged_df = merged_df.drop(columns=["snapshot_date_x", "snapshot_date_y", "customer_id", "loan_id", "label_def"])
print(len(merged_df))

3876


In [28]:
import pandas as pd
import numpy as np   # <-- import numpy

# 1. Identify your label column
label_col = 'label'  # change if your target column has a different name

# 2. Separate features and target
y = merged_df[label_col]
X = merged_df.drop(columns=[label_col])

# 3. Feature → target correlations (simpler with corrwith)
feat_target_corr = X.corrwith(y)
top10_feat_target = feat_target_corr.abs().sort_values(ascending=False).head(10)

print("Top 10 features most correlated with target:\n",
      pd.DataFrame({
          'feature':      top10_feat_target.index,
          'corr_with_target': feat_target_corr[top10_feat_target.index]
      }))

# 4. Inter‐feature correlations
corr_matrix = X.corr().abs()

# Create a mask for the upper triangle
mask = np.triu(np.ones(corr_matrix.shape, dtype=bool), k=1)

# Apply the mask, stack, and sort
sorted_pairs = (
    corr_matrix.where(mask)
               .stack()
               .sort_values(ascending=False)
               .reset_index()
               .rename(columns={
                   'level_0': 'feat1',
                   'level_1': 'feat2',
                   0:         'abs_corr'
               })
)

top10_pairs = sorted_pairs.head(10)
print("\nTop 10 most correlated feature‐pairs:\n", top10_pairs)

Top 10 features most correlated with target:
                                            feature  corr_with_target
delay_from_due_date            delay_from_due_date          0.328343
avg_fe_10                                avg_fe_10         -0.322844
avg_fe_5                                  avg_fe_5          0.318774
interest_rate                        interest_rate          0.318563
credit_mix_bad                      credit_mix_bad          0.300935
outstanding_debt                  outstanding_debt          0.299115
credit_history_age_month  credit_history_age_month         -0.282833
avg_fe_9                                  avg_fe_9         -0.273494
num_of_loan                            num_of_loan          0.269500
avg_fe_4                                  avg_fe_4          0.268070

Top 10 most correlated feature‐pairs:
                          feat1                         feat2  abs_corr
0  payment_behaviour_spent_low  payment_behaviour_spent_high  0.853717
1    payment_