In [4]:
import os
import glob
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split

import model_inference


In [5]:
# Build a .py script that takes a snapshot date, loads a model artefact and make an inference and save to datamart

## set up pyspark session

In [6]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

## set up config

In [7]:
snapshot_date_str = "2024-01-01"
model_name = "credit_model_2024_09_02.pkl"


In [8]:
config = {}
config["snapshot_date_str"] = snapshot_date_str
config["snapshot_date"] = datetime.strptime(config["snapshot_date_str"], "%Y-%m-%d")
config["model_name"] = model_name
config["model_bank_directory"] = "model_bank/"
config["model_artefact_filepath"] = config["model_bank_directory"] + config["model_name"]

pprint.pprint(config)

{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 1, 1, 0, 0),
 'snapshot_date_str': '2024-01-01'}


## load model artefact from model bank

In [9]:
# Load the model from the pickle file
with open(config["model_artefact_filepath"], 'rb') as file:
    model_artefact = pickle.load(file)

print("Model loaded successfully! " + config["model_artefact_filepath"])

Model loaded successfully! model_bank/credit_model_2024_09_02.pkl


## load feature store

In [10]:
# Load gold feature store for the inference date
gold_feature_store_path = "datamart/gold/feature_store/"
files_list = [os.path.join(gold_feature_store_path, os.path.basename(f)) for f in glob.glob(os.path.join(gold_feature_store_path, '*'))]
features_store_sdf = spark.read.option("header", "true").parquet(*files_list)
features_sdf = features_store_sdf.filter(col("snapshot_date") == config["snapshot_date"])
features_pdf = features_sdf.toPandas()

features_pdf.head(5)


                                                                                

Unnamed: 0,Customer_ID,fe_1,fe_2,fe_3,fe_4,fe_5,fe_6,fe_7,fe_8,fe_9,...,Credit_Mix_Standard,Loan_Home_Equity_Loan,Loan_Payday_Loan,Loan_Personal_Loan,Loan_Debt_Consolidation_Loan,Loan_Mortgage_Loan,Loan_Student_Loan,Loan_Credit-Builder_Loan,Loan_Auto_Loan,Loan_Not_Specified
0,CUS_0x2ff7,-0.231084,-1.154507,1.226085,-0.360448,0.025799,0.563097,-1.5884,1.262261,-1.17734,...,1,1,1,0,0,1,0,1,0,0
1,CUS_0x303a,-1.036914,1.796077,-0.126767,0.758165,0.620311,-0.791628,-0.92111,0.762157,-0.058155,...,0,1,1,0,0,0,0,1,0,0
2,CUS_0x305c,-0.221136,-1.144437,1.453207,-0.570188,0.303238,-1.048729,-0.702,-0.178038,-1.098106,...,1,0,0,1,0,0,1,0,0,0
3,CUS_0x3082,0.853304,0.094204,-0.610634,-2.06833,0.600494,-1.740924,0.094763,-1.208251,-0.177007,...,1,0,1,0,0,0,0,1,0,1
4,CUS_0x308d,-0.519591,-0.238114,2.588813,-0.280547,-0.846151,-1.721147,0.274035,-0.998208,-0.186911,...,0,0,0,0,0,0,0,0,0,1


In [11]:
# Drop non-feature columns to get only feature columns
non_feature_cols = [
    'Customer_ID',
    'clickstream_snapshot_date',
    'attributes_snapshot_date',
    'financial_snapshot_date',
    'snapshot_date'
]

X_inference = features_pdf.drop(columns=non_feature_cols).values  # Convert to numpy array

## preprocess data for modeling

In [12]:
# Apply the scaler
transformer_stdscaler = model_artefact["preprocessing_transformers"]["stdscaler"]
X_inference = transformer_stdscaler.transform(X_inference)

print('X_inference', X_inference.shape[0])

X_inference 485




## model prediction inference

In [13]:
# load model
model = model_artefact["model"]

# predict model
y_inference = model.predict_proba(X_inference)[:, 1]

# prepare output
y_inference_pdf = features_pdf[["Customer_ID","snapshot_date"]].copy()
y_inference_pdf["model_name"] = config["model_name"]
y_inference_pdf["model_predictions"] = y_inference
y_inference_pdf

Unnamed: 0,Customer_ID,snapshot_date,model_name,model_predictions
0,CUS_0x2ff7,2024-01-01,credit_model_2024_09_02.pkl,0.414623
1,CUS_0x303a,2024-01-01,credit_model_2024_09_02.pkl,0.401065
2,CUS_0x305c,2024-01-01,credit_model_2024_09_02.pkl,0.462319
3,CUS_0x3082,2024-01-01,credit_model_2024_09_02.pkl,0.563414
4,CUS_0x308d,2024-01-01,credit_model_2024_09_02.pkl,0.416237
...,...,...,...,...
480,CUS_0x2e1a,2024-01-01,credit_model_2024_09_02.pkl,0.465596
481,CUS_0x2e8d,2024-01-01,credit_model_2024_09_02.pkl,0.398042
482,CUS_0x2ee4,2024-01-01,credit_model_2024_09_02.pkl,0.462323
483,CUS_0x2fa7,2024-01-01,credit_model_2024_09_02.pkl,0.590299


## save model inference to datamart gold table

In [14]:
# create bronze datalake
gold_directory = f"datamart/gold/model_predictions/{config['model_name'][:-4]}/"
print(gold_directory)

if not os.path.exists(gold_directory):
    os.makedirs(gold_directory)

# save gold table - IRL connect to database to write
partition_name = config["model_name"][:-4] + "_predictions_" + snapshot_date_str.replace('-','_') + '.parquet'
filepath = gold_directory + partition_name
spark.createDataFrame(y_inference_pdf).write.mode("overwrite").parquet(filepath)

print('saved to:', filepath)

datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_01_01.parquet


## backfill

In [15]:
# set up config
snapshot_date_str = "2023-02-01"

start_date_str = "2023-02-01"
end_date_str = "2024-12-01"

In [16]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)


In [17]:
import glob
import os

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)

for snapshot_date in dates_str_lst:
    print(snapshot_date)
    # Build the directory path for this date
    label_dir = f"datamart/gold/label_store/gold_label_store_{snapshot_date.replace('-', '_')}.parquet"
    # Find all parquet files inside the directory
    label_files = glob.glob(os.path.join(label_dir, "*.parquet"))
    if label_files:
        model_inference.main(snapshot_date, model_name)
    else:
        print(f"Label file missing for {snapshot_date}, skipping.")

2023-02-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 2, 1, 0, 0),
 'snapshot_date_str': '2023-02-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 501 2023-02-01 00:00:00




X_inference 501
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_02_01.parquet


---completed job---


2023-03-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 3, 1, 0, 0),
 'snapshot_date_str': '2023-03-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 506 2023-03-01 00:00:00




X_inference 506
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_03_01.parquet


---completed job---


2023-04-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 4, 1, 0, 0),
 'snapshot_date_str': '2023-04-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 510 2023-04-01 00:00:00




X_inference 510
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_04_01.parquet


---completed job---


2023-05-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 5, 1, 0, 0),
 'snapshot_date_str': '2023-05-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 521 2023-05-01 00:00:00




X_inference 521
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_05_01.parquet


---completed job---


2023-06-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 6, 1, 0, 0),
 'snapshot_date_str': '2023-06-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 517 2023-06-01 00:00:00




X_inference 517
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_06_01.parquet


---completed job---


2023-07-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 7, 1, 0, 0),
 'snapshot_date_str': '2023-07-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 471 2023-07-01 00:00:00




X_inference 471
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_07_01.parquet


---completed job---


2023-08-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 8, 1, 0, 0),
 'snapshot_date_str': '2023-08-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 481 2023-08-01 00:00:00




X_inference 481
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_08_01.parquet


---completed job---


2023-09-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 9, 1, 0, 0),
 'snapshot_date_str': '2023-09-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 454 2023-09-01 00:00:00




X_inference 454
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_09_01.parquet


---completed job---


2023-10-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 10, 1, 0, 0),
 'snapshot_date_str': '2023-10-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 487 2023-10-01 00:00:00




X_inference 487
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_10_01.parquet


---completed job---


2023-11-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 11, 1, 0, 0),
 'snapshot_date_str': '2023-11-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 491 2023-11-01 00:00:00




X_inference 491
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_11_01.parquet


---completed job---


2023-12-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2023, 12, 1, 0, 0),
 'snapshot_date_str': '2023-12-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 489 2023-12-01 00:00:00




X_inference 489
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2023_12_01.parquet


---completed job---


2024-01-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 1, 1, 0, 0),
 'snapshot_date_str': '2024-01-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 485 2024-01-01 00:00:00




X_inference 485
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_01_01.parquet


---completed job---


2024-02-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 2, 1, 0, 0),
 'snapshot_date_str': '2024-02-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 518 2024-02-01 00:00:00




X_inference 518
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_02_01.parquet


---completed job---


2024-03-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 3, 1, 0, 0),
 'snapshot_date_str': '2024-03-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 511 2024-03-01 00:00:00




X_inference 511
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_03_01.parquet


---completed job---


2024-04-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 4, 1, 0, 0),
 'snapshot_date_str': '2024-04-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 513 2024-04-01 00:00:00




X_inference 513
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_04_01.parquet


---completed job---


2024-05-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 5, 1, 0, 0),
 'snapshot_date_str': '2024-05-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 491 2024-05-01 00:00:00




X_inference 491
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_05_01.parquet


---completed job---


2024-06-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 6, 1, 0, 0),
 'snapshot_date_str': '2024-06-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 498 2024-06-01 00:00:00




X_inference 498
datamart/gold/model_predictions/credit_model_2024_09_02/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_02/credit_model_2024_09_02_predictions_2024_06_01.parquet


---completed job---


2024-07-01


---starting job---


{'model_artefact_filepath': 'model_bank/credit_model_2024_09_02.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_02.pkl',
 'snapshot_date': datetime.datetime(2024, 7, 1, 0, 0),
 'snapshot_date_str': '2024-07-01'}
Model loaded successfully! model_bank/credit_model_2024_09_02.pkl
extracted features_sdf 0 2024-07-01 00:00:00




ValueError: Found array with 0 sample(s) (shape=(0, 34)) while a minimum of 1 is required by StandardScaler.

## Check datamart

In [None]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

In [None]:
folder_path = "datamart/gold/model_predictions/credit_model_2024_09_02/"
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
df = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",df.count())

df.show()