In [1]:
import mlflow

mlflow.__version__

'2.21.3'

In [2]:
from src.utils.folder_operations import get_project_root

# set mlflow tracking uri
mlflow.set_tracking_uri(
    (get_project_root() / 'mlflow_new/mlruns').as_uri()
)

# Creating custom Pyfunc models
MLflow’s persistence modules provide convenience functions for creating models with the pyfunc flavor in a variety of machine learning frameworks (scikit-learn, Keras, Pytorch, and more); however, they do not cover every use case. For example, you may want to create an MLflow model with the pyfunc flavor using a framework that MLflow does not natively support. Alternatively, you may want to build an MLflow model that executes custom logic when evaluating queries, such as preprocessing and postprocessing routines. Therefore, mlflow.pyfunc provides utilities for creating pyfunc models from arbitrary code and model data.

In [3]:
import pandas as pd

# Define a simple function to log
def predict(model_input):
    """
    Predicts the input multiplied by 2

    :param model_input: The input to the model
    :return: The input multiplied by 2
    """
    return model_input.apply(lambda x: x * 2)


# Save the function as a model
with mlflow.start_run(run_name="function_model"):
    mlflow.pyfunc.log_model("model", python_model=predict, pip_requirements=["pandas"])
    run_id = mlflow.active_run().info.run_id

# Load the model from the tracking server and perform inference
model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
x_new = pd.Series([1,2,3,4,5])

prediction = model.predict(x_new)
print(prediction)



0     2
1     4
2     6
3     8
4    10
dtype: int64


# Class-based Model

If you’re looking to serialize a more complex object, for instance a class that handles preprocessing, complex prediction logic, or custom serialization, you should subclass the PythonModel class.

```{python}
import mlflow
import pandas as pd

class MyModel(mlflow.pyfunc.PythonModel):
    def predict(self, context, model_input, params=None):
        return [x*2 for x in model_input]

# Save the function as a model
with mlflow.start_run():
    mlflow.pyfunc.log_model("model", python_model=MyModel(), pip_requirements=["pandas"])
    run_id = mlflow.active_run().info.run_id

# Load the model from the tracking server and perform inference
model = mlflow.pyfunc.load_model(f"runs:/{run_id}/model")
x_new = pd.Series([1, 2, 3])

print(f"Prediction:
    {model.predict(x_new)}")
```

Basic Guidelines for a PythonModel

- The guidelines for this approach are as follows:

- Your class must be a subclass of mlflow.pyfunc.PythonModel

- Your class must implement a predict method

- The predict method must adhere to the requirements of the Inference API.

- The predict method must have reference to context as the first named argument

- If you wish to provide parameters with your model, these must be defined as part of the model signature. The signature must be saved along with the model.

- If you intend to have additional functionality execute when loading the model (such as loading additional dependent files), you may decide to define the load_context method in your class.

In [4]:
class CustomModel(mlflow.pyfunc.PythonModel):
    
    def predict(self, context, model_input):
        self._preprocess(model_input)
        return model_input.apply(lambda x: x * 2)
    
    def _preprocess(self, model_input):
        print("processing input....") 
        print("Input Length: ", len(model_input))



In [5]:
with mlflow.start_run(run_name="class_model") as run:
    print(run.info.run_id)
    mlflow.pyfunc.log_model("model", python_model=CustomModel(), pip_requirements=["pandas"])



ef4b98a3a66741eeb362754e7aacd726


In [6]:
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run.info.run_id}/model")
prediction = loaded_model.predict(x_new)
print(prediction)

processing input....
Input Length:  5
0     2
1     4
2     6
3     8
4    10
dtype: int64


# Obtaining the original Class

In [7]:
class CustomModel(mlflow.pyfunc.PythonModel):
    def __init__(self):
        pass

    def predict(self, context, model_input):
        self._preprocess(model_input)
        return model_input.apply(lambda x: x * 2)
    
    def _preprocess(self, model_input):
        print("processing input....") 
        print("Input Length: ", len(model_input))

    def additional_method(self):
        print("Running an additional method")



In [8]:
with mlflow.start_run(run_name="class_model_with_additional_methods") as run:
    print(run.info.run_id)
    mlflow.pyfunc.log_model("model", python_model=CustomModel(), pip_requirements=["pandas"])
    



20bceea277734d2981cbdd13e585612c


In [9]:
loaded_model = mlflow.pyfunc.load_model(f"runs:/{run.info.run_id}/model")
prediction = loaded_model.predict(x_new)
print(prediction)

processing input....
Input Length:  5
0     2
1     4
2     6
3     8
4    10
dtype: int64


In [10]:
type(loaded_model)

mlflow.pyfunc.PyFuncModel

In [11]:
# I can use the method "unwrap_python_model" to get the original class
original_class = loaded_model.unwrap_python_model()
original_class.additional_method()

Running an additional method


In [12]:
original_class._preprocess(x_new)

processing input....
Input Length:  5


In [13]:
type(original_class)

__main__.CustomModel

# Custom Models with Signature

In [14]:
from mlflow.models import ModelSignature
from mlflow.types.schema import Schema 
from mlflow.types.schema import ColSpec
from mlflow.types.schema import ParamSchema
from mlflow.types.schema import ParamSpec

In [15]:
input_schema = Schema([
    ColSpec(type="integer", name="input", required=True),
])

output_schema = Schema([
    ColSpec(type="long", name="output", required=True),
])

param = ParamSchema(params = [
    ParamSpec(name="factor", dtype="integer", default=1)
])

model_signature = ModelSignature(inputs=input_schema, outputs=output_schema, params=param)

model_signature.to_dict()

{'inputs': '[{"type": "integer", "name": "input", "required": true}]',
 'outputs': '[{"type": "long", "name": "output", "required": true}]',
 'params': '[{"name": "factor", "default": 1, "shape": null, "type": "integer"}]'}

In [16]:
class CustomModel(mlflow.pyfunc.PythonModel):
    def __init__(self):
        pass

    def predict(self, context, model_input, params=None):
        self._preprocess(model_input)
        self.__preprocess_params(params)
        factor = params.get("factor", 1)
        return model_input.apply(lambda x: x * factor)
    
    def __preprocess_params(self, params):
        if params:
            print("Processing params....")
            print(params)
        else:
            print("No params provided")
    
    def _preprocess(self, model_input):
        print("processing input....") 
        print("Input Length: ", len(model_input))

    def additional_method(self):
        print("Running an additional method")



In [17]:
with mlflow.start_run(run_name="class_model_with_parameters") as run:
    print(run.info.run_id)
    mlflow.pyfunc.log_model("model", python_model=CustomModel(), signature=model_signature)

b5727f1e709c4c74b775525db873d7e9




In [18]:
run_id = run.info.run_id
model_uri = f"runs:/{run_id}/model"
x_new = pd.DataFrame({"input": [1,2,3,4,5]}, dtype="int32")
loaded_model = mlflow.pyfunc.load_model(model_uri)

In [19]:
# inference with default parameters
prediction = loaded_model.predict(x_new)
print(prediction)

processing input....
Input Length:  5
Processing params....
{'factor': 1}
   input
0      1
1      2
2      3
3      4
4      5


In [20]:
# inference with custom parameters
prediction = loaded_model.predict(x_new, {"factor": 10})
print(prediction)

processing input....
Input Length:  5
Processing params....
{'factor': 10}
   input
0     10
1     20
2     30
3     40
4     50
