# Model serving

In [1]:
import ray
from ray import serve

In [2]:
!python --version

Python 3.10.8


In [None]:
ray.init(address='ray://ray-head:10001')

In [None]:
serve.start(detached=False, http_options={'host':"0.0.0.0", 'port':5010})
# serve.start(http_options={'host':"0.0.0.0", 'port':5010})

[2m[36m(ServeController pid=317)[0m INFO 2023-01-06 08:38:39,253 controller 317 http_state.py:129 - Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:ibRHty:SERVE_PROXY_ACTOR-fbc3c0e34b760004e6910028d7da6776416926f68ffed9208ad8cde6' on node 'fbc3c0e34b760004e6910028d7da6776416926f68ffed9208ad8cde6' listening on '0.0.0.0:5010'


<ray.serve._private.client.ServeControllerClient at 0x7f592ab94580>

[2m[36m(HTTPProxyActor pid=345)[0m INFO:     Started server process [345]


## Server version 2: Ray + FastAPI

In [2]:
import fastapi

fastapi.__version__

'0.88.0'

In [5]:
import mlflow
import pandas as pd

from fastapi import FastAPI
from pydantic import BaseModel, Field
from fastapi.responses import HTMLResponse
from fastapi.responses import RedirectResponse


class Input(BaseModel):
    specimen_number: float = Field(..., gt=0, example=1)
    eccentricity: float = Field(..., gt=0, example=0.86224)
    aspect_ratio: float = Field(..., gt=0, example=2.0735)
    elongation: float = Field(..., gt=0, example=0.52269)
    solidity: float = Field(..., gt=0, example=0.98686)
    stochastic_convexity: float = Field(..., gt=0, example=0.99474)
    isoperimetric_factor: float = Field(..., gt=0, example=0.70529)
    maximal_indentation_depth: float = Field(..., gt=0, example=0.010097)
    lobedness: float = Field(..., gt=0, example=0.018554)
    average_intensity: float = Field(..., gt=0, example=0.041404)
    average_contrast: float = Field(..., gt=0, example=0.12163)
    smoothness: float = Field(..., gt=0, example=0.014579)
    third_moment: float = Field(..., gt=0, example=0.0048689)
    uniformity: float = Field(..., gt=0, example=0.00027608)
    entropy: float = Field(..., gt=0, example=0.9458)


app = FastAPI(title='Predictor API',
              description='Pipeline online inference')


@serve.deployment()
@serve.ingress(app)
# @serve.deployment()
class LeafDeployment:
    def __init__(self):
        model_name = "extratree"
        model_stage = "Production"
        self.predictor = mlflow.sklearn.load_model(
                                model_uri=f"models:/{model_name}/{model_stage}")
        self.predictor2 = mlflow.sklearn.load_model(
                                model_uri=f"models:/{model_name}/14")
        self.predictor3 = mlflow.sklearn.load_model(
                                model_uri=f"models:/{model_name}/13")

    def preprocessing(self, df):
        with mlflow.start_run(run_name='preprocessing') as mlrun:
            # Some preprocessing steps here
            # df = pd.read_csv(filename)
            df_cleaned = df.loc[:, df.columns != 'specimen_number'].copy()
            df_cleaned[df_cleaned.columns] = df_cleaned[df_cleaned.columns].astype(float)
            df_cleaned.to_csv('preprocessed_data.csv', index=False)
            mlflow.log_artifact('preprocessed_data.csv')

            # logging
            mlflow.log_param(key='n_samples', value=len(df_cleaned))
            mlflow.log_param(key='n_features', value=len(df_cleaned.columns))

            return df_cleaned

    @app.get('/', include_in_schema=False)
    async def docs_redirect(self):
        return RedirectResponse(url='/docs')

    @app.post("/predict",
         tags=['Predictor 1 por default'],
         summary="Usa el modelo 1 para la predicción")
    def call(self, request: Input):
    # async def __call__(self, request):
        # data = await request.json()
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor.predict(preprocessed)
        return result
    
    @app.post("/model2",
         tags=['Predictor 2'],
         summary="Usa el modelo 2 para la predicción")
    def call(self, request: Input):
    # async def __call__(self, request):
        # data = await request.json()
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor2.predict(preprocessed)
        return result
    
    @app.post("/model3",
         tags=['Predictor 3'],
         summary="Usa el modelo 3 para la predicción")
    def call(self, request: Input):

        # data = await request.json()
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor3.predict(preprocessed)
        return result
    
    @app.post("/model_voting",
         tags=['Voting method'],
         summary="Calcula la moda de las predicciones")
    def call(self, request: Input):
        from scipy import stats
        
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        predicts = [self.predictor.predict(preprocessed),
                   self.predictor2.predict(preprocessed),
                   self.predictor3.predict(preprocessed)]
        result = stats.mode(predicts, keepdims=True).mode[0]
        return result
    
    @app.post("/checker_integrity_model",
         tags=['Checker Integrity + model'],
         summary="Ejecuta el checker de integridad y luego el modelo")
    def call(self, request: Input):
        from scipy import stats
        
        data = request.__dict__
        if data['entropy'] >= 10:
            return -1
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor3.predict(preprocessed)
        return result
        
# LeafDeployment.deploy()
serve.run(LeafDeployment.bind())

The new client HTTP config differs from the existing one in the following fields: ['host', 'port', 'location']. The new HTTP config is ignored.
[2m[36m(ServeController pid=1682)[0m INFO 2022-12-28 14:53:30,563 controller 1682 deployment_state.py:1310 - Adding 1 replica to deployment 'LeafDeployment'.
[2m[36m(ServeReplica:LeafDeployment pid=1840)[0m https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations
[2m[36m(ServeReplica:LeafDeployment pid=1840)[0m https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations


RayServeSyncHandle(deployment='LeafDeployment')

# Cliente

In [6]:
import pandas as pd
import requests

filename = "serving/X_inference.csv"
df = pd.read_csv(filename)
# request = df.head(1).to_dict('list')
request = df.head(1).to_dict('records')
request

[{'specimen_number': 1,
  'eccentricity': 0.86224,
  'aspect_ratio': 2.0735,
  'elongation': 0.52269,
  'solidity': 0.98686,
  'stochastic_convexity': 0.99474,
  'isoperimetric_factor': 0.70529,
  'maximal_indentation_depth': 0.010097,
  'lobedness': 0.018554,
  'average_intensity': 0.041404,
  'average_contrast': 0.12163,
  'smoothness': 0.014579,
  'third_moment': 0.0048689,
  'uniformity': 0.00027608,
  'entropy': 0.9458}]

## Llamar al modelo 1, por default

In [7]:
%%time

response = requests.post("http://0.0.0.0:5010/predict", json=request[0])
# response = requests.post("http://ray-head:5010/predict", json=request[0])
# response = requests.post("http://ray-head:5010/LeafDeployment", json=request)
result = response.json()[0]
result

CPU times: user 1.62 ms, sys: 4.35 ms, total: 5.98 ms
Wall time: 532 ms


32

[2m[36m(HTTPProxyActor pid=1734)[0m INFO 2022-12-28 14:55:41,997 http_proxy 172.20.0.6 http_proxy.py:315 - POST / 200 528.3ms
[2m[36m(ServeReplica:LeafDeployment pid=1840)[0m INFO 2022-12-28 14:55:41,995 LeafDeployment LeafDeployment#EHjEeW replica.py:505 - HANDLE __call__ OK 522.6ms


## Llamar al modelo 2

In [9]:
%%time

response = requests.post("http://ray-head:5010/model2", json=request[0])
# response = requests.post("http://ray-head:5010/LeafDeployment", json=request)
result = response.json()[0]
result

CPU times: user 9.97 ms, sys: 699 µs, total: 10.7 ms
Wall time: 98.7 ms


32

[2m[36m(HTTPProxyActor pid=341)[0m INFO 2022-11-08 10:19:24,620 http_proxy 172.22.0.4 http_proxy.py:315 - POST / 200 94.8ms
[2m[36m(ServeReplica:LeafDeployment pid=757)[0m INFO 2022-11-08 10:19:24,618 LeafDeployment LeafDeployment#tamUxz replica.py:505 - HANDLE __call__ OK 90.2ms


## Llamar al model_voting

In [22]:
%%time

response = requests.post("http://ray-head:5010/model_voting", json=request[0])
result = response.json()[0]
result

CPU times: user 5.25 ms, sys: 9.15 ms, total: 14.4 ms
Wall time: 167 ms


32

[2m[36m(HTTPProxyActor pid=341)[0m INFO 2022-11-08 11:42:53,742 http_proxy 172.22.0.4 http_proxy.py:315 - POST / 200 162.7ms
[2m[36m(ServeReplica:LeafDeployment pid=2305)[0m INFO 2022-11-08 11:42:53,740 LeafDeployment LeafDeployment#FytfAS replica.py:505 - HANDLE __call__ OK 159.1ms
[2m[36m(HTTPProxyActor pid=341)[0m INFO 2022-11-08 11:43:41,085 http_proxy 172.22.0.4 http_proxy.py:315 - POST / 200 112.0ms
[2m[36m(ServeReplica:LeafDeployment pid=2305)[0m INFO 2022-11-08 11:43:41,083 LeafDeployment LeafDeployment#FytfAS replica.py:505 - HANDLE __call__ OK 107.9ms
