## Requirements

- Authenticated to gcloud (```gcloud auth application-default login```)

This notebook demonstrate how to develop a python function based model that supports UPI protocol.
The model that we are going to develop is a simple iris classifier based on xgboost.

In [None]:
!pip install --upgrade -r requirements.txt > /dev/null

In [None]:
import merlin
import warnings
import os
import xgboost as xgb
import uuid
from sklearn.datasets import load_iris
from merlin.model import ModelType
warnings.filterwarnings('ignore')

## 1. Initialize

In [None]:
# initialize merlin url
merlin.set_url("https://my-merlin-domain/api/merlin")
# set active project
merlin.set_project("sample")
# set active model
merlin.set_model("pyfunc-upi", ModelType.PYFUNC)

## 2. Train Model

Train iris classifier using sample dataset provided by sklearn package

In [None]:
iris = load_iris()
y = iris['target']
X = iris['data']

param = {
            'max_depth': 6,
            'eta': 0.1,
            'nthread': 4,
            'num_class': 3,
            'objective': 'multi:softprob'
        }

dtrain = xgb.DMatrix(X, label=y)
xgb_model = xgb.train(params=param, dtrain=dtrain)

Save the trained model under `./xgboost-model/model.json`

In [None]:
model_dir = "xgboost-model"
model_filename = "model.json"
model_path = os.path.join(model_dir, model_filename)
xgb_model.save_model(model_path)

## 3. Create PyFunc Model

To create a PyFunc model you'll have to extend `merlin.PyFuncModel` class and implement its `initialize` and one of `infer` or `upiv1_infer` method.

`initialize` will be called once during model initialization. The argument to `initialize` is a dictionary containing a key value pair of artifact name and its URL. The artifact's keys are the same value as received by `log_pyfunc_model`.

`infer` method is the prediction method that needs to be implemented when `HTTP_JSON` protocol is used (the default protocol). It accept a dictionary type argument which represent incoming request body. `infer` should return a dictionary object which correspond to response body of prediction result.

`upiv1_infer` method is the prediction method that needs to be implemented when `UPI_V1` protocol is used.

In this example, `IrisClassifier` class implements both `infer` and `upiv1_infer` methods which allow the model to be deployed with both `UPI_V1` and `HTTP_JSON` protocol.

In [None]:
from merlin.model import PyFuncModel
from merlin.model import PyFuncModel
from caraml.upi.utils import df_to_table, table_to_df
from caraml.upi.v1 import upi_pb2, upi_pb2_grpc
import grpc

import xgboost as xgb
import pandas as pd
import numpy as np

class IrisClassifier(PyFuncModel):
    MODEL_ARTIFACT_KEY = "xgb_model"

    feature_names = [
        "sepal-length",
        "sepal-width",
        "petal-length",
        "petal-width"
    ]

    class_names = [
        "setosa",
        "versicolor",
        "virginica"
    ]

    target_name = "iris-species"

    def initialize(self, artifacts):
        self._model = xgb.Booster(model_file=artifacts[self.MODEL_ARTIFACT_KEY])

    def infer(self, request: dict, **kwargs):
        """
        infer is the entry point for HTTP_JSON protocol.
        """
        result = self._predict(request['instances'])
        return {"predictions": result.to_numpy().tolist()} # not the most efficient

    def upiv1_infer(self, request: upi_pb2.PredictValuesRequest,
                    context: grpc.ServicerContext) -> upi_pb2.PredictValuesResponse:
        """
        Perform prediction when using UPI_V1 protocol.
        The method accept request in form of PredictValuesRequest proto and should return PredictValuesResponse response proto.

        :param request: Inference request as PredictValuesRequest
        :param context: grpc context
        :return: Prediction result as PredictValuesResponse proto
        """
        if not self._validate_request(request, context):
            return upi_pb2.PredictValuesResponse()

        features_df, _ = table_to_df(request.prediction_table)
        prediction_result_df = self._predict(features_df)
        return self._create_response(prediction_result_df, request)

    def _create_response(self, predictions: pd.DataFrame, request: upi_pb2.PredictValuesRequest) -> upi_pb2.PredictValuesResponse:
        """
        Convert predictions result to upi response ( PredictValuesResponse )

        :param predictions: predictions calculated by the model. (pd.DataFrame)
        :param request: incoming request (PredictValuesRequest).
        :return: PredictValuesResponse
        """
        prediction_result_table = df_to_table(df=predictions, table_name="prediction_result")
        response_metadata = upi_pb2.ResponseMetadata(prediction_id=request.metadata.prediction_id,
                                                     models=[self._create_model_metadata()])
        return upi_pb2.PredictValuesResponse(prediction_result_table=prediction_result_table, target_name=self.target_name, metadata=response_metadata)

    def _predict(self, features: pd.DataFrame) -> pd.DataFrame:
        """
        Perform prediction. This shared method that will be called by `infer` and `upiv1_infer`
        :param features: features dataframe
        :return: prediction result
        """
        features_matrix = xgb.DMatrix(features)
        return pd.DataFrame(self._model.predict(features_matrix), columns = self.class_names, dtype=np.float64)

    def _create_model_metadata(self):
        """
        create model metadata to be used in response.
        """
        return upi_pb2.ModelMetadata(
            name=os.getenv("CARAML_MODEL_NAME", "iris-classifier"),
            version=os.getenv("CARAML_MODEL_VERSION", "1")
        )

    def _validate_request(self, request: upi_pb2.PredictValuesRequest, context: grpc.ServicerContext):
        """
        Perform request validation

        :param request: incoming request
        :param context: grpc context

        :return: True if request is valid, return False otherwise
        """
        # Check target name matches
        if request.target_name != self.target_name:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details(f"Invalid target_name, got: {request.target_name}, expected: {self.target_name}")
            return False

        return True

Let's test `infer` and `upiv1_infer` locally

In [None]:
m = IrisClassifier()
m.initialize({IrisClassifier.MODEL_ARTIFACT_KEY: model_path})
m.infer({"instances": [[1,2,3,4], [2,1,2,4]] })

In [None]:
def create_upi_request_from_iris_dataset() -> upi_pb2.PredictValuesRequest:
    iris_dataset = load_iris()
    X = iris_dataset['data']
    df = pd.DataFrame(X, columns=IrisClassifier.feature_names)

    prediction_table = df_to_table(df, "prediction_table")
    return upi_pb2.PredictValuesRequest(
        target_name=IrisClassifier.target_name,
        prediction_table=prediction_table,
        metadata=upi_pb2.RequestMetadata(
            prediction_id=str(uuid.uuid1())
        )
    )

In [None]:
m.upiv1_infer(create_upi_request_from_iris_dataset(), {})

## 4. Deploy Model

### 4.1 Create Model Version and Upload

In [None]:
with merlin.new_model_version() as v:    
    merlin.log_pyfunc_model(model_instance=IrisClassifier(),
                            conda_env="env.yaml",
                            artifacts={IrisClassifier.MODEL_ARTIFACT_KEY: model_path})

### 4.2 Deploy Model as UPI Compatible service

Each of a deployed model version will have its own generated url

In [None]:
from merlin.protocol import Protocol

endpoint = merlin.deploy(v, protocol = Protocol.UPI_V1)

In [None]:
endpoint

### 4.3 Send Test Request

In [None]:
import grpc

channel = grpc.insecure_channel(f"{endpoint.url}:80") # Note to add :80 at the end of URL
stub = upi_pb2_grpc.UniversalPredictionServiceStub(channel)

request = create_upi_request_from_iris_dataset()
request

In [None]:
response = stub.PredictValues(request)
response

### 4.4 Delete Deployment

In [None]:
merlin.undeploy(v)

### 4.5 Deploy Model as HTTP service

Since the pyfunc model implement `infer` method, it is also possible to deploy the same model using `HTTP_JSON` protocol

In [None]:
endpoint = merlin.deploy(v) # if protocol is not set, it will default to HTTP_JSON

Send test request

In [None]:
%%bash -s "$endpoint.url"
curl POST $1 -d '{
  "instances": [
    [2.8,  1.0,  6.8,  0.4],
    [3.1,  1.4,  4.5,  1.6]
  ]
}'

## 4.6 Delete Deployment

In [None]:
merlin.undeploy(v)