<a href="https://colab.research.google.com/github/timsetsfire/odsc-ml-drum/blob/main/Colab%20-%20DRUM%20Model%20Serving.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# DRUM - Automated Model Serving Made Easy

 We'll get our hands dirty by 

* Building a simple regression model using Scikit
* Using DRUM for Batch Scoring
* Using DRUM to get a REST API endpoint
* Show a simple example app connected to the REST API
* H2O, Keras, XGBoost, and DataRobot
* Add a DataRobot remote agent if you are interested in further model monitoring


## Build a Model

In [2]:
!git clone https://github.com/timsetsfire/odsc-ml-drum.git

fatal: destination path 'odsc-ml-drum' already exists and is not an empty directory.


In [3]:
!pip install -r /content/odsc-ml-drum/colab-requirements.txt -q

[K     |████████████████████████████████| 276kB 19.0MB/s 
[K     |████████████████████████████████| 8.7MB 47.5MB/s 
[K     |████████████████████████████████| 276kB 44.4MB/s 
[K     |████████████████████████████████| 148.9MB 53kB/s 
[K     |████████████████████████████████| 61kB 10.4MB/s 
[K     |████████████████████████████████| 153kB 55.9MB/s 
[K     |████████████████████████████████| 204kB 55.7MB/s 
[K     |████████████████████████████████| 788kB 48.9MB/s 
[K     |████████████████████████████████| 51kB 8.1MB/s 
[K     |████████████████████████████████| 204kB 44.8MB/s 
[K     |████████████████████████████████| 808kB 52.1MB/s 
[K     |████████████████████████████████| 112kB 61.3MB/s 
[K     |████████████████████████████████| 552kB 56.0MB/s 
[?25h  Building wheel for PyYAML (setup.py) ... [?25l[?25hdone
  Building wheel for memory-profiler (setup.py) ... [?25l[?25hdone
  Building wheel for progress (setup.py) ... [?25l[?25hdone
  Building wheel for strictyaml (setup.

In [9]:
!sudo apt install nginx

Reading package lists... Done
Building dependency tree       
Reading state information... Done
The following additional packages will be installed:
  geoip-database iproute2 libatm1 libgeoip1 libmnl0 libnginx-mod-http-geoip
  libnginx-mod-http-image-filter libnginx-mod-http-xslt-filter
  libnginx-mod-mail libnginx-mod-stream libxtables12 nginx-common nginx-core
Suggested packages:
  iproute2-doc geoip-bin fcgiwrap nginx-doc ssl-cert
The following NEW packages will be installed:
  geoip-database iproute2 libatm1 libgeoip1 libmnl0 libnginx-mod-http-geoip
  libnginx-mod-http-image-filter libnginx-mod-http-xslt-filter
  libnginx-mod-mail libnginx-mod-stream libxtables12 nginx nginx-common
  nginx-core
0 upgraded, 14 newly installed, 0 to remove and 14 not upgraded.
Need to get 3,544 kB of archives.
After this operation, 11.8 MB of additional disk space will be used.
Get:1 http://archive.ubuntu.com/ubuntu bionic/main amd64 libmnl0 amd64 1.0.4-2 [12.3 kB]
Get:2 http://archive.ubuntu.com/ubu

In [4]:
import pandas as pd
from sklearn.ensemble import RandomForestRegressor
import pickle
import datetime

## load data

df = pd.read_csv('/content/odsc-ml-drum/data/boston_housing.csv')
df.head()

## set features and target

X = df.drop('MEDV', axis=1)
y = df['MEDV']

## train the model
rf = RandomForestRegressor(n_estimators = 20)
rf.fit(X,y)

## serialize the model

with open('/content/odsc-ml-drum/src/custom_model/rf.pkl', 'wb') as pkl:
    pickle.dump(rf, pkl)

# Batch Scoring with DRUM
<a id="setup_complete"></a>

At this point our model has been written to disk and we want to start making predictions with it.  To do this, we'll leverage DRUM and it's ability to natively handle our scikit learn model, all we need to do is tell DRUM where it resides as well as the data we wish to score.  

There are a lot of frameworks which DRUM supports nateively, but for those which DRUM doesn't support of these shelf, we'll just need to create some custom hooks so DRUM.  In this example, we'll highlight some very simple custom hooks, and will provide links to more complex examples.  

In [6]:
%%sh 
drum score --code-dir /content/odsc-ml-drum/src/custom_model \
--input /content/odsc-ml-drum/data/boston_housing_inference.csv \
--output /content/odsc-ml-drum/data/predictions.csv --target-type regression

2020-12-03 23:19:27.029582: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
  defaults = yaml.load(f)


In [8]:
pd.read_csv("/content/odsc-ml-drum/data/predictions.csv").head()

Unnamed: 0,Predictions
0,26.02
1,22.81
2,34.805
3,32.785
4,35.825


# Start the inference server locally

Batch scoring can be very useful, but the utility DRUM offers does not stop there.  We can also leverage DRUM to serve our model as a RESTful API endpoint.  The only thing that changes is the way we will structure the command - using the `server` mode instead of `score` model.  We'll also need to provide an address which is NOT in use.  

When starting the server, we'll use `subprocess.Popen` so we may interact with the server in this notebook

In [10]:
import subprocess
import requests
import pandas as pd
from io import BytesIO
import yaml
import time
import os
import datarobot as dr
from pprint import pprint

In [11]:
run_inference_server = ["drum",
              "server",
              "--code-dir","/content/odsc-ml-drum/src/custom_model", 
              "--address", "0.0.0.0:6789", 
              "--show-perf",
              "--target-type", "regression",
              "--logging-level", "info",
              "--show-stacktrace",
              "--verbose",
              "--production", 
              "--max-workers", "5"
              ]

In [39]:
inference_server = subprocess.Popen(run_inference_server, stdout=subprocess.PIPE)

In [42]:
!sudo service nginx status

 * nginx is running


## Ping the Server to make sure it is running

In [43]:
## confirm the server is running
time.sleep(5) ## snoozing before pinging the server to give it time to actually start
print('check status')
requests.request("GET", "http://0.0.0.0:6789/").content

check status


b'{"message": "OK"}'

## Send data to server for inference

The request must provide our dataset as form data.  In order to do so, we'll create a simple python function to pass the data over appropriately.  We'll leverage the same function in our simple flask app a little later.  

In [44]:
def score(data, port = "6789"):
    b_buf = BytesIO()
    b_buf.write(data.to_csv(index=False).encode("utf-8"))
    b_buf.seek(0)
  
    url = "http://localhost:{}/predict/".format(port)
    files = [
        ('X', b_buf)
    ]
    response = requests.request("POST", url, files = files, timeout=None, verify=False)
    return response

In [45]:
# %%timeit
scoring_data = pd.read_csv("/content/odsc-ml-drum/data/boston_housing_inference.csv")
predictions = score(scoring_data).json() ## score entire dataset but only show first 5 records
pprint(predictions)

{'predictions': [26.02,
                 22.81,
                 34.805,
                 32.785,
                 35.825,
                 26.335,
                 21.78,
                 25.885,
                 16.16]}


In [46]:
requests.request("GET", "http://0.0.0.0:6789/").content

b'{"message": "OK"}'

In [47]:
inference_server.terminate()
inference_server.stdout.readlines()

[b'Name: uWSGI\n',
 b'Version: 2.0.19.1\n',
 b'Summary: The uWSGI server\n',
 b'Home-page: https://uwsgi-docs.readthedocs.io/en/latest/\n',
 b'Author: Unbit\n',
 b'Author-email: info@unbit.it\n',
 b'License: GPLv2+\n',
 b'Location: /usr/local/lib/python3.6/dist-packages\n',
 b'Requires: \n',
 b'Required-by: mlpiper\n',
 b'Detected REST server mode - this is an advanced option\n',
 b'\x1b[32m \x1b[0m\n',
 b'\x1b[32m \x1b[0m\n',
 b'\x1b[32mComponent: uwsgi_serving\x1b[0m\n',
 b'\x1b[32mLanguage:  Python\x1b[0m\n',
 b'\x1b[32mOutput:\x1b[0m\n',
 b'\x1b[32m------------------------------------------------------------\x1b[0m\n']

In [48]:
%%sh
nginx -s stop
sudo service nginx status

 * nginx is not running


In [34]:
# requests.request("POST", "http://0.0.0.0:6789/shutdown/").content

## Value Prop

One may ask, what is the benefit to be had here?  Well, first of, there is not need for me to write an api to get the model up and running.  Second, DRUM allows me to abstract the framework away (provided I'm using one that is natively supported, or I can write enough python so that DRUM understands how to hook up to the model.  

For example, I could hot swap models as I see fit (see exampels in `./src/other_models`)

While we will run through several other frameworks with in `score` you can bet they are supported in `server` mode as well!

#### H2O Mojo

In [None]:
!drum score --code-dir /content/odsc-ml-drum/src/other_models/h2o_mojo/regression --input /content/odsc-ml-drum/data/boston_housing_inference.csv --target-type regression


SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
   Predictions
0    24.504000
1    22.492000
2    34.554001
3    34.420001
4    35.289001
5    28.394001
6    21.936000
7    23.451000
8    17.065000


#### Keras

In [None]:
!drum score --code-dir /content/odsc-ml-drum/src/other_models/python3_keras_joblib --input /content/odsc-ml-drum/data/boston_housing_inference.csv --target-type regression


2020-12-02 00:52:35.879918: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-12-02 00:52:37.171689: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcuda.so.1
2020-12-02 00:52:37.240518: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:982] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2020-12-02 00:52:37.241161: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1716] Found device 0 with properties: 
pciBusID: 0000:00:04.0 name: Tesla T4 computeCapability: 7.5
coreClock: 1.59GHz coreCount: 40 deviceMemorySize: 14.73GiB deviceMemoryBandwidth: 298.08GiB/s
2020-12-02 00:52:37.241230: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
2020-12-02 00:52:37.463018: I tensorflow/stream_executor/platform/default

#### XGBoost

Requires XGBoost

In [None]:
!drum score --code-dir /content/odsc-ml-drum/src/other_models/python3_xgboost --input /content/odsc-ml-drum/data/boston_housing_inference.csv --target-type regression


  defaults = yaml.load(f)
2020-12-02 00:52:46.610748: I tensorflow/stream_executor/platform/default/dso_loader.cc:48] Successfully opened dynamic library libcudart.so.10.1
   Predictions
0    24.541843
1    21.260277
2    34.018497
3    32.569200
4    34.248066
5    27.282364
6    20.803959
7    19.645220
8    16.968880


#### DataRobot Codegen

In [None]:
!drum score --code-dir /content/odsc-ml-drum/src/other_models/dr_codegen --input /content/odsc-ml-drum/data/boston_housing_inference.csv --target-type regression


   Predictions
0    24.258228
1    24.258228
2    32.451515
3    32.451515
4    32.451515
5    24.258228
6    21.078378
7    13.107812
8    13.107812


# Monitoring Deployments

What follows will require a DataRobot account.  You can get a trial account at [https://www.datarobot.com/trial/](https://www.datarobot.com/trial/)

Also, JDK 11 or 12 will be required.

The main idea: we'll will start an agent service locally.  This agent will be monitoring a spooler.  The spooler could be something as simple as local file system, or a little more realistic like a message broker (pubsub, rabbitmq, sqs).  

Once, this agent is spun up locally, we'll enable a few environment variables to let DRUM know that there is an agent present and that it needs to buffer data to defined spool.  

## Getting the monitoring agents



Currently - have to go in through the [UI](https://app2.datarobot.com/account/developer-tools) to grab the agents 

In [None]:
token = "token"
endpoint = "https://app2.datarobot.com"
## connect to DataRobot platform with python client. 
client = dr.Client(token, "{}/api/v2".format(endpoint))
# mlops_agents_tb = client.get("mlopsInstaller")
# with open("/content/odsc-ml-drum/mlops-agent.tar.gz", "wb") as f:
#     f.write(mlops_agents_tb.content)

In [None]:
# !tar -xf /content/mlops-agent.tar.gz -C ..
!tar -xf /content/datarobot-mlops-agent-6.2.4-399.tar.gz -C .

## Configuring the Agent

When we'll configure the agent, we just need to define the DataRobot MLOPS location, our api token.  By default, the agent will expect the data to be spooled on the local file system.  Specifically, the default location will be `/tmp/ta` so we just need to make sure that location exists

In [None]:
!mkdir -p /tmp/ta

In [None]:
agents_dir = "/content/datarobot-mlops-agent-6.2.4"
with open(r'{}/conf/mlops.agent.conf.yaml'.format(agents_dir)) as file:
    documents = yaml.load(file, Loader=yaml.FullLoader)
## configure the loaction of the mlops instance with which we'll communcate
documents['mlopsUrl'] = endpoint
# Set your API token
documents['apiToken'] = token
## write the configuration back to disk
with open('../{}/conf/mlops.agent.conf.yaml'.format(agents_dir), "w") as f:
    yaml.dump(documents, f)

## Start the Agent Service

Checking to make sure we can start up the agents service.  

This will require a JDK - tested with 11 and 12

In [None]:
## run agents service
subprocess.call("{}/bin/start-agent.sh".format(agents_dir))

0

In [None]:
## check status
check = subprocess.Popen(["../{}/bin/status-agent.sh".format(agents_dir)], stdout=subprocess.PIPE)
print(check.stdout.readlines())
check.terminate()

[b'DataRobot MLOps-Agent is running as a service.\n']


In [None]:
## check log to see that the agent connected to DR MLOps
check = subprocess.Popen(["cat", "../{}/logs/mlops.agent.log".format(agents_dir)], stdout=subprocess.PIPE)
for line in check.stdout.readlines():
    print(line)
check.terminate()

b'2020-11-16 19:01:02,449 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Found spooler of type FILESYSTEM\n'
b'2020-11-16 19:01:02,452 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Setting directory = /tmp/ta\n'
b'2020-11-16 19:01:02,453 INFO  com.datarobot.mlops.agent.config.channels.YamlBuilder        [] - Setting CHANNEL_NAME = filesystem\n'
b"2020-11-16 19:01:03,699 INFO  com.datarobot.mlops.common.client.MLOpsClient                [] - DataRobot Server API Version found: '2.22'\n"
b"2020-11-16 19:01:04,258 INFO  com.datarobot.mlops.common.client.MLOpsClient                [] - DataRobot Server API Version found: '2.22'\n"
b"2020-11-16 19:01:04,259 INFO  com.datarobot.mlops.agent.Agent                              [] - DataRobot server at 'https://app2.datarobot.com' is reachable.\n"
b'2020-11-16 19:01:04,259 INFO  com.datarobot.mlops.agent.Agent                              [] - DataRobot Monitoring Agent will process 100 records 

## DataRobot MLOps - Deploying External Model 

To communication with DataRobot MLOps, with need to MLOps python client installed which came in the downloaded tarball

In [None]:
!pip install /content/datarobot-mlops-*/lib/datarobot_mlops-*.whl -q

[K     |████████████████████████████████| 112kB 13.6MB/s 
[K     |████████████████████████████████| 133kB 23.5MB/s 
[K     |████████████████████████████████| 5.9MB 18.5MB/s 
[K     |████████████████████████████████| 204kB 55.4MB/s 
[K     |████████████████████████████████| 71kB 9.9MB/s 
[K     |████████████████████████████████| 552kB 48.3MB/s 
[31mERROR: datascience 0.10.6 has requirement folium==0.2.1, but you'll have folium 0.8.3 which is incompatible.[0m
[?25h

In [None]:
from datarobot.mlops.mlops import MLOps
from datarobot.mlops.common.enums import OutputType
from datarobot.mlops.connected.client import MLOpsClient
from datarobot.mlops.common.exception import DRConnectedException
from datarobot.mlops.constants import Constants

In [None]:
DEPLOYMENT_NAME="Boston Housing Prices PGH Data Science Meetup"
TRAINING_DATA = '/content/odsc-ml-drum/data/boston_housing.csv'

In [None]:
model_info = {
        "name": "Boston Housing Pricins",
        "modelDescription": {
            "description": "prediction price of home"
        },
        "target": {
            "type": "Regression",
            "name": "MEDV",
        }
}

In [None]:
# Create connected client
mlops_client = MLOpsClient(endpoint, token)

# Add training_data to model configuration
print("Uploading training data - {}. This may take some time...".format(TRAINING_DATA))
dataset_id = mlops_client.upload_dataset(TRAINING_DATA)
print("Training dataset uploaded. Catalog ID {}.".format(dataset_id))
model_info["datasets"] = {"trainingDataCatalogId": dataset_id}

# Create the model package
print('Create model package')
model_pkg_id = mlops_client.create_model_package(model_info)
model_pkg = mlops_client.get_model_package(model_pkg_id)
model_id = model_pkg["modelId"]

# Deploy the model package
print('Deploy model package')
deployment_id = mlops_client.deploy_model_package(model_pkg["id"],
                                                            DEPLOYMENT_NAME)

# Enable data drift tracking
print('Enable feature drift')
enable_feature_drift = TRAINING_DATA is not None
mlops_client.update_deployment_settings(deployment_id, target_drift=True,
                                                  feature_drift=enable_feature_drift)
_ = mlops_client.get_deployment_settings(deployment_id)

print("\nDone.")
print("DEPLOYMENT_ID=%s, MODEL_ID=%s" % (deployment_id, model_id))

DEPLOYMENT_ID = deployment_id
MODEL_ID = model_id

Uploading training data - /content/odsc-ml-drum/data/boston_housing.csv. This may take some time...
Training dataset uploaded. Catalog ID 5fb2cc94e3a7e9072ed463fa.
Create model package
Deploy model package
Enable feature drift

Done.
DEPLOYMENT_ID=5fb2ccba6a2cd70255b0fa2c, MODEL_ID=5fb2ccb82133930df77dea02


In [None]:
from IPython.core.display import display, HTML
link = "{}/deployments/{}/overview".format(endpoint,deployment_id)
# display(HTML("""<a href="{link}">{link}</a>""".format( link=link )))
print(link)

https://app2.datarobot.com/deployments/5fb2ccba6a2cd70255b0fa2c/overview


# Adding Monitoring with MLOps Monitoring Agents

## Monitoring With DRUM

There are a few addition parameters we should set for the command line utility, or we may just create environment variables, and allow the drum utility to pick up the details from there.  

```
  --monitor             Monitor predictions using DataRobot MLOps. True or
                        False. (env: MONITOR).Monitoring can not be used in
                        unstructured mode.
  --deployment-id DEPLOYMENT_ID
                        Deployment id to use for monitoring model predictions
                        (env: DEPLOYMENT_ID)
  --model-id MODEL_ID   MLOps model id to use for monitoring predictions (env:
                        MODEL_ID)
  --monitor-settings MONITOR_SETTINGS
                        MLOps setting to use for connecting with the MLOps
                        Agent (env: MONITOR_SETTINGS)
```
For today, we'll set environment variables to add monitoring. 


In [None]:
os.environ["MONITOR"] = "True"
os.environ["DEPLOYMENT_ID"] = deployment_id
os.environ["MODEL_ID"] = model_id
os.environ["MONITOR_SETTINGS"] = "spooler_type=filesystem;directory=/tmp/ta;max_files=5;file_max_size=1045876000"

In [None]:
run_inference_server = ["drum",
              "server",
              "--code-dir","/content/odsc-ml-drum/src/custom_model", 
              "--address", "0.0.0.0:43210", 
              "--show-perf",
              "--target-type", "regression",
              "--logging-level", "info",
              "--show-stacktrace",
#               "--verbose"
              ]

In [None]:
inference_server_with_monitoring = subprocess.Popen(run_inference_server, stdout=subprocess.PIPE)

In [None]:
predictions = score(
    pd.read_csv("/content/odsc-ml-drum/data/boston_housing.csv").drop(["MEDV"],axis=1).head(100),
    "43210")

In [None]:
pd.DataFrame(predictions.json()).head()

Unnamed: 0,predictions
0,26.345
1,22.18
2,34.64
3,33.845
4,35.32


In [None]:
requests.post("http://localhost:43210/shutdown/").content

b'Server shutting down...'

In [None]:
subprocess.call("../{}/bin/stop-agent.sh".format(agents_dir))

0

In [None]:
## check that agent is stopped 
check = subprocess.Popen(["../{}/bin/status-agent.sh".format(agents_dir)], stdout=subprocess.PIPE)
print(check.stdout.readlines())
check.terminate()

[b'DataRobot MLOps-Agent is not running as a service.\n']


In [None]:
deployment = dr.Deployment.get(deployment_id)
deployment.get_service_stats()

ServiceStats(5fb2ccb82133930df77dea02 | 2020-11-09 20:00:00+00:00 - 2020-11-16 20:00:00+00:00)

In [None]:
service_stats = deployment.get_service_stats()
service_stats.metrics

{'cacheHitRatio': 0,
 'executionTime': 15.6660079956055,
 'medianLoad': 0,
 'numConsumers': 1,
 'peakLoad': 1,
 'responseTime': 0,
 'serverErrorRate': 0,
 'slowRequests': 0,
 'totalPredictions': 100,
 'totalRequests': 1,
 'userErrorRate': 0}