# Project teaser

The following notebook is an excerpt and re-written example from a _real_ production model.

The overall purpose of the ML algorithm is to identify users on the website that are new possible customers. This is done by collecting behaviour data from the users as input, and the target is whether they converted/turned into customers -- essentially a classification problem. 

This notebook only focuses on the data processing part. As you know, there are multiple steps in an ML pipeline, and it's not always they are neatly separated like this. For the exam project, they will not be, and that is part of the challenge for you. For production code, it should also not be Python notebooks since, as you may well see, it is difficult to work with and collaborate on them in an automated way.

There is a lot of "fluff" in such a notebook. This ranges from comments and markdown cells to commented out code and random print statements. That is not necessary in a properly managed project where you can use git to check the version history and such. 

What is important for you is the identify the entry points into the code and segment them out into easily understandable chunks. Additionally, you might want to follow some basic code standards, such as:

- Import only libraries in the beginning of the files
- Define functions in the top of the scripts, or if used multiple places, move into a util.py script or such
- Remove unused/commented out code
- Follow the [PEP 8](https://peps.python.org/pep-0008/) style guide (and others)
  
Another thing to note is that comments can be misleading. Even if the markdown cell or inline comments says it does _X_, don't be surprised if it actually does _Y_. Sometimes additional text can be a blessing, but it can also be a curse sometimes. Remember, though, that your task is to make sure the code runs as before after refactoring the notebook into other files, not update/improve the model or flow to reflect what the comments might say.

***

# DATA PROCESSING

In this section, we will perform Exploratory Data Analysis (EDA) to better understand the dataset before proceeding with more advanced analysis. EDA helps us get a sense of the data’s structure, identify patterns, and spot any potential issues like missing values or outliers. By doing so, we can gain a clearer understanding of the data's key characteristics.

We will start with summary statistics to review basic measures like mean, median, and variance, providing an initial overview of the data distribution. Then, we’ll create visualizations such as histograms, box plots, and scatter plots to explore relationships between variables, check for any skewness, and highlight outliers.

The purpose of this EDA is to ensure that the dataset is clean and well-structured for further analysis. This step also helps us identify any necessary data transformations and informs decisions on which features might be most relevant for modeling in later stages.

# Create artifact directory
We want to create a directory for storing all the artifacts in the current directory. Users can load all the artifacts later for data cleaning pipelines and inferencing.

In [1]:
# dbutils.widgets.text("Training data max date", "2024-01-31")
# dbutils.widgets.text("Training data min date", "2024-01-01")
# max_date = dbutils.widgets.get("Training data max date")
# min_date = dbutils.widgets.get("Training data min date")

# testnng
max_date = "2024-01-31"
min_date = "2024-01-01"

In [None]:
import os
import shutil
from pprint import pprint

shutil.rmtree("./artifacts",ignore_errors=True)
os.makedirs("artifacts",exist_ok=True)
print("Created artifacts directory")

# Pandas dataframe print options

In [3]:
import pandas as pd
import warnings

warnings.filterwarnings('ignore')
pd.set_option('display.float_format',lambda x: "%.3f" % x)

# Helper functions

* **describe_numeric_col**: Calculates various descriptive stats for a numeric column in a dataframe.
* **impute_missing_values**: Imputes the mean/median for numeric columns or the mode for other types.

In [4]:
def describe_numeric_col(x):
    """
    Parameters:
        x (pd.Series): Pandas col to describe.
    Output:
        y (pd.Series): Pandas series with descriptive stats. 
    """
    return pd.Series(
        [x.count(), x.isnull().count(), x.mean(), x.min(), x.max()],
        index=["Count", "Missing", "Mean", "Min", "Max"]
    )

def impute_missing_values(x, method="mean"):
    """
    Parameters:
        x (pd.Series): Pandas col to describe.
        method (str): Values: "mean", "median"
    """
    if (x.dtype == "float64") | (x.dtype == "int64"):
        x = x.fillna(x.mean()) if method=="mean" else x.fillna(x.median())
    else:
        x = x.fillna(x.mode()[0])
    return x

# Read data

We read the latest data from our data lake source. Here we load it locally after having pulled it from DVC.

In [None]:
print("Loading training data")

# data_path = "./data/raw"
# most_recent_date = [x.name[:-1] for x in dbutils.fs.ls(f"{data_path}")][-1]
# most_recent_file = [x.name for x in dbutils.fs.ls(f"{data_path}/{most_recent_date}")][-1]
# data = pd.read_csv(f"{data_path}/{most_recent_date}/{most_recent_file}")

# Testing
from data_generating_process import generate_data
data = generate_data(n_rows=1_000)
most_recent_date = "2024-01-31"
# Done testing 

print("Most recent date:", most_recent_date)
print("Total rows:", data.count())
display(data)

In [6]:
import pandas as pd
import datetime
import json

if not max_date:
    max_date = pd.to_datetime(datetime.datetime.now().date()).date()
else:
    max_date = pd.to_datetime(max_date).date()

min_date = pd.to_datetime(min_date).date()

# Time limit data
data["date_part"] = pd.to_datetime(data["date_part"]).dt.date
data = data[(data["date_part"] >= min_date) & (data["date_part"] <= max_date)]

min_date = data["date_part"].min()
max_date = data["date_part"].max()
date_limits = {"min_date": str(min_date), "max_date": str(max_date)}
with open("./artifacts/date_limits.json", "w") as f:
    json.dump(date_limits, f)

# DATA CLEANING
Data cleaning in MLOps refers to the process of preparing and refining raw data to ensure its quality and usability for machine learning models. This involves identifying and correcting errors, handling missing values, removing duplicates, and standardizing data formats. Effective data cleaning is crucial as it directly impacts model performance and the reliability of insights drawn from the data. In MLOps, it also includes automating these processes to maintain data integrity across different stages of the machine learning lifecycle.

# Feature selection

Not all columns are relevant for modelling

In [7]:
data = data.drop(
    [
        "is_active", "marketing_consent", "first_booking", "existing_customer", "last_seen"
    ],
    axis=1
)

In [8]:
#Removing columns that will be added back after the EDA
data = data.drop(
    ["domain", "country", "visited_learn_more_before_booking", "visited_faq"],
    axis=1
)

# Data cleaning
* Remove rows with empty target variable
* Remove rows with other invalid column data

In [None]:
import numpy as np

data["lead_indicator"].replace("", np.nan, inplace=True)
data["lead_id"].replace("", np.nan, inplace=True)
data["customer_code"].replace("", np.nan, inplace=True)

data = data.dropna(axis=0, subset=["lead_indicator"])
data = data.dropna(axis=0, subset=["lead_id"])

data = data[data.source == "signup"]
result=data.lead_indicator.value_counts(normalize = True)

print("Target value counter")
for val, n in zip(result.index, result):
    print(val, ": ", n)
data

# Create categorical data columns

In [None]:
vars = [
    "lead_id", "lead_indicator", "customer_group", "onboarding", "source", "customer_code"
]

for col in vars:
    data[col] = data[col].astype("object")
    print(f"Changed {col} to object type")

# Separate categorical and continuous columns

In [None]:
cont_vars = data.loc[:, ((data.dtypes=="float64")|(data.dtypes=="int64"))]
cat_vars = data.loc[:, (data.dtypes=="object")]

print("\nContinuous columns: \n")
pprint(list(cont_vars.columns), indent=4)
print("\n Categorical columns: \n")
pprint(list(cat_vars.columns), indent=4)

# Outliers

Outliers are data points that significantly differ from the majority of observations in a dataset and can distort statistical analysis or model performance. To identify and remove outliers, one common method is to use the Z-score, which measures how many standard deviations a data point is from the mean. Data points with a Z-score greater than 2 (or sometimes 3) standard deviations away from the mean are typically considered outliers. By applying this threshold, we can filter out values that fall outside the normal range of the data, ensuring that the remaining dataset is more representative and less influenced by extreme values.

In [None]:
cont_vars = cont_vars.apply(lambda x: x.clip(lower = (x.mean()-2*x.std()),
                                             upper = (x.mean()+2*x.std())))
outlier_summary = cont_vars.apply(describe_numeric_col).T
outlier_summary.to_csv('./artifacts/outlier_summary.csv')
outlier_summary

# Impute data

In real-world datasets, missing data is a common occurrence due to various factors such as human error, incomplete data collection processes, or system failures. These gaps in the data can hinder analysis and lead to biased results if not properly addressed. Since many analytical and machine learning algorithms require complete data, handling missing values is an essential step in the data preprocessing phase.

In the next code block, we will handle missing data by performing imputation. For numerical columns, we will replace missing values with the mean or median of the entire column, which provides a reasonable estimate based on the existing data. For categorical columns (object type), we will use the mode, or most frequent value, to fill in missing entries. This approach helps us maintain a complete dataset while ensuring that the imputed values align with the general distribution of each column.

In [None]:
cat_missing_impute = cat_vars.mode(numeric_only=False, dropna=True)
cat_missing_impute.to_csv("./artifacts/cat_missing_impute.csv")
cat_missing_impute

In [None]:
# Continuous variables missing values
cont_vars = cont_vars.apply(impute_missing_values)
cont_vars.apply(describe_numeric_col).T

In [None]:
cat_vars.loc[cat_vars['customer_code'].isna(),'customer_code'] = 'None'
cat_vars = cat_vars.apply(impute_missing_values)
cat_vars.apply(lambda x: pd.Series([x.count(), x.isnull().sum()], index = ['Count', 'Missing'])).T
cat_vars

# Data standardisation

Standardization, or scaling, becomes necessary when continuous independent variables are measured on different scales, as this can lead to unequal contributions to the analysis. The objective is to rescale these variables so they have comparable ranges and/or variances, ensuring a more balanced influence in the model.

In [None]:
from sklearn.preprocessing import MinMaxScaler
import joblib

scaler_path = "./artifacts/scaler.pkl"

scaler = MinMaxScaler()
scaler.fit(cont_vars)

joblib.dump(value=scaler, filename=scaler_path)
print("Saved scaler in artifacts")

cont_vars = pd.DataFrame(scaler.transform(cont_vars), columns=cont_vars.columns)
cont_vars

# Combine data

In [None]:
cont_vars = cont_vars.reset_index(drop=True)
cat_vars = cat_vars.reset_index(drop=True)
data = pd.concat([cat_vars, cont_vars], axis=1)
print(f"Data cleansed and combined.\nRows: {len(data)}")
data

# Data drift artifact

In [None]:
import json

data_columns = list(data.columns)
with open('./artifacts/columns_drift.json','w+') as f:           
    json.dump(data_columns,f)
    
data.to_csv('./artifacts/training_data.csv', index=False)

# Binning object columns

In [None]:
data.columns

In [None]:
data['bin_source'] = data['source']
values_list = ['li', 'organic','signup','fb']
data.loc[~data['source'].isin(values_list),'bin_source'] = 'Others'
mapping = {'li' : 'socials', 
           'fb' : 'socials', 
           'organic': 'group1', 
           'signup': 'group1'
           }

data['bin_source'] = data['source'].map(mapping)

# Save gold medallion dataset

In [None]:
#spark.sql(f"drop table if exists train_gold")


In [None]:
data_gold = spark.createDataFrame(data)
data_gold.write.saveAsTable('train_gold')
dbutils.notebook.exit(('training_golden_data',most_recent_date))

data.to_csv('./artifacts/train_data_gold.csv', index=False)