# Monitoring ML Training Pipeline: Preprocessing
- Load extracted raw data
- Enforce datatypes
- Engineer new features
- Split train-test
- Train transformation models for
    - Imputing missing values
    - Converting categorical to numerical values
    - Rescaling
- Apply the transformation models on both training and test datasets

---

In [17]:
import datetime
import sys
import os
import json
import re
import pickle
import traceback
import pandas as pd
import numpy as np

from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from importlib import reload

sys.path.append(os.path.join(os.path.dirname(os.getcwd()), 'dags', 'src'))

import helpers
import config

reload(helpers)
reload(config)

engineered_vars = {
    "categorical": ["application_year", "application_month", "application_week", "application_day", "application_season"],
    "numerical": ["current_credit_balance_ratio"],
    "date": ["application_date"]
}

## Methods


In [18]:
#### helpers.py methods ####

def save_dataset(df: pd.DataFrame, path: str):
    """
    Save data set.
    :param df: DataFrame - The DataFrame to be saved.
    :param path: str - The file path to save the data.
    :return: None
    """
    df.to_csv(path, index=False)  # Save the DataFrame to a CSV file without including the index.
    print(f"[INFO] Dataset saved to {path}")  # Print a message confirming the dataset has been saved.

def load_dataset(path: str) -> pd.DataFrame:
    """
    Load data set.
    :param path: str - The file path to load the data from.
    :return: DataFrame - The loaded DataFrame.
    """
    return pd.read_csv(path)  # Load a DataFrame from the specified CSV file.

def save_model_as_pickle(model, model_name: str, directory=None):
    """
    Save a model as a pickle file.
    :param model: AnyType - The model to be saved.
    :param model_name: str - The name of the model.
    :param directory: str - The directory to save the model (optional).
    :return: None
    """
    if directory:
        filename = os.path.join(directory, model_name + ".pkl")
    else:
        filename = os.path.join(config.PATH_DIR_MODELS, model_name + ".pkl")
    with open(filename, "wb") as f:
        pickle.dump(model, f)  # Serialize and save the model as a pickle file.
    print("[INFO] Model saved as pickle file:", filename)  # Print a message confirming the model has been saved.

def load_model_from_pickle(model_name: str):
    """
    Load a pickle model.
    :param model_name: str - The name of the model to load.
    :return: AnyType - The loaded model.
    """
    with open(os.path.join(config.PATH_DIR_MODELS, model_name + ".pkl"), "rb") as f:
        return pickle.load(f)  # Deserialize and load a model from a pickle file.

def save_model_as_json(model: dict, model_name: str, directory: str = None):
    """
    Save a model as a JSON file.
    :param model: dict - The model to be saved as a JSON file.
    :param model_name: str - The name of the model.
    :param directory: str - The directory to save the model (optional).
    :return: None
    """
    if directory:
        filename = os.path.join(directory, model_name + ".json")
    else:
        filename = os.path.join(config.PATH_DIR_MODELS, model_name + ".json")
    with open(filename, "w") as f:
        json.dump(model, f)  # Serialize and save the model as a JSON file.
    print("[INFO] Model saved as JSON file:", filename)  # Print a message confirming the model has been saved.

def load_model_from_json(model_name: str) -> dict:
    """
    Load a JSON model.
    :param model_name: str - The name of the model to load.
    :return: dict - The loaded model as a dictionary.
    """
    with open(os.path.join(config.PATH_DIR_MODELS, model_name + ".json"), "r") as f:
        return json.load(f)  # Deserialize and load a model from a JSON file as a dictionary.


In [19]:
###### preprocess.py methods #####

###### missing values ######

def get_variables_with_missing_values(df: pd.DataFrame) -> pd.DataFrame:
    """
    Get variables with missing values.
    :param df: DataFrame
    :return: DataFrame
    """
    missing_counts = df.isnull().sum()
    return missing_counts[missing_counts > 0].index.tolist()

def impute_missing_values(df: pd.DataFrame, method: str = "basic", mode: str = None,
                          cat_vars: list = config.CAT_VARS, num_vars: list = config.NUM_VARS, job_id: str = "") -> pd.DataFrame:
    """
    Treat missing values.
    
    :param df: DataFrame - The DataFrame with missing values.
    :param method: str, "basic" or "advanced" - The imputation method to use.
    :param mode: str, "training" or "inference" - The mode of operation (training or inference).
    :return: DataFrame - The DataFrame with missing values imputed.
    """
    assert mode in ("training", "inference"), f"mode must be either 'training' or 'inference, but got {mode}"
    assert method in ["basic", "advanced"], f"{method} is not a valid method (basic, advanced)"
    
    if mode == "training":
        model = {
            "method": method,
            "imputes": dict()
        }
        for col in df.columns:
            print("[INFO] Treating missing values in column:", col)
            model["imputes"][col] = dict()
            if method == "basic":
                if col in set(cat_vars + engineered_vars["categorical"]):
                    model["imputes"][col]['mode'] = df[df[col].notnull()][col].mode()[0]
                elif col in set(num_vars + engineered_vars["numerical"]):
                    model["imputes"][col]['mean'] = df[df[col].notnull()][col].mean()
                elif col in set(config.DATETIME_VARS + engineered_vars["date"]):
                    model["imputes"][col]['mode'] = df[df[col].notnull()][col].mode()[0]
                elif col in ["loan_id", "customer_id", "loan_status"] + config.EXC_VARIABLES:
                    pass
                else:
                    raise ValueError(f"[ERROR]{col} is not a valid variable")
        helpers.save_model_as_pickle(model, f"{job_id}_missing_values_model")
        return impute_missing_values(df, method=method, mode="inference", cat_vars=cat_vars, num_vars=num_vars, job_id=job_id)
    else:
        model = helpers.load_model_from_pickle(model_name=f"{job_id}_missing_values_model")
        cols = get_variables_with_missing_values(df)
        method = model["method"]
        if method == "basic":
            for col in cols:
                if col in set(cat_vars + engineered_vars["categorical"]):
                    df[col].fillna(model["imputes"][col]['mode'], inplace=True)
                elif col in set(num_vars + engineered_vars["numerical"]):
                    df[col].fillna(model["imputes"][col]['mean'], inplace=True)
                elif col in set(config.DATETIME_VARS + engineered_vars["date"]):
                    df[col].fillna(model["imputes"][col]['mode'], inplace=True)
                elif col in ["loan_id", "customer_id", "loan_status"] + config.EXC_VARIABLES:
                    pass
                else:
                    raise ValueError(f"[ERROR]{col} is not a valid variable. Pre-trained variables: {list(model['imputes'].keys())}")
        if method == "advanced":
            raise NotImplementedError
    return df


In [20]:
###### enforcing datatypes ######

def enforce_datatypes_on_variables(df: pd.DataFrame, cat_vars: list = [], num_vars: list = []) -> pd.DataFrame:
    """
    Transform variables.
    :param df: DataFrame - The DataFrame to transform.
    :param cat_vars: list - List of categorical variables.
    :param num_vars: list - List of numerical variables.
    :return: DataFrame - The transformed DataFrame.
    """
    
    df["application_time"] = pd.to_datetime(df["application_time"])  # Convert the "application_time" column to datetime.
    for var in num_vars:
        df[var] = df[var].apply(lambda x: enforce_numeric_to_float(x))  # Apply the helper function to enforce numeric to float for numerical variables.
    for var in cat_vars:
        df[var] = df[var].astype(str)  # Convert categorical variables to strings.
    return df

def enforce_numeric_to_float(x: str) -> float:
    """
    Convert numeric to float. To ensure that all stringified numbers are converted to float.
    :param x: str - The string to convert to float.
    :return: float - The converted float value.
    """
    try:
        return float(re.sub("[^0-9.]", "", str(x)))  # Convert the string to a float, removing non-numeric characters.
    except ValueError:
        return np.nan  # Return NaN if conversion is not possible.


In [21]:
###### encoding categorical variables ######

# Function to categorize years in current job
def categorize_years_in_current_job(x: str) -> int:
    """
    Categorize years in current job.
    :param x: str
    :return: int
    """
    
# Function to convert term to integer
def term_to_int(x: str) -> int:
    """
    Convert term to int.
    :param x: str, lower cased term
    :return: int
    """
    
# Function to convert home ownership to integer
def home_ownership_to_int(x: str) -> int:
    """
    Convert home ownership to int.
    :param x: str, lower cased home ownership
    :return: int
    """
    
# Function to train a model for converting purpose to int
def train_purpose_to_int_model(x: pd.Series, method: str, job_id: str = "") -> dict:
    """
    Build a model file to be used to convert string variable `purpose` into integer datatype.
    :param x: pd.Series
    :param method: str, "ranking", "weighted ranking", "relative ranking"
    :param job_id: str, job id
    :return: dict
    """
    
# Function to convert purpose to int
def purpose_to_int(x: pd.Series, mode: str, method: str = None, model: str = None, job_id: str = "") -> pd.Series:
    """
    Convert purpose to int.
    :param x: pd.Series
    :param mode: str, choose from "training", "inference"
    :param method: str, "ranking", "weighted ranking", "relative ranking"
    :param model: method, model to predict the purpose
    :param job_id: str, job id
    :return: pd.Series
    """
    
# Function to convert loan status to int
def loan_status_to_int(x: str) -> int:
    """
    Convert loan status to int.
    :param x: str, lower cased loan status
    :return: int
    """
    
# Function to encode categorical variables in a DataFrame
def encode_categorical_variables(df: pd.DataFrame, mode="training", purpose_encode_method="ranking", job_id: str = "") -> pd.DataFrame:
    """
    Encode categorical variables.
    :param df: DataFrame
    :param mode: str, "training" or "inference"
    :param purpose_encode_method: str, choose from "ranking", "weighted ranking", "relative ranking"
    :param job_id: str, job id
    :return: DataFrame
    """


In [22]:
###### engineer new variables ######

def engineer_variables(df: pd.DataFrame) -> pd.DataFrame:
    """
    Engineer variables.
    :param df: DataFrame
    :return: DataFrame
    """
    for col in ["application_time"]:
        assert col in df.columns, f"{col} not in {df.columns}"

    df["application_date"] = df["application_time"].dt.date
    df["application_year"] = df["application_time"].dt.year
    df["application_month"] = df["application_time"].dt.month
    df["application_week"] = df["application_time"].dt.week
    df["application_day"] = df["application_time"].dt.day
    df["application_season"] = df["application_month"].apply(lambda x: month_to_season(x))
    df["current_credit_balance_ratio"] = (df["current_credit_balance"] / df["current_loan_amount"]).fillna(0.0)
    return df

def month_to_season(month: int) -> int:
    """
    Convert date to season.
    :param month: int, month between 1 and 12
    :return: int
    """
    if month in [1, 2, 3]:
        return 1
    elif month in [4, 5, 6]:
        return 2
    elif month in [7, 8, 9]:
        return 3
    elif month in [10, 11, 12]:
        return 4
    else:
        return np.nan



In [23]:
#### data transformation ####
def rescale_data(df: pd.DataFrame, method: str = 'standardize', mode: str = 'training', columns: list = [], job_id: str = "") -> pd.DataFrame:
    """
    Rescale data.
    :param df: DataFrame
    :param method: str, 'standardize' or 'minmax'
    :param mode: str, 'training' or 'inference'
    :param columns: list of columns to rescale
    :param job_id: a job identifier
    :return: DataFrame
    """
    # Check if the rescaling method is valid (either 'standardize' or 'minmax')
    assert method in ('standardize', 'minmax'), f"{method} is not a valid method (standardize, minmax)"
    
    # Check if the mode is valid (either 'training' or 'inference')
    assert mode in ('training', 'inference'), f"{mode} is not a valid mode (training, inference)"
    
    # Check if specified columns exist in the DataFrame
    for col in columns:
        assert col in df.columns

    if mode == 'training':
        if method == 'standardize':
            # Create a StandardScaler object and fit it to the training data
            scaler = StandardScaler()
            scaler.fit(df[columns])
        if method == 'minmax':
            # Create a MinMaxScaler object and fit it to the training data
            scaler = MinMaxScaler()
            scaler.fit(df[columns])
        
        # Create a model dictionary with the scaler and method
        model = {
            'scaler': scaler,
            'method': method,
        }
        
        # Save the model using a custom function (not shown in the provided code)
        helpers.save_model_as_pickle(model, f"{config.PATH_DIR_MODELS}/{job_id}_numerical_scaler.pkl")
        
        # Transform the specified columns in the DataFrame and add them with new names
        df[list(map(lambda x: f"{method}_{x}", columns))] = scaler.transform(df[columns])
        return df

    if mode == 'inference':
        # Load the model using a custom function (not shown in the provided code)
        model = helpers.load_model_from_pickle(model_name=f"{job_id}_numerical_scaler.pkl")
        scaler = model['scaler']
        method = model['method']
        
        # Attempt to convert the specified columns to float (debugging purpose)
        for col in columns:
            try:
                df[col].astype(float)
            except:
                print("[DEBUG] Column skipped:", col)
        
        # Transform the specified columns in the DataFrame and add them with new names
        df[list(map(lambda x: f"{method}_{x}", columns))] = scaler.transform(df[columns])
        return df


In [24]:
###### Preprocess ######

# Function to split data into train and test based on the method provided
def split_train_test(df: pd.DataFrame, test_size: float, method: str = 'time based'):
    """
    Split data into train and test.
    :param df: DataFrame
    :param test_size: float, between 0 and 0.99
    :param method: str, 'time based' or 'random'
    :return: (DataFrame, DataFrame)
    """
    if method == 'random':
        # Randomly shuffle and split the DataFrame into train and test
        return df.sample(frac=1, random_state=config.RANDOM_STATE).iloc[:int(len(df) * test_size)], df.sample(frac=1, random_state=config.RANDOM_STATE).iloc[int(len(df) * test_size):]
    if method == 'time based':
        # Split based on time order of dates in the 'application_date' column
        unique_dates = sorted(df["application_date"].unique())
        train_dates = unique_dates[:int(len(unique_dates) * (1 - test_size))]
        test_dates = unique_dates[unique_dates.index(train_dates[-1]) + 1:]
        train_df = df[df["application_date"].isin(train_dates)]
        test_df = df[df["application_date"].isin(test_dates)]
        return train_df, test_df
    raise ValueError(f"{method} is not a valid method (time based, random)")

# Function to preprocess data
def preprocess_data(df: pd.DataFrame, mode: str, job_id: str = None, rescale: bool = False, ref_job_id: str = None) -> pd.DataFrame:
    """
    Pre-process data and save preprocessed datasets for later use.
    :param df: DataFrame
    :param mode: str, 'training' or 'inference'
    :param job_id: str, job_id for the preprocessed dataset
    :param rescale: bool, whether to rescale data.
    :param ref_job_id: str, job_id of the last deployed model. Useful when doing inference.
    :return: DataFrame
    """
    assert mode in ('training', 'inference')
    
    if mode == 'training':
        assert config.TARGET in df.columns, f"{config.TARGET} not in {df.columns}"
    
    # Convert column names to lowercase
    df.columns = list(map(str.lower, df.columns))
    initial_size = df.shape[0]
    
    # Remove rows with null values in specific columns
    df = df[df["customer_id"].notnull() & df["loan_id"].notnull() & df["loan_status"].notnull()]
    
    if mode == 'training':
        df["loan_status"] = df["loan_status"].str.lower()
    
    if df.shape[0] != initial_size:
        print(f"[WARNING] Dropped {initial_size - df.shape[0]} rows with null values in (customer_id, loan_id, loan_status)")
    
    # Enforce data types on variables (not shown in the code)
    df = enforce_datatypes_on_variables(df, cat_vars=config.CAT_VARS, num_vars=config.NUM_VARS)
    
    # Engineer variables (not shown in the code)
    df = engineer_variables(df)
    
    if mode == 'training':
        # Split train and test data before encoding categorical variables and imputing missing values
        train_df, test_df = split_train_test(df, config.TEST_SPLIT_SIZE, method=config.SPLIT_METHOD)
        train_df = encode_categorical_variables(train_df, mode="training", purpose_encode_method=config.PURPOSE_ENCODING_METHOD, job_id=job_id)
        train_df = impute_missing_values(train_df, method="basic", mode="training", job_id=job_id)
        
        if rescale:
            # Rescale data if necessary
            train_df = rescale_data(train_df, method=config.RESCALE_METHOD, mode="training", columns=num_vars + engineered_vars["numerical"])
        
        # Save the preprocessed training dataset
        helpers.save_dataset(train_df, os.path.join(config.PATH_DIR_DATA, "preprocessed", f"{job_id}_training.csv"))
        
        # Recursively preprocess the test data for inference
        preprocess_data(test_df, mode="inference", job_id=job_id, ref_job_id=job_id)
    else:
        # If the mode is inference, no need to split train and test data
        test_df = encode_categorical_variables(df, mode="inference", purpose_encode_method=config.PURPOSE_ENCODING_METHOD, job_id=ref_job_id)
        test_df = impute_missing_values(test_df, method="basic", mode="inference", job_id=ref_job_id)
        
        if rescale:
            # Rescale data if necessary
            test_df = rescale_data(test_df, method=config.RESCALE_METHOD, mode="inference", columns=num_vars + engineered_vars["numerical"])
        
        # Save the preprocessed inference dataset
        helpers.save_dataset(test_df, os.path.join(config.PATH_DIR_DATA, "preprocessed", f"{job_id}_inference.csv"))


In [34]:
# Change the filename and job ID accordingly
filename = "../dags/data/raw/12196ecaa65e4831987aee4bfced5f60_2015-01-01_2015-05-31.csv"
job_id = "12196ecaa65e4831987aee4bfced5f60"

# Load the dataset from the new filename
df = helpers.load_dataset(os.path.join(filename))

# Preprocess the data using the new job ID and any other desired settings
_ = preprocess_data(df=df, mode="training", job_id=job_id, rescale=False, ref_job_id=None)


  df["application_week"] = df["application_time"].dt.week
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df[col] = df[col].str.lower()
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["term"] = df["term"].apply(lambda x: term_to_int(x))
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["home_ownership"] = df["home_ownership"].apply(lambda x: h

[INFO] Converting purpose to int using method: weighted ranking
[INFO] No model for purpose-to-int conversion provided. Training a new model first...
[INFO] Model saved as json file: ../dags/models\aa4c3eaadb02409281b589829e3c9370_purpose_to_int_model.json
[INFO] Treating missing values in column: loan_id
[INFO] Treating missing values in column: customer_id
[INFO] Treating missing values in column: loan_status
[INFO] Treating missing values in column: application_time
[INFO] Treating missing values in column: current_loan_amount
[INFO] Treating missing values in column: term
[INFO] Treating missing values in column: tax_liens
[INFO] Treating missing values in column: purpose
[INFO] Treating missing values in column: no_of_properties
[INFO] Treating missing values in column: home_ownership
[INFO] Treating missing values in column: annual_income
[INFO] Treating missing values in column: years_in_current_job
[INFO] Treating missing values in column: months_since_last_delinquent
[INFO] Tr

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return self._update_inplace(result)


[INFO] Dataset saved to ../dags/data\preprocessed\aa4c3eaadb02409281b589829e3c9370_training.csv


  df["application_week"] = df["application_time"].dt.week


[INFO] Converting purpose to int using method: weighted ranking
[INFO] No model for purpose-to-int conversion provided. Training a new model first...
[INFO] Model saved as json file: ../dags/models\aa4c3eaadb02409281b589829e3c9370_purpose_to_int_model.json
[INFO] Dataset saved to ../dags/data\preprocessed\aa4c3eaadb02409281b589829e3c9370_inference.csv


In [31]:
# Load the preprocessed training dataset using the specified job ID
tdf = pd.read_csv("../dags/data/preprocessed/12196ecaa65e4831987aee4bfced5f60_training.csv")

# Load the preprocessed inference dataset using the same job ID
vdf = pd.read_csv("../dags/data/preprocessed/12196ecaa65e4831987aee4bfced5f60_inference.csv")


In [32]:
# Select specified predictor columns and the target column from tdf, 
# display the first five rows, and transpose the resulting DataFrame
tdf[config.PREDICTORS + [config.TARGET]].head().T


Unnamed: 0,0,1,2,3,4
current_loan_amount,33231.0,15612.0,7959.0,29346.0,6011.0
term,0.0,1.0,0.0,1.0,0.0
credit_score,1350.696132,1350.696132,1350.696132,1350.696132,1350.696132
years_in_current_job,-1.0,-1.0,2.0,-1.0,-1.0
home_ownership,1.0,1.0,1.0,1.0,1.0
annual_income,71612.920399,71612.920399,71612.920399,71612.920399,71612.920399
purpose,0.786098,0.060076,0.060076,0.786098,0.003407
monthly_debt,941.224573,941.224573,949.3,941.224573,941.224573
years_of_credit_history,18.560949,18.560949,21.0,18.560949,18.560949
months_since_last_delinquent,34.752726,34.752726,34.752726,34.752726,34.752726


---