# How to prepare a Mlflow Model for Navio Deployment

This notebook shows how to wrap a custom inference pipeline into a mlflow model ready for deployment on navio.
xgboost, pandas, sklearn and numpy needs to be installed

In [None]:
# Load training and test datasets
import xgboost as xgb
import joblib
import pandas as pd
import numpy as np
import sklearn
import pickle
import sys
import os
import shutil
from collections import defaultdict
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import mlflow

### Load Dataset

In [None]:
from sklearn.datasets import load_iris
iris = load_iris()
x = pd.DataFrame(data= iris['data'], columns= ["x1","x2","x3","x4"])
y = pd.DataFrame(data= iris['target'], columns= ["y"])
x_train, x_test, y_train, _ = train_test_split(x, y, test_size=0.2, random_state=42)

### Model Training

In [None]:
# scale

sc = StandardScaler()
x_train = sc.fit_transform(x_train)
x_test = sc.transform(x_test)

# Training model
xgb_model = xgb.XGBClassifier(params={'max_depth': 10})
xgb_model.fit(
    x_train,
    y_train,
)

### Save the artifacts

In [None]:
# cleanup notebook
if os.path.exists("temp"):
    shutil.rmtree("temp")
os.makedirs("temp")


xgb_model_path = "temp/xgb_model.pkl"
pickle.dump(xgb_model, open(xgb_model_path, 'wb'))

standard_scaler_path = 'temp/scaler.pkl'
pickle.dump(sc, open(standard_scaler_path, 'wb'))

# Inference Pipeline
This is how the data scientist needs to design his pipeline.

In [None]:
#import mlflow.pyfunc
import json
import cloudpickle
import pip
from sklearn.preprocessing import StandardScaler

### Provide schema for navio. 

This schema is not used by mlflow

In [None]:

example_request = {
    "featureColumns":  [
        {
            "name": "x1",
            "sampleData": 5.1,
            "type": "float",
            "nullable": False
        },
        {
            "name": "x2",
            "sampleData": 3.5,
            "type": "float",
            "nullable": False
        },
        {
            "name": "x3",
            "sampleData": 1.4,
            "type": "float",
            "nullable": False
        },
        {
            "name": "x4",
            "sampleData": 0.2,
            "type": "float",
            "nullable": False
        }
    ],
    "targetColumns": [
        {
            "name": "y",
            "sampleData": 5.1,
            "type": "float",
            "nullable": False
        }
    ]
}

EXAMPLE_REQUEST_PATH = 'temp/example_request.json'

with open(EXAMPLE_REQUEST_PATH, 'w') as file:
    json.dump(example_request, file)

In [None]:
# Create a Conda environment for the new MLflow Model that contains the XGBoost library
# as a dependency, as well as the required CloudPickle library
conda_env = {
    'channels': ['defaults'],
    'dependencies': [
        'python={}'.format(sys.version.split(' ')[0]),
        'pip={}'.format(pip.__version__),
        {
            'pip': [
                'mlflow=={}'.format(mlflow.__version__), # otherwise --install-mlflow is required
                'scikit-learn=={}'.format(sklearn.__version__),
                'xgboost=={}'.format(xgb.__version__),
                'numpy=={}'.format(np.__version__)
            ]
        }
    ],
    'name': 'my_env'
}
conda_env

In [None]:
# Create an `artifacts` dictionary that assigns a unique name to the saved XGBoost model file.
# This dictionary will be passed to `mlflow.pyfunc.save_model`, which will copy the model file
# into the new MLflow Model's directory.
artifacts = {
    "xgb_model": xgb_model_path,
    "standard_scaler": standard_scaler_path,
    "example_request": EXAMPLE_REQUEST_PATH
}

In [None]:
src_path = "./src" # This is where the actual code is.

In [None]:
from src.inference.custom_pipeline import CustomPipeline

In [None]:
CustomPipeline

In [None]:
import mlflow.pyfunc
"""
Generic Mlflow Model Class that just wraps the custom class

If this class is not defined in __main__ mlflow will not find it.
-> Solution: Add it to the code_path. if it is within src it would also work for example
"""


class MlFlowInference(mlflow.pyfunc.PythonModel):
    
    def load_context(self, context) -> None:
        from src.inference.custom_pipeline import CustomPipeline

        self.model = CustomPipeline(artifacts=context.artifacts)

    def predict(self, context, model_input: pd.DataFrame) -> dict:
        pred = self.model.predict(model_input)
        return {"prediction": pred.tolist()}

### Save the MLflow Model

In [None]:

mlflow_pyfunc_model_path = "mlflow-template/mlflow-custom-pipeline"

if os.path.exists(mlflow_pyfunc_model_path):
    shutil.rmtree(mlflow_pyfunc_model_path)

mlflow.pyfunc.save_model(
    path=mlflow_pyfunc_model_path,
    python_model=MlFlowInference(),
    artifacts=artifacts,
    conda_env=conda_env,
    code_path=[src_path] 
)

### Make a zip for navio

In [None]:

shutil.make_archive(mlflow_pyfunc_model_path, 'zip', mlflow_pyfunc_model_path)

### Test inference in python

In [None]:
# Load the model in `python_function` format
loaded_model = mlflow.pyfunc.load_model(mlflow_pyfunc_model_path)

In [None]:
# test request
test_input = x[:10].to_dict(orient='split') # this is what we send in the json request
test_input_df = pd.DataFrame(test_input["data"], columns=test_input["columns"]) # this is how mlflow loads it in python

In [None]:
# Evaluate the model
test_predictions = loaded_model.predict(test_input_df)
print(test_predictions)

In [None]:
# cleanup notebook
if os.path.exists("temp"):
    shutil.rmtree("temp")


# Test serving before deploying on navio

In the commandline run:

    mlflow models serve -m mlflow-template/mlflow-custom-pipeline -p 5001 --install-mlflow
    curl http://127.0.0.1:5001/invocations -H 'Content-Type: application/json' -d '{"columns": ["x1", "x2", "x3", "x4"],"data": [[4.1, 5.1, 6.1, -4.1], [4.1, 5.1, 6.1, -4.1]]}'



