# BentoML
production ready machine learning

In real business cases, when we finished in training, tuning and picking up the best model  
How we can get to real people for example applying for a Loan etc.?  
![](./pic/1.png)

- One option is to create a web-service for the created model (for example with Flask) and upload it into Cloud to then interact with that service
- But how to make sure that our service is reliable enough to handle not 10 requests per second but 100 or even 1000 depending on use case?

In this project, we will see how to build ML models on scale:
- build and deploy ML service
- customize ML service to fit specific use case
- Make service **production ready**

### What is production ready?
1. Scalability
2. Operationally efficiency
    - being able to maintain service without spending too much time on that
3. Repeatability (CI/CD)
    - what if we need to update the model every week?
4. Flexibility
    - meet business requirements in changing conditions
5. Resiliency
    - we need to be able to easily get back to a stable version
6. Easy to use-ity

![](./pic/2.png)

- Bento - packing all the components of ML service into some sort of deployable unit

# Building Service

we are going to take Model that we selected in the previous module (Tree-based models), specifically XGBoost model and build service on it

In [4]:
#imports
import pandas as pd
import numpy as np

import xgboost as xgb
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction import DictVectorizer
from sklearn.metrics import roc_auc_score

In [5]:
#data loading and preparation:
df = pd.read_csv('./data/CreditScoring.csv')
df.columns = df.columns.str.lower()
# map target variable:
df['status'] = df['status'].map({
    1: 'ok',
    2: 'default',
    0: 'unk'
})
# map other features:
home_values = {
    1: 'rent',
    2: 'owner',
    3: 'private',
    4: 'ignore',
    5: 'parents',
    6: 'other',
    0: 'unk'
}
df['home'] = df['home'].map(home_values)
# matrital:
marital_values = {
    1: 'single',
    2: 'married',
    3: 'widow',
    4: 'separated',
    5: 'divorced',
    0: 'unk'
}
df['marital'] = df['marital'].map(marital_values)
# records:
records_values = {
    1: 'no',
    2: 'yes',
    0: 'unk'
}
df['records'] = df['records'].map(records_values)
#jobs:
job_values = {
    1: 'fixed',
    2: 'partime',
    3: 'freelance',
    4: 'others',
    0: 'unk'
}
df['job'] = df['job'].map(job_values)

for column in ['income', 'assets', 'debt']:
    df[column] = df[column].replace(to_replace=99999999, value=np.nan)
df = df.fillna(0)
df = df[df['status'] != 'unk'].reset_index(drop=True)

df_full_train, df_test = train_test_split(df, test_size=0.2, random_state=11)
df_train, df_val = train_test_split(df_full_train, test_size=0.25, random_state=11)
#reset_indexes:
df_full_train = df_full_train.reset_index(drop=True)
df_test = df_test.reset_index(drop=True)
df_train = df_train.reset_index(drop=True)
df_val = df_val.reset_index(drop=True)

# convert into binary format:
df_full_train['status'] = (df_full_train['status'] == 'default').astype(int)
df_train['status'] = (df_train['status'] == 'default').astype(int)
df_val['status'] = (df_val['status'] == 'default').astype(int)
df_test['status'] = (df_test['status'] == 'default').astype(int)

#assign target variables separately:
y_full_train = df_full_train['status'].values
y_train = df_train['status'].values
y_val = df_val['status'].values
y_test = df_test['status'].values

# remove target from dataset:
del df_full_train['status']
del df_train['status']
del df_val['status']
del df_test['status']

# turn data into Dictionaries to use One-hot encoding later
train_dicts = df_train.to_dict(orient='records')
val_dicts = df_val.to_dict(orient='records')
test_dicts = df_test.to_dict(orient='records')
dicts_full_train = df_full_train.to_dict(orient='records')

# train DictVectorizer:
dv = DictVectorizer(sparse=False)
dv.fit(train_dicts)
X_train = dv.transform(train_dicts)
X_val = dv.transform(val_dicts)
X_test = dv.transform(test_dicts)
X_full_train = dv.transform(dicts_full_train)

#matrix for xgboost
dfulltrain = xgb.DMatrix(X_full_train, label=y_full_train)
dtest = xgb.DMatrix(X_test)

In [6]:
# xgboost:
xgb_params = {
    'eta': 0.1,
    'max_depth': 3,
    'min_child_weight': 1,
    'objective': 'binary:logistic',
    'eval_metric': 'auc',
    'nthread': 8,
    'seed': 1,
    'verbosity': 1,
}
model = xgb.train(xgb_params, dfulltrain, num_boost_round=175) # final model
y_pred = model.predict(dtest)
auc = roc_auc_score(y_test, y_pred)
print(f'auc score = {auc}')

auc score = 0.8324067738624701


 #### How we can save and load this model later to be able to run api service on this model?  
1. One possible approach is to Pickle the model into pickle file and then load it into Flask application
    - The problem with that approach is that depending on the ML framework, there are might be specific things to do in order to save the model properly (even within different versions of framework they may recommend different ways of saving the model). Therefore, it is important to look into the documentation.
    - Bento ML allows to use simple method to save model, which does necessary steps for us

*pip install bentoml*

In [7]:
import bentoml

In [8]:
bentoml.xgboost.save_model("credit_risk_model", model,
                           custom_objects={
                                "dictVectorizer": dv
                           })

Model(tag="credit_risk_model:ypwmfnlaxwefgpcv", path="C:\Users\dein5\bentoml\models\credit_risk_model\ypwmfnlaxwefgpcv\")

what is done behind this 1-row code is:
- we are going through the process of saving the model in the way that is recommended
- we tag it with id, which is unique every time we call the method save model

Now we can create service on our model

- create a file *service.py*, with code for our service
- to load the model, we can use special method of BentoML: *bentoml.MLFramework.get* (in our case: *bentoml.xgboost.get()*)
    - this function gets: tag, that we just created,  (credit_risk_model:ysdfym3aecblspcv)

In [20]:
model_ref = bentoml.xgboost.get('credit_risk_model:latest')
model_ref

Model(tag="credit_risk_model:2th4ozdaxgkbopcv", path="C:\Users\dein5\bentoml\models\credit_risk_model\2th4ozdaxgkbopcv")

In [21]:
# to get access to that model:
model_runner = model_ref.to_runner()
model_runner

Runner(runnable_class=<class 'bentoml._internal.frameworks.xgboost.get_runnable.<locals>.XGBoostRunnable'>, runnable_init_params={}, name='credit_risk_model', models=[Model(tag="credit_risk_model:2th4ozdaxgkbopcv", path="C:\Users\dein5\bentoml\models\credit_risk_model\2th4ozdaxgkbopcv")], resource_config=None, runner_methods=[RunnerMethod(runner=..., name='predict', config=RunnableMethodConfig(batchable=False, batch_dim=(0, 0), input_spec=None, output_spec=None), max_batch_size=100, max_latency_ms=10000)], scheduling_strategy=<class 'bentoml._internal.runner.strategy.DefaultStrategy'>, _runner_handle=<bentoml._internal.runner.runner_handle.DummyRunnerHandle object at 0x0000023760800DD0>)

- runner - is BentoML abstraction for BentoML the model itself
    - it allows us to scale model, separately from the rest of the service
    - it is very useful for high performing scenarios
    - but also it is a way to access the model

In [22]:
# create our service:
svc = bentoml.Service('credit_risk_classifier', runners=[model_runner])
svc

bentoml.Service(name="credit_risk_classifier", runners=[credit_risk_model])

as we can see, to the Service we provide name of Service as well as list of models, which is useful if we have multiple models, BentoML would pack them all together

In [19]:
#from bentoml.io import JSON

# service endpoint:
@svc.api(input=JSON(), output=JSON())
def classify(application_data):
    prediction = model_runner.predict.run(application_data)
    return {"status": "Approved"}

NameError: name 'JSON' is not defined

- the model has the exact same methods as original model had
- the only difference is that instead if calling the predict directly, we need to use predict.run()
- this allows us to run prediction in couple different ways (which will help us to improve scalability of our service)

call bento ml serve:  
- *bentoml serve service.py:svc*

after we running the service script we can see that it is running locally with port 3000:  
![](./pic/3.png)

if we paste this URL to the browser (http://localhost:3000)  
we see Swager UI - automatically generated UI for open API spak

we can test our service with sample data:

In [11]:
sample_data = {
    "seniority": 3,
    "home": "owner",
    "time": 36,
    "age": 26,
    "marital": "single",
    "records": "no",
    "job": "freelance",
    "expenses": 35,
    "income": 0.0,
    "assets": 60000.0,
    "debt": 3000.0,
    "amount": 800,
    "price": 1000
}

there is an option "try it out" to test our service in Browser:  
after we execute service we get an error:  
  ![](./pic/4.png)  
  ![](./pic/5.png)

error message says: Not supported type for data: 'dict'  


this happens due to the fact that we did not pass dictionary data directly to our model  
we used DictVectorizer to transform our data into array with 1-hot encoding for categorical variables (the result was array)

in BentoML there is a special tool for using external modules like DictVectorizer for our services:
- for that we can pass additional parameter *custom_objects* when we save our model with BentoML:

In [None]:
import bentoml
bentoml.xgboost.save_model("credit_risk_model", model,
                           custom_objects={
                                "dictVectorizer": dv
                           })

Model(tag="credit_risk_model:s4s6rbdaxgsacpcv", path="C:\Users\dein5\bentoml\models\credit_risk_model\s4s6rbdaxgsacpcv\")

now we have new tag, and we will use this tag, since now it has DictVectorizer in it:

In [25]:
#from bentoml.io import JSON

model_ref = bentoml.xgboost.get('credit_risk_model:latest')
dv = model_ref.custom_objects['dictVectorizer']

model_runner = model_ref.to_runner()

svc = bentoml.Service("credit_risk_classifier", runners=[model_runner])

def classify(application_data):
    vector = dv.transform(application_data)
    prediction = model.predict(vector)
    return {"status": "Approved"}

classify(sample_data)

TypeError: ('Expecting data to be a DMatrix object, got: ', <class 'numpy.ndarray'>)

every time we change the code of the service, we would have to reload terminal where it is running  
to eliminate this we can use --reload option running service, so every time we change the code, it will automatically reload service

bentoml Serve  
- **bentoml serve service.py:svc --reload**  

- if port is busy: 
    - sudo kill -9 $(sudo lsof -t -i:3000)
    - fuser -k 3000/tcp

# Deploy BentoML Service

see the saved models with BentoML  
- *bentoml models list*

get info about model by it's tag:  
- *bentoml models get TAG*
![](./pic/6.png)

bentoml saves various information about the model, including version of framework, which is very important, since we need to make sure that the framework on which the model was trained is the same one that is used for deployment to eliminate any inconsistency

#### Build Bento unit
- we need to create a Bento file:
    - bentofile.yaml

In [1]:
service: "service.py:svc"
labels:
  owner: bentoml-team
  project: gallery
include:
- "*.py"
python:
  packages:
    - xgboost
    - sklearn

SyntaxError: invalid syntax (3002970879.py, line 2)

- service - entry point for bentoml serve
- labels - anything which is important for business purposes to understand what this project is about, what is involved
- include  
- exclude 
    - these are 2 sections that can be used to help to organize the project, it is especially important 
    - it helps to keep our Bento lightweight with just what we need


after that we simply call: 
- bentoml build

and bentoml will create a packed service

after that if we look into the bentos folder, we can see what was created by BentpML for us:  
![](./pic/7.png)

- even docker file was created automatically, however there is still an option to customize it
- in python section - there are requirements for the specific version of frameworks
- model - model itself (custom objects and metadata nad model)


so thanks to BentoML we have all the things required for ML service in one place, so that they all can be then containerized and put in any environment

build a Docker image
- bentoml containerize credit_risk_classifier:TAG

to look for all docker images that we build:
- docker images

now when we have our docker image, we can run it:
- docker run -it --rm -p 3000:3000 containerize credit_risk_classifier:mifjnuda4geoeaav

and when docker container is running we can communicate with that service on localhost:3000

# Validating Data

previously we were able to interact with service running from docker container

but what if we got input data with the following errors:
- missing one feature
- random name of the field

! the problem is that service is not actually fails: it gives us some usual results, and this might be even worse than fail, since we have no idea that our input data were corrupted

- therefore, we want to Fail, if input data does not look right

### Pydantic

this is the place where Pydantic library comes into play

we need to change our service python file:

In [1]:
from pydantic import BaseModel

BaseModel is the class of that library that we are going to extend to create a **Data Schema**

In [2]:
class CreditApplication(BaseModel):
    seniority: int
    home: str
    age: int
    '...'

In [3]:
class CreditApplication(BaseModel):
    seniority: int
    home: str
    time: int
    age: int
    marital: str
    records: str
    job: str
    expenses: int
    income: float
    assets: float
    debt: float
    amount: int
    price: int

and the way how we may make sure that our data is validated is by passing our data model to the JSON input function

In [4]:
# @svc.api(input=JSON(pydantic_model=CreditApplication), output=JSON())

and since it is no longer plain json object (now it is class CreditApplication), we now need to transform our object of CreditApplication class

and now, if we pass wrong data (missing values ect) we would get error:    
which says that input data is wrong
![](./pic/8.png)

# High Performance Services

- we will test service with high volume of traffic and
- and then optimize service

- **pip install locust**
this library can provide traffic to our service to test it

we need to create so called *locustfile.py*

it contains sample of the data that we want to send   
it is similar to swagger ui, but locust will send 100, 1000 and even more requests per second

after creating locust file, we can run in terminal using one of these commands:
- locust -H http://localhost:3000,  
             in case if all requests failed then load client with
- locust -H http://localhost:3000 -f locustfile.py

Open browser at http://0.0.0.0:8089 (localhost:8089), adjust desired number of users and spawn
        rate for the load test from the Web UI and start swarming

![](./pic/9.png)

here we can choose how many users and how ofter (times per second) will make requests to out service

it is also important to remember that the default BentoML setting for failure is if request takes longer than 1 second (it can be adjusted)

![](./pic/10.png)

after increasing number of users = 1000 and frequency of requests = 10, on number of users = 810 we start to see that service starts to fail, therefore if we expect such high demand on our service, we should optimize it

## Optimization

- async-await optimization  
by default all requests in BentoML service are in que, meaning that BentoML service receives the request, services it and only after that comes to next request  
what async allows service to do - process these requests in parallel

to implement it we need to modify *service.py* file

add async before def classify  
and add await before model_runner.predict  
also change predict method from predict.run to predict.async_run

In [None]:
@svc.api(input=JSON(pydantic_model=CreditApplication), output=JSON()) # decorate endpoint as in json format for input and output
async def classify(credit_application):
    application_data = credit_application.dict()
    vector = dv.transform(application_data)
    prediction = await model_runner.predict.async_run(vector)

- it allows to parallelize at endpoint level (same like in frameworks like FastAPI which allows to parallelize on endpoint level)
- BentoML gives us an additional feature of being able to parallelize at the **inference level** by calling **async_run** rather that run  
    - this means that we still can call our method (predict), but now we are able to call it in a different ways (async_run or just a normal run)
    - it also allows us to fan out in more creative ways when we come to distributed on several machines services

![](./pic/11.png)

after implementing async functionality, service did not fail even with 1000 users and 1000 requests per second

Usually, it is better to run traffic generator on the other machine than the one which serves ML Service

2. Optimization - Fans out the processes for the service (**micro-batching**):  
    - when we run our service, we create a Process, and therefore, only 1 CPU (Central Processing Unit) is able to run it
    - to have advantage of using several CPUs we need to create several processes  
    - we can replicate our service and run several processes at a time, so when the requests are coming they will be able to go to different CPU cores
      
      ![](./pic/12.png)

however, there are several problems with this approach in ML:
- if our model is big enough (GB) we can only copy till we run out of memory

it is much much more efficient to give several input to the model at the time  
![](./pic/13.png)

if we can combine requests into batches, and then send them into the model, we can have huge efficiency boost (micro-batching)  


for doing this, we need to come to script, where we saved the model itself:  
train_save_model.py file

In [None]:
bentoml.xgboost.save_model("credit_risk_model", model,
                           custom_objects={
                                "DictVectorizer": dv
                           },
                           signatures = { # models signatures for runner inference
                            'predict': {
                                'batchable': True,
                                'batch_dim': 0 # 0 means BentoML will concatenate request
                            }
                           }
                           )

we can provide signatures features for save model function, where we can specify parameters of Batching

we save the new model, and change serve.py script to serve_batches.py (provide new tag for model: credit_risk_model:2xqrm5tfrkx3yaav)

then, to serve this model, we will need to use -- production flag, which tells machine that we want more then just 1 processors for our workers:  
- **bentoml serve --production**  
- bentoml serve service_async_batches.py:svc --reload --production


there are 2 most important parameters considering batching:
- Max Batch Size, we can say: do not pack together more than 100 request
- Max Latency - we can say do not wait longer than 5 ms (milliseconds), before sending request to the model

and we can configure these parameters by creating: *bentoconfiguration.yaml* file: