# MACHINE LEARNING IN PRODUCTION MADRID - MLFLOW DEPLOYMENT

In previous lessons we've seen how to put a simple Scikit-Learn model into production. However, in the real world the models used to be complicated, maybe not Sklearn flavor and there is an important feature engineering of the input data.

You can also handle that with MLFlow. We'll see how to do it in the following cells.

## Custom Model to Production

The first thing we need to do is defining the paths to the pickle data we saved in previous lessons, in order to be able to reproduce the prediction pipeline.

In [1]:
pickle_data_path = '../output/pickle_data'

artifacts = {
    'encoder_path': f'{pickle_data_path}/encoder.pickle',
    'umap_path': f'{pickle_data_path}/umap.pickle',
    'hdbscan_path': f'{pickle_data_path}/hdbscan.pickle',
}

To put a model into production with MLFlow it is necessary to define a wrapper for it. The process is straightforward with a Scikit-Learn model (KMeans from previous lessons) since the Sklearn Wrapper has been already defined by MLFlow developers.

Thus, the only thing we need to do is extend the mlflow.pyfunc.PythonModel class and override the predict method:

```python
class ModelWrapper(mlflow.pyfunc.PythonModel):
    
    def predict(self, context, model_input):
        your_code_here
    
```

In the cell below, a custom mlflow.pyfunc.PythonModel has been defined. However, it is more complex than the previous definition since the feature engineering of the input data is also included here.

In [2]:
import mlflow.pyfunc

import numpy as np
import pandas as pd
import pickle
import hdbscan

class ModelWrapper(mlflow.pyfunc.PythonModel):

    # define some useful list of columns
    def __init__(self):

        self.columns_to_encode = ['origin', 'destination', 'train_type', 'train_class', 'fare']
        self.columns_to_remove = ['insert_date', 'start_date', 'end_date']

    # at the time of loading the MLFlow model, the pickle data from the baseline
    # pipeline has to be loaded
    def load_context(self, context):
        
        with open(context.artifacts['encoder_path'], 'rb') as f:
            self.encoder_m = pickle.load(f)
            
        with open(context.artifacts['umap_path'], 'rb') as f:
            self.umap_m = pickle.load(f)
        
        with open(context.artifacts['hdbscan_path'], 'rb') as f:
            self.hdbscan_m = pickle.load(f)
            
    # the datetime columns could arrive in the integer form, in that case convert to
    # datetime type
    def check_dt_type(self, model_input):
        
        if model_input[self.columns_to_remove[0]].dtype == 'int64':
            for col in self.columns_to_remove:
                model_input[col] = pd.to_datetime(model_input[col])
        
        return model_input

    # the baseline transformations are done here
    def transform(self, model_input):
        
        model_input.dropna(inplace=True)
        
        model_input = self.check_dt_type(model_input)
        
        model_input.loc[:, self.columns_to_encode] = \
            self.encoder_m.transform(model_input[self.columns_to_encode])
        
        model_input['duration'] = (model_input['end_date'] - model_input['start_date']).dt.seconds / 3600

        model_input['time_to_departure'] = (model_input['start_date'].dt.tz_localize('Europe/Madrid').dt.tz_convert('UTC') \
                                   - model_input['insert_date'].dt.tz_localize('UTC')).dt.days

        model_input['hour'] = model_input['start_date'].dt.hour

        model_input['weekday'] = model_input['start_date'].dt.dayofweek

        model_input = model_input[[x for x in model_input.columns if x not in self.columns_to_remove]]
        
        return model_input

    # main method to override, the OrdinalEncoder and UMAP transformations are done along
    # with the HDBSCAN prediction over this embedding
    def predict(self, context, model_input):
        
        # allocate payload with return value for null
        payload = np.ones(len(model_input)) * -1
        
        preprocessed = self.transform(model_input.reset_index(drop=True))
        embedding = self.umap_m.transform(preprocessed)
        clusters, _ = hdbscan.approximate_predict(self.hdbscan_m, embedding)
        
        # fill not null records with their cluster
        payload[preprocessed.index] = clusters
        
        return payload



After the custom model has been defined, it is necessary to pack everything together, both the model and the conda environment.

In [3]:
mlflow_pyfunc_model_path = '../output/custom_model'

# remove all models if already there
!rm -rf $mlflow_pyfunc_model_path

# conda environment definition
# As corporate proxy restricts us from reaching internet hence removing 'defaults'
conda_env = {
    'channels': [
          '- https://nexus.apps.usae-2.syngentaaws.org/repository/Anaconda/pkgs/main/',
          '- https://nexus.apps.usae-2.syngentaaws.org/repository/Anaconda/pkgs/free/',
          '- https://nexus.apps.usae-2.syngentaaws.org/repository/Anaconda/pkgs/r/',
          '- https://nexus.apps.usae-2.syngentaaws.org/repository/Anaconda/pkgs/pro/',
          '- https://nexus.apps.usae-2.syngentaaws.org/repository/Anaconda/pkgs/msys2/'],
    'dependencies': [
        'python',
        {'pip': [
            'mlflow',
            'umap-learn',
            'hdbscan',
          ]
        },
    ],
    'name': 'custom_env',
}

# finally save the model as an MLFlow project into the output directory
mlflow.pyfunc.save_model(path=mlflow_pyfunc_model_path, 
                         python_model=ModelWrapper(),
                         conda_env=conda_env,
                         artifacts=artifacts)

## Setup Endpoint

In previous lessons we saw how to create an endpoint with MLFlow and the command line:

```bash
mlflow models serve -m path_to_your_model -h host -p port
```

However, it is desirable that this endpoint could be always alive. This can be done with systemd and the following configuration:

```
[Unit]
Description=MLFlow model in production
After=network.target

[Service]
Restart=on-failure
RestartSec=30
StandardOutput=file:/path_to_your_logging_folder/stdout.log
StandardError=file:/path_to_your_logging_folder/stderr.log
Environment=MLFLOW_TRACKING_URI=http://host_ts:port_ts
Environment=MLFLOW_CONDA_HOME=/path_to_your_conda_installation
ExecStart=/bin/bash -c 'PATH=/path_to_your_conda_installation/envs/mlinproduction_env/bin/:$PATH exec mlflow models serve -m path_to_your_model -h host -p port'

[Install]
WantedBy=multi-user.target
```



## Test Endpoint

Before testing the endpoint it is necessary to load some test data.

### Load Test Data

In [4]:
import pandas as pd

df = pd.read_parquet('../data/raw/renfe.parquet')

test_data = df.sample(10)

display(test_data)

Unnamed: 0,insert_date,origin,destination,start_date,end_date,train_type,price,train_class,fare
2654730,2019-09-17 09:15:29,SEVILLA,MADRID,2019-09-23 07:40:00,2019-09-23 10:05:00,AVE,79.65,Preferente,Promo
9343390,2019-06-18 01:00:55,MADRID,SEVILLA,2019-08-15 13:10:00,2019-08-15 20:51:00,MD-LD,34.35,Turista con enlace,Promo +
784700,2019-08-29 09:45:54,MADRID,BARCELONA,2019-09-21 08:30:00,2019-09-21 11:15:00,AVE,85.1,Turista,Promo
7666853,2019-05-30 06:00:56,SEVILLA,MADRID,2019-06-22 08:45:00,2019-06-22 20:16:00,MD,52.5,Turista con enlace,Flexible
4388336,2019-04-20 09:05:17,MADRID,BARCELONA,2019-05-30 19:30:00,2019-05-30 22:40:00,AVE,75.4,Turista,Promo
5556875,2019-05-05 09:26:47,MADRID,SEVILLA,2019-05-12 16:00:00,2019-05-12 18:30:00,AVE,76.3,Turista,Flexible
721056,2019-08-28 19:05:33,BARCELONA,MADRID,2019-10-06 20:00:00,2019-10-06 23:10:00,AVE,80.15,Turista Plus,Promo
3366351,2019-04-12 23:39:28,SEVILLA,MADRID,2019-05-25 19:45:00,2019-05-25 22:17:00,AVE,,Turista,Promo
6943674,2019-05-22 17:49:39,BARCELONA,MADRID,2019-07-15 12:50:00,2019-07-15 15:45:00,AVE-TGV,75.4,Turista,Promo
3739460,2019-04-15 16:12:31,VALENCIA,MADRID,2019-04-30 18:10:00,2019-04-30 19:57:00,AVE,33.65,Turista,Promo


### Debug Model

In case the endpoint is not working as expected, the model can be loaded with the MLFlow API into the Jupyter notebook and start debugging it with the following cell.

In [5]:
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)

print(f'Predictions: {loaded_model.predict(test_data)}')

Predictions: [ 9.  3.  5.  3.  9.  8.  9. -1.  9.  3.]


### Query Endpoint

Here, it is done via Python requests, however it can also be done with cURL or another tool.

In [1]:
import requests

host = 'usaeilidssbxd01.syngentaaws.org'
port = '8900'

url = f'http://{host}:{port}/invocations'

headers = {
    'Content-Type': 'application/json',
}

r = requests.post(url=url, headers=headers, data=test_data.to_json(orient='split'))

print(f'Predictions: {r.text}')


NameError: name 'test_data' is not defined