In [2]:
import os
import glob
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import pprint
import pickle

import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType, BooleanType

from sklearn.model_selection import train_test_split
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification

import argparse
import os, pickle, pprint
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

import xgboost as xgb

import utils.model_inference as model_inference 

In [29]:
import pyspark
from pyspark.sql import SparkSession

# spark = SparkSession.builder \
#     .appName("model_train_main") \
#     .master("local[*]") \
#     .getOrCreate()

spark = (
    SparkSession.builder
    .appName("LocalMedallionPipeline")
    .config("spark.sql.execution.pyspark.udf.faulthandler.enabled", "true")
    .config("spark.python.worker.faulthandler.enabled", "true")
    .config("spark.driver.memory", "4g")  # raise if you can (6g/8g)
    .config("spark.sql.execution.arrow.pyspark.enabled", "false")  # avoid Arrow edge cases
    .getOrCreate()
)


spark.sparkContext.setLogLevel("ERROR")

In [30]:
snapshot_date_str = "2024-01-01"
model_name = "credit_model_2024_09_01.pkl"

In [31]:
config = {}
config["snapshot_date_str"] = snapshot_date_str
config["snapshot_date"] = datetime.strptime(config["snapshot_date_str"], "%Y-%m-%d")
config["model_name"] = model_name
config["model_bank_directory"] = "model_bank/"
config["model_artefact_filepath"] = config["model_bank_directory"] + config["model_name"]

pprint.pprint(config)

{'model_artefact_filepath': 'model_bank/credit_model_2024_09_01.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_01.pkl',
 'snapshot_date': datetime.datetime(2024, 1, 1, 0, 0),
 'snapshot_date_str': '2024-01-01'}


In [32]:
import os, pathlib

p = pathlib.Path(config["model_artefact_filepath"])
print("存在？", p.exists(), "| 是檔案？", p.is_file())
print("大小(bytes)：", p.stat().st_size if p.exists() else "N/A")

# 看看目錄下還有沒有其他版本
print("所在目錄：", p.parent)
for q in p.parent.glob("credit_model_*.pkl"):
    print(" -", q.name, "size:", q.stat().st_size)


存在？ True | 是檔案？ True
大小(bytes)： 64997
所在目錄： model_bank
 - credit_model_2024_09_01.pkl size: 64997


In [33]:
from utils.model_train_processor import DataProcessor
from utils.model_train_processor import SafeOrdinalEncoder
import sys, pickle

# Load the model from the pickle file
with open(config["model_artefact_filepath"], 'rb') as file:
    model_artefact = pickle.load(file)

print("Model loaded successfully! " + config["model_artefact_filepath"])


Model loaded successfully! model_bank/credit_model_2024_09_01.pkl


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


In [34]:
# connect to silver attributes table
folder_path = "datamart/silver/attr/"

# read specific parquet file for that snapshot_date_str
attributes_sdf = spark.read.parquet(folder_path + 'silver_attr_mthly_' + config["snapshot_date_str"].replace("-", "_") + '.parquet')

# take only important features
attributes_cols = ['Customer_ID', 'Age', 'Occupation', 'snapshot_date']
attributes_sdf_subset = attributes_sdf[attributes_cols]
print("attributes row_count:",attributes_sdf_subset.count())
attributes_sdf_subset.show(5)
print("selected columns:", attributes_sdf_subset.columns)

attributes row_count: 485
+-----------+---+------------+-------------+
|Customer_ID|Age|  Occupation|snapshot_date|
+-----------+---+------------+-------------+
| CUS_0x102d| 31|Entrepreneur|   2024-01-01|
| CUS_0x1051| 42|    Engineer|   2024-01-01|
| CUS_0x1269| 22|     Manager|   2024-01-01|
| CUS_0x1290| 31|   Architect|   2024-01-01|
| CUS_0x12d1| 41|  Accountant|   2024-01-01|
+-----------+---+------------+-------------+
only showing top 5 rows
selected columns: ['Customer_ID', 'Age', 'Occupation', 'snapshot_date']


In [35]:
# connect to silver attributes table
folder_path = "datamart/silver/fin/"

# read specific parquet file for that snapshot_date_str
financials_sdf = spark.read.parquet(folder_path + 'silver_fin_mthly_' + config["snapshot_date_str"].replace("-", "_") + '.parquet')

# take only important features
financials_cols = ["Customer_ID", "Annual_Income", "Monthly_Inhand_Salary",
    "Num_Bank_Accounts", "Num_Credit_Card", "Interest_Rate", "Num_of_Loan",
    "Delay_from_due_date", "Num_of_Delayed_Payment", "Changed_Credit_Limit",
    "Num_Credit_Inquiries", "Credit_Mix", "Outstanding_Debt",
    "Credit_Utilization_Ratio", "Credit_History_Age",
    "Total_EMI_per_month", "Amount_invested_monthly",
    "Payment_Behaviour", "Monthly_Balance",
    "Num_Fin_Pdts", "Loans_per_Credit_Item",
    "Debt_to_Salary", "EMI_to_Salary", "Repayment_Ability", "Loan_Extent"]
financials_sdf_subset = financials_sdf[financials_cols]
print("attributes row_count:",financials_sdf_subset.count())
financials_sdf_subset.show(5)
print("selected columns:", financials_sdf_subset.columns)

attributes row_count: 485
+-----------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+-------------------+-----------------------+-----------------+---------------+------------+---------------------+--------------------+--------------------+-----------------+-----------+
|Customer_ID|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Total_EMI_per_month|Amount_invested_monthly|Payment_Behaviour|Monthly_Balance|Num_Fin_Pdts|Loans_per_Credit_Item|      Debt_to_Salary|       EMI_to_Salary|Repayment_Ability|Loan_Extent|
+-----------+-------------+---------------------+-----------------+---

In [36]:
# Merge label data to attributes and financials into 1 table
merged_df = attributes_sdf_subset.select([col(c) for c in attributes_sdf_subset.columns]) # make a fresh copy
merged_df = merged_df.join(financials_sdf_subset, on="Customer_ID", how="left")

# Check size of resultant table. Join is correct, no of rows same as original labels_sdf
print(f"Num of rows: {merged_df.count()}")
merged_df.show(5, truncate=False)
merged_df.columns

Num of rows: 485
+-----------+---+------------+-------------+-------------+---------------------+-----------------+---------------+-------------+-----------+-------------------+----------------------+--------------------+--------------------+----------+----------------+------------------------+------------------+-------------------+-----------------------+-----------------+---------------+------------+---------------------+--------------------+--------------------+-----------------+-----------+
|Customer_ID|Age|Occupation  |snapshot_date|Annual_Income|Monthly_Inhand_Salary|Num_Bank_Accounts|Num_Credit_Card|Interest_Rate|Num_of_Loan|Delay_from_due_date|Num_of_Delayed_Payment|Changed_Credit_Limit|Num_Credit_Inquiries|Credit_Mix|Outstanding_Debt|Credit_Utilization_Ratio|Credit_History_Age|Total_EMI_per_month|Amount_invested_monthly|Payment_Behaviour|Monthly_Balance|Num_Fin_Pdts|Loans_per_Credit_Item|Debt_to_Salary      |EMI_to_Salary       |Repayment_Ability|Loan_Extent|
+-----------+---+

['Customer_ID',
 'Age',
 'Occupation',
 'snapshot_date',
 'Annual_Income',
 'Monthly_Inhand_Salary',
 'Num_Bank_Accounts',
 'Num_Credit_Card',
 'Interest_Rate',
 'Num_of_Loan',
 'Delay_from_due_date',
 'Num_of_Delayed_Payment',
 'Changed_Credit_Limit',
 'Num_Credit_Inquiries',
 'Credit_Mix',
 'Outstanding_Debt',
 'Credit_Utilization_Ratio',
 'Credit_History_Age',
 'Total_EMI_per_month',
 'Amount_invested_monthly',
 'Payment_Behaviour',
 'Monthly_Balance',
 'Num_Fin_Pdts',
 'Loans_per_Credit_Item',
 'Debt_to_Salary',
 'EMI_to_Salary',
 'Repayment_Ability',
 'Loan_Extent']

In [37]:
# 確保是 Spark DataFrame
from pyspark.sql import DataFrame as SparkDF

if isinstance(merged_df, SparkDF):
    merged_pd = merged_df.toPandas()
else:
    merged_pd = merged_df  # 已是 pandas

# 移除不需要的欄位
merged_df_clean = merged_pd.drop(columns=['Customer_ID',"snapshot_date"], errors='ignore')
merged_df_clean

Unnamed: 0,Age,Occupation,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,Num_of_Loan,Delay_from_due_date,Num_of_Delayed_Payment,...,Total_EMI_per_month,Amount_invested_monthly,Payment_Behaviour,Monthly_Balance,Num_Fin_Pdts,Loans_per_Credit_Item,Debt_to_Salary,EMI_to_Salary,Repayment_Ability,Loan_Extent
0,31.0,Entrepreneur,89064.520,7256.043,5,3,1,1,6,5,...,37.573,296.094,4.0,641.937,9,0.111111,0.089342,0.005177,7218.470,6
1,42.0,Engineer,35022.220,2859.518,3,5,4,1,6,11,...,21.215,259.345,0.0,295.392,9,0.111111,0.349741,0.007416,2838.303,6
2,22.0,Manager,42031.090,3762.591,2,1,8,3,1,5,...,66.506,108.856,4.0,450.897,6,0.750000,0.022200,0.017671,3696.085,3
3,31.0,Architect,10455.875,729.323,6,5,12,3,11,19,...,25.748,24.491,1.0,302.693,14,0.250000,0.113867,0.035256,703.575,33
4,41.0,Accountant,21384.940,1646.078,4,4,15,2,24,15,...,22.096,168.245,1.0,254.267,10,0.222222,0.419853,0.013415,1623.982,48
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
480,41.0,Musician,15497.020,1193.418,8,4,19,2,19,14,...,24.167,107.719,1.0,267.456,14,0.153846,0.988398,0.020233,1169.251,38
481,34.0,Developer,120955.920,10021.660,2,7,4,1,28,8,...,57.505,241.974,5.0,942.686,10,0.100000,0.036852,0.005737,9964.155,28
482,26.0,Journalist,14441.510,1453.459,9,8,26,2,15,17,...,21.032,48.390,4.0,325.924,19,0.111111,0.918768,0.014460,1432.427,30
483,43.0,Teacher,142475.320,12013.943,8,5,11,2,23,18,...,124.400,81.779,5.0,1235.216,15,0.142857,0.052316,0.010354,11889.543,46


In [38]:
# apply transformer 
transformer_processor = model_artefact['preprocessing_transformers']['data_processor'] 
X_inference = transformer_processor.transform(merged_df_clean)

print('X_inference', X_inference.shape[0])
X_inference

X_inference 485


Unnamed: 0,Age,Occupation,Annual_Income,Monthly_Inhand_Salary,Num_Bank_Accounts,Num_Credit_Card,Interest_Rate,Num_of_Loan,Delay_from_due_date,Num_of_Delayed_Payment,...,Amount_invested_monthly,Payment_Behaviour,Monthly_Balance,Num_Fin_Pdts,Loans_per_Credit_Item,Debt_to_Salary,EMI_to_Salary,Repayment_Ability,Loan_Extent,Debt_to_Income
0,-0.255766,-0.288488,0.884735,1.082652,-0.156713,-1.238814,-1.522806,-1.052975,-1.107708,-1.230472,...,0.516399,0.0,1.140587,-1.027195,-0.955549,-0.805891,-0.220051,0.722304,-0.983181,-0.648415
1,0.779029,-0.506495,-0.115080,-0.087631,-0.920651,-0.316165,-1.193203,-1.052975,-1.107708,-0.368863,...,0.410109,0.0,-0.344819,-1.027195,-0.955549,-0.319225,-0.215157,0.120652,-0.983181,-0.369015
2,-1.102417,0.365534,0.080331,0.257269,-1.302620,-2.161463,-0.753732,-0.296503,-2.674784,-1.230472,...,-0.284456,0.0,0.464234,-1.551316,1.785422,-0.950337,-0.192878,0.290844,-1.307846,-0.718792
3,-0.255766,-1.160517,-1.409892,-1.803855,0.225255,-0.316165,-0.314261,-0.296503,-0.433480,0.779948,...,-1.460403,0.0,-0.298140,-0.153660,-0.244847,-0.755339,-0.155192,-0.777883,-0.066274,-0.639482
4,0.684956,-1.378524,-0.643478,-0.781526,-0.538682,-0.777489,0.015342,-0.674739,0.484639,0.205542,...,0.063437,0.0,-0.631350,-0.852488,-0.380448,-0.204244,-0.202097,-0.239132,0.145750,-0.320116
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
480,0.684956,1.019556,-0.988426,-1.185474,0.989193,-0.777489,0.454812,-0.674739,0.205510,0.061941,...,-0.292830,0.0,-0.534716,-0.153660,-0.727824,0.560422,-0.187346,-0.450775,0.013324,0.235583
481,0.026450,-0.942510,1.212588,1.488526,-1.302620,0.606484,-1.193203,-1.052975,0.670297,-0.799668,...,0.354523,0.0,1.876700,-0.852488,-1.016193,-0.918020,-0.218826,0.930100,-0.158556,-0.704587
482,-0.726128,-0.070480,-1.063985,-0.937865,1.371162,1.067809,1.223886,-0.674739,-0.073619,0.492745,...,-0.927972,0.0,-0.156747,0.719876,-0.955549,0.479486,-0.199829,-0.319997,-0.119865,0.437073
483,0.873101,1.455571,1.387987,1.716433,0.989193,-0.316165,-0.424129,-0.674739,0.433575,0.636347,...,-0.512261,0.0,2.394654,0.021047,-0.785566,-0.884408,-0.208752,1.043986,0.121573,-0.686510


In [39]:
# load model
model = model_artefact["model"]

# predict model
y_inference = model.predict_proba(X_inference)[:, 1]

# prepare output
y_inference_pdf = merged_pd[["Customer_ID","snapshot_date"]].copy()
y_inference_pdf["model_name"] = config["model_name"]
y_inference_pdf["model_predictions"] = y_inference
y_inference_pdf

Unnamed: 0,Customer_ID,snapshot_date,model_name,model_predictions
0,CUS_0x102d,2024-01-01,credit_model_2024_09_01.pkl,0.053647
1,CUS_0x1051,2024-01-01,credit_model_2024_09_01.pkl,0.100153
2,CUS_0x1269,2024-01-01,credit_model_2024_09_01.pkl,0.030457
3,CUS_0x1290,2024-01-01,credit_model_2024_09_01.pkl,0.051263
4,CUS_0x12d1,2024-01-01,credit_model_2024_09_01.pkl,0.104633
...,...,...,...,...
480,CUS_0xc68d,2024-01-01,credit_model_2024_09_01.pkl,0.084153
481,CUS_0xc6d8,2024-01-01,credit_model_2024_09_01.pkl,0.300250
482,CUS_0xe8d,2024-01-01,credit_model_2024_09_01.pkl,0.222321
483,CUS_0xf9e,2024-01-01,credit_model_2024_09_01.pkl,0.123477


In [40]:
# create bronze datalake
gold_directory = f"datamart/gold/model_predictions/{config['model_name'][:-4]}/"
print(gold_directory)

if not os.path.exists(gold_directory):
    os.makedirs(gold_directory)

# save gold table - IRL connect to database to write
partition_name = config["model_name"][:-4] + "_preds_" + snapshot_date_str.replace('-','_') + '.parquet'
filepath = gold_directory + partition_name
spark.createDataFrame(y_inference_pdf).write.mode("overwrite").parquet(filepath)
# df.toPandas().to_parquet(filepath,
#           compression='gzip')
print('saved to:', filepath)

datamart/gold/model_predictions/credit_model_2024_09_01/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_01/credit_model_2024_09_01_preds_2024_01_01.parquet


backfill

In [41]:
# set up config
snapshot_date_str = "2023-01-01"

start_date_str = "2023-01-01"
end_date_str = "2024-12-01"

In [42]:
# generate list of dates to process
def generate_first_of_month_dates(start_date_str, end_date_str):
    # Convert the date strings to datetime objects
    start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
    end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
    # List to store the first of month dates
    first_of_month_dates = []

    # Start from the first of the month of the start_date
    current_date = datetime(start_date.year, start_date.month, 1)

    while current_date <= end_date:
        # Append the date in yyyy-mm-dd format
        first_of_month_dates.append(current_date.strftime("%Y-%m-%d"))
        
        # Move to the first of the next month
        if current_date.month == 12:
            current_date = datetime(current_date.year + 1, 1, 1)
        else:
            current_date = datetime(current_date.year, current_date.month + 1, 1)

    return first_of_month_dates

dates_str_lst = generate_first_of_month_dates(start_date_str, end_date_str)


In [43]:
import utils.model_inference as model_inference
from utils.model_train_processor import DataProcessor
from utils.model_train_processor import SafeOrdinalEncoder

In [None]:
for snapshot_date in dates_str_lst:
    print('snapshot_date: ', snapshot_date)
    model_inference.main(snapshot_date, model_name)

snapshot_date:  2023-01-01

---starting job---

{'model_artefact_filepath': 'model_bank/credit_model_2024_09_01.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_01.pkl',
 'snapshot_date': datetime.datetime(2023, 1, 1, 0, 0),
 'snapshot_date_str': '2023-01-01'}
Model loaded successfully! model_bank/credit_model_2024_09_01.pkl


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


attributes row_count: 530
financials row_count: 530
merged_df row_count: 530
X_inference rows:  530
datamart/gold/model_predictions/credit_model_2024_09_01/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_01/credit_model_2024_09_01_preds_2023_01_01.parquet
Stopping Spark session...

---completed job---


snapshot_date:  2023-02-01

---starting job---

{'model_artefact_filepath': 'model_bank/credit_model_2024_09_01.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_01.pkl',
 'snapshot_date': datetime.datetime(2023, 2, 1, 0, 0),
 'snapshot_date_str': '2023-02-01'}


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Model loaded successfully! model_bank/credit_model_2024_09_01.pkl
attributes row_count: 501
financials row_count: 501
merged_df row_count: 501
X_inference rows:  501
datamart/gold/model_predictions/credit_model_2024_09_01/


                                                                                

saved to: datamart/gold/model_predictions/credit_model_2024_09_01/credit_model_2024_09_01_preds_2023_02_01.parquet
Stopping Spark session...

---completed job---


snapshot_date:  2023-03-01

---starting job---

{'model_artefact_filepath': 'model_bank/credit_model_2024_09_01.pkl',
 'model_bank_directory': 'model_bank/',
 'model_name': 'credit_model_2024_09_01.pkl',
 'snapshot_date': datetime.datetime(2023, 3, 1, 0, 0),
 'snapshot_date_str': '2023-03-01'}


https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


Model loaded successfully! model_bank/credit_model_2024_09_01.pkl
attributes row_count: 506
financials row_count: 506
merged_df row_count: 506
X_inference rows:  506
datamart/gold/model_predictions/credit_model_2024_09_01/


                                                                                

In [3]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

In [4]:
folder_path = "datamart/gold/model_predictions/credit_model_2024_09_01/"
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
df = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",df.count())
df.show()

row_count: 11974
+-----------+-------------+--------------------+--------------------+
|Customer_ID|snapshot_date|          model_name|   model_predictions|
+-----------+-------------+--------------------+--------------------+
| CUS_0xbb30|   2024-07-01|credit_model_2024...| 0.10487028956413269|
| CUS_0xbb70|   2024-07-01|credit_model_2024...|0.050871994346380234|
| CUS_0xbb79|   2024-07-01|credit_model_2024...| 0.07232044637203217|
| CUS_0xbbfc|   2024-07-01|credit_model_2024...| 0.08364594727754593|
| CUS_0xbc2b|   2024-07-01|credit_model_2024...|  0.1306462585926056|
| CUS_0xbc3a|   2024-07-01|credit_model_2024...| 0.35166487097740173|
| CUS_0xbc50|   2024-07-01|credit_model_2024...|0.029445325955748558|
| CUS_0xbc5f|   2024-07-01|credit_model_2024...| 0.13459664583206177|
| CUS_0xbc74|   2024-07-01|credit_model_2024...|  0.3551354706287384|
| CUS_0xbc99|   2024-07-01|credit_model_2024...| 0.18393340706825256|
| CUS_0xbcde|   2024-07-01|credit_model_2024...| 0.03703097999095917|
|  