# Hackathon III - MLOps

In [1]:
# Avoid unnecessary warnings

import warnings
warnings.filterwarnings('ignore')

## Load the data

First step always is data loading. CSV are provided and can be replicated from the previows lesson. CSVs contains data for the PRODUCT_ID 3960.

- 👉 Load train and test datasets.
- 👉 Set "fecha_venta" as index column.
- 👉 Sort the data by "fecha_venta" column.
- 👉 Show the first 5 rows of the train dataset.

In [None]:
import pandas as pd

TRAIN_DATA_PATH = "data/itemSalesTrain.csv"                          #This file only includes data of the article 3960
df_train = pd.read_csv(TRAIN_DATA_PATH)             
df_train = df_train.set_index("fecha_venta").sort_index()            #Set fecha_venta as index and sort by fecha_venta column
df_train


In [None]:
TEST_DATA_PATH = "data/itemSalesTest.csv"
df_test = pd.read_csv(TEST_DATA_PATH)
df_test = df_test.set_index("fecha_venta").sort_index()
df_test

## Split data into train and test sets

We will split the data into train and test sets. We will use the train set to train the model and the test set to evaluate it.


- 👉 Select the feature columns and the label columns
- 👉 Split the data into train and validation sets using a 80/20 or 90/10 ratio.
- 💡 Remember how train/val split should be made in time series problems.

In [4]:
# Set the product id and family (only for logging purposes, do not use it for filtering)
PRODUCT_ID = 3960
PRODUCT_FAMILY = "BOLLERIA"

df_test.columns     # To see what dependent variable we want to use (var1(t)). Then, we set the label and the features

LABEL_COLUMN = "var1(t)"

FEATURES_COLUMN = [
    'var1(t-7 day)',
    'var1(t-6 day)', 
    'var1(t-5 day)', 
    'var1(t-4 day)',
    'var1(t-3 day)', 
    'var1(t-2 day)', 
    'var1(t-1 day)', 
    'var1(t-5 week)',
    'var1(t-4 week)', 
    'var1(t-3 week)', 
    'var1(t-2 week)', 
    'var1(t-1 week)',
    'var1(t)', 
    'tavg_w', 
    'tmin_w', 
    'tmax_w', 
    'isfestivo', 
    'day',
    'dayofweek', 
    'month', 
    'year', 
    'weekday'
]

features = df_train[FEATURES_COLUMN]
labels = df_train[LABEL_COLUMN]

VAL_SPLIT = 0.1

train_size = int(len(df_train) * (1 - VAL_SPLIT))       # 90% of the data

X_train = features[:train_size]
y_train = labels[:train_size]
X_val = features[train_size:]
y_val = labels[train_size:]

## Start MLFlow Server

- 👉 Launch a local MLFlow server
- 👉 Connect to local MLFlow server
- 👉 Set the desired experiment
- 👉 Enable MLFlow autologing for sklearn

In [None]:
%pip install mlflow==2.10.2

In [None]:
%pip install packaging

In [7]:
import mlflow 

In [8]:
# Connect to the mlflow server hosted locally and setting the url in a variable
MLFLOW_TRACKING_URI = "http://127.0.0.1:5000"

mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)        #To launch the server, we open the terminal and write "mlflow ui"

In [None]:
# After we create a new experiment (I did it on the webpage, it´s called "Sales Forecasting"), we have to connect to the experiment.
MLFLOW_EXPERIMENT_NAME = "Sales Forecasting"
mlflow.set_experiment(MLFLOW_EXPERIMENT_NAME)       # Set what the experiment is. 

In [None]:
#Set the autlog sklearn
mlflow.sklearn.autolog()

## Train and evaluate the model

The next section is to train and evaluate the model. We will use a pipeline to preprocess the data and train the model.

- 👉 Create a Sklearn Pipeline:
  - 👉 Preprocessing: StandardScaler or MinMaxScaler
  - 👉 Model: LinearRegression, RandomForestRegressor, etc.
- 👉 Start a run in MLFlow
- 👉 Train the model using the train dataset
- 👉 Add convenient tags for PRODUCT_ID and FAMILY_ID
- 👉 Evaluate the model
- 💡 Remember this is a regression problem
- 💡 Autolog will automatically log metrics and model

In [11]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import MinMaxScaler
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeRegressor
from sklearn import metrics

scaler = MinMaxScaler()
model = LinearRegression()
pipe = Pipeline([
    ("scaler", scaler),
    ("model", model)
])

with mlflow.start_run() as run:                             # Start a run with these information:

    pipe.fit(X_train, y_train)

    mlflow.set_tag("product_id", PRODUCT_ID)                # Add tags in columns
    mlflow.set_tag("product_family", PRODUCT_FAMILY)

    predictions = pipe.predict(X_val)
    mae = metrics.mean_absolute_error(y_val, predictions)   # Add mae, mse and r2 metrics
    mse = metrics.mean_squared_error(y_val, predictions)
    r2 = metrics.r2_score(y_val, predictions)


## Register the model

Promote the model to Model Registry. For this section you can choose between using the MLflow UI or using code snipets. If you choose the UI you should provide screenshots.

![Model Sales Forecasting](data/model1_mlflow_production.PNG)

## Tag the Model

We can assign a tag to the model to indicate that it is ready for production. This way all versions (v1, v2...) of the model will have the same tag. So we can deploy the model by selecting the (same) tag instead of a specific (different) version.

For this section you can choose between using the MLflow UI or using code snipets. If you choose the UI you should provide screenshots.

![Model Sales Forecasting Tag "production"](data/mlflow_2.PNG)

## Deploy the model

In a terminal run the following command to deploy the model:

```bash
export MLFLOW_TRACKING_URI=http://localhost:5000
mlflow models serve -m models:/<model_name>@production -p 5001 --env-manager local
```

You should see something like this:

```bash
[INFO] Starting gunicorn 21.2.0
[INFO] Listening at: http://127.0.0.1:5001 (236041)
[INFO] Using worker: sync
[INFO] Booting worker with pid: 236048
```

It means it's working correctly 🎉

## Make requests to the model

The model is now deployed and ready to receive requests. We will make a request to the model using the test set.

- 👉 prepare the test set to be sent as JSON
- 👉 make a POST request to the model
- 👉 get the predictions from the response and show them

In [12]:
import requests
import json


URL = "http://localhost:5001/invocations"
#URL = "http://localhost:5001/health"            
headers = {"Content-Type": "application/json"}

data = {
    "dataframe_split": {
        "columns": df_test.columns.tolist(),
        "index": df_test.index.to_list(),
        "data": df_test.values.tolist()
    }
}

In [None]:
response = requests.get(URL)
response.text

In [14]:
response = requests.post(URL, headers=headers, data=json.dumps(data))
predictions = response.json()["predictions"]

## Push Results to Database

We push the results to the database so we can visualize them using other tools like Tableau, PowerBI, etc.

In [15]:
# Helpfull class used to connect to the database and push dataframes

import sqlalchemy as sa

class DatabaseConnection:

    def __init__(
        self,
        username: str,
        password: str,
        dialect: str = "mysql",
        driver: str = "pymysql",
        host: str = "database-1.cxlpff3hacbu.eu-west-3.rds.amazonaws.com",
        port: int = 3306,
        database: str = "sandbox",
    ) -> None:
        """Creates a connection to a database

        Args:
            username (str): username
            password (str): password
            dialect (str, optional): dialect. Defaults to "mysql".
            driver (str, optional): driver. Defaults to "pymysql".
            host (str, optional): host. Defaults to "database-1.crek3tiqyj7r.eu-west-3.rds.amazonaws.com".
            port (int, optional): port. Defaults to 3306.
            database (str, optional): database. Defaults to "classicmodels".
        """
        connection_string = f"{dialect}+{driver}://{username}:{password}@{host}:{port}/{database}"
        self.engine = sa.create_engine(connection_string)

    def insert_dataframe(self, df: pd.DataFrame, table_name: str) -> None:
        """Inserts a dataframe into a table
        
        Args:
            df (pd.DataFrame): dataframe to insert
            table_name (str): table name
        """
        df.to_sql(table_name, self.engine, if_exists="replace", index=False)

    def query_to_df(self, query: str) -> pd.DataFrame:
        """Retrieves a dataframe from a query.

        Args:
            query (str): query to perform.

        Returns:
            pd.DataFrame: daframe with the results of the query.
        """
        with self.engine.connect() as conn:
            df = pd.read_sql_query(query, conn)
            return df

    def check_connection(self) -> bool:
        """Checks if the connection is working

        Returns:
            bool: True if the connection is working, False otherwise
        """
        try:
            self.engine.connect()
        except Exception as e:
            print(e)
            return False

Prepare the dataframe to upload to the database

In [16]:
dates = df_test.index.to_list()

# Create a dataframe with the data to store
df_article_prediction = pd.DataFrame({
    "fecha": dates,
    "cantidad": predictions,
    "articulo": [PRODUCT_ID] * len(predictions),                                     # repeat the article for each date
    "familia": [PRODUCT_FAMILY] * len(predictions),                                  # repeat the family for each date
})

Push the dataframe to the database

In [17]:
# Load environment variables to hide credentials
import os
from dotenv import load_dotenv
load_dotenv(verbose=True)

True

In [24]:
# Database credentials
USER = os.getenv("EV_user")
PASSWORD = os.getenv("EV_password")
NAME = "david97p"
table_name = f"Materials_Prediction_Group_{NAME}"

# Connect to the database
db = DatabaseConnection(USER, PASSWORD)
db.insert_dataframe(df_article_prediction, table_name)