In [1]:
import mlflow
from mlflow.models import infer_signature
from config import run_name
import polars as pl
import numpy as np
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import roc_auc_score 
import warnings
import os
import re

warnings.filterwarnings('ignore')

dataPath = "/Users/chrisjackson/Downloads/home-credit-credit-risk-model-stability/"

## Define an MLflow Experiment

In order to group any distinct runs of a particular project or idea together, we can define an Experiment that will group each iteration (runs) together. 
Defining a unique name that is relevant to what we're working on helps with organization and reduces the amount of work (searching) to find our runs later on. 

In [2]:
# NOTE: review the links mentioned above for guidance on connecting to a managed tracking server, such as the free Databricks Community Edition

mlflow.set_tracking_uri(uri="http://127.0.0.1:8080")

### NOTE:RUN THIS IN BASH TO START MLFLOW TRACKING SERVER ###
# mlflow server --host 127.0.0.1 --port 8080


In [3]:
def get_or_create_experiment(experiment_name):
    """
    Retrieve the ID of an existing MLflow experiment or create a new one if it doesn't exist.

    This function checks if an experiment with the given name exists within MLflow.
    If it does, the function returns its ID. If not, it creates a new experiment
    with the provided name and returns its ID.

    Parameters:
    - experiment_name (str): Name of the MLflow experiment.

    Returns:
    - str: ID of the existing or newly created MLflow experiment.
    """

    if experiment := mlflow.get_experiment_by_name(experiment_name):
        return experiment.experiment_id
    else:
        return mlflow.create_experiment(experiment_name)


In [4]:
experiment_id = get_or_create_experiment("MLflow Home Credit Test")

## Data Cleaning and Aggregation

In [5]:
class Pipeline:
    @staticmethod
    def set_table_dtypes(df): #Standardize the dtype.
        for col in df.columns:
            if col in ["case_id", "WEEK_NUM", "num_group1", "num_group2"]:
                df = df.with_columns(pl.col(col).cast(pl.Int64))
            elif col in ["date_decision"]:
                df = df.with_columns(pl.col(col).cast(pl.Date))
            elif col[-1] in ("P", "A"):
                df = df.with_columns(pl.col(col).cast(pl.Float64))
            elif col[-1] in ("M",):
                df = df.with_columns(pl.col(col).cast(pl.String))
            elif col[-1] in ("D",):
                df = df.with_columns(pl.col(col).cast(pl.Date))            

        return df
    
    @staticmethod
    def convert_strings(df: pd.DataFrame) -> pd.DataFrame:
        for col in df.columns:  
            if df[col].dtype.name in ['object', 'string']:
                df[col] = df[col].astype("string").astype('category')
                current_categories = df[col].cat.categories
                new_categories = current_categories.to_list() + ["Unknown"]
                new_dtype = pd.CategoricalDtype(categories=new_categories, ordered=True)
                df[col] = df[col].astype(new_dtype)
        return df
    
    @staticmethod
    def handle_dates(df): #Change the feature for D to the difference in days from date_decision.
        for col in df.columns:
            if col[-1] in ("D",):
                df = df.with_columns(pl.col(col) - pl.col("date_decision"))
                df = df.with_columns(pl.col(col).dt.total_days())
                
        df = df.drop("date_decision", "MONTH")

        return df
    
    @staticmethod
    def filter_cols(df): #Remove those with an average is_null exceeding 0.95 and those that do not fall within the range 1 < nunique < 200.
        for col in df.columns:
            if col not in ["target", "case_id", "WEEK_NUM"]:
                isnull = df[col].is_null().mean()

                if isnull > 0.95:
                    df = df.drop(col)

        for col in df.columns:
            if (col not in ["target", "case_id", "WEEK_NUM"]) & (df[col].dtype == pl.String):
                freq = df[col].n_unique()

                if (freq == 1) | (freq > 200):
                    df = df.drop(col)

        return df

In [6]:
feature_def = pl.read_csv(dataPath + "feature_definitions.csv")

basetable = pl.read_parquet(dataPath + "parquet_files/train/train_base.parquet")


In [7]:
def data_agg(depth=0, test_train='train') -> pl.DataFrame:
    # List all files in the folder
    all_files = os.listdir(dataPath + f"parquet_files/{test_train}/")

    # Filter files where the first number in the filename is depth
    filtered_files = []
    for file in all_files:
        digit = re.search(r'\d+', file)
        if digit and digit.group() == str(depth):
            filtered_files.append(file)
    
    
    # Read the base table
    aggregated_df = pl.read_parquet(dataPath + f"parquet_files/{test_train}/train_base.parquet")
    # Add the static tables to the aggregated_df, meaning the first number in the file name is 0 and they should be listed in filetred_files
    for file in filtered_files:
        df = pl.read_parquet(dataPath + f"parquet_files/{test_train}/{file}")
        print(file)

        # Join the aggregated results to the overall aggregated dataframe using the case_id column as the index
        aggregated_df = aggregated_df.join(df, on="case_id", how="left")
    
    # The columns with right in the name might contain info not in the non-right columns.  We need to combine them into one column
    # First we need to find the columns with right in the name
    right_columns = [col for col in aggregated_df.columns if "right" in col]
    # Then we need to find the columns without right in the name
    non_right_columns = [col for col in aggregated_df.columns if "right" not in col]
    # We need to iterate over the right columns and add them to the non-right columns and pick the non-null value
    for col in right_columns:
        # Print out how many null values there are in the right column
        print(f"Number of null values in {col}: {aggregated_df[col].is_null().sum()}")
        # Find the non-right column that corresponds to the right column
        non_right_col = col.replace("_right", "")
        # Print out how many null values there are in the non-right column
        print(f"Number of null values in {non_right_col}: {aggregated_df[non_right_col].is_null().sum()}")
        # Add the non-right column to the aggregated_df if the non-right column is null
        aggregated_df = aggregated_df.with_columns(
        pl.when(pl.col(non_right_col).is_null())
        .then(pl.col(col))
        .otherwise(pl.col(non_right_col))
        .alias(non_right_col)
        )
        # Print out how many null values there are in the non-right column after the change
        print(f"Number of null values in the combined {non_right_col}: {aggregated_df[non_right_col].is_null().sum()}")
    # We need to drop the right columns
    aggregated_df = aggregated_df.drop(right_columns)
    
    # Process the data with the Pipeline class
    aggregated_df = Pipeline.set_table_dtypes(aggregated_df)
    aggregated_df = Pipeline.handle_dates(aggregated_df)
    aggregated_df = Pipeline.filter_cols(aggregated_df)
    
    # If the depth is 1, then we are interested in the base tables and the tables with a depth of 1
    # if depth == 1:
    #     # As a test lets try to count the number of applications for each case_id
    #     # Gather the files with a depth of 1
    #     depth_1_files = [file for file in filtered_files if file[0] == "1"]
    #     return aggregated_df

    
    # # Remove basetable from the list of parquet files
    # parquet_files = [file for file in parquet_files if file != "train_base.parquet"]
    
    # # Iterate over each parquet file
    # for file in parquet_files:
    #     # Read the parquet file
    #     df = pl.read_parquet(os.path.join(dataPath, file))
        
    #     # Check if the file contains the column num_group1 and is a file with a depth of 1
    #     if "num_group1" in df.columns and "num_group2" not in df.columns:

    #         # Remove the num_group1 column
    #         df = df.drop("num_group1")
            # Join the aggregated results to the overall aggregated dataframe using the case_id column as the index
    #         aggregated_df = aggregated_df.join(df, on="case_id", how="outer")
        
    #     # Check if the file contains the column num_group1 and num_group2 and is a file with a depth of 2
    #     elif "num_group1" in df.columns and "num_group2" in df.columns and num_group2 is not None:
            # # Check if the aggregated df has the num_group1 and num_group2 columns
            # if "num_group2" not in aggregated_df.columns:
            #     # Add the num_group2 column to the aggregated_df
            #     aggregated_df = aggregated_df.join(df, on="case_id", how="outer")
            # # Filter for the desired num_group1 and num_group2 values
            # df = df.filter(pl.col("num_group1") == num_goup1).filter(pl.col("num_group2") == num_group2)
            # # Remove the num_group1 and num_group2 columns
            # df = df.drop("num_group1").drop("num_group2")
            # # Join the aggregated results to the overall aggregated dataframe using the case_id column as the index
            # aggregated_df = aggregated_df.join(df, on="case_id", how="outer")
        
    return aggregated_df
    

In [8]:
data = data_agg(0, 'train')


train_static_0_0.parquet
train_static_0_1.parquet
train_static_cb_0.parquet
Number of null values in actualdpdtolerance_344P_right: 1125249
Number of null values in actualdpdtolerance_344P: 819588
Number of null values in the combined actualdpdtolerance_344P: 418178
Number of null values in amtinstpaidbefduel24m_4187115A_right: 1136027
Number of null values in amtinstpaidbefduel24m_4187115A: 951756
Number of null values in the combined amtinstpaidbefduel24m_4187115A: 561124
Number of null values in annuity_780A_right: 1003757
Number of null values in annuity_780A: 522902
Number of null values in the combined annuity_780A: 0
Number of null values in annuitynextmonth_57A_right: 1003759
Number of null values in annuitynextmonth_57A: 522904
Number of null values in the combined annuitynextmonth_57A: 4
Number of null values in applicationcnt_361L_right: 1003757
Number of null values in applicationcnt_361L: 522902
Number of null values in the combined applicationcnt_361L: 0
Number of null va

In [9]:
# Send the depth 0 dataframe to a parquet file
data.write_parquet(dataPath + "depth0_df.parquet")

In [10]:
# Find the columns ending in P as they are the days past due columns
dpd_columns = [col for col in data.columns if col.endswith("P")]
dpd_columns = pl.DataFrame({'feature_name':dpd_columns})
# Add the column name feature_name to the feature_def dataframe
feature_def.columns = ["feature_name", "description"]
# Join the feature definitions to the dpd columns
dpd_columns_names = dpd_columns.join(feature_def, on="feature_name", how="left")
dpd_columns_names.head()


feature_name,description
str,str
"""actualdpdtoler…","""DPD of client …"
"""avgdbddpdlast2…","""Average days p…"
"""avgdbddpdlast3…","""Average days p…"
"""avgdbdtollast2…","""Average days o…"
"""avgdpdtolclosu…","""Average DPD (d…"


In [11]:
# Lets try making a dataframe of the payment history only and see if we can predict the target, might be a path to ensembling
# Create a dataframe of the payment history
payment_history = data.select(dpd_columns_names['feature_name'].to_list() + ["target"] + ["case_id"] + ["WEEK_NUM"])

In [12]:
# Make a test run with data as the payment history
data = payment_history

### Modelling

In [13]:
case_ids = data["case_id"].unique().shuffle(seed=1)
case_ids_train, case_ids_valid = train_test_split(case_ids, train_size=0.8, random_state=1)


cols_pred = []
for col in data.columns:
    if col[-1].isupper() and col[:-1].islower():
        cols_pred.append(col)

print(cols_pred)

def from_polars_to_pandas(case_ids: pl.DataFrame) -> pl.DataFrame:
    return (
        data.filter(pl.col("case_id").is_in(case_ids))[["case_id", "WEEK_NUM", "target"]].to_pandas(),
        data.filter(pl.col("case_id").is_in(case_ids))[cols_pred].to_pandas(),
        data.filter(pl.col("case_id").is_in(case_ids))["target"].to_pandas().ravel()
    )

base_train, X_train, y_train = from_polars_to_pandas(case_ids_train)
base_valid, X_valid, y_valid = from_polars_to_pandas(case_ids_valid)


# for df in [X_train, X_valid, X_test]:
#     df = Pipeline.convert_strings(df)

['actualdpdtolerance_344P', 'avgdbddpdlast24m_3658932P', 'avgdbddpdlast3m_4187120P', 'avgdbdtollast24m_4525197P', 'avgdpdtolclosure24_3658938P', 'avgmaxdpdlast9m_3716943P', 'maxdbddpdlast1m_3658939P', 'maxdbddpdtollast12m_3658940P', 'maxdbddpdtollast6m_4187119P', 'maxdpdfrom6mto36m_3546853P', 'maxdpdinstlnum_3546846P', 'maxdpdlast12m_727P', 'maxdpdlast24m_143P', 'maxdpdlast3m_392P', 'maxdpdlast6m_474P', 'maxdpdlast9m_1059P', 'maxdpdtolerance_374P', 'mindbddpdlast24m_3658935P', 'mindbdtollast24m_4525191P', 'posfpd10lastmonth_333P', 'posfpd30lastmonth_3976960P', 'posfstqpd30lastmonth_3976962P']


In [14]:
print(f"Train: {X_train.shape}")
print(f"Valid: {X_valid.shape}")
print(f"Test: {X_test.shape}")

Train: (915995, 22)
Valid: (305332, 22)
Test: (305332, 22)


In [15]:
import optuna

# override Optuna's default logging to ERROR only
optuna.logging.set_verbosity(optuna.logging.ERROR)

# define a logging callback that will report on only new challenger parameter configurations if a
# trial has usurped the state of 'best conditions'


def champion_callback(study, frozen_trial):
    """
    Logging callback that will report when a new trial iteration improves upon existing
    best trial values.
    """

    winner = study.user_attrs.get("winner", None)

    if study.best_value and winner != study.best_value:
        study.set_user_attr("winner", study.best_value)
        if winner:
            improvement_percent = (abs(winner - study.best_value) / study.best_value) * 100
            print(
                f"Trial {frozen_trial.number} achieved value: {frozen_trial.value} with "
                f"{improvement_percent: .4f}% improvement"
            )
        else:
            print(f"Initial trial {frozen_trial.number} achieved value: {frozen_trial.value}")


In [16]:

# Create a dataset for lightgbm
lgb_train = lgb.Dataset(X_train, label=y_train)
lgb_valid = lgb.Dataset(X_valid, label=y_valid, reference=lgb_train)

# Define the objective function for Optuna
def objective(trial):
    with mlflow.start_run(nested=True):
        param = {
            "boosting_type": trial.suggest_categorical("boosting_type", ["gbdt", "rf"]),
            "objective": "binary",
            "metric": "auc",
            "max_depth": trial.suggest_int("max_depth", 1, 10),
            "num_leaves": trial.suggest_int("num_leaves", 20, 60),
            "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2),
            "feature_fraction": trial.suggest_float("feature_fraction", 0.6, 1.0),
            "bagging_fraction": trial.suggest_float("bagging_fraction", 0.6, 1.0),
            "bagging_freq": trial.suggest_int("bagging_freq", 1, 10),
            "min_child_samples": trial.suggest_int("min_child_samples", 5, 100),
            "lambda_l1": trial.suggest_loguniform("lambda_l1", 1e-8, 10.0),
            "lambda_l2": trial.suggest_loguniform("lambda_l2", 1e-8, 10.0),
            "n_estimators": 1000,
            "verbose": -1,
            "feature_pre_filter": False,  # Explicitly disabling feature pre-filtering
        }
        
        gbm = lgb.train(
            param,
            lgb_train,
            valid_sets=lgb_valid,
            callbacks=[lgb.log_evaluation(50), lgb.early_stopping(10)]
        )
        
        preds = gbm.predict(X_valid)
        auc = roc_auc_score(y_valid, preds)
        
        # Log to MLflow
        mlflow.log_params(param)
        mlflow.log_metric("auc", auc)
    return auc


In [17]:
def gini_stability(base, w_fallingrate=88.0, w_resstd=-0.5):
    gini_in_time = base.loc[:, ["WEEK_NUM", "target", "score"]]\
        .sort_values("WEEK_NUM")\
        .groupby("WEEK_NUM")[["target", "score"]]\
        .apply(lambda x: 2*roc_auc_score(x["target"], x["score"])-1).tolist()
    
    x = np.arange(len(gini_in_time))
    y = gini_in_time
    a, b = np.polyfit(x, y, 1)
    y_hat = a*x + b
    residuals = y - y_hat
    res_std = np.std(residuals)
    avg_gini = np.mean(gini_in_time)
    return avg_gini + w_fallingrate * min(0, a) + w_resstd * res_std

## MLFLOW TRAIN AND LOG

In [21]:

# Initiate the parent run and call the hyperparameter tuning child run logic
with mlflow.start_run(experiment_id=experiment_id, run_name=run_name, nested=True):
    # Initialize the Optuna study
    study = optuna.create_study(direction="maximize")
    study.optimize(objective, n_trials=10, callbacks=[champion_callback])  
        
    # Log the best hyperparameters to MLflow
    mlflow.log_params(study.best_params)
    mlflow.log_metric("AUC", study.best_value)
        
    # Log tags
    mlflow.set_tags(
        tags={
            "project": "Kaggle Home Credit Default Risk",
            "optimizer_engine": "optuna",
            "model_family": "lgbm",
            "feature_set_version": 2,
        }
    )

    # Fit model instance
    model = lgb.train(study.best_params, lgb_train)

    # Add the preditions to the base table
    base_train["score"] = model.predict(X_train)
    
    # Calc the gini stability score
    stability_score_train = gini_stability(base_train)
    
    # Log the gini stability score to MLflow
    mlflow.log_metric("gini_stability", stability_score_train)
    
    # Log the model to MLflow    
    artifact_path = "model"

    mlflow.lightgbm.log_model(model, artifact_path)

    # Get the logged model uri so that we can load it from the artifact store
    model_uri = mlflow.get_artifact_uri(artifact_path)



Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[14]	valid_0's auc: 0.64163
Initial trial 0 achieved value: 0.6416304757297406
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[14]	valid_0's auc: 0.651476
Trial 1 achieved value: 0.6514760155598909 with  1.5113% improvement
Training until validation scores don't improve for 10 rounds
Early stopping, best iteration is:
[25]	valid_0's auc: 0.660212
Trial 2 achieved value: 0.6602118632695736 with  1.3232% improvement
Training until validation scores don't improve for 10 rounds
[50]	valid_0's auc: 0.693843
Early stopping, best iteration is:
[59]	valid_0's auc: 0.694219
Trial 3 achieved value: 0.6942190563856229 with  4.8986% improvement
Training until validation scores don't improve for 10 rounds
[50]	valid_0's auc: 0.692583
Early stopping, best iteration is:
[47]	valid_0's auc: 0.692672
Training until validation scores don't improve for 10 rounds

In [20]:
def gini_stability(base, w_fallingrate=88.0, w_resstd=-0.5):
    gini_in_time = base.loc[:, ["WEEK_NUM", "target", "score"]]\
        .sort_values("WEEK_NUM")\
        .groupby("WEEK_NUM")[["target", "score"]]\
        .apply(lambda x: 2*roc_auc_score(x["target"], x["score"])-1).tolist()
    
    x = np.arange(len(gini_in_time))
    y = gini_in_time
    a, b = np.polyfit(x, y, 1)
    y_hat = a*x + b
    residuals = y - y_hat
    res_std = np.std(residuals)
    avg_gini = np.mean(gini_in_time)
    return avg_gini + w_fallingrate * min(0, a) + w_resstd * res_std

stability_score_train = gini_stability(base_train)
stability_score_valid = gini_stability(base_valid)


print(f'The stability score on the train set is: {stability_score_train}') 
print(f'The stability score on the valid set is: {stability_score_valid}') 


KeyError: "['score'] not in index"

## Submission


In [None]:
# Load the sample submission file
sub_df = pd.read_csv(dataPath + "sample_submission.csv")
# Generate the predictions from the test files

sub_df = sub_df.set_index("case_id")
sub_df['score'] = 