In [1]:
import os
os.environ['NUMBA_DISABLE_JIT'] = "1"
os.environ["NUMBA_DISABLE_CUDA"] = "1"

import numpy as np
import pandas as pd
import feast
import seedir as sd
from nvtabular import ColumnSchema, Schema

from merlin.systems.dag.ensemble import Ensemble
from merlin.systems.dag.ops.softmax_sampling import SoftmaxSampling
from merlin.systems.dag.ops.tensorflow import PredictTensorflow
from merlin.systems.dag.ops.unroll_features import UnrollFeatures
from merlin.systems.triton.utils import send_triton_request
from merlin.systems.dag.ops.workflow import TransformWorkflow

  import distutils as _distutils
2024-09-23 09:11:19.636891: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-09-23 09:11:19.869471: I tensorflow/core/platform/cpu_feature_guard.cc:183] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE3 SSE4.1 SSE4.2 AVX, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  warn(f"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}")


## Register our features on feature store

The Feast feature registry is a central catalog of all the feature definitions and their related metadata(read more here). We have defined our user and item features definitions in the user_features.py and `item_features.py` files. With `FeatureView()` users can register data sources in their organizations into Feast, and then use those data sources for both training and online inference. In the `user_features.py` and `item_features.py` files, we are telling `Feast` where to find user and item features. \
https://docs.feast.dev/getting-started/architecture-and-components/registry


Before we move on to the next steps, we need to perform `feast apply` command as directed below. With that, we register our features, we can apply the changes to create our `feature registry` and `store` all entity and feature view definitions in a local `SQLite` online store called `online_store.db`.



In [5]:
BASE_DIR = os.environ.get("BASE_DIR", "/try-merlin/")
DATA_FOLDER = os.environ.get("DATA_FOLDER", "/try-merlin/data/")

# define feature repo path
feast_repo_path = os.path.join(BASE_DIR, "feast_repo/feature_repo/")

In [6]:
%cd $feast_repo_path
!feast apply

/try-merlin/feast_repo/feature_repo


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


Created entity [1m[32muser_id[0m
Created entity [1m[32mitem_id[0m
Created feature view [1m[32muser_features[0m
Created feature view [1m[32mitem_features[0m

Created sqlite table [1m[32mfeast_repo_item_features[0m
Created sqlite table [1m[32mfeast_repo_user_features[0m



## Loading features from offline store into an online store

After we execute apply and registered our features and created our online local store, now we need to perform `materialization` operation. This is done to keep our online store up to date and get it ready for prediction. For that we need to run a job that loads feature data from our feature view sources into our online store. As we add new features to our offline stores, we can continuously materialize them to keep our online store up to date by finding the latest feature values for each user.

https://docs.feast.dev/how-to-guides/running-feast-in-production

In [7]:
%cd $feast_repo_path
!feast materialize 1995-01-01T01:01:01 2025-01-01T01:01:01

/try-merlin/feast_repo/feature_repo
Materializing [1m[32m2[0m feature views from [1m[32m1995-01-01 01:01:01+00:00[0m to [1m[32m2025-01-01 01:01:01+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32muser_features[0m:
100%|███████████████████████████████████████████████████████████| 446/446 [00:00<00:00, 3079.47it/s]
[1m[32mitem_features[0m:
100%|███████████████████████████████████████████████████████████| 453/453 [00:00<00:00, 8700.76it/s]


In [8]:
# set up the base dir to for feature store
sd.seedir(os.path.join(BASE_DIR, 'feast_repo'), style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)

feast_repo/
├─README.md
├─__init__.py
└─feature_repo/
  ├─__init__.py
  ├─data/
  │ ├─item_features.parquet
  │ ├─online_store.db
  │ ├─registry.db
  │ └─user_features.parquet
  ├─feature_store.yaml
  ├─item_features.py
  ├─test_workflow.py
  └─user_features.py


## Set up Faiss index, create feature store client and objects for the Triton ensemble

In [9]:
if not os.path.isdir(os.path.join(BASE_DIR, 'faiss_index')):
    os.makedirs(os.path.join(BASE_DIR, 'faiss_index'))

In [10]:
faiss_index_path = os.path.join(BASE_DIR, 'faiss_index', "index.faiss")
retrieval_model_path = os.path.join(BASE_DIR, "query_tower/")
ranking_model_path = os.path.join(BASE_DIR, "dlrm/")

`QueryFaiss` operator creates an interface between a FAISS Approximate Nearest Neighbors (ANN) Index and Triton Inference Server. For a given input query vector, we do an ANN search query to find the ids of top-k nearby nodes in the index. 

`setup_faiss` is a utility function that will create a Faiss index from an embedding vector with using L2 distance.

In [11]:
from merlin.systems.dag.ops.faiss import QueryFaiss, setup_faiss 

item_embeddings = pd.read_parquet(os.path.join(BASE_DIR, "item_embeddings.parquet"))
setup_faiss(item_embeddings, faiss_index_path, embedding_column="output_1")



In [12]:
feature_store = feast.FeatureStore(feast_repo_path)

Fetch user features with `QueryFeast` operator from the feature store. `QueryFeast` operator is responsible for ensuring that our feast feature store can communicate correctly with `tritonserver` for the ensemble feast feature look ups.



In [13]:
from merlin.systems.dag.ops.feast import QueryFeast 

user_attributes = ["user_id"] >> QueryFeast.from_feature_view(
    store=feature_store,
    view="user_features",
    column="user_id",
    include_id=True,
)

Materializing [1m[32m1[0m feature views to [1m[32m2024-09-23 09:12:26+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32muser_features[0m from [1m[32m2025-01-01 01:01:01+00:00[0m to [1m[32m2024-09-23 09:12:26+00:00[0m:


0it [00:00, ?it/s]


In [14]:
from nvtabular import Workflow

nvt_workflow = Workflow.load(os.path.join(DATA_FOLDER, 'processed_nvt/workflow'))
user_subgraph = nvt_workflow.get_subworkflow("user")
user_features = user_attributes >> TransformWorkflow(user_subgraph)

Retrieve top-K candidate items using `retrieval model` that are relevant for a given user. We use `PredictTensorflow()` operator that takes a tensorflow model and packages it correctly for TIS to run with the tensorflow backend.

In [15]:
# prevent TF to claim all GPU memory
from merlin.dataloader.tf_utils import configure_tensorflow

configure_tensorflow()

<function tensorflow.python.dlpack.dlpack.from_dlpack(dlcapsule)>

In [16]:
topk_retrieval = int(
    os.environ.get("topk_retrieval", "100")
)
retrieval = (
    user_features
    >> PredictTensorflow(retrieval_model_path)
    >> QueryFaiss(faiss_index_path, topk=topk_retrieval)
)



INFO:tensorflow:Assets written to: /tmp/tmpz7hstpie/assets


INFO:tensorflow:Assets written to: /tmp/tmpz7hstpie/assets


In [17]:
item_attributes = retrieval["candidate_ids"] >> QueryFeast.from_feature_view(
    store=feature_store,
    view="item_features",
    column="candidate_ids",
    output_prefix="item",
    include_id=True,
)

Materializing [1m[32m1[0m feature views to [1m[32m2024-09-23 09:12:34+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mitem_features[0m from [1m[32m2025-01-01 01:01:01+00:00[0m to [1m[32m2024-09-23 09:12:34+00:00[0m:


0it [00:00, ?it/s]


In [18]:
item_subgraph = nvt_workflow.get_subworkflow("item")
item_features = item_attributes >> TransformWorkflow(item_subgraph)

Merge the user features and items features to create the all set of combined features that were used in model training using `UnrollFeatures` operator which takes a target column and joins the “unroll” columns to the target. This helps when broadcasting a series of user features to a set of items.

In [19]:
user_features_to_unroll = [
    "user_id",
    "user_shops",
    "user_profile",
    "user_group",
    "user_gender",
    "user_age",
    "user_consumption_2",
    "user_is_occupied",
    "user_geography",
    "user_intentions",
    "user_brands",
    "user_categories",
]

combined_features = item_features >> UnrollFeatures(
    "item_id", user_features[user_features_to_unroll]
)

Rank the combined features using the trained ranking model, which is a DLRM model for this example. We feed the path of the ranking model to PredictTensorflow() operator.

In [20]:
import tensorflow as tf
ranking = combined_features >> PredictTensorflow(ranking_model_path)



INFO:tensorflow:Assets written to: /tmp/tmpyslm4f7x/assets


INFO:tensorflow:Assets written to: /tmp/tmpyslm4f7x/assets


For the ordering we use `SoftmaxSampling()` operator. This operator sorts all inputs in descending order given the input ids and prediction introducing some randomization into the ordering by sampling items from the softmax of the predicted relevance scores, and finally returns top-k ordered items.



In [21]:
top_k=10
ordering = combined_features["item_id"] >> SoftmaxSampling(
    relevance_col=ranking["click/binary_classification_task"], topk=top_k, temperature=0.00000001
)


## Export Graph as Ensemble

The last step is to create the ensemble artifacts that TIS can consume. To make these artifacts import the Ensemble class. This class represents an entire ensemble consisting of multiple models that run sequentially in TIS initiated by an inference request. It is responsible with interpreting the graph and exporting the correct files for TIS.

When we create an Ensemble object we feed the graph and a schema representing the starting input of the graph. After we create the ensemble object, we export the graph, supplying an export path for the ensemble.export() function. This returns an ensemble config which represents the entire inference pipeline and a list of node-specific configs.

Create the folder to export the models and config files.



In [22]:
if not os.path.isdir(os.path.join(BASE_DIR, 'poc_ensemble')):
    os.makedirs(os.path.join(BASE_DIR, 'poc_ensemble'))

In [23]:
request_schema = Schema(
    [
        ColumnSchema("user_id", dtype=np.int32),
    ]
)

In [24]:
# define the path where all the models and config files exported to
export_path = os.path.join(BASE_DIR, 'poc_ensemble')

ensemble = Ensemble(ordering, request_schema)
ens_config, node_configs = ensemble.export(export_path)

# return the output column name
outputs = ensemble.graph.output_schema.column_names
print(outputs)

['ordered_ids', 'ordered_scores']


In [25]:
sd.seedir(export_path, style='lines', itemlimit=10, depthlimit=5, exclude_folders=['.ipynb_checkpoints', '__pycache__'], sort=True)

poc_ensemble/
├─0_transformworkflowtriton/
│ ├─1/
│ │ ├─model.py
│ │ └─workflow/
│ │   ├─categories/
│ │   │ ├─unique.user_age.parquet
│ │   │ ├─unique.user_brands.parquet
│ │   │ ├─unique.user_categories.parquet
│ │   │ ├─unique.user_consumption_2.parquet
│ │   │ ├─unique.user_gender.parquet
│ │   │ ├─unique.user_geography.parquet
│ │   │ ├─unique.user_group.parquet
│ │   │ ├─unique.user_id.parquet
│ │   │ ├─unique.user_intentions.parquet
│ │   │ └─unique.user_is_occupied.parquet
│ │   ├─metadata.json
│ │   └─workflow.pkl
│ └─config.pbtxt
├─1_predicttensorflowtriton/
│ ├─1/
│ │ └─model.savedmodel/
│ │   ├─assets/
│ │   ├─fingerprint.pb
│ │   ├─keras_metadata.pb
│ │   ├─saved_model.pb
│ │   └─variables/
│ │     ├─variables.data-00000-of-00001
│ │     └─variables.index
│ └─config.pbtxt
├─2_transformworkflowtriton/
│ ├─1/
│ │ ├─model.py
│ │ └─workflow/
│ │   ├─categories/
│ │   │ ├─unique.item_brand.parquet
│ │   │ ├─unique.item_category.parquet
│ │   │ ├─unique.item_id.parquet
│ │   │ └

## Starting Triton Server

In [25]:
!tritonserver --model-repository=/try-merlin/poc_ensemble --backend-config=tensorflow,version=2

W0922 05:19:29.380894 388 pinned_memory_manager.cc:236] Unable to allocate pinned system memory, pinned memory pool will not be available: CUDA driver version is insufficient for CUDA runtime version
I0922 05:19:29.381689 388 cuda_memory_manager.cc:115] CUDA memory pool disabled
I0922 05:19:29.620446 388 model_lifecycle.cc:462] loading: 3_predicttensorflowtriton:1
I0922 05:19:29.623060 388 model_lifecycle.cc:462] loading: 2_transformworkflowtriton:1
I0922 05:19:29.625543 388 model_lifecycle.cc:462] loading: 1_predicttensorflowtriton:1
I0922 05:19:29.628507 388 model_lifecycle.cc:462] loading: executor_model:1
I0922 05:19:29.631335 388 model_lifecycle.cc:462] loading: 0_transformworkflowtriton:1
I0922 05:19:29.969806 388 tensorflow.cc:2577] TRITONBACKEND_Initialize: tensorflow
I0922 05:19:29.969854 388 tensorflow.cc:2587] Triton TRITONBACKEND API version: 1.13
I0922 05:19:29.969857 388 tensorflow.cc:2593] 'tensorflow' TRITONBACKEND API version: 1.13
I0922 05:19:29.969860 388 tensorflow.

In [26]:
# read in data for request
from merlin.core.dispatch import make_df
import numpy as np

# create a request to be sent to TIS
request = make_df({"user_id": [7]})
request["user_id"] = request["user_id"].astype(np.int32)
print(request)

   user_id
0        7


In [27]:
response = send_triton_request(request_schema, request, outputs)
response

{'ordered_ids': array([[360,  17,   2, 268, 114,   2, 296,   2, 139,   2]], dtype=int32),
 'ordered_scores': array([[0.4980855 , 0.4982919 , 0.49818763, 0.49815258, 0.49854738,
         0.49818763, 0.49851397, 0.49818763, 0.49850938, 0.49818763]],
       dtype=float32)}

Unnamed: 0,user_id
0,7
