# Model serving

In [1]:
import ray
from ray import serve

In [2]:
!python --version

Python 3.8.13


In [3]:
ray_head = "ray-head"
ray.init(address=f'ray://{ray_head}:10001')

0,1
Python version:,3.8.13
Ray version:,2.2.0
Dashboard:,http://172.21.0.5:8265


In [4]:
serve.start(detached=False, http_options={'host':"0.0.0.0", 'port':5010})
# serve.start(detached=True, http_options={'host':"0.0.0.0", 'port':5010}) #production


[2m[36m(ServeController pid=1067)[0m INFO 2023-04-05 06:57:56,381 controller 1067 http_state.py:129 - Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:qHVFWr:SERVE_PROXY_ACTOR-420d9d1091ca36a6e68aabdb533fc05558895484176a41c5d7ac871e' on node '420d9d1091ca36a6e68aabdb533fc05558895484176a41c5d7ac871e' listening on '0.0.0.0:5010'


<ray.serve._private.client.ServeControllerClient at 0x7f18307edc70>

In [5]:
import requests

requests.post("http://ray-head:5010/")

<Response [404]>

In [6]:
import requests

try:
    response = requests.get(f"http://{ray_head}:5010")
    #response.raise_for_status()
    print("Instancia empezada")
    #serve.shutdown()
except requests.exceptions.RequestException as e:
    print("Instancia no esta empezada. ", e)
    print("Desplegando el modelo:")
    serve.start(detached=True, http_options={'host':"0.0.0.0", 'port':5010}) # En produccion, ver bien el puerto

Instancia empezada


[2m[36m(HTTPProxyActor pid=1131)[0m INFO:     Started server process [1131]


## Server version 2: Ray + FastAPI

In [7]:
import fastapi

fastapi.__version__

'0.95.0'

In [8]:
import mlflow
import pandas as pd

from fastapi import FastAPI, Query
from pydantic import BaseModel#, Field
from fastapi.responses import HTMLResponse
from fastapi.responses import RedirectResponse


example_values = {
    "specimen_number": 1,
    "eccentricity": 0.86224,
    "aspect_ratio": 2.0735,
    "elongation": 0.52269,
    "solidity": 0.98686,
    "stochastic_convexity": 0.99474,
    "isoperimetric_factor": 0.70529,
    "maximal_indentation_depth": 0.010097,
    "lobedness": 0.018554,
    "average_intensity": 0.041404,
    "average_contrast": 0.12163,
    "smoothness": 0.014579,
    "third_moment": 0.0048689,
    "uniformity": 0.00027608,
    "entropy": 0.9458
}

class Input(BaseModel):
    specimen_number: float = Query(..., gt=0, example=example_values['specimen_number'])
    eccentricity: float = Query(..., gt=0, example=example_values['eccentricity'])
    aspect_ratio: float = Query(..., gt=0, example=example_values['aspect_ratio'])
    elongation: float = Query(..., gt=0, example=example_values['elongation'])
    solidity: float = Query(..., gt=0, example=example_values['solidity'])
    stochastic_convexity: float = Query(..., gt=0, example=example_values['stochastic_convexity'])
    isoperimetric_factor: float = Query(..., gt=0, example=example_values['isoperimetric_factor'])
    maximal_indentation_depth: float = Query(..., gt=0, example=example_values['maximal_indentation_depth'])
    lobedness: float = Query(..., gt=0, example=example_values['lobedness'])
    average_intensity: float = Query(..., gt=0, example=example_values['average_intensity'])
    average_contrast: float = Query(..., gt=0, example=example_values['average_contrast'])
    smoothness: float = Query(..., gt=0, example=example_values['smoothness'])
    third_moment: float = Query(..., gt=0, example=example_values['third_moment'])
    uniformity: float = Query(..., gt=0, example=example_values['uniformity'])
    entropy: float = Query(..., gt=0, example=example_values['entropy'])

    class Config:
        schema_extra = {
            "example": example_values
        }


app = FastAPI(title='Predictor API',
              description='Pipeline online inference')


@serve.deployment()
@serve.ingress(app)
# @serve.deployment()
class LeafDeployment:
    def __init__(self):
        
        # Inicializa los modelos. Los carga desde el MLflow.
        model_name = "extratree"
        model_stage = "Production"
        self.predictor = mlflow.sklearn.load_model(
                                model_uri=f"models:/{model_name}/{model_stage}")
        self.predictor2 = mlflow.sklearn.load_model(
                                model_uri=f"models:/{model_name}/13")
        self.predictor3 = mlflow.sklearn.load_model(
                                model_uri=f"models:/{model_name}/14")

    # Aqui van los metodos que tenga tu pipeline
    def preprocessing(self, df):
        with mlflow.start_run(run_name='preprocessing') as mlrun:
            df_cleaned = df.loc[:, df.columns != 'specimen_number'].copy()
            df_cleaned[df_cleaned.columns] = df_cleaned[df_cleaned.columns].astype(float)
            df_cleaned.to_csv('preprocessed_data.csv', index=False)
            mlflow.log_artifact('preprocessed_data.csv')

            # logging
            mlflow.log_param(key='n_samples', value=len(df_cleaned))
            mlflow.log_param(key='n_features', value=len(df_cleaned.columns))

            return df_cleaned

    @app.get('/', include_in_schema=False)
    async def docs_redirect(self):
        return RedirectResponse(url='/docs')

    @app.post("/predict",
         tags=['Predictor 1 por default'],
         summary="Usa el modelo 1 para la predicción")
    def call(self, request: Input):

        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor.predict(preprocessed)
        
        return result
    
    @app.post("/model2",
         tags=['Predictor 2'],
         summary="Usa el modelo 2 para la predicción")
    def call(self, request: Input):
    # async def __call__(self, request):
        # data = await request.json()
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor2.predict(preprocessed)
        return result
    
    @app.post("/model3",
         tags=['Predictor 3'],
         summary="Usa el modelo 3 para la predicción")
    def call(self, request: Input):

        # data = await request.json()
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor3.predict(preprocessed)
        return result
    
    @app.post("/model_voting",
         tags=['Voting method'],
         summary="Calcula la moda de las predicciones")
    def call(self, request: Input):
        from scipy import stats
        
        data = request.__dict__
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        predicts = [self.predictor.predict(preprocessed),
                    # 1 if max(preprocessed['monto']) > 100000 else 0,
                    self.predictor2.predict(preprocessed),
                    self.predictor3.predict(preprocessed)]
        result = stats.mode(predicts, keepdims=True).mode[0]
        return result
    
    @app.post("/checker_integrity_model",
         tags=['Checker Integrity + model'],
         summary="Ejecuta el checker de integridad y luego el modelo")
    def call(self, request: Input):
        from scipy import stats
        
        data = request.__dict__
        if data['entropy'] >= 10:
            return -1
        data = pd.json_normalize(data)
        preprocessed = self.preprocessing(data)
        result = self.predictor3.predict(preprocessed)
        return result
        
# LeafDeployment.deploy()
serve.run(LeafDeployment.bind())

The new client HTTP config differs from the existing one in the following fields: ['host', 'port', 'location']. The new HTTP config is ignored.
[2m[36m(ServeController pid=1067)[0m INFO 2023-04-05 06:58:11,192 controller 1067 deployment_state.py:1310 - Adding 1 replica to deployment 'LeafDeployment'.
[2m[36m(ServeReplica:LeafDeployment pid=1216)[0m Exception in 'lifespan' protocol
[2m[36m(ServeReplica:LeafDeployment pid=1216)[0m Traceback (most recent call last):
[2m[36m(ServeReplica:LeafDeployment pid=1216)[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/uvicorn/lifespan/on.py", line 86, in main
[2m[36m(ServeReplica:LeafDeployment pid=1216)[0m     await app(scope, self.receive, self.send)
[2m[36m(ServeReplica:LeafDeployment pid=1216)[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
[2m[36m(ServeReplica:LeafDeployment pid=1216)[0m     return await self.app(scope, receive, send)
[2m[36

RayServeSyncHandle(deployment='LeafDeployment')

# Cliente

In [9]:
import pandas as pd
import requests

filename = "X_inference.csv"
df = pd.read_csv(filename)
# request = df.head(1).to_dict('list')
request = df.head(1).to_dict('records')
request

[{'specimen_number': 1,
  'eccentricity': 0.86224,
  'aspect_ratio': 2.0735,
  'elongation': 0.52269,
  'solidity': 0.98686,
  'stochastic_convexity': 0.99474,
  'isoperimetric_factor': 0.70529,
  'maximal_indentation_depth': 0.010097,
  'lobedness': 0.018554,
  'average_intensity': 0.041404,
  'average_contrast': 0.12163,
  'smoothness': 0.014579,
  'third_moment': 0.0048689,
  'uniformity': 0.00027608,
  'entropy': 0.9458}]

## Llamar al modelo 1, por default

In [10]:
%%time

# response = requests.post("http://0.0.0.0:5010/predict", json=request[0])
response = requests.post("http://ray-head:5010/predict", json=request[0])
# response = requests.post("http://ray-head:5010/LeafDeployment", json=request)
result = response.json()[0]
result

JSONDecodeError: Expecting value: line 1 column 1 (char 0)

[2m[36m(HTTPProxyActor pid=869)[0m INFO 2023-04-03 10:00:03,427 http_proxy 172.18.0.4 http_proxy.py:361 - POST / 500 7.3ms
[2m[36m(HTTPProxyActor pid=869)[0m Task exception was never retrieved
[2m[36m(HTTPProxyActor pid=869)[0m future: <Task finished name='Task-12' coro=<_wrap_awaitable() done, defined at /home/ray/anaconda3/lib/python3.8/asyncio/tasks.py:688> exception=RayTaskError(TypeError)(TypeError("'NoneType' object is not callable"))>
[2m[36m(HTTPProxyActor pid=869)[0m Traceback (most recent call last):
[2m[36m(HTTPProxyActor pid=869)[0m   File "/home/ray/anaconda3/lib/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
[2m[36m(HTTPProxyActor pid=869)[0m     return (yield from awaitable.__await__())
[2m[36m(HTTPProxyActor pid=869)[0m ray.exceptions.RayTaskError(TypeError): [36mray::ServeReplica:LeafDeployment.handle_request()[39m (pid=963, ip=172.18.0.4)
[2m[36m(HTTPProxyActor pid=869)[0m   File "/home/ray/anaconda3/lib/python3.8/site-packages/ray

## Llamar al modelo 2

In [9]:
%%time

response = requests.post("http://ray-head:5010/model2", json=request[0])
# response = requests.post("http://ray-head:5010/LeafDeployment", json=request)
result = response.json()[0]
result

CPU times: user 9.97 ms, sys: 699 µs, total: 10.7 ms
Wall time: 98.7 ms


32

[2m[36m(HTTPProxyActor pid=341)[0m INFO 2022-11-08 10:19:24,620 http_proxy 172.22.0.4 http_proxy.py:315 - POST / 200 94.8ms
[2m[36m(ServeReplica:LeafDeployment pid=757)[0m INFO 2022-11-08 10:19:24,618 LeafDeployment LeafDeployment#tamUxz replica.py:505 - HANDLE __call__ OK 90.2ms


## Llamar al model_voting

In [22]:
%%time

response = requests.post("http://ray-head:5010/model_voting", json=request[0])
result = response.json()[0]
result

CPU times: user 5.25 ms, sys: 9.15 ms, total: 14.4 ms
Wall time: 167 ms


32

[2m[36m(HTTPProxyActor pid=341)[0m INFO 2022-11-08 11:42:53,742 http_proxy 172.22.0.4 http_proxy.py:315 - POST / 200 162.7ms
[2m[36m(ServeReplica:LeafDeployment pid=2305)[0m INFO 2022-11-08 11:42:53,740 LeafDeployment LeafDeployment#FytfAS replica.py:505 - HANDLE __call__ OK 159.1ms
[2m[36m(HTTPProxyActor pid=341)[0m INFO 2022-11-08 11:43:41,085 http_proxy 172.22.0.4 http_proxy.py:315 - POST / 200 112.0ms
[2m[36m(ServeReplica:LeafDeployment pid=2305)[0m INFO 2022-11-08 11:43:41,083 LeafDeployment LeafDeployment#FytfAS replica.py:505 - HANDLE __call__ OK 107.9ms


In [15]:
import requests
from starlette.requests import Request
from typing import Dict

from ray import serve


# 1: Define a Ray Serve deployment.
@serve.deployment(route_prefix="/")
class MyModelDeployment:
    def __init__(self, msg: str):
        # Initialize model state: could be very large neural net weights.
        self._msg = msg

    def __call__(self, request: Request) -> Dict:
        return {"result": self._msg}


# 2: Deploy the model.
serve.start(detached=False, http_options={'host':"0.0.0.0", 'port':5011})


The new client HTTP config differs from the existing one in the following fields: ['port']. The new HTTP config is ignored.


<ray.serve._private.client.ServeControllerClient at 0x7fdae5705940>

In [16]:
serve.run(MyModelDeployment.bind(msg="Hello world!"))

# 3: Query the deployment and print the result.
# print(requests.get("http://localhost:5011/").json())
print(requests.post("http://ray-head:5011/", json=request[0]))
# {'result': 'Hello world!'}

The new client HTTP config differs from the existing one in the following fields: ['host', 'port', 'location']. The new HTTP config is ignored.
[2m[36m(ServeController pid=806)[0m INFO 2023-04-03 10:06:59,554 controller 806 deployment_state.py:1214 - Stopping 1 replicas of deployment 'MyModelDeployment' with outdated versions.
[2m[36m(ServeController pid=806)[0m INFO 2023-04-03 10:07:01,710 controller 806 deployment_state.py:1310 - Adding 1 replica to deployment 'MyModelDeployment'.


ConnectionError: HTTPConnectionPool(host='ray-head', port=5011): Max retries exceeded with url: / (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x7fd9e1d78e50>: Failed to establish a new connection: [Errno 111] Connection refused'))