In [1]:
# Copyright 2022 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

<img src="http://developer.download.nvidia.com/compute/machine-learning/frameworks/nvidia_logo.png" style="width: 90px; float: right;">

This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container. This Jupyter notebook example demonstrates how to deploy an `XGBoost` model to Triton Inference Server (TIS) and generate prediction results for a given query.

To begin, we download the `MovieLens 100k Dataset` and train an `XGBoost` model to predict a rating a user is likely to give to a movie.

In this notebook we will focus on deploying our model and will breeze through data preprocessing and the training of the model. 

If you would like to learn more about training an `XGBoost` model using the Merlin Framework, please consult a tutorial available [here](https://github.com/NVIDIA-Merlin/models/blob/main/examples/07-Train-an-xgboost-model-using-the-Merlin-Models-API.ipynb).

Let's begin by downloading the data.

# Download Data

In [2]:
from merlin.core.utils import Distributed
from merlin.models.xgb import XGBoost
import nvtabular as nvt
import numpy as np

from merlin.datasets.entertainment import get_movielens

train, _ = get_movielens(variant='ml-100k')

2022-08-02 09:50:39.952360: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:952] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-02 09:50:39.952868: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:952] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2022-08-02 09:50:39.953008: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:952] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero


# Data Preprocessing

Let us now preprocess our data.

We capture the preprocessing steps in a workflow and will be able to reuse them for preprocessing incoming requests during serving.

In [3]:
preprocessing_steps = ['movieId', 'userId', 'genres'] >> nvt.ops.Categorify(freq_threshold=2, dtype=np.int32)

train_preprocessing_workflow = nvt.Workflow(preprocessing_steps + train.schema.remove_col('rating_binary').remove_col('title').column_names)
train_transformed = train_preprocessing_workflow.fit_transform(train)

# Train an XGBoost Model

In [4]:
with Distributed():
    model = XGBoost(schema=train_transformed.schema)
    model.fit(
        train_transformed,
        num_boost_round=85,
        verbose_eval=20
)

2022-08-02 09:50:42,015 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
[09:50:43] task [xgboost.dask]:tcp://127.0.0.1:45235 got new rank 0


[0]	train-rmse:2.36952
[20]	train-rmse:0.95316
[40]	train-rmse:0.92447
[60]	train-rmse:0.90741
[80]	train-rmse:0.89437
[84]	train-rmse:0.89138


# Create the Ensemble Graph

Let us now define an `Ensemble` that will be used for serving predictions on the Triton Inference Server.

An `Ensemble` defines operations to be performed on incoming requests. It begins with specifying the input schema (fields that the inference request will contain).

Our model was trained on data that included the `target` column. However, in production, this information will not be available to us. Let us modify the schema to reflect this.

In [5]:
inference_schema = train_preprocessing_workflow.input_schema.remove_col('rating')

In general, you want to define a preprocessing workflow once and apply it throughout the lifecycle of your model, from training all the way to serving in production. Redifining the workflows on the go, or using custom written code for these operations, can be a source of subtle bugs.

In order to ensure we process our data in the same way in production as we do in training, let us now modify the training preprocessing pipeline and use it to construct our inference workflow.

In [6]:
inference_preprocessing_workflow = train_preprocessing_workflow.remove_inputs(['rating'])

Equipped with the modified data preprocessing workflow, let us define the full set of inference operations we will want to run on the Triton inference server.

We begin by stating what data the server can expect (`inference_schema`). We proceed to wrap our `inference_preprocessing_workflow` in `TransformWorkflow` -- an operator we can leverage to executing our workflow during serving.

Last but not least, having received and preprocessed the data, we instruct the Triton inference server to perform inference using the model that we trained. 

In [7]:
from merlin.systems.dag.ops.fil import PredictForest
from merlin.systems.dag.ensemble import Ensemble
from merlin.systems.dag.ops.workflow import TransformWorkflow

inference_ops = inference_schema.column_names >> TransformWorkflow(inference_preprocessing_workflow) \
                    >> PredictForest(model.booster, inference_preprocessing_workflow.output_schema)

With inference operations defined, all that remains now is outputting the ensemble to disk so that it can be loaded up when Triton starts.

In [8]:
ensemble = Ensemble(inference_ops, inference_schema)
ensemble.export('ensemble');

# Starting the Triton Inference Server

We now are ready to start the Triton Inference Server. We do so in a subprocess in order to continue our exploration in this notebook.

In [9]:
import subprocess
subprocess.Popen(["tritonserver", "--model-repository=ensemble"])

<subprocess.Popen at 0x7fa1b031e8e0>

I0802 09:50:45.025498 1057 pinned_memory_manager.cc:240] Pinned memory pool is created at '0x7f05a6000000' with size 268435456
I0802 09:50:45.025823 1057 cuda_memory_manager.cc:105] CUDA memory pool is created on device 0 with size 67108864
I0802 09:50:45.027726 1057 model_repository_manager.cc:1191] loading: 1_fil:1
I0802 09:50:45.127882 1057 model_repository_manager.cc:1191] loading: 0_transformworkflow:1
I0802 09:50:45.137409 1057 initialize.hpp:43] TRITONBACKEND_Initialize: fil
I0802 09:50:45.137421 1057 backend.hpp:47] Triton TRITONBACKEND API version: 1.9
I0802 09:50:45.137424 1057 backend.hpp:52] 'fil' TRITONBACKEND API version: 1.9
I0802 09:50:45.137694 1057 model_initialize.hpp:37] TRITONBACKEND_ModelInitialize: 1_fil (version 1)
I0802 09:50:45.138704 1057 instance_initialize.hpp:46] TRITONBACKEND_ModelInstanceInitialize: 1_fil_0 (GPU device 0)
I0802 09:50:45.156972 1057 model_repository_manager.cc:1345] successfully loaded '1_fil' version 1
I0802 09:50:45.228012 1057 model_re

The server is now running. We have pointed it to where our ensemble resides on disk via specifying the `--model-repository` parameter.

Our inference pipeline has now been loaded onto the server and it is ready to receive inference requests.

Let us issue a request and verify the results.

We begin by obtaining 10 examples from our train data.

In [10]:
ten_examples = train.compute().drop(columns=['rating', 'rating_binary', 'title'])[:10]
ten_examples.head()

Unnamed: 0,movieId,userId,genres,TE_movieId_rating,userId_count,gender,zip_code,age
0,7,77,43,0.779876,5.572154,1,77,1
1,231,77,13,-0.896619,5.572154,1,77,1
2,366,77,17,-0.954632,5.572154,1,77,1
3,96,77,89,-0.093809,5.572154,1,77,1
4,383,77,25,-0.539376,5.572154,1,77,1


Now let's package the information up as inputs and send it to Triton for inference.

In [11]:
from merlin.systems.triton import convert_df_to_triton_input
import tritonclient.grpc as grpcclient

ten_examples = train.compute().drop(columns=['rating', 'title', 'rating_binary'])[:10]
inputs = convert_df_to_triton_input(inference_schema.column_names, ten_examples, grpcclient.InferInput)

outputs = [
    grpcclient.InferRequestedOutput(col)
    for col in inference_ops.output_schema.column_names
]
# send request to tritonserver
with grpcclient.InferenceServerClient("localhost:8001") as client:
    response = client.infer("ensemble_model", inputs, outputs=outputs)

Let us now look at the results and compare to local predictions.

In [12]:
predictions_from_triton = response.as_numpy(outputs[0].name())

In [13]:
with Distributed():
    local_predictions = model.predict(train_transformed)[:10]

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43463 instead
2022-08-02 09:51:02,611 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize


In [15]:
assert np.allclose(predictions_from_triton, local_predictions)

We managed to preprocess the data in the same way in serving as we did during training and obtain the same predictions!