# Project notebook

The following notebook is an excerpt and re-written example from a _real_ production model.

The overall purpose of the ML algorithm is to identify users on the website that are new possible customers. This is done by collecting behaviour data from the users as input, and the target is whether they converted/turned into customers -- essentially a classification problem. 

This notebook only focuses on the data processing part. As you know, there are multiple steps in an ML pipeline, and it's not always they are neatly separated like this. For the exam project, they will not be, and that is part of the challenge for you. For production code, it should also not be Python notebooks since, as you may well see, it is difficult to work with and collaborate on them in an automated way.

There is a lot of "fluff" in such a notebook. This ranges from comments and markdown cells to commented out code and random print statements. That is not necessary in a properly managed project where you can use git to check the version history and such. 

What is important for you is the identify the entry points into the code and segment them out into easily understandable chunks. Additionally, you might want to follow some basic code standards, such as:

- Import only libraries in the beginning of the files
- Define functions in the top of the scripts, or if used multiple places, move into a util.py script or such
- Remove unused/commented out code
- Follow the [PEP 8](https://peps.python.org/pep-0008/) style guide (and others)
  
Another thing to note is that comments can be misleading. Even if the markdown cell or inline comments says it does _X_, don't be surprised if it actually does _Y_. Sometimes additional text can be a blessing, but it can also be a curse sometimes. Remember, though, that your task is to make sure the code runs as before after refactoring the notebook into other files, not update/improve the model or flow to reflect what the comments might say.

***

# DATA PROCESSING

In this section, we will perform Exploratory Data Analysis (EDA) to better understand the dataset before proceeding with more advanced analysis. EDA helps us get a sense of the data’s structure, identify patterns, and spot any potential issues like missing values or outliers. By doing so, we can gain a clearer understanding of the data's key characteristics.

We will start with summary statistics to review basic measures like mean, median, and variance, providing an initial overview of the data distribution. Then, we’ll create visualizations such as histograms, box plots, and scatter plots to explore relationships between variables, check for any skewness, and highlight outliers.

The purpose of this EDA is to ensure that the dataset is clean and well-structured for further analysis. This step also helps us identify any necessary data transformations and informs decisions on which features might be most relevant for modeling in later stages.

# Create artifact directory
We want to create a directory for storing all the artifacts in the current directory. Users can load all the artifacts later for data cleaning pipelines and inferencing.

In [None]:
# dbutils.widgets.text("Training data max date", "2024-01-31")
# dbutils.widgets.text("Training data min date", "2024-01-01")
# max_date = dbutils.widgets.get("Training data max date")
# min_date = dbutils.widgets.get("Training data min date")

# testnng
max_date = "2024-01-31"
min_date = "2024-01-01"

#C: hard-coded dates should eventually be moved into env. variables, config or otherwise.
#C: commented-out code is safe to remove.

In [None]:
import os #C: file-system utilities (path handling, directory creation etc.)
import shutil #C: advanced file operations - check actually used in code, since it's matted out.
from pprint import pprint #C: pretty-printing objects for debugging

# shutil.rmtree("./artifacts",ignore_errors=True) #C: deletes the "artifacts" folder.
os.makedirs("artifacts",exist_ok=True) #C: creates directory named "artifacts". exist_ok=True prevents errors if dir already exists.
print("Created artifacts directory") #C: print statement can be removed or replaced with logging.

# Pandas dataframe print options

In [None]:
import pandas as pd
import warnings

warnings.filterwarnings('ignore') #C: silences all warnings across every library. Not recommended in production, used to keep notebook-output clean.
pd.set_option('display.float_format',lambda x: "%.3f" % x) #C: sets global pd display format so floats are 3 decimals.

# Helper functions

* **describe_numeric_col**: Calculates various descriptive stats for a numeric column in a dataframe.
* **impute_missing_values**: Imputes the mean/median for numeric columns or the mode for other types.

In [None]:
def describe_numeric_col(x):
    """
    Parameters:
        x (pd.Series): Pandas col to describe.
    Output:
        y (pd.Series): Pandas series with descriptive stats. 
    """
    return pd.Series(
        [x.count(), x.isnull().count(), x.mean(), x.min(), x.max()],
        index=["Count", "Missing", "Mean", "Min", "Max"]
    )

#C: ^function creates a new pandas series containing
# the number of non-null values
# number of null values
# mean, min & max of the column
# and labels those values with index names
# assumes x is numeric
# might just be used for exploratory data analysis and will not be needed in pipeline possibly.

def impute_missing_values(x, method="mean"):
    """
    Parameters:
        x (pd.Series): Pandas col to describe.
        method (str): Values: "mean", "median"
    """
    if (x.dtype == "float64") | (x.dtype == "int64"): #C: checks if column is numeric
        x = x.fillna(x.mean()) if method=="mean" else x.fillna(x.median()) #C: fills missing values with column mean if method input is set to mean, else fills with median.
    else:
        x = x.fillna(x.mode()[0]) #C: if datatype is not float or int, fills the column with mode (most frequent value)
    return x #C: returns the filled-out column.

#C: ^function takes a pandas Series and replaces missing values with column mean

# Read data

We read the latest data from our data lake source. Here we load it locally after having pulled it from DVC.

In [None]:
!dvc pull #C: ! tells notebook to run a shell command, not Python code.
#C: looks at dvc-tracked data files, downloads the actual data from remote storage configured in repo,
# and ensures local wokspace has the exact dataset version the pipeline expects

In [None]:
print("Loading training data") #C: remove print statement

data = pd.read_csv("./artifacts/raw_data.csv") #C: reads data into pd dataframe

print("Total rows:", data.count()) #C: print statement, remove in future
display(data.head(5)) #C: display first five rows, remove in future


In [None]:
import pandas as pd #C: imports pandas again, not needed unless part of a separate .py file
import datetime
import json

if not max_date: #C: checks if max date is empty/None/False
    max_date = pd.to_datetime(datetime.datetime.now().date()).date() #C: sets max date to today's date and converts to datetime.date object
else:
    max_date = pd.to_datetime(max_date).date() #C: if max_date is already set, converts it to a datetime.date object

min_date = pd.to_datetime(min_date).date() #C: converts hard-coded min date from earlier to datetime.date object

# Time limit data
data["date_part"] = pd.to_datetime(data["date_part"]).dt.date #C: converts the column "date_part" in the data to datetime and then extracts the date
data = data[(data["date_part"] >= min_date) & (data["date_part"] <= max_date)] #C: Filters rows so that date_part is between min_date and max_date, inclusive.

min_date = data["date_part"].min() #C: finds actual min date in filtered data
max_date = data["date_part"].max() #C: same as above but max date
date_limits = {"min_date": str(min_date), "max_date": str(max_date)} #C: creates dict with min_date and max_date as strings
with open("./artifacts/date_limits.json", "w") as f:
    json.dump(date_limits, f) #C: saves the dictionary as a .json file so downstream stages knows the data boundaries

# Feature selection

Not all columns are relevant for modelling

In [None]:
data = data.drop(
    [
        "is_active", "marketing_consent", "first_booking", "existing_customer", "last_seen"
    ],
    axis=1 #C: removes columns, not rows
)
#C: removes columns from dataframe that are irrelevant to the model or to preprocessing.

In [None]:
#Removing columns that will be added back after the EDA - C: correct.
data = data.drop(
    ["domain", "country", "visited_learn_more_before_booking", "visited_faq"],
    axis=1
)

# Data cleaning
* Remove rows with empty target variable
* Remove rows with other invalid column data

In [None]:
import numpy as np

data["lead_indicator"].replace("", np.nan, inplace=True)
data["lead_id"].replace("", np.nan, inplace=True)
data["customer_code"].replace("", np.nan, inplace=True)
#C: ^ replaces empty strings in three columns with NaN. inplace=True modifies the dataframe directly.
# this ensures missing values are correctly marked as NaN, which is required for .dropna etc.

data = data.dropna(axis=0, subset=["lead_indicator"])
data = data.dropna(axis=0, subset=["lead_id"])
#C: ^ removes rows where lead_indicator or lead_id is NaN.

data = data[data.source == "signup"] #C: filters dataset to only include rows where source columns equals "signup"
result=data.lead_indicator.value_counts(normalize = True) #C: counts unique values in the lead_indicator column. normalize=True gives fractions instead of counts.

print("Target value counter")
for val, n in zip(result.index, result):
    print(val, ": ", n)
data
#C: ^ print statement and display of data, remove in future or replace with logging.

# Create categorical data columns

In [None]:
vars = [
    "lead_id", "lead_indicator", "customer_group", "onboarding", "source", "customer_code"
] #C: define a lit of column names that will be converted. Categorical/identifier columns

for col in vars:
    data[col] = data[col].astype("object") #C: converts datatype of each column to object.
    print(f"Changed {col} to object type") #C: print statement, remove later or replace with logging.

# Separate categorical and continuous columns

In [None]:
cont_vars = data.loc[:, ((data.dtypes=="float64")|(data.dtypes=="int64"))] #C: select continuous (numeric) columns and save to variable cont_vars
cat_vars = data.loc[:, (data.dtypes=="object")] #C: same as above but for categorical columns, saved to cat_vars

print("\nContinuous columns: \n")
pprint(list(cont_vars.columns), indent=4) #C: converts cont. column names to a list, uses pprint for nice formatting
print("\n Categorical columns: \n")
pprint(list(cat_vars.columns), indent=4) #C: converts cat. column names to a list, uses pprint for nice formatting

# Outliers

Outliers are data points that significantly differ from the majority of observations in a dataset and can distort statistical analysis or model performance. To identify and remove outliers, one common method is to use the Z-score, which measures how many standard deviations a data point is from the mean. Data points with a Z-score greater than 2 (or sometimes 3) standard deviations away from the mean are typically considered outliers. By applying this threshold, we can filter out values that fall outside the normal range of the data, ensuring that the remaining dataset is more representative and less influenced by extreme values.

In [None]:
cont_vars = cont_vars.apply(lambda x: x.clip(lower = (x.mean()-2*x.std()),
                                             upper = (x.mean()+2*x.std()))) #C: restricts values to a given range (pm 2 std. deviations of mean)
outlier_summary = cont_vars.apply(describe_numeric_col).T #C: summarizes each continuous column after removing outliers
outlier_summary.to_csv('./artifacts/outlier_summary.csv') #C: writes the outlier summary to a csv in artifacts folder
outlier_summary #C: displays summary, remove later

# Impute data

In real-world datasets, missing data is a common occurrence due to various factors such as human error, incomplete data collection processes, or system failures. These gaps in the data can hinder analysis and lead to biased results if not properly addressed. Since many analytical and machine learning algorithms require complete data, handling missing values is an essential step in the data preprocessing phase.

In the next code block, we will handle missing data by performing imputation. For numerical columns, we will replace missing values with the mean or median of the entire column, which provides a reasonable estimate based on the existing data. For categorical columns (object type), we will use the mode, or most frequent value, to fill in missing entries. This approach helps us maintain a complete dataset while ensuring that the imputed values align with the general distribution of each column.

In [None]:
cat_missing_impute = cat_vars.mode(numeric_only=False, dropna=True) #C: prepares a dictionary of replacement values for imputing missing categorical data later from the modes, ignoring missing data when computing these.
cat_missing_impute.to_csv("./artifacts/cat_missing_impute.csv") #C: saves modes to a csv file in artifacts folder
cat_missing_impute #C: displays modes, remove later

In [None]:
# Continuous variables missing values
cont_vars = cont_vars.apply(impute_missing_values) #C: fills missing values in numeric columns with mean or median (if specified) using helper function impute_missing_values()
cont_vars.apply(describe_numeric_col).T #C: produces descriptive stats after imputation.

In [None]:
cat_vars.loc[cat_vars['customer_code'].isna(),'customer_code'] = 'None' #C: selects rows in cat_vars where customer_code is NaN and replaces them with string "None". Ensures no missing values before categorical imputation (so not imputed)
cat_vars = cat_vars.apply(impute_missing_values) #C: applies imputation function to cat columns. Replaces NaN with the mode
cat_vars.apply(lambda x: pd.Series([x.count(), x.isnull().sum()], index = ['Count', 'Missing'])).T #C: creates table showing # of non-missing and missing values in each column. Verifies that missing values have been imputed.
cat_vars # C: display, remove later.

# Data standardisation

Standardization, or scaling, becomes necessary when continuous independent variables are measured on different scales, as this can lead to unequal contributions to the analysis. The objective is to rescale these variables so they have comparable ranges and/or variances, ensuring a more balanced influence in the model.

In [None]:
from sklearn.preprocessing import MinMaxScaler #C: scales numeric features to a 0-1 range
import joblib #C: used to serialize Python objects

scaler_path = "./artifacts/scaler.pkl" #C: setting filepath for storage of scaler

scaler = MinMaxScaler() #C: initializes a new MinMax scaler
scaler.fit(cont_vars) #C: fits the scaler on continuous/numeric columns

joblib.dump(value=scaler, filename=scaler_path) #C: saves the scaler to artifacts
print("Saved scaler in artifacts") #C: print statement, remove or replace with logging.

cont_vars = pd.DataFrame(scaler.transform(cont_vars), columns=cont_vars.columns) #C: applies MinMax scaling to the numeric columns and converts the result into a df with original col. names
cont_vars #C: display, remove.

# Combine data

In [None]:
cont_vars = cont_vars.reset_index(drop=True) #C: resets continuous variables' row index after cleaning. drop=True ensures old index is not added as column.
cat_vars = cat_vars.reset_index(drop=True) #C: same as above for categorical variables.
data = pd.concat([cat_vars, cont_vars], axis=1) #C: concatenates the categorical and continuous dataframes column-wise.
print(f"Data cleansed and combined.\nRows: {len(data)}") #C: print-statement, remove or replace with logging.
data #C: display, remove.

# Data drift artifact

In [None]:
import json #C: imports json again

data_columns = list(data.columns) #C: creates a list from all columns in the dataframe.
with open('./artifacts/columns_drift.json','w+') as f:           
    json.dump(data_columns,f) #C: writes the list of column names to .json - to monitor column drift in production, to detect changes or missing features.
    
data.to_csv('./artifacts/training_data.csv', index=False) #C: writes the fully processed dataframe to the artifacts folder. index=False ensures no column containing indices. This is the final, training-ready dataset.

# Binning object columns

In [None]:
data.columns #C: display, remove.

In [None]:
data['bin_source'] = data['source'] #C: creates a new column, "bin_source" - initially a copy of "source" column.
values_list = ['li', 'organic','signup','fb'] #C: specifies which original "source" values are explicitly handled
data.loc[~data['source'].isin(values_list),'bin_source'] = 'Others' #C: for rows where "source" is not in values_list, sets "bin_source" to "Others"
mapping = {'li' : 'socials', 
           'fb' : 'socials', 
           'organic': 'group1', 
           'signup': 'group1'
           } #C: maps specific source values to broader categories for modeling (either socials or group1)

data['bin_source'] = data['source'].map(mapping) #replaces original "bin_source" values according to mapping.

# Save gold medallion dataset

In [None]:
#spark.sql(f"drop table if exists train_gold") - C: safe to remove.


In [None]:
# data_gold = spark.createDataFrame(data)
# data_gold.write.saveAsTable('train_gold')
# dbutils.notebook.exit(('training_golden_data',most_recent_date))

#C: all commented-out lines safe to remove.

data.to_csv('./artifacts/train_data_gold.csv', index=False) #C: saves the fully processed dataframe to a csv file in artifacts folder

# MODEL TRAINING

Training the model uses a training dataset for training an ML algorithm. It has sample output data and the matching input data that affects the output.

In [None]:
import datetime

# Constants used:
current_date = datetime.datetime.now().strftime("%Y_%B_%d") #C: gets current date and time and formats it as yyyy_monthname_dd
data_gold_path = "./artifacts/train_data_gold.csv" #C: points to the csv file saved earlier, path for training data
data_version = "00000" #C: placeholder for data version ID
experiment_name = current_date #C: uses the current date as the experiment name

# Create paths

Maybe the artifacts path has not been created during data cleaning

In [None]:
import os
import shutil

os.makedirs("artifacts", exist_ok=True) #C: creates artifacts folder if one does not already exist
os.makedirs("mlruns", exist_ok=True) #C: creates a folder named "mlruns"
os.makedirs("mlruns/.trash", exist_ok=True) #C: creates a hidden folder, ".trash" inside "mlruns"

In [None]:
import mlflow #C: a library for experiment tracking, model versioning and logging metrics/artifactss

mlflow.set_experiment(experiment_name) #C: tells mlflow to use the current experiment. Is created if it does not already exist.

# Helper functions

* *create_dummies*: Create one-hot encoding columns in the data.

In [None]:
def create_dummy_cols(df, col): #C: defines function that takes a dataframe and a list of column to be one-hot encoded
    df_dummies = pd.get_dummies(df[col], prefix=col, drop_first=True) #C: converts categorical values into separate 0/1 columns (one-hot encoding)
    new_df = pd.concat([df, df_dummies], axis=1) #C: concatenates new dummy columns with original dataframe column-wise
    new_df = new_df.drop(col, axis=1) #C: drops the original column since its information is now captured by dummy columns.
    return new_df #C: returns new dataframe with dummy variables included.

# Load training data
We use the training data we cleaned earlier

In [None]:
data = pd.read_csv(data_gold_path) #C: reads the ready training-data into a pandas dataframe "data"
print(f"Training data length: {len(data)}") #C: prints the number of observations in dataframe. Can be removed or replaced with logging.
data.head(5) #C: display, remove.

# Data type split

In [None]:
data = data.drop(["lead_id", "customer_code", "date_part"], axis=1) #C: removes columns not used for modeling

cat_cols = ["customer_group", "onboarding", "bin_source", "source"] #C: lists columns that are categorical features for modeling
cat_vars = data[cat_cols] #C: extracts categorical features from the dataframe.

other_vars = data.drop(cat_cols, axis=1) #C: saves non-categorical features in variable "other_vars"

# Dummy variable for categorical vars

1. Create one-hot encoded cols for cat vars
2. Change to floats

In [None]:
import pandas as pd

for col in cat_vars:
    cat_vars[col] = cat_vars[col].astype("category")
    cat_vars = create_dummy_cols(cat_vars, col)
#C: ^ loops over each categorical column and converts it to category datatype. One-hot encodes the column.

data = pd.concat([other_vars, cat_vars], axis=1) #C: combines other_vars and one-hot encoded categorical features.

for col in data:
    data[col] = data[col].astype("float64")
    print(f"Changed column {col} to float")
#C: ^ converts every column in data to datatype float. Categorical values are one-hot encoded, so all features are now numerical.

# Splitting data

In [None]:
y = data["lead_indicator"] #C: saves the label column to variable y
X = data.drop(["lead_indicator"], axis=1) #C: saves the feature columns with no label column to variable X

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    X, y, random_state=42, test_size=0.15, stratify=y
) #C: splits the training data into a training and a validation/evaluation test set.
y_train #C: display, remove.

# Model training

This stage involves training the ML algorithm by providing it with datasets, where the learning process takes place. Consistent training can significantly enhance the model's prediction accuracy. It's essential to initialize the model's weights randomly so the algorithm can effectively learn to adjust them.

# XGBoost

In [None]:
from xgboost import XGBRFClassifier #C: the XGBoost random forest classifier
from sklearn.model_selection import RandomizedSearchCV #C: tool for random hyperparameter search
from scipy.stats import uniform #C: uniform distribution for sampling float params
from scipy.stats import randint #C: integer distribution for sampling int params

model = XGBRFClassifier(random_state=42) #C: creates the XGBoost RF model/classifier with fixed randomness
params = {
    "learning_rate": uniform(1e-2, 3e-1), #C: samples learing rate between 0.01 and 0.31
    "min_split_loss": uniform(0, 10), #C: samles min loss reduction needed for a slpit
    "max_depth": randint(3, 10), #C: samples max tree depth bewteen 3 and 9
    "subsample": uniform(0, 1), #C: samples row subsampling rate between 0 and 1
    "objective": ["reg:squarederror", "binary:logistic", "reg:logistic"], #C: tries different learning objectives
    "eval_metric": ["aucpr", "error"] #C: tries different eval metrics for validation
}

model_grid = RandomizedSearchCV(model, param_distributions=params, n_jobs=-1, verbose=3, n_iter=10, cv=10) #C: sets up random hyperparameter search for the model

model_grid.fit(X_train, y_train) #C: fit the model grid to training data and find best parameters

# Model test accuracy

In [None]:
from sklearn.metrics import accuracy_score #C: function to calculate accuracy

best_model_xgboost_params = model_grid.best_params_ #C: gets the best hyperparameters found
print("Best xgboost params") 
pprint(best_model_xgboost_params) #C: prints the best hyperparameters, remove or use logging

y_pred_train = model_grid.predict(X_train) #C: predicts labels for training data
y_pred_test = model_grid.predict(X_test) #C: predicts labels for test data
print("Accuracy train", accuracy_score(y_pred_train, y_train )) #C: prints training accuracy, remove or use logging
print("Accuracy test", accuracy_score(y_pred_test, y_test)) #C: prints test accuracy, remove or use logging


# XGBoost performance overview
* Confusion matrix
* Classification report

In [None]:
from sklearn.metrics import confusion_matrix
from sklearn.metrics import classification_report

#C: Test set evaluation 
conf_matrix = confusion_matrix(y_test, y_pred_test) #C: compute confusion matrix for test data
print("Test actual/predicted\n")
print(pd.crosstab(y_test, y_pred_test, rownames=['Actual'], colnames=['Predicted'], margins=True),'\n') #C: prints confusion matrix as table
print("Classification report\n")
print(classification_report(y_test, y_pred_test),'\n') #C: prints classification report for test data
#C: ^ above print statements should be remove or logged

#C: Train set evaluation
conf_matrix = confusion_matrix(y_train, y_pred_train)
print("Train actual/predicted\n")
print(pd.crosstab(y_train, y_pred_train, rownames=['Actual'], colnames=['Predicted'], margins=True),'\n')
print("Classification report\n")
print(classification_report(y_train, y_pred_train),'\n')
#C: same as for the test set evaluation

# Save best XGBoost model

In [None]:
xgboost_model = model_grid.best_estimator_ #C: get the best trained XGBoost model
xgboost_model_path = "./artifacts/lead_model_xgboost.json" #C: file path to save the model in artifacts folder
xgboost_model.save_model(xgboost_model_path) #C: save the model to a JSON file in artifacts folder
#C: not saved to MLflow (?)

model_results = {
    xgboost_model_path: classification_report(y_train, y_pred_train, output_dict=True) 
}
#C: store train classification metrics for the saved model

# SKLearn logistic regression

In [None]:
import mlflow.pyfunc #C: MLflow PythonModel interface
from sklearn.linear_model import LogisticRegression
import os #C: file-system utilities (path handling, directory creation etc.) - check actually used in code, since it's matted out
from sklearn.metrics import cohen_kappa_score, f1_score #C: evaluation metrics - check if cohen_kappa_score is used in the code
import matplotlib.pyplot as plt #C: plotting library
import joblib #C: used to serialize Python objects

class lr_wrapper(mlflow.pyfunc.PythonModel): #C: custom wrapper for MLflow to log predict_proba outputs
    def __init__(self, model):
        self.model = model #C: wrapper stores model internally for use in prediction.
    
    def predict(self, context, model_input):
        return self.model.predict_proba(model_input)[:, 1] #C: probability for each class, extracting only probability of "class 1"


mlflow.sklearn.autolog(log_input_examples=True, log_models=False) #C: automatically log parameters, metrics, and input examples (log_input_examples=True)
experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id #C: get experiment ID

with mlflow.start_run(experiment_id=experiment_id) as run: #C: start MLflow run
    model = LogisticRegression() #C: create logistic regression (LR) model
    lr_model_path = "./artifacts/lead_model_lr.pkl" #C: path to save LR model in artifacts folder

    #C: define hyperparameter search space
    params = {
              'solver': ["newton-cg", "lbfgs", "liblinear", "sag", "saga"], #C: optimization algorithm
              'penalty':  ["none", "l1", "l2", "elasticnet"], #C: regularization type
              'C' : [100, 10, 1.0, 0.1, 0.01] #C: regularization strength (inverse)
    }

    #C: random search for best hyperparameters
    model_grid = RandomizedSearchCV(model, param_distributions= params, verbose=3, n_iter=10, cv=3)
    model_grid.fit(X_train, y_train) #C: fit model grid on training data

    best_model = model_grid.best_estimator_ #C: get best LR model

    y_pred_train = model_grid.predict(X_train) #C: predict training labels
    y_pred_test = model_grid.predict(X_test) #C: predict test labels


    # log artifacts - C: to MLflow
    mlflow.log_metric('f1_score', f1_score(y_test, y_pred_test))
    mlflow.log_artifacts("artifacts", artifact_path="model")
    mlflow.log_param("data_version", "00000")
    
    # store model for model interpretability
    joblib.dump(value=model, filename=lr_model_path)
        
    # Custom python model for predicting probability - C: logging to MLflow
    mlflow.pyfunc.log_model('model', python_model=lr_wrapper(model))

#C: classification report for test data
model_classification_report = classification_report(y_test, y_pred_test, output_dict=True)

best_model_lr_params = model_grid.best_params_ #C: get best LR hyperparameters

print("Best lr params")
pprint(best_model_lr_params)
#C: ^ remove or log

print("Accuracy train:", accuracy_score(y_pred_train, y_train ))
print("Accuracy test:", accuracy_score(y_pred_test, y_test))
#C: ^ remove print or log

#C: confusion matrix and report for test data
conf_matrix = confusion_matrix(y_test, y_pred_test)
print("Test actual/predicted\n")
print(pd.crosstab(y_test, y_pred_test, rownames=['Actual'], colnames=['Predicted'], margins=True),'\n')
print("Classification report\n")
print(classification_report(y_test, y_pred_test),'\n')
#C: ^ remove print or log

#C: confusion matrix and report for train data
conf_matrix = confusion_matrix(y_train, y_pred_train)
print("Train actual/predicted\n")
print(pd.crosstab(y_train, y_pred_train, rownames=['Actual'], colnames=['Predicted'], margins=True),'\n')
print("Classification report\n")
print(classification_report(y_train, y_pred_train),'\n')
#C: ^ remove print or log

model_results[lr_model_path] = model_classification_report #C: store test classification report in results dictionary
print(model_classification_report["weighted avg"]["f1-score"])
#C: ^ remove print or log


# Save columns and model results

In [None]:
column_list_path = './artifacts/columns_list.json' #C: path to save list of feature columns in articats folder
with open(column_list_path, 'w+') as columns_file:
    columns = {'column_names': list(X_train.columns)} #C: create dictionary of column names from training data
    pprint(columns) #C: remove
    json.dump(columns, columns_file) #C: save column names to JSON file

print('Saved column list to ', column_list_path) #C: remove or log

model_results_path = "./artifacts/model_results.json" #C: path to save model results in articats folder
with open(model_results_path, 'w+') as results_file:
    json.dump(model_results, results_file) #C: save model evaluation metrics to JSON file

# MODEL SELECTION

Model selection involves choosing the most suitable statistical model from a set of candidates. In straightforward cases, this process uses an existing dataset. When candidate models offer comparable predictive or explanatory power, the simplest model is generally the preferred choice.

In [None]:
# Constants used:
current_date = datetime.datetime.now().strftime("%Y_%B_%d") #C: gets current date and time and formats it as yyyy_monthname_dd
artifact_path = "model" #C: folder path to save MLflow artifacts
model_name = "lead_model" #C: name of the model
experiment_name = current_date #C: uses the current date as the experiment name

# Helper functions

In [None]:
import time
from mlflow.tracking.client import MlflowClient #C: MLflow client to interact with tracking server
from mlflow.entities.model_registry.model_version_status import ModelVersionStatus #C: enumeration for model version status
from mlflow.tracking.client import MlflowClient #C: repeated line - remove

def wait_until_ready(model_name, model_version):
    client = MlflowClient() #C: create MLflow client instance
    for _ in range(10):
        model_version_details = client.get_model_version( #C: fetch details of specific model version
          name=model_name, 
          version=model_version,
        )
        status = ModelVersionStatus.from_string(model_version_details.status) #C: convert status string to enumeration
        print(f"Model status: {ModelVersionStatus.to_string(status)}") #C: print current model status, remove or log(?)
        if status == ModelVersionStatus.READY: #C: stop waiting if model is ready
            break
        time.sleep(1) #C: wait 1 second before checking again

#C: ^ repeatedly checks the status of a single MLflow model version until it is READY (or up to 10 attempts)

# Getting experiment model results

In [None]:
experiment_ids = [mlflow.get_experiment_by_name(experiment_name).experiment_id] #C: get the ID of the experiment
experiment_ids #C: display, remove

In [None]:
experiment_best = mlflow.search_runs(
    experiment_ids=experiment_ids, #C: search runs within this experiment
    order_by=["metrics.f1_score DESC"], #C: sort runs by F1-score descending
    max_results=1 #C: return only the top run
).iloc[0] #C: select the first row (best run)
experiment_best #C: display - remove

In [None]:
import json

with open("./artifacts/model_results.json", "r") as f: #C: open previously saved model results JSON file
    model_results = json.load(f)
results_df = pd.DataFrame({model: val["weighted avg"] for model, val in model_results.items()}).T #C: extract "weighted avg" metrics for each model
results_df #C: display - remove

In [None]:
best_model = results_df.sort_values("f1-score", ascending=False).iloc[0].name  #C: find the model with highest F1-score
print(f"Best model: {best_model}") #C: remove or log (?)

#C: is this even used elsewhere

# Get production model

In [None]:
from mlflow.tracking import MlflowClient

client = MlflowClient() #C: create MLflow client instance
prod_model = [model for model in client.search_model_versions(f"name='{model_name}'") if dict(model)['current_stage']=='Production'] #C: search for all registered versions of the model that are in 'Production' stage
prod_model_exists = len(prod_model)>0  #C: check if any production model exists

if prod_model_exists:
    prod_model_version = dict(prod_model[0])['version'] #C: get version number of first production model
    prod_model_run_id = dict(prod_model[0])['run_id'] #C: get run ID of first production model
    
    print('Production model name: ', model_name)
    print('Production model version:', prod_model_version)
    print('Production model run id:', prod_model_run_id)
    #C: ^ remove or log(?)
    
else:
    print('No model in production')
    #C: ^ remove or log(?)

#C: ^ perhaps make this a helper function?

# Compare prod and best trained model

In [None]:
train_model_score = experiment_best["metrics.f1_score"] #C: F1-score of the best run from current experiment - this is only LR runs (XGBOOSt was not wrapped in MLflow run)
model_details = {}
model_status = {} #C: placeholder to track current vs production scores
run_id = None

if prod_model_exists:
    data, details = mlflow.get_run(prod_model_run_id) #C: fetch metrics and details of current production model
    prod_model_score = data[1]["metrics.f1_score"] #C: extract F1-score of production model

    model_status["current"] = train_model_score #C: store F1-score of candidate model
    model_status["prod"] = prod_model_score #C: store F1-score of production model

    if train_model_score>prod_model_score: #C: compare scores - candidate model is better; mark for registration
        print("Registering new model") #C: remove or log
        run_id = experiment_best["run_id"] #C: store run ID of best candidate model
else:
    print("No model in production") #C: remove or log
    run_id = experiment_best["run_id"] #C: register the best candidate model by default

print(f"Registered model: {run_id}") #C: remove or log

# Register best model

In [None]:
if run_id is not None:
    print(f'Best model found: {run_id}') #C: remove or log (?)

    model_uri = "runs:/{run_id}/{artifact_path}".format( #C: create MLflow URI to locate model artifacts
        run_id=run_id,
        artifact_path=artifact_path
    )
    model_details = mlflow.register_model(model_uri=model_uri, name=model_name) #C: register the model in MLflow Model Registry
    wait_until_ready(model_details.name, model_details.version) #C: wait until the model version is fully available in the registry
    model_details = dict(model_details) #C: convert model details to dictionary for easier access/logging
    print(model_details) #C: remove print

# DEPLOY

A model version can be assigned to one or more stages. MLflow provides predefined stages for common use cases: None, Staging, Production, and Archived. With the necessary permissions, you can transition a model version between stages or request a transition to a different stage.

In [None]:
model_version = 1 #C: manually specify the version number of the model

# Transition to staging

In [None]:
from mlflow.tracking import MlflowClient

client = MlflowClient()


def wait_for_deployment(model_name, model_version, stage='Staging'): #C: waits until the specified model version reaches the desired stage
    status = False #C: flag to track deployment status
    while not status:
        model_version_details = dict( 
            client.get_model_version(name=model_name,version=model_version) #C: get details of the model version
            )
        if model_version_details['current_stage'] == stage: #C: check if model reached target stage
            print(f'Transition completed to {stage}') #C: remove or log(?)
            status = True
            break
        else:
            time.sleep(2) #C: wait 2 seconds before checking again
    return status #C: return True when deployment is complete

model_version_details = dict(client.get_model_version(name=model_name,version=model_version)) #C: fetch current model version details
model_status = True #C: flag to track whether model transition happened successfully
if model_version_details['current_stage'] != 'Staging': #C: check if model is already in Staging
    client.transition_model_version_stage( #C: move a registered model version to a different stage in the MLflow Model Registry
        name=model_name,
        version=model_version,stage="Staging", 
        archive_existing_versions=True #C: archive any other versions currently in Staging
    )
    model_status = wait_for_deployment(model_name, model_version, 'Staging') #C: wait until model reaches Staging
else:
    print('Model already in staging') #C: remove or log(?)