In [1]:
import os
import glob
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
print('done importing')

done importing


In [2]:
spark = SparkSession \
    .builder \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/27 09:57:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
def setup_config():
    snapshot_date_str = "2024-01-01"
    model_name = "credit_model_2024_09_01.pkl"

    config = {}
    config["snapshot_date_str"] = snapshot_date_str
    config["snapshot_date"] = datetime.strptime(config["snapshot_date_str"], "%Y-%m-%d")

    config["model_name"] = model_name
    config["model_bank_directory"] = "model_bank/"
    config["model_artefact_filepath"] = config["model_bank_directory"] + config["model_name"]

    print("Config setup complete")
    pprint.pprint(config)

    return config

def read_gold_table(table, gold_db, spark):
    folder_path = os.path.join(gold_db, table)
    files_list = [os.path.join(folder_path, os.path.basename(f)) for f in glob.glob(os.path.join(folder_path, '*'))]
    df = spark.read.option("header", "true").parquet(*files_list)
    return df

In [4]:
# == Setup configuration ===
config = setup_config()

Config setup complete
{'model_artefact_filepath': 'model_bank/credit_model_2024_09_01.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_01.pkl',
 'snapshot_date': datetime.datetime(2024, 1, 1, 0, 0),
 'snapshot_date_str': '2024-01-01'}


In [28]:
# ==== Load Feature Data ====
y_spark = read_gold_table('label_store', 'datamart/gold', spark)
y_df = y_spark.toPandas().sort_values(by='snapshot_date')
y_df['snapshot_date'] = pd.to_datetime(y_df['snapshot_date'])
y_df.info()



<class 'pandas.core.frame.DataFrame'>
Int64Index: 8974 entries, 0 to 5002
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   loan_id        8974 non-null   object        
 1   customer_id    8974 non-null   object        
 2   label          8974 non-null   int32         
 3   label_def      8974 non-null   object        
 4   snapshot_date  8974 non-null   datetime64[ns]
dtypes: datetime64[ns](1), int32(1), object(3)
memory usage: 385.6+ KB


                                                                                

In [29]:
# ==== Load Inference ========
inference_path = f"datamart/gold/predictions/{config['model_name']}/{config['model_name'][:-4]}_predictions_{config['snapshot_date_str'].replace('-', '_')}.parquet"
y_inference = spark.read.option("header", "true").parquet(inference_path)
y_inference_pdf = y_inference.toPandas().sort_values(by='snapshot_date')
y_inference_pdf['snapshot_date'] = pd.to_datetime(y_inference_pdf['snapshot_date'])
y_inference_pdf.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 485 entries, 0 to 484
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype         
---  ------         --------------  -----         
 0   customer_id    485 non-null    object        
 1   snapshot_date  485 non-null    datetime64[ns]
 2   model_name     485 non-null    object        
 3   prediction     485 non-null    float64       
dtypes: datetime64[ns](1), float64(1), object(2)
memory usage: 18.9+ KB


In [36]:
y_df[y_df['snapshot_date'] == pd.to_datetime('2024-01-01')].sort_values(by='customer_id')['customer_id'].value_counts()

CUS_0x1130    1
CUS_0x90cf    1
CUS_0x960e    1
CUS_0x94f6    1
CUS_0x94e8    1
             ..
CUS_0x4f16    1
CUS_0x4e0e    1
CUS_0x4dd5    1
CUS_0x4dd1    1
CUS_0xfb8     1
Name: customer_id, Length: 471, dtype: int64

In [38]:
y_inference_pdf.sort_values(by='customer_id')['customer_id'].value_counts()

CUS_0x102d    1
CUS_0x853d    1
CUS_0x8e5f    1
CUS_0x8e38    1
CUS_0x8da2    1
             ..
CUS_0x4953    1
CUS_0x48f     1
CUS_0x48e     1
CUS_0x4874    1
CUS_0xffc     1
Name: customer_id, Length: 485, dtype: int64

In [41]:
df_monitor = pd.merge(
    y_inference_pdf,
    y_df[['customer_id', 'snapshot_date', 'label']],
    on=['customer_id'],
    how='inner'
)
df_monitor

Unnamed: 0,customer_id,snapshot_date_x,model_name,prediction,snapshot_date_y,label
0,CUS_0x9778,2024-01-01,credit_model_2024_09_01.pkl,0.247142,2024-07-01,0
1,CUS_0x305c,2024-01-01,credit_model_2024_09_01.pkl,0.198256,2024-07-01,0
2,CUS_0x303a,2024-01-01,credit_model_2024_09_01.pkl,0.371733,2024-07-01,1
3,CUS_0x2ff7,2024-01-01,credit_model_2024_09_01.pkl,0.077434,2024-07-01,0
4,CUS_0x2fc5,2024-01-01,credit_model_2024_09_01.pkl,0.690279,2024-07-01,1
...,...,...,...,...,...,...
480,CUS_0x7622,2024-01-01,credit_model_2024_09_01.pkl,0.147215,2024-07-01,0
481,CUS_0x7500,2024-01-01,credit_model_2024_09_01.pkl,0.078024,2024-07-01,0
482,CUS_0x7480,2024-01-01,credit_model_2024_09_01.pkl,0.155120,2024-07-01,0
483,CUS_0x8449,2024-01-01,credit_model_2024_09_01.pkl,0.134830,2024-07-01,0
