In [1]:
import os
import glob
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

# import model_inference


In [2]:
# Build a .py script that takes a snapshot date, loads a model artefact and make an inference and save to datamart

## set up pyspark session

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/28 01:54:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/06/28 01:54:32 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## set up config

In [3]:
snapshot_date_str = "2024-01-01"
model_name = "credit_model_2024_09_01.pkl"


In [4]:
config = {}
config["snapshot_date_str"] = snapshot_date_str
config["snapshot_date"] = datetime.strptime(config["snapshot_date_str"], "%Y-%m-%d")
config["model_name"] = model_name
config["model_bank_directory"] = "model_bank/"
config["model_artefact_filepath"] = config["model_bank_directory"] + config["model_name"]

pprint.pprint(config)

{'model_artefact_filepath': 'model_bank/credit_model_2024_09_01.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_01.pkl',
 'snapshot_date': datetime.datetime(2024, 1, 1, 0, 0),
 'snapshot_date_str': '2024-01-01'}


## load model artefact from model bank

In [5]:
# Load the model from the pickle file
with open(config["model_artefact_filepath"], 'rb') as file:
    model_artefact = pickle.load(file)

print("Model loaded successfully! " + config["model_artefact_filepath"])

Model loaded successfully! model_bank/credit_model_2024_09_01.pkl


## load feature store

In [6]:
from pyspark.sql import Window
from pyspark.sql.functions import col, last, first, coalesce

# connect to feature store 1
cust_risk_folder_path = "datamart/gold/feature_store/cust_fin_risk/"
cust_risk_files_list = [cust_risk_folder_path+os.path.basename(f) for f in glob.glob(os.path.join(cust_risk_folder_path, '*'))]
cust_risk_feature_store_sdf = spark.read.option("header", "true").parquet(*cust_risk_files_list)
print("row_count:",cust_risk_feature_store_sdf.count())

# extract cust risk feature store
cust_risk_features_sdf = cust_risk_feature_store_sdf.filter((col("snapshot_date") == config["snapshot_date"]))

# connect to feature store 2
eng_folder_path = "datamart/gold/feature_store/eng/"
eng_files_list = [eng_folder_path+os.path.basename(f) for f in glob.glob(os.path.join(eng_folder_path, '*'))]
eng_feature_store_sdf = spark.read.option("header", "true").parquet(*eng_files_list)
print("row_count:",eng_feature_store_sdf.count())

# extract eng feature store
eng_features_sdf = eng_feature_store_sdf.filter((col("snapshot_date") == config["snapshot_date"]))

# Join features
features_sdf = eng_features_sdf.join(cust_risk_features_sdf, on=["Customer_ID", "snapshot_date"], how="outer")

fill_cols = [
    "click_1m", "click_2m", "click_3m", "click_4m", "click_5m", "click_6m",
    "Credit_History_Age", "Num_Fin_Pdts", "EMI_to_Salary", "Debt_to_Salary",
    "Repayment_Ability", "Loans_per_Credit_Item", "Loan_Extent", "Outstanding_Debt",
    "Interest_Rate", "Delay_from_due_date", "Changed_Credit_Limit"
]

fwd_window = Window.partitionBy("Customer_ID").orderBy("snapshot_date").rowsBetween(Window.unboundedPreceding, 0)
bwd_window = Window.partitionBy("Customer_ID").orderBy("snapshot_date").rowsBetween(0, Window.unboundedFollowing)

for col_name in fill_cols:
    fwd_fill = last(col(col_name), ignorenulls=True).over(fwd_window)
    bwd_fill = first(col(col_name), ignorenulls=True).over(bwd_window)
    features_sdf = features_sdf.withColumn(col_name, coalesce(fwd_fill, bwd_fill))

features_pdf = features_sdf.toPandas()
features_pdf

                                                                                

row_count: 11974
row_count: 206402


                                                                                

Unnamed: 0,Customer_ID,snapshot_date,click_1m,click_2m,click_3m,click_4m,click_5m,click_6m,Credit_History_Age,Num_Fin_Pdts,EMI_to_Salary,Debt_to_Salary,Repayment_Ability,Loans_per_Credit_Item,Loan_Extent,Outstanding_Debt,Interest_Rate,Delay_from_due_date,Changed_Credit_Limit
0,CUS_0x1000,2024-01-01,172,0,150,169,135,68,,,,,,,,,,,
1,CUS_0x100b,2024-01-01,92,350,196,178,48,155,,,,,,,,,,,
2,CUS_0x1011,2024-01-01,94,142,143,133,206,154,,,,,,,,,,,
3,CUS_0x1013,2024-01-01,61,60,83,157,85,183,,,,,,,,,,,
4,CUS_0x1015,2024-01-01,22,0,40,93,0,116,,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
8969,CUS_0xfe5,2024-01-01,117,140,198,0,4,24,,,,,,,,,,,
8970,CUS_0xfea,2024-01-01,69,208,187,93,180,178,,,,,,,,,,,
8971,CUS_0xff3,2024-01-01,171,260,72,248,54,93,,,,,,,,,,,
8972,CUS_0xffc,2024-01-01,228,97,104,162,186,209,157.0,22.0,0.052271,0.24911,4945.289,0.533333,368.0,1300.13,27.0,46.0,8.82


## preprocess data for modeling

In [7]:
# prepare X_inference
feature_cols = fill_cols
X_inference = features_pdf[feature_cols]

# apply transformer - standard scaler
transformer_stdscaler = model_artefact["preprocessing_transformers"]["stdscaler"]
X_inference = transformer_stdscaler.transform(X_inference)

print('X_inference', X_inference.shape[0])
X_inference

X_inference 8974


array([[ 0.75795314, -1.22273733,  0.47431178, ...,         nan,
                nan,         nan],
       [-0.17417326,  2.69932734,  1.0068935 , ...,         nan,
                nan,         nan],
       [-0.1508701 ,  0.36850034,  0.39326674, ...,         nan,
                nan,         nan],
       ...,
       [ 0.74630156,  1.69079643, -0.42876156, ...,         nan,
                nan,         nan],
       [ 1.41044162, -0.13576512, -0.05826993, ...,  1.31097215,
         1.6752295 , -0.23272158],
       [-1.16455757,  2.18385599,  0.70586905, ...,         nan,
                nan,         nan]], shape=(8974, 17))

## model prediction inference

In [8]:
# load model
model = model_artefact["model"]

# predict model
y_inference = model.predict_proba(X_inference)[:, 1]

# prepare output
y_inference_pdf = features_pdf[["Customer_ID","snapshot_date"]].copy()
y_inference_pdf["model_name"] = config["model_name"]
y_inference_pdf["model_predictions"] = y_inference
y_inference_pdf

Unnamed: 0,Customer_ID,snapshot_date,model_name,model_predictions
0,CUS_0x1000,2024-01-01,credit_model_2024_09_01.pkl,0.188017
1,CUS_0x100b,2024-01-01,credit_model_2024_09_01.pkl,0.260677
2,CUS_0x1011,2024-01-01,credit_model_2024_09_01.pkl,0.171789
3,CUS_0x1013,2024-01-01,credit_model_2024_09_01.pkl,0.211780
4,CUS_0x1015,2024-01-01,credit_model_2024_09_01.pkl,0.215008
...,...,...,...,...
8969,CUS_0xfe5,2024-01-01,credit_model_2024_09_01.pkl,0.156240
8970,CUS_0xfea,2024-01-01,credit_model_2024_09_01.pkl,0.221796
8971,CUS_0xff3,2024-01-01,credit_model_2024_09_01.pkl,0.281820
8972,CUS_0xffc,2024-01-01,credit_model_2024_09_01.pkl,0.715934


## save model inference to datamart gold table

In [9]:
# create bronze datalake
gold_directory = f"datamart/gold/model_predictions/{config["model_name"][:-4]}/"
print(gold_directory)

if not os.path.exists(gold_directory):
    os.makedirs(gold_directory)

# save gold table - IRL connect to database to write
partition_name = config["model_name"][:-4] + "_predictions_" + snapshot_date_str.replace('-','_') + '.parquet'
filepath = gold_directory + partition_name
spark.createDataFrame(y_inference_pdf).write.mode("overwrite").parquet(filepath)
# df.toPandas().to_parquet(filepath,
#           compression='gzip')
print('saved to:', filepath)

datamart/gold/model_predictions/credit_model_2024_09_01/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_01/credit_model_2024_09_01_predictions_2024_01_01.parquet


## Check datamart

In [10]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

In [11]:
folder_path = "datamart/gold/model_predictions/credit_model_2024_09_01/"
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
df = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",df.count())

df.show()

row_count: 8974
+-----------+-------------+--------------------+-------------------+
|Customer_ID|snapshot_date|          model_name|  model_predictions|
+-----------+-------------+--------------------+-------------------+
| CUS_0xa584|   2024-01-01|credit_model_2024...| 0.2620254158973694|
| CUS_0xa585|   2024-01-01|credit_model_2024...|0.18663430213928223|
| CUS_0xa587|   2024-01-01|credit_model_2024...|0.15890556573867798|
|  CUS_0xa59|   2024-01-01|credit_model_2024...|0.20641601085662842|
| CUS_0xa59d|   2024-01-01|credit_model_2024...| 0.1637917011976242|
| CUS_0xa5a0|   2024-01-01|credit_model_2024...|0.17526331543922424|
| CUS_0xa5a1|   2024-01-01|credit_model_2024...| 0.1879502236843109|
| CUS_0xa5ac|   2024-01-01|credit_model_2024...| 0.3046533167362213|
| CUS_0xa5b6|   2024-01-01|credit_model_2024...|0.19666583836078644|
| CUS_0xa5be|   2024-01-01|credit_model_2024...|  0.222067192196846|
|  CUS_0xa5c|   2024-01-01|credit_model_2024...| 0.2429320216178894|
| CUS_0xa5c2|   20