# D - Next steps   

As the next steps, we will prepare the deployment pipeline for a Random Forest Regression model to transition seamlessly from development to production. 

Our deployment process will begin with building a draft API, incorporating endpoints for training, prediction, and log retrieval to ensure model retraining and usage monitoring. We'll containerize the API, model, and unit tests with Docker to enhance reproducibility and facilitate easy scaling across different environments. 

Applying test-driven development, we will iteratively refine the API to anticipate scale, load, and potential data drift over time, ensuring a robust model lifecycle.

## I - Loading the Data

### 1 - Importing the necessary librairies

In [23]:
## Import necessary libraries 

import os
import joblib
import re
import time
from datetime import date
from datetime import datetime
from collections import defaultdict
import csv
import uuid

import pandas as pd
import numpy as np

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_percentage_error


from data_module import load_json_data
from cleaning_module import data_cleaning_pipeline
from data_module import time_series_df

import warnings
warnings.filterwarnings('ignore')

print('\nNecessary librairies imported\n')


Necessary librairies imported



### 2 - Loading the dataframe

In [2]:
# start timer for runtime
loading_df_time_start = time.time()

##Â loading the dataframe as loaded_df
from data_module import load_json_data
loaded_df_original = load_json_data('cs-train')
print(f"\n... Dataframe loaded as 'loaded_df'\n")

## Rows count
print(f'\nloaded_df contains initialy {len(loaded_df_original):,.0f} rows\n')

# calculate runtime duration
m, s = divmod( time.time() - loading_df_time_start , 60)
h, m = divmod(m, 60)
loading_df_runtime = "%03d:%02d:%02d"%(h, m, s)
print(f'\nLoading df runtime : {loading_df_runtime}\n')


... Dataframe loaded as 'loaded_df'


loaded_df contains initialy 815,011 rows


Loading df runtime : 000:00:14



### 3 - Cleaning the dataframe

In [3]:
# start timer for runtime
cleaning_time_start = time.time()

# clean the dataframe
from cleaning_module import data_cleaning_pipeline
loaded_df = data_cleaning_pipeline(loaded_df_original)

# calculate runtime duration
m, s = divmod( time.time() - cleaning_time_start , 60)
h, m = divmod(m, 60)
cleaning_runtime = "%03d:%02d:%02d"%(h, m, s)
print(f'\nCleaning runtime : {cleaning_runtime}\n')


Starting data cleaning pipeline...


Duplicate rows Summary
----------------------

The total number of rows before dropping duplicates is 815,011

... Removed 28,844 duplicate rows.

The total number of rows after dropping duplicates is 786,167


The numerical columns are : times_viewed and price and year and month and day

The total number of rows before dropping any invalid data is 786,167.


Data Quality Summary
--------------------------

The total number of invalid data points in times_viewed is 7,714

The total number of invalid data points in price is 5,252

The total number of invalid data points in year is 0

The total number of invalid data points in month is 0

The total number of invalid data points in day is 0

... Removed 12,328 rows with invalid data.

The total number of rows after dropping all invalid data is 773,839.


The numerical columns are : times_viewed and price and year and month and day


There are 20,967 rows identified with outliers in times_viewed data



### 4 - Features Engineering

In [19]:
def engineer_features(df, country, training):
    """
    Engineer features for each day to predict the sum of revenue for the next 30 days.

    Parameters:
    df (pd.DataFrame): The input DataFrame containing 'date' and 'revenue' columns.
    training (bool): If True, trims the last 30 days of data; else, returns all data.

    Returns:
    X (pd.DataFrame): DataFrame with engineered features.
    y (pd.Series): Target values (sum of next 30 days' revenue).
    dates (pd.Series): Dates corresponding to each feature row.
    """

    # start timer for runtime
    engineer_features_time_start = time.time()

    ts_df = time_series_df(df, country=country)
    ts_df = ts_df[['date','revenue', 'purchases', 'total_views']]
    # Ensure date dat type as datetime
    ts_df['date'] = pd.to_datetime(ts_df['date'])

    # Initialize dictionaries to store features and target values
    eng_features = defaultdict(list)
    y = []

    # Define the look-back periods (in days) for feature engineering
    previous_days = [7, 14, 28, 70]

    # Iterate over each row in the DataFrame
    for idx , row in ts_df.iterrows():
        current_date = row['date']

        # Sum revenue for each specified look-back period
        for num_days in previous_days:
            start_date = current_date - pd.Timedelta(days=num_days)
            revenue_sum = ts_df[(ts_df['date'] >= start_date) & (ts_df['date'] < current_date)]['revenue'].sum()
            eng_features[f"previous_{num_days}"].append(revenue_sum)

        # Target: Sum revenue for the next 30 days
        target_sum = ts_df[(ts_df['date'] >= current_date) & (ts_df['date'] < current_date + pd.Timedelta(days=30))]['revenue'].sum()
        y.append(target_sum)
        
        # Previous year revenue for trend analysis
        prev_year_start = current_date - pd.DateOffset(years=1)
        prev_year_end = prev_year_start + pd.DateOffset(days=30)
        prev_year_revenue = ts_df[(ts_df['date'] >= prev_year_start) & (ts_df['date'] < prev_year_end)]['revenue'].sum()
        eng_features['previous_year'].append(prev_year_revenue)
        
        # Non-revenue features: Average invoices and views over the last 30 days
        recent_period_start = current_date - pd.Timedelta(days=30)
        recent_data = ts_df[(ts_df['date'] >= recent_period_start) & (ts_df['date'] < current_date)]
        eng_features['recent_views'].append(recent_data['total_views'].mean())
    
    # Convert the features dictionary to a DataFrame
    X = pd.DataFrame(eng_features)
    y = pd.Series(y, name='target')
    dates = ts_df['date']

    ## Remove rows with all zeros (in cases where no data exists for look-back periods)
    X = X[(X != 0).any(axis=1)]
    y = y[X.index]
    dates = dates[X.index]

    # If training, exclude the last 30 days to ensure target reliability
    if training:
        X = X.iloc[:-30]
        y = y.iloc[:-30]
        dates = dates.iloc[:-30]

    # Reset index for neatness
    X.reset_index(drop=True, inplace=True)
    y.reset_index(drop=True, inplace=True)
    dates.reset_index(drop=True, inplace=True)

    # Calculate runtime duration
    m, s = divmod( time.time() - engineer_features_time_start , 60)
    h, m = divmod(m, 60)
    engineer_features_runtime = "%03d:%02d:%02d"%(h, m, s)
    print(f'\nengineer_features runtime : {engineer_features_runtime}\n')
    return X, y, dates


### 5 - Updating Log Training File

In [32]:
def update_train_log(country, date_range, metric, runtime, version, prefix, note, mode= , test=False):
    """

    """

    # Create Log Directory if it doesn't exist
    if not os.path.isdir(LOG_DIR):
        os.mkdir(LOG_DIR)
    
    # Name the logfile and and define its path
    today = date.today()
    train_logfile = os.path.join(LOG_DIR, f"{prefix}-train-{today.month}-{today.year}.log")
    
    # Define the header
    header = ['unique_id', 'timestamp', 'date_range', 'country', 'metric', 'model_version', 'runtime', 'mode', 'note']
    write_header = False
    
    # Write the header if needed
    if not os.path.exists(train_logfile):
        write_header = True
        
    # Get the current timestamp
    current_timestamp = datetime.fromtimestamp(time.time()).strftime("%H:%M:%S")

    # Generate a random UUID
    unique_id = uuid.uuid4()
    unique_id = str(unique_id)[:13]
    
    # Write to CSV
    with open(train_logfile, mode='a+', newline='') as csvfile: 
        writer = csv.writer(csvfile, delimiter=',')
        
        # Write the header if needed
        if write_header:
            writer.writerow(header)
        
        # Prepare the row for writing
        to_write = map(str, [unique_id, current_timestamp, date_range, country, metric, version, runtime, mode, note])
        
        # Write the row to the log file
        writer.writerow(to_write)



### 6 - Training the Model

In [34]:
def perform_training(df, country, prefix, version, model, model_param_grid, model_scaler, training, test) :       
    
    # start timer for runtime
    perform_training_time_start = time.time()

    # prepare the data
    X, y, dates = engineer_features(df, country, training)

    # Execute this block only if model is RandomForestRegressor
    if isinstance(model, RandomForestRegressor):
        X = X.dropna()  
        y = y[X.index]
        dates = dates[X.index]

    # Create Test Subset of Data (if in Test Mode)
    if test == True:
        # Sample with replacement
        subset = X.sample(frac=0.30, replace=False, random_state=42).index
        dates = dates.loc[subset]
        X = X.loc[subset]
        y = y.loc[subset]

    # Define the date range for logging
    max_date = dates.iloc[-1].strftime('%Y-%m-%d')
    min_date = dates.iloc[0].strftime('%Y-%m-%d')
    date_range = f"{min_date}:{max_date}"
    
    # Perform a train-test split
    X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=42, test_size=0.20, shuffle=True)

    # Create a pipeline with scaling and a random forest model
    pipe = Pipeline([('model_scaler' , model_scaler) ,
                     ('model'        , model ) ])
    
    print("\n... Tuning the model hyperparameters ")
    # Tune the hyperparameter
    grid = GridSearchCV(pipe, param_grid=model_param_grid, cv=5, n_jobs=2)

    # Fit the model on train set
    grid.fit(X_train, y_train)

    # Make predictions on test set 
    y_pred = grid.predict(X_test)

    ## Evaluate the model on test set
    # Calculate RMSE
    eval_rmse = round(mean_squared_error(y_test, y_pred)**0.5)
    print(f"\nEvaluated RMSE : {eval_rmse}")
    # Calculate MAPE
    eval_mape = mean_absolute_percentage_error(y_test, y_pred) * 100
    print(f"\nEvaluated MAPE : {eval_mape:.2f}%")
    # Define the date range for logging
    eval_metrics = f"[RMSE={eval_rmse},MAPE={eval_mape:.1f}%]"
    
    # retrain using all data
    grid.fit(X, y)
    print("\n... Retraining model using all data")

    # Best model
    fitted_model = grid.best_estimator_
    print("\nThe best fitted model :", fitted_model, '\n')

    # make the model name more system compatible and file-friendly when saving the model
    final_model_step = fitted_model.named_steps['model']  
    model_name = final_model_step.__class__.__name__

    # Define the file path when testing the model
    if test:
        saved_model = os.path.join(MODEL_DIR, f"test-{country}-{model_name}-{version}.joblib")
        print(f"\n... saving test version of model: {saved_model}\n")
        
    # Define the path for any other usage with the model (e.g: evaluation, pre-production, production)
    else:
        saved_model = os.path.join(MODEL_DIR, f"{prefix}-{country}-{model_name}-{version}.joblib")
        print(f"\n... saving model version for {prefix} : {saved_model}\n")

    # Save the fitted model
    joblib.dump(fitted_model , saved_model)
    
    # Calculate runtime duration
    m, s = divmod(time.time()-perform_training_time_start, 60)
    h, m = divmod(m, 60)
    perform_training_runtime = "%03d:%02d:%02d"%(h, m, s)
    print(f'perform_training runtime : {perform_training_runtime}\n')

    # update train log
    update_train_log(country, date_range, eval_metrics, perform_training_runtime, version, prefix, test=False, note=NOTE)
    
    return model_name

In [7]:
def model_train(df, country, prefix, version, model, model_param_grid, model_scaler, training=True, test=False) :
    """
    funtion to train model given a df
    
    'mode' -  can be used to subset data essentially simulating a train
    """
    
    # start timer for runtime
    model_train_time_start = time.time()

    # Create Model Directory if it doesn't exist
    if not os.path.isdir(MODEL_DIR):
        os.mkdir(MODEL_DIR)

    if test:
        print("...... testing")
        print("...... subseting data")

    # only train model for all countries and United Kingdom in test mode
    #if country not in [ 'all_countries' , 'United Kingdom' ] and test :    
    model_name = perform_training(df, country, prefix, version, model, model_param_grid, model_scaler, training, test)    
    print(f'... model f"{prefix}-{country}-{model_name}-{version} trained')

    # Calculate runtime duration
    m, s = divmod(time.time()-model_train_time_start, 60)
    h, m = divmod(m, 60)
    model_train_runtime = "%03d:%02d:%02d"%(h, m, s)
    print(f'\nmodel_train runtime : {model_train_runtime}\n')


#### 5-1 - Execution example

In [71]:
## model specific variables (iterate the version and note with each change)
NOTE      = "random forest model for time-series"
MODEL_DIR = "models"
version   = 'v3'
mode      = 'dev'

## Random Forest Regressor model specific variables 
model_prefix     = 'sl'
country          = 'all_countries'
model_scaler     = StandardScaler()
model            = RandomForestRegressor( random_state = 42)
model_param_grid = { 'model__n_estimators' : [90, 100, 110, 120, 130] ,
                     'model__max_depth'    : [None, 5, 10, 15]        ,
                     'model__criterion'    : ['squared_error']        }


## log specific variables
#LOG_DIR_PATH = os.path.join(os.path.dirname(__file__),'..','log')
LOG_DIR = "logs"


In [30]:
# Execution example

model_train(loaded_df, country, prefix, version, model, model_param_grid, model_scaler, training=True, test=False)


engineer_features runtime : 000:00:04


... Tuning the model hyperparameters 

Evaluated RMSE : 8838

Evaluated MAPE : 5.54%

... Retraining model using all data

The best fitted model : Pipeline(steps=[('model_scaler', StandardScaler()),
                ('model',
                 RandomForestRegressor(max_depth=5, n_estimators=130,
                                       random_state=42))]) 


... saving model version for sl : models/sl-all_countries-RandomForestRegressor-v4.joblib

perform_training runtime : 000:00:46

... model f"sl-all_countries-RandomForestRegressor-v4 trained

model_train runtime : 000:00:46



### 6 - Loading a Random Forest Regression Model Version 

In [36]:
def model_load(country, training, df=loaded_df, prefix='sl' ):
    """
    """

    # start timer for runtime
    model_load_time_start = time.time()

    # Initialize return values
    all_data = None
    all_models = None

    # Retrieve all model filenames matching the specified prefix and country
    models = [ filename for filename in os.listdir("./models") if prefix in filename and country in filename ]

    # Check if any models were found
    if not models:
        print(f"Models starting with the prefix '{prefix}' for the '{country}' country cannot be found! Had you trained it?")
        return all_data, all_models  # Return None for both if no models are found

    # Load models into a dictionary, keyed by country
    all_models = { re.split(".joblib", model)[0] : joblib.load(os.path.join(".", "models", model)) for model in models }

    # Engineer features and target variable from the provided DataFrame
    X, y, dates = engineer_features(df, country, training)
    # Convert dates to string format for consistency
    dates = pd.to_datetime(dates).dt.strftime('%Y-%m-%d')

    # Compile the data into a dictionary for easy access
    all_data = { "X" : X , "y" : y , "dates" : dates }

    # Calculate runtime duration
    m, s = divmod(time.time()-model_load_time_start, 60)
    h, m = divmod(m, 60)
    model_load_runtime = "%03d:%02d:%02d"%(h, m, s)
    print(f'\nmodel_load runtime : {model_load_runtime}\n')

    return all_models , all_data



### 7 - Making Predictions

In [37]:
def nearest_date(items, pivot):
    return min(items, key=lambda x: abs(date.fromisoformat(x) - pivot))

In [83]:
def update_predict_log(country, target_date, y_pred, y_proba, runtime, version, mode):
    """
    example function to update predict log file
    """
    
    # Name the logfile and and define its path
    today = date.today()
    predict_logfile = os.path.join(LOG_DIR, f"{mode}-predicted_on_{today.month}_{today.year}.log")
    
    # Get the current timestamp
    current_timestamp = datetime.fromtimestamp(time.time()).strftime("%H:%M:%S")

    # Generate a random UUID
    unique_id = uuid.uuid4()
    unique_id = str(unique_id)[:13]
    
    # Define the header
    header = ['unique_id', 'timestamp', 'mode', 'country', 'target_date', 'y_pred', 'y_proba', 'model_version', 'runtime']
    write_header = False
    
    # Write the header if needed
    if not os.path.exists(predict_logfile):
        write_header = True
    
    # Write to CSV
    with open(predict_logfile, 'a+', newline='') as csvfile:
        writer = csv.writer(csvfile, delimiter=',')
        
        # Write the header if needed
        if write_header:
            writer.writerow(header)
            
        to_write = map(str, [unique_id, current_timestamp, mode, country, target_date, y_pred, y_proba, version, runtime])
        writer.writerow(to_write)


In [84]:
def model_predict(df, country, year, month, day, mode='prod', all_models=None, training=False):
    """

    """

    ## start timer for runtime
    model_predict_time_start = time.time()

    ## load model if needed
    if not all_models:
        all_models , all_data = model_load(country, training, df, prefix='sl')

    # input checks   
    if not any(f'{country}' in key for key in all_models.keys()):
        raise Exception(f"\nERROR (model_predict) - any model for country '{country}' could not be found")

    # Finding the model with the latest version
    latest_model_key = max(all_models.keys(), key=lambda k: int(k.split('-')[-1][1:]))
    latest_model = all_models[latest_model_key]
    print(f"\nLatest Model used for {country} : {latest_model_key}")    
    
    # Validate Date Components
    for d in [year,month,day]:
        if re.search(r"\D", str(d)):  
            raise Exception("ERROR (model_predict) - invalid year, month or day")
        
    ## load data
    data = all_data

    # Convert dates to Datetime Format
    dates = data['dates']
    dates = pd.to_datetime(data['dates'])

    # Check the target date
    target_date = f"{year}-{str(month).zfill(2)}-{str(day).zfill(2)}"
    print(f"... Check if {target_date} is in the range.")

    # Validate Target Date and Find Nearest Date if Out of Range
    if target_date not in dates.dt.strftime('%Y-%m-%d').values:
        print(f"ERROR (model_predict) - date {target_date} not in range [ {data['dates'].iloc[0]} - {data['dates'].iloc[-1]} ]")
        target_date = nearest(data['dates'], date.fromisoformat(target_date))
        print(f"Nearest target date is {target_date}")
    else:
        print("Target date is in the range.")

    # Get the index of the target_date
    target_date_indx = dates[dates == target_date].index[0]

    # Query the corresponding row
    query = data['X'].iloc[[target_date_indx]]  

    ## sanity check
    if data['dates'].shape[0] != data['X'].shape[0]:
        raise Exception("ERROR (model_predict) - dimensions mismatch")

    ## make prediction
    y_pred = latest_model.predict(query)

    ## add a probability to the prediction 
    y_proba = None
    if 'predict_proba' in dir(latest_model) and 'probability' in dir(latest_model):
        if latest_model.probability == True:
            y_proba = latest_model.predict_proba(query)

    # Calculate runtime duration
    m, s = divmod(time.time()-model_predict_time_start, 60)
    h, m = divmod(m, 60)
    model_predict_runtime = "%03d:%02d:%02d"%(h, m, s)
    print(f'\nmodel_predict runtime : {model_predict_runtime}\n')

    # update predict log
    update_predict_log(country, target_date, f'{y_pred[0]:,.0f}', y_proba, model_predict_runtime, version, mode)

    return({'y_pred':f'{y_pred[0]:,.0f}' , 'y_proba':y_proba})


#### 7-1 - Execution example

In [86]:
model_predict(loaded_df, 'all_countries', 2019, 7, 30, mode='prod', all_models=None, training=False)


engineer_features runtime : 000:00:11


model_load runtime : 000:00:12


Latest Model used for all_countries : sl-all_countries-RandomForestRegressor-v4
... Check if 2019-07-30 is in the range.
Target date is in the range.

model_predict runtime : 000:00:12



{'y_pred': '83,442', 'y_proba': None}

### 8 - Updating Log Files