In [1]:
# Copyright 2021 NVIDIA Corporation. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ================================

# Each user is responsible for checking the content of datasets and the
# applicable licenses and determining if suitable for the intended use.

<img src="https://developer.download.nvidia.com/notebooks/dlsw-notebooks/merlin_merlin_02-deploying-multi-stage-recsys-with-merlin-systems/nvidia_logo.png" style="width: 90px; float: right;">

## Deploying a Multi-Stage RecSys into Production with Merlin Systems and Triton Inference Server

This notebook is created using the latest stable [merlin-tensorflow](https://catalog.ngc.nvidia.com/orgs/nvidia/teams/merlin/containers/merlin-tensorflow/tags) container. 

At this point, when you reach out to this notebook, we expect that you have already executed the first notebook `01-Building-Recommender-Systems-with-Merlin.ipynb` and exported all the required files and models. 

We are going to generate recommended items for a given user query (user_id) by following the steps described in the figure below.

![tritonensemble](../images/triton_ensemble.png)

Merlin Systems library have the set of operators to be able to serve multi-stage recommender systems built with Tensorflow on [Triton Inference Server](https://github.com/triton-inference-server/server)(TIS) easily and efficiently. Below, we will go through these operators and demonstrate their usage in serving a multi-stage system on Triton.

### Import required libraries and functions

At this step, we assume you already installed `feast` library when running the first notebook `01-Building-Recommender-Systems-with-Merlin.ipynb`. 

In case you need to install it for running this example, execute the following script in a cell:
```
%pip install feast==0.18.1
```
`Milvus` requires a milvus server to be installed and listening to requests for building and querying vector database indexes. If you have not already installed `Milvus` server and the `pymilvus` API in the first notebook, execute these lines:
```
%pip install milvus
%pip install pymilvus
```
Note that the version of Milvus used in this notebook does not support GPU acceleration.

In [2]:
# %pip install feast==0.18.1
# %pip install milvus
# %pip install pymilvus

In [3]:
import os
os.environ["TF_GPU_ALLOCATOR"]="cuda_malloc_async"
import numpy as np
import pandas as pd
import feast
import seedir as sd
from nvtabular import ColumnSchema, Schema

from merlin.systems.dag.ensemble import Ensemble
from merlin.systems.dag.ops.session_filter import FilterCandidates
from merlin.systems.dag.ops.softmax_sampling import SoftmaxSampling
from merlin.systems.dag.ops.tensorflow import PredictTensorflow
from merlin.systems.dag.ops.unroll_features import UnrollFeatures
from merlin.systems.triton.utils import send_triton_request

05/12/2023 07:19:54 PM INFO:init
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  _INTEGERSTATISTICS = _descriptor.Descriptor(
2023-05-12 19:19:56.771553: I tensorflow/core/platform/cpu_feature_guard.cc:194] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE3 SSE4.1 SSE4.2 AVX
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  _TENSORSHAPEPROTO_DIM = _descriptor.Descriptor(
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.EnumValueDescriptor(
  _DATATYPE = _descriptor.EnumDescriptor(
  _descriptor.FieldDescriptor(
  _SERIALIZEDDTYPE = _descriptor.Descriptor(
  DESCRIPTOR = _descriptor.FileDescriptor(
  _descriptor.FieldDescriptor(
  warn(f"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}")


Execute the following cell to start the milvus server (for performing embedding vector ANN search) on the local server. Set the default server base directory to any local directory of your choice (this is where milvus creates logs, indices, etc. to make the ANN search possible).

In [4]:
import time
from milvus import default_server
from pymilvus import (connections, FieldSchema, CollectionSchema, DataType, Collection, utility)

BASE_DIR = os.environ.get("BASE_DIR", "/workspace/data/multistage_milvus/")
DATA_SET = os.environ.get("DATA_SET", "YOOCHOOSE")        # if using aliccp, replace "YOOCHOOSE" with "ALICCP"

default_server.set_base_dir(os.path.join(BASE_DIR, 'server'))
default_server.cleanup()    # this will remove all collections and indexes previously created
default_server.start()
_HOST = '127.0.0.1'
_PORT = default_server.listen_port

fmt = "\n=== {:30} ===\n"
search_latency_fmt = "search latency = {:.4f}s"



    __  _________ _   ____  ______
   /  |/  /  _/ /| | / / / / / __/
  / /|_/ // // /_| |/ / /_/ /\ \
 /_/  /_/___/____/___/\____/___/ {Lite}

 Welcome to use Milvus!

 Version:   v2.2.8-lite
 Process:   167004
 Started:   2023-05-12 19:20:13
 Config:    /workspace/systems/examples/multistage_milvus/server/configs/milvus.yaml
 Logs:      /workspace/systems/examples/multistage_milvus/server/logs

 Ctrl+C to exit ...


You can use the test script below to create a connection to the `milvus` server and test it.

In [5]:
#connections.connect(host=_HOST, port=_PORT)
#print(connections.list_connections())

### Register our features on feature store

The Feast feature registry is a central catalog of all the feature definitions and their related metadata(read more [here](https://docs.feast.dev/getting-started/architecture-and-components/registry)). We have defined our user and item features definitions in the `user_features.py` and  `item_features.py` files. With FeatureView() users can register data sources in their organizations into Feast, and then use those data sources for both training and online inference. In the `user_features.py` and `item_features.py` files, we are telling Feast where to find user and item features.

Before we move on to the next steps, we need to perform `feast apply`command as directed below.  With that, we register our features, we can apply the changes to create our feature registry and store all entity and feature view definitions in a local SQLite online store called `online_store.db`.

In [6]:
# define feature repo path
feast_repo_path = os.path.join(BASE_DIR, "feature_repo/")

In [7]:
%cd $feast_repo_path
!feast apply

/workspace/systems/examples/multistage_milvus/feature_repo
[1m[94mNo changes to registry
[1m[94mNo changes to infrastructure


### Loading features from offline store into an online store 

After we execute `apply` and registered our features and created our online local store, now we need to perform [materialization](https://docs.feast.dev/how-to-guides/running-feast-in-production) operation. This is done to keep our online store up to date and get it ready for prediction. For that we need to run a job that loads feature data from our feature view sources into our online store. As we add new features to our offline stores, we can continuously materialize them to keep our online store up to date by finding the latest feature values for each user. 

When you run the `feast materialize ..` command below, you will see a message <i>Materializing 2 feature views from 1995-01-01 01:01:01+00:00 to 2025-01-01 01:01:01+00:00 into the sqlite online store </i>  will be printed out.

Note that materialization step takes some time.. 

In [8]:
!feast materialize 1995-01-01T01:01:01 2025-01-01T01:01:01

Materializing [1m[32m2[0m feature views from [1m[32m1995-01-01 01:01:01+00:00[0m to [1m[32m2025-01-01 01:01:01+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mitem_features[0m:
  return last_values_df[columns_to_extract]
100%|███████████████████████████████████████████████████████| 47400/47400 [00:15<00:00, 2975.60it/s]
[1m[32muser_features[0m:
  return last_values_df[columns_to_extract]
  8%|████                                                | 494657/6345199 [04:06<55:29, 1756.98it/s]^C
  8%|████                                                | 494976/6345199 [04:07<48:41, 2002.46it/s]

Aborted!


Now, let's check our feature_repo structure again after we ran `apply` and `materialize` commands.

In [9]:
# set up the base dir to for feature store
feature_repo_path = os.path.join(BASE_DIR, 'feature_repo')
sd.seedir(feature_repo_path, style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)

feature_repo/
├─__init__.py
├─data/
│ ├─item_features.parquet
│ ├─online_store.db
│ ├─registry.db
│ └─user_features.parquet
├─feature_store.yaml
├─item_features.py
└─user_features.py


### Set up Milvus index, create feature store client and objects for the Triton ensemble

To set up an index with `milvus`, you need to retrieve item embeddings and build a `milvus` collection with them. The function `setup_milvus` below will take as input an item embeddings vector along with the `milvus` server connection parameters (host and port) and a name `coll_name` for the collection to build. The function `query_milvus` will take as input a user embedding vector and query it against the index built by `setup_milvus`

In [10]:
def setup_milvus(item_vector, HOST, PORT, coll_name):
    """
    Function that will create a Milvus index from an embedding vector. Currently only supports L2 distance.
    Parameters
    ----------
    item_vector : Numpy.ndarray
        This is a matrix representing all the nodes embeddings, represented as a numpy ndarray.
    HOST : str
        milvus server host ip address
    PORT : integer
        milvus server listening port number
    coll_name : string
        name of milvus collection to create and store/index item embeddings
    """
    
    n_dim = item_vector.shape[1]-1     # size of the embedding vector - 1 (first value is item_id)

    #######################################################################################
    # 1. connect to Milvus
    # Add a new connection alias `default` for Milvus server in `localhost:19530`
    # Actually the "default" alias is a buildin in PyMilvus.
    # If the address of Milvus is the same as `localhost:19530`, you can omit all
    # parameters and call the method as: `connections.connect()`.
    # Note: the `using` parameter of the following methods is default to "default".

    print(fmt.format("Connecting to Milvus..."))
    connections.connect("default", host=HOST, port=PORT)
    has = utility.has_collection(coll_name)
    print(f"Does collection {coll_name} exist in Milvus: {has}")
    
    #######################################################################################
    # 2. create collection
    # We're going to create a collection with 2 fields.
    # +-+-------------------+-------------+------------------+----------------------------+
    # | |    field name     | field type  | other attributes |      field description     |
    # +-+-------------------+-------------+------------------+----------------------------+
    # |1|       "pk"        |    Int64    |  is_primary=True |      "primary key"         |
    # | |                   |             |   auto_id=False  |                            |
    # +-+-------------------+-------------+------------------+----------------------------+
    # |2| "item_embeddings" | FloatVector |     dim=64       | "float vector with dim 64" |
    # +-+-------------------+-------------+------------------+----------------------------+

    fields = [
        FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=False),
        FieldSchema(name="item_embeddings", dtype=DataType.FLOAT_VECTOR, dim=n_dim)
    ]
    schema = CollectionSchema(fields, "fields that represent RecSys item embeddings.")

    print(fmt.format("Creating collection..."))
    coll_milvus = Collection(coll_name, schema, consistency_level="Strong")
    print(f"Collection created.")
    
    ######################################################################################
    # 3. insert data
    # We are going to insert item embeddings into `coll_milvus`
    # Data to be inserted must be organized in fields.
    #
    # The insert() method returns:
    # - either automatically generated primary keys by Milvus if auto_id=True in the schema;
    # - or the existing primary key field from the entities if auto_id=False in the schema.

    print(fmt.format("Inserting embeddings..."))
    num_entities = item_vector.shape[0]
    entities = [
        # provide the pk field, which is item id, because `auto_id` is set to False
        [item_vector[i][0].astype(np.int64) for i in range(num_entities)],
        [[item_vector[i][j+1] for j in range(n_dim)] for i in range(num_entities)],  # field embeddings
    ]
    insert_result = coll_milvus.insert(entities)
    coll_milvus.flush()
    print(f"Number of entities in Milvus: {coll_milvus.num_entities}")  # check number inserted

    #######################################################################################
    # 4. create index
    # We are going to create an IVF_FLAT index for coll_milvus collection.
    # create_index() can only be applied to `FloatVector` and `BinaryVector` fields.
    print(fmt.format("Creating index IVF_FLAT..."))
    index = {
        "index_type": "IVF_FLAT",
        "metric_type": "L2",
        "params": {"nlist": 128},
    }
    start_time = time.time()
    coll_milvus.create_index("item_embeddings", index)   # will save the index to a default location of milvus installation
    end_time = time.time()
    print(f"Indexing latency: {end_time - start_time}")

def query_milvus(user_vector, coll_name, topk=10):
    """
    Function to query input user vector against an indexed set item embedding vectors.
    Parameters
    ----------
    user_vector : Numpy.ndarray
        This is a 2D matrix representing the user embedding vectors to be queried, represented as a numpy ndarray
    coll_name : str
        Name of milvus collection where item embeddings are stored and indexed
    topk : integer
        number of most similar items to return
    """
    print(fmt.format("Loading collection and index..."))
    coll_milvus = Collection(coll_name)      # Get an existing collection.    
    coll_milvus.load()
    
    print(fmt.format("Searching based on vector similarity..."))
    search_params = {
        "metric_type": "L2",
        "params": {"nprobe": 10},
    }
    
    start_time = time.time()
    result = coll_milvus.search(user_vector, "item_embeddings", search_params, limit=topk, output_fields=["pk"])
    end_time = time.time()

    i=0
    for hits in result:
        i=i+1; print(f"Query for user {i}:")
        for hit in hits:
            print(f"hit: {hit}, pk: {hit.entity.get('pk')}")

    print(search_latency_fmt.format(end_time - start_time))

Now use these two functions to setup the milvus item embeddings vector index and query similarity between one user's embedding against the item embeddings.

In [None]:
item_embeddings = np.ascontiguousarray(
    pd.read_parquet(os.path.join(BASE_DIR, "item_embeddings.parquet")).to_numpy()
)
setup_milvus(item_embeddings, _HOST, _PORT, "coll_milvus")

In [None]:
user_embeddings = np.ascontiguousarray(
    pd.read_parquet(os.path.join(BASE_DIR, "user_embeddings.parquet")).to_numpy()
)
query_milvus(user_embeddings[0:2,1:], "coll_milvus", 10)  # query embedding vectors for the first 2 users, ignoring first value which is user_id

Now define paths for ranking model and retrieval model

In [None]:
retrieval_model_path = os.path.join(BASE_DIR, "query_tower/")
ranking_model_path = os.path.join(BASE_DIR, "dlrm/")

`QueryMilvus` operator defined below creates an interface between a `Milvus` IVF_FLAT index and Triton Inference Server. For a given input query vector, we do a nearest neighbor search query to find the ids of top-k nearby embeddings in the index.

In [None]:
import json
import os
import pathlib
from pathlib import Path
from typing import Dict, List

import numpy as np

from merlin.core.dispatch import HAS_GPU
from merlin.core.protocols import Transformable
from merlin.dag import ColumnSelector
from merlin.schema import ColumnSchema, Schema
from merlin.systems.dag.ops.operator import InferenceOperator

import pymilvus

class QueryMilvus(InferenceOperator):
    """
    This operator creates an interface between a Milvus Approximate Nearest Neighbors (ANN)
    Index and Triton Infrence Server. The operator allows users to perform different supported
    types of Nearest Neighbor search to your ensemble. For input query vector, we do an ANN
    search query to find the ids of top-k nearby nodes in the index.
    """

    def __init__(self, milvus_host: str, milvus_port: str, collection_name: str, index_name: str, topk=10):
        """
        Creates a QueryMilvus Pipelineable Inference Operator.
        Parameters
        ----------
        milvus_host: str
            Milvus host name (eg. 127.0.0.1)
        milvus_port: str
            Milvus port number (eg. 19530)
        collection_name: str
            Milvus collection name where item embeddings have been previously stored and indexed
        index_name: str
            Name of the milvus index for item embeddings
        topk : int, optional
            The number of results we should receive from query to Milvus as output, by default 10
        """
        print("QueryMilvus operator initializing...")
        super().__init__()
        self.mlv_host = milvus_host
        self.mlv_port = milvus_port
        self.col_name = collection_name
        self.idx_name = index_name
        self.topk = topk

    def load_artifacts(self, artifact_path):
        """
        Makes a connection to the milvus server and loads the indexed embeddings collection
        Parameters
        ----------
        artifact_path: str
            Unused - pass empty string
        """
        self.mlv_con = pymilvus.connections
        self.mlv_con.connect(host=self.mlv_host, port=self.mlv_port)
        self.mlv_coll = Collection(self.col_name)      # Get the right milvus collection.    
        self.mlv_coll.load()

    def transform(
        self, col_selector: ColumnSelector, transformable: Transformable
    ) -> Transformable:
        """
        Transform input dataframe to output dataframe using function logic.
        In the case of QueryMilvus operator, it takes the input user embeddings vector,
        queries the most similar k item embeddings and returns their id's.
        """
        print(fmt.format("QueryMilvus operator transforming..."))
        user_vector = list(transformable.values())[0]
        user_vector = user_vector.values

        print(fmt.format("Searching based on vector similarity..."))
        search_params = {
            "metric_type": "L2",
            "params": {"nprobe": 10},
        }

        start_time = time.time()
        result = self.mlv_coll.search(user_vector, self.idx_name, search_params, limit=self.topk, output_fields=["pk"])
        end_time = time.time()
        print(f"Search latency: {end_time - start_time}")
        
        candidate_ids = []
        for hits in result:
            for hit in hits:
                candidate_ids.append(hit.entity.get('pk'))
        candidate_ids = np.array(candidate_ids).T.astype(np.int32)

        return type(transformable)({"candidate_ids": candidate_ids})

    def compute_input_schema(
        self,
        root_schema: Schema,
        parents_schema: Schema,
        deps_schema: Schema,
        selector: ColumnSelector,
    ) -> Schema:
        """
        Compute the input schema of this node given the root, parents and dependencies schemas of
        all ancestor nodes.
        Parameters
        ----------
        root_schema : Schema
            The schema representing the input columns to the graph
        parents_schema : Schema
            A schema representing all the output columns of the ancestors of this node.
        deps_schema : Schema
            A schema representing the dependencies of this node.
        selector : ColumnSelector
            A column selector representing a target subset of columns necessary for this node's
            operator
        Returns
        -------
        Schema
            A schema that has the correct representation of all the incoming columns necessary for
            this node's operator to complete its transform.
        Raises
        ------
        ValueError
            Cannot receive more than one input for this node
        """
        input_schema = super().compute_input_schema(
            root_schema, parents_schema, deps_schema, selector
        )
        return input_schema

    def compute_output_schema(
        self, input_schema: Schema, col_selector: ColumnSelector, prev_output_schema: Schema = None
    ) -> Schema:
        """
        Compute the input schema of this node given the root, parents and dependencies schemas of
        all ancestor nodes.
        Parameters
        ----------
        input_schema : Schema
            The schema representing the input columns to the graph
        col_selector : ColumnSelector
            A column selector representing a target subset of columns necessary for this node's
            operator
        prev_output_schema : Schema
            A schema representing the output of the previous node.
        Returns
        -------
        Schema
            A schema object representing all outputs of this node.
        """
        return Schema(
            [
                ColumnSchema("candidate_ids", dtype=np.int32),
            ]
        )

    def validate_schemas(
        self, parents_schema, deps_schema, input_schema, output_schema, strict_dtypes=False
    ):
        if len(input_schema.column_schemas) > 1:
            raise ValueError(
                "More than one input has been detected for this node,"
                / f"inputs received: {input_schema.column_names}"
            )

Now create the feature store client.

In [None]:
feature_store = feast.FeatureStore(feast_repo_path)

Fetch user features with `QueryFeast` operator from the feature store. `QueryFeast` operator is responsible for ensuring that our feast feature store can communicate correctly with tritonserver for the ensemble feast feature look ups.

In [None]:
from merlin.systems.dag.ops.feast import QueryFeast 

user_features = ["user_id_raw"] >> QueryFeast.from_feature_view(
    store=feature_store,
    view="user_features",
    column="user_id_raw",
    include_id=False,
)

Retrieve top-K candidate items using `retrieval model` that are relevant for a given user. We use `PredictTensorflow()` operator that takes a tensorflow model and packages it correctly for TIS to run with the tensorflow backend.

In [None]:
# prevent TF to claim all GPU memory
from merlin.dataloader.tf_utils import configure_tensorflow

configure_tensorflow()

In [None]:
topk_retrieval = int(
    os.environ.get("topk_retrieval", "100")
)
retrieval = (
    user_features
    >> PredictTensorflow(retrieval_model_path)
    >> QueryMilvus("127.0.0.1", str(default_server.listen_port), "coll_milvus", "item_embeddings", topk=topk_retrieval)
)

Fetch item features for the candidate items that are retrieved from the retrieval step above from the feature store.

In [None]:
item_features = retrieval["candidate_ids"] >> QueryFeast.from_feature_view(
    store=feature_store,
    view="item_features",
    column="candidate_ids",
    output_prefix="item",
    include_id=True,
)

Merge the user features and items features to create the all set of combined features that were used in model training using `UnrollFeatures` operator which takes a target column and joins the "unroll" columns to the target. This helps when broadcasting a series of user features to a set of items.

In [None]:
if DATA_SET == "ALICCP":
    user_features_to_unroll = [
        "user_id",
        "user_shops",
        "user_profile",
        "user_group",
        "user_gender",
        "user_age",
        "user_consumption_2",
        "user_is_occupied",
        "user_geography",
        "user_intentions",
        "user_brands",
        "user_categories",
    ]
else:
    user_features_to_unroll = [
        "user_id",
        "user_age",
    ]

combined_features = item_features >> UnrollFeatures(
    "item_id", user_features[user_features_to_unroll]
)

Rank the combined features using the trained ranking model, which is a DLRM model for this example. We feed the path of the ranking model to `PredictTensorflow()` operator.

In [None]:
ranking = combined_features >> PredictTensorflow(ranking_model_path)

For the ordering we use `SoftmaxSampling()` operator. This operator sorts all inputs in descending order given the input ids and prediction introducing some randomization into the ordering by sampling items from the softmax of the predicted relevance scores, and finally returns top-k ordered items.

In [None]:
top_k=10
ordering = combined_features["item_id_raw"] >> SoftmaxSampling(
    relevance_col=ranking["click/binary_classification_task"], topk=top_k, temperature=20.0
)

### Export Graph as Ensemble
The last step is to create the ensemble artifacts that TIS can consume. To make these artifacts import the Ensemble class. This class  represents an entire ensemble consisting of multiple models that run sequentially in TIS initiated by an inference request. It is responsible with interpreting the graph and exporting the correct files for TIS.

When we create an Ensemble object we feed the graph and a schema representing the starting input of the graph.  After we create the ensemble object, we export the graph, supplying an export path for the `ensemble.export()` function. This returns an ensemble config which represents the entire inference pipeline and a list of node-specific configs.

Create the folder to export the models and config files (remove the folder first, if it exists).

In [69]:
if os.path.isdir(os.path.join(BASE_DIR, 'poc_ensemble')):
    !rm -r {os.path.join(BASE_DIR, 'poc_ensemble')}
os.makedirs(os.path.join(BASE_DIR, 'poc_ensemble'))

Create a request schema that we are going to use when sending a request to Triton Inference Server (TIS).

In [70]:
request_schema = Schema(
    [
        ColumnSchema("user_id_raw", dtype=np.int32),
    ]
)

In [71]:
# define the path where all the models and config files exported to
export_path = os.path.join(BASE_DIR, 'poc_ensemble')

ensemble = Ensemble(ordering, request_schema)

In [73]:
PredictTensorflow(ranking_model_path).input_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged
0,item_category,(),"DType(name='int32', element_type=<ElementType....",False,False
1,item_id,(),"DType(name='int32', element_type=<ElementType....",False,False
2,item_id_raw,(),"DType(name='int32', element_type=<ElementType....",False,False
3,user_age,(),"DType(name='int32', element_type=<ElementType....",False,False
4,user_id,(),"DType(name='int32', element_type=<ElementType....",False,False
5,user_id_raw,(),"DType(name='int32', element_type=<ElementType....",False,False


In [74]:
ens_config, node_configs = ensemble.export(export_path)

In [75]:
# return the output column name
outputs = ensemble.graph.output_schema.column_names
print(outputs)

['ordered_ids', 'ordered_scores']


### Starting Triton Server

It is time to deploy all the models as an ensemble model to Triton Inference Serve [TIS](https://github.com/triton-inference-server). After we export the ensemble, we are ready to start the TIS. You can start triton server by using the following command on your terminal:

```
tritonserver --model-repository=/ensemble_export_path/ --backend-config=tensorflow,version=2
```

For the `--model-repository` argument, specify the same path as the `export_path` that you specified previously in the `ensemble.export` method. This command will launch the server and load all the models to the server. Once all the models are loaded successfully, you should see `READY` status printed out in the terminal for each loaded model.

### Retrieving Recommendations from Triton

Once our models are successfully loaded to the TIS, we can now easily send a request to TIS and get a response for our query with `send_triton_request` utility function. 

Let's send a request to TIS for a given `user_id_raw` value.

In [76]:
# read in data for request
from merlin.core.dispatch import make_df

# create a request to be sent to TIS
request = make_df({"user_id_raw": [2]})
request["user_id_raw"] = request["user_id_raw"].astype(np.int32)
print(request)

   user_id_raw
0            2


Let's return raw item ids from TIS as top-k recommended items per given request.

In [77]:
response = send_triton_request(request_schema, request, outputs)
response

{'ordered_ids': array([[214807367, 214753912, 214812896, 214850497, 214822940, 214747460,
         214769032, 214793102, 214759412, 214735509]], dtype=int32),
 'ordered_scores': array([[0.50041765, 0.50041765, 0.50041765, 0.50041765, 0.50041765,
         0.50041765, 0.50041765, 0.50041765, 0.50041765, 0.50041765]],
       dtype=float32)}

That's it! You finished deploying a multi-stage Recommender Systems on Triton Inference Server using Merlin framework.