# Deploiement de la pipeline de recommandation personalisée

Ce notebook a pour but de créer un ensemble triton pour déployer le système de recommandation

In [2]:
#%pip install "feast<0.31" faiss-gpu
#!pip install seedir

In [3]:
import os
import numpy as np
import pandas as pd
import feast
import seedir as sd
from nvtabular import ColumnSchema, Schema

from merlin.systems.dag.ensemble import Ensemble
from merlin.systems.dag.ops.softmax_sampling import SoftmaxSampling
from merlin.systems.dag.ops.tensorflow import PredictTensorflow
from merlin.systems.dag.ops.unroll_features import UnrollFeatures
from merlin.systems.triton.utils import send_triton_request
from merlin.systems.dag.ops.workflow import TransformWorkflow

  import distutils as _distutils
2024-09-16 13:42:48.964376: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-09-16 13:42:49.011759: I tensorflow/core/platform/cpu_feature_guard.cc:183] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: SSE3 SSE4.1 SSE4.2 AVX, in other operations, rebuild TensorFlow with the appropriate compiler flags.
  warn(f"PyTorch dtype mappings did not load successfully due to an error: {exc.msg}")


## Enregistrement des features dans le feature store

In [2]:
INPUT_DATA_DIR = os.environ.get("INPUT_DATA_DIR", "/root/Data/Row/")
DATA_FOLDER = os.environ.get("DATA_FOLDER", "/root/Data/")
MODELS_FOLDER = os.environ.get("MODELS", "/root/Models/")
PROCESSED_FOLDER = os.environ.get("PROCESSED_FOLDER", "/root/Data/Processed/")
feature_repo_path = os.environ.get("FEAST_PATH", "/root/Data/feast_repo/feature_repo")

In [3]:
import seedir as sd

feature_repo_path = os.path.join(feature_repo_path)
sd.seedir(
    feature_repo_path,
    style="lines",
    itemlimit=10,
    depthlimit=3,
    #exclude_folders=".ipynb_checkpoints",
    sort=True,
)

feature_repo/
├─__init__.py
├─__pycache__/
│ ├─__init__.cpython-310.pyc
│ ├─example_repo.cpython-310.pyc
│ └─test_workflow.cpython-310.pyc
├─cufile.log
├─data/
│ ├─item_features.parquet
│ ├─online_store.db
│ ├─registry.db
│ └─user_features.parquet
├─feature_store.yaml
├─item_features.py
├─test_workflow.py
└─user_features.py


Enregistrementdes données dans le feature store.
Le Feast feature registry est le catalogue centrale des définitions des features et des metadata.

Chargement des features depuis l'offline store à l'online store

In [4]:
%cd $feature_repo_path
!find . -name ".ipynb_checkpoints" -exec rm -r {} +
!feast apply

#!find . -name ".ipynb_checkpoints" -exec rm -r {} +

/root/Data/feast_repo/feature_repo


  self.shell.db['dhist'] = compress_dhist(dhist)[-100:]


Created entity [1m[32mitem_id[0m
Created entity [1m[32muser_id[0m
Created feature view [1m[32mitem_features[0m
Created feature view [1m[32muser_features[0m

Created sqlite table [1m[32mfeast_repo_item_features[0m
Created sqlite table [1m[32mfeast_repo_user_features[0m



In [5]:
!feast materialize 1995-01-01T01:01:01 2025-01-01T01:01:01

Materializing [1m[32m2[0m feature views from [1m[32m1995-01-01 01:01:01+00:00[0m to [1m[32m2025-01-01 01:01:01+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mitem_features[0m:
100%|███████████████████████████████████████████████████████| 23417/23417 [00:13<00:00, 1762.09it/s]
[1m[32muser_features[0m:
100%|█████████████████████████████████████████████████████| 442707/442707 [04:56<00:00, 1493.12it/s]


In [6]:
# set up the base dir to for feature store
sd.seedir(os.path.join(feature_repo_path), style='lines', itemlimit=10, depthlimit=5, sort=True) #exclude_folders=['.ipynb_checkpoints', '__pycache__']

feature_repo/
├─__init__.py
├─__pycache__/
│ ├─__init__.cpython-310.pyc
│ ├─example_repo.cpython-310.pyc
│ └─test_workflow.cpython-310.pyc
├─data/
│ ├─item_features.parquet
│ ├─online_store.db
│ ├─registry.db
│ └─user_features.parquet
├─feature_store.yaml
├─item_features.py
├─test_workflow.py
└─user_features.py


Création du client du feast

In [7]:
feature_store = feast.FeatureStore(feature_repo_path)

Definition des chemins

In [156]:
if not os.path.isdir(os.path.join(DATA_FOLDER, 'faiss_index')):
    os.makedirs(os.path.join(DATA_FOLDER, 'faiss_index'))

In [157]:
faiss_index_path = os.path.join(DATA_FOLDER, 'faiss_index', "index.faiss")
retrieval_model_path = os.path.join(MODELS_FOLDER, "query_tower/")
ranking_model_path = os.path.join(MODELS_FOLDER, "dlrm/")

Create de l'index Faiss à partir des embeddings

In [158]:
from merlin.systems.dag.ops.faiss import QueryFaiss, setup_faiss 

item_embeddings = pd.read_parquet(os.path.join(DATA_FOLDER, "Processed/item_embeddings.parquet"))
setup_faiss(item_embeddings, faiss_index_path, embedding_column="output_1")

## Definition de la pipeline de recommandation complète avec Nvtabular

In [205]:
import warnings
import logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

class EnhancedTimedQueryFeast(QueryFeast):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.log_file = open('queryfeast_logs.txt', 'w')

    def log_message(self, message):
        print(message, file=self.log_file, flush=True)
        print(message, file=sys.stdout, flush=True)

    def transform(self, col_selector: List[str], df: "cudf.DataFrame") -> "cudf.DataFrame":
        start_time = time.time()
        self.log_message(f"Début de QueryFeast.transform() à {start_time}")
        self.logger.error(f"Debut Feast {time.time()}")
        result = super().transform(col_selector, df)
        
        end_time = time.time()
        duration = end_time - start_time
        self.log_message(f"Fin de QueryFeast.transform(). Durée: {duration:.4f} secondes")
        self.logger.error(f"Fin Feast {time.time()}")
        return result

    def compute_output_schema(self, input_schema: Schema, col_selector: List[str], prev_output_schema: Schema = None) -> Schema:
        start_time = time.time()
        self.log_message(f"Début de QueryFeast.compute_output_schema() à {start_time}")
        
        result = super().compute_output_schema(input_schema, col_selector, prev_output_schema)
        
        end_time = time.time()
        duration = end_time - start_time
        self.log_message(f"Fin de QueryFeast.compute_output_schema(). Durée: {duration:.4f} secondes")
        
        return result

    def __del__(self):
        if hasattr(self, 'log_file'):
            self.log_file.close()


In [192]:
from merlin.core.dispatch import make_df
from merlin.systems.dag.ops.feast import QueryFeast
from nvtabular import ColumnSelector, Workflow, Dataset


#Test input
request = make_df({"user_id": [11]})
request["user_id"] = request["user_id"].astype(np.int32)
test_dataset = Dataset(request)


# Embedding retrieval >> TimerOperator('Début du worklow')
user_attributes = ["user_id"] >> EnhancedTimedQueryFeast.from_feature_view(
    store=feature_store,
    view="user_features",
    column="user_id",
    include_id=True,
)

# >> TimerOperator('Récupération des attributes') 
nvt_workflow = Workflow.load(os.path.join(MODELS_FOLDER, 'general_workflow'))
user_subgraph = nvt_workflow.get_subworkflow("user")
user_features = user_attributes >> TransformWorkflow(user_subgraph) 

configure_tensorflow()

topk_retrieval = int(
    os.environ.get("topk_retrieval", "100")
)
# Predictensorflow 1 
retrieval = (
    user_features
    #>> TimerOperator('Transformation des features user') 
    >> PredictTensorflow(retrieval_model_path)
    >> QueryFaiss(faiss_index_path, topk=topk_retrieval)
)

# >> TimerOperator('Retrieval Two Tower') 
item_attributes = retrieval["candidate_ids"] >>  QueryFeast.from_feature_view(
    store=feature_store,
    view="item_features",
    column="candidate_ids",
    #output_prefix="item",
    include_id=True,
)


output = TEST_workflow.fit_transform(test_dataset)


print(output.to_ddf().compute())

Materializing [1m[32m1[0m feature views to [1m[32m2024-08-30 14:54:56+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32muser_features[0m from [1m[32m2025-01-01 01:01:01+00:00[0m to [1m[32m2024-08-30 14:54:56+00:00[0m:


0it [00:00, ?it/s]


INFO:tensorflow:Assets written to: /tmp/tmp_nrnrr16/assets


INFO:tensorflow:Assets written to: /tmp/tmp_nrnrr16/assets


Materializing [1m[32m1[0m feature views to [1m[32m2024-08-30 14:55:09+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mitem_features[0m from [1m[32m2025-01-01 01:01:01+00:00[0m to [1m[32m2024-08-30 14:55:09+00:00[0m:


0it [00:00, ?it/s]


   user_id  FN  Active  club_member_status  fashion_news_frequency  \
0       11   4       3                   3                       4   

   postal_code  popular_product_type  2nd_popular_product_type  \
0           99                    14                         6   

   popular_department_no  2nd_popular_department_no  popular_section_no  \
0                     13                         35                   8   

   2nd_popular_section_no  last_product_code  2nd_last_product_code  \
0                       4                 55                    382   

   last_product_type  2nd_last_product_type       age  frequency    amount  \
0                 17                      7 -0.544461   1.025946  0.784323   

    recency  
0 -0.594213  


Premier opérateur : récupération des features brut d'un client à partir d'un user_id

In [206]:
from merlin.systems.dag.ops.feast import QueryFeast 

user_attributes = ["user_id"] >> EnhancedTimedQueryFeast.from_feature_view(
    store=feature_store,
    view="user_features",
    column="user_id",
    include_id=True,
)

Materializing [1m[32m1[0m feature views to [1m[32m2024-08-30 15:37:52+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32muser_features[0m from [1m[32m2025-01-01 01:01:01+00:00[0m to [1m[32m2024-08-30 15:37:52+00:00[0m:


 ... (more hidden) ...


Préparation des données pour le Two Tower model

In [207]:
from nvtabular import Workflow
# Premier workflow
nvt_workflow = Workflow.load(os.path.join(MODELS_FOLDER, 'general_workflow'))
user_subgraph = nvt_workflow.get_subworkflow("user")
user_features = user_attributes  >>  TransformWorkflow(user_subgraph)

In [208]:
# prevent TF to claim all GPU memory
from merlin.dataloader.tf_utils import configure_tensorflow

configure_tensorflow()

<function tensorflow.python.dlpack.dlpack.from_dlpack(dlcapsule)>

On récupère les items candidats en comparant l'embedding du client aux embeddings des produits via faiss.

On récupère ensuite les features de ces items dans feast.

In [209]:
topk_retrieval = int(
    os.environ.get("topk_retrieval", "100")
)
# Predictensorflow 1 
retrieval = (
    user_features
    >> PredictTensorflow(retrieval_model_path)
    >> QueryFaiss(faiss_index_path, topk=topk_retrieval)
)



INFO:tensorflow:Assets written to: /tmp/tmp_bfchr44/assets


INFO:tensorflow:Assets written to: /tmp/tmp_bfchr44/assets


In [210]:
item_attributes = retrieval["candidate_ids"]   >>  EnhancedTimedQueryFeast.from_feature_view(
    store=feature_store,
    view="item_features",
    column="candidate_ids",
    #output_prefix="item",
    include_id=True,
)

Materializing [1m[32m1[0m feature views to [1m[32m2024-08-30 15:38:05+00:00[0m into the [1m[32msqlite[0m online store.

[1m[32mitem_features[0m from [1m[32m2025-01-01 01:01:01+00:00[0m to [1m[32m2024-08-30 15:38:05+00:00[0m:


 ... (more hidden) ...


In [211]:
item_subgraph = nvt_workflow.get_subworkflow("item")
item_features = item_attributes  >> TransformWorkflow(item_subgraph)

In [212]:
user_features_to_unroll = [
    "user_id",
    "FN",
    "Active",
    "club_member_status",
    "fashion_news_frequency",
    "age",
    "postal_code",
    "recency",
    "frequency",
    "amount", 'popular_product_type', '2nd_popular_product_type', 'popular_department_no', '2nd_popular_department_no', 'popular_section_no', '2nd_popular_section_no', 'last_product_code', '2nd_last_product_code', 'last_product_type', '2nd_last_product_type']

combined_features = item_features  >> UnrollFeatures(
    "item_id", user_features[user_features_to_unroll]
)

Scoring avec le modèle DLRM sauvegardé précédemment

In [213]:
ranking = combined_features >> PredictTensorflow(ranking_model_path)



INFO:tensorflow:Assets written to: /tmp/tmpgj0cxcdf/assets


INFO:tensorflow:Assets written to: /tmp/tmpgj0cxcdf/assets


In [214]:
top_k=12
ordering = combined_features["item_id"] >>  SoftmaxSampling(
    relevance_col=ranking["Target/binary_output"], topk=top_k, temperature=0.00000001
) 

from merlin.core.dispatch import make_df
from merlin.systems.dag.ops.feast import QueryFeast
from nvtabular import ColumnSelector, Workflow, Dataset

#Test input
request = make_df({"user_id": [11]})
request["user_id"] = request["user_id"].astype(np.int32)
test_dataset = Dataset(request)

# Exécuter la pipeline sur les données de test
similarity_workflow = Workflow(ordering)
output = similarity_workflow.transform(test_dataset)


print(output.to_ddf().compute())

## Sauvegarde de la pipeline complète

Create folder to export graphs and config

In [None]:
if not os.path.isdir("/root/Triton_models"):
    os.makedirs(os.path.join('/root/Triton_models'))

Création du schema de l'input qu'on va donner à Triton.

In [216]:
request_schema = Schema(
    [
        ColumnSchema("user_id", dtype=np.int32),
    ]
)
request_schema

Unnamed: 0,name,tags,dtype,is_list,is_ragged
0,user_id,(),"DType(name='int32', element_type=<ElementType....",False,False


Création de l'ensemble python (la pipeline au complet)

In [217]:
%%time
# define the path where all the models and config files exported to
export_path = os.path.join('/root/Triton_models_test_operateur_perso')

ensemble = Ensemble(ordering, request_schema)
ens_config, node_configs = ensemble.export(export_path)

# return the output column name
outputs = ensemble.graph.output_schema.column_names
print(outputs)

['ordered_ids', 'ordered_scores']
CPU times: user 32.6 s, sys: 254 ms, total: 32.9 s
Wall time: 726 ms


In [11]:
sd.seedir( os.path.join('/root/Triton_models_test_operateur_perso'), style='lines', itemlimit=10, depthlimit=5, sort=True) #exclude_folders=['.ipynb_checkpoints', '__pycache__']

Triton_models_test_operateur_perso/
├─0_transformworkflowtriton/
│ ├─1/
│ │ ├─__pycache__/
│ │ │ └─model.cpython-310.pyc
│ │ ├─model.py
│ │ └─workflow/
│ │   ├─categories/
│ │   │ ├─unique.2nd_last_product_code.parquet
│ │   │ ├─unique.2nd_last_product_type.parquet
│ │   │ ├─unique.2nd_popular_department_no.parquet
│ │   │ ├─unique.2nd_popular_product_type.parquet
│ │   │ ├─unique.2nd_popular_section_no.parquet
│ │   │ ├─unique.Active.parquet
│ │   │ ├─unique.FN.parquet
│ │   │ ├─unique.club_member_status.parquet
│ │   │ ├─unique.fashion_news_frequency.parquet
│ │   │ └─unique.last_product_code.parquet
│ │   ├─metadata.json
│ │   └─workflow.pkl
│ └─config.pbtxt
├─1_predicttensorflowtriton/
│ ├─1/
│ │ └─model.savedmodel/
│ │   ├─.merlin/
│ │   │ ├─input_schema.json
│ │   │ └─output_schema.json
│ │   ├─assets/
│ │   ├─fingerprint.pb
│ │   ├─keras_metadata.pb
│ │   ├─saved_model.pb
│ │   └─variables/
│ │     ├─variables.data-00000-of-00001
│ │     └─variables.index
│ └─config.pbtxt
├─2_tr

## Démarrage du Triton Server

Let's clean some useless files

In [219]:
import shutil
export_path = os.path.join('/root/Triton_models_test_operateur_perso')

def remove_checkpoints(dir_path):
    for root, dirs, files in os.walk(dir_path):
        for dir_name in dirs:
            if dir_name == '.ipynb_checkpoints':
                dir_to_remove = os.path.join(root, dir_name)
                print(f"Removing: {dir_to_remove}")
                shutil.rmtree(dir_to_remove)

remove_checkpoints(export_path)

#sd.seedir(export_path, style='lines', itemlimit=10, depthlimit=5, sort=True)

Pour démarrer Triton Inference Server, il faut executer la commande suivande dans un terminale dans ce conteneur :

tritonserver --model-repository=/root/Triton_models/ --backend-config=tensorflow,version=2

Il faut attendre d'avoir tous les modèles ready dans le terminal avant d'executer les test (cela peut être vraiment long 20-30 min)

In [4]:
# read in data for request

from merlin.core.dispatch import make_df

# create a request to be sent to TIS
request = make_df({"user_id": [9]})
request["user_id"] = request["user_id"].astype(np.int32)
print(request)

outputs = ['ordered_ids', 'ordered_scores']

request_schema = Schema(
    [
        ColumnSchema("user_id", dtype=np.int32),
    ]
)

response = send_triton_request(request_schema, request, outputs)
response

   user_id
0        9


{'ordered_ids': array([[837283001, 812530002, 912075004, 867948001, 830016003, 915412002,
         873279001, 898703001, 873279003, 855769002, 909916002, 772785005]],
       dtype=int32),
 'ordered_scores': array([[0.97283494, 0.9176917 , 0.967815  , 0.99024385, 0.9676408 ,
         0.95743316, 0.9999999 , 0.9802405 , 0.9999999 , 0.9687073 ,
         0.99999964, 0.9999497 ]], dtype=float32)}

In [5]:
from merlin.core.dispatch import make_df

# create a request to be sent to TIS
request = make_df({"user_id": [9]})
request["user_id"] = request["user_id"].astype(np.int32)
print(request)

outputs = ['ordered_ids', 'ordered_scores']

request_schema = Schema(
    [
        ColumnSchema("user_id", dtype=np.int32),
    ]
)

   user_id
0        9


In [6]:
%%time
response = send_triton_request(request_schema, request, outputs)
response

CPU times: user 13.1 ms, sys: 3.19 ms, total: 16.3 ms
Wall time: 792 ms


{'ordered_ids': array([[873884006, 781613013, 914351004, 673677011, 873279001, 911870009,
         859805007, 885910001, 873679001, 781613015, 858313001, 749699024]],
       dtype=int32),
 'ordered_scores': array([[0.9886706 , 0.9668843 , 0.97886753, 0.99835396, 0.9999999 ,
         0.99999976, 0.9623392 , 0.99990916, 0.9898747 , 0.975149  ,
         0.9585596 , 0.930302  ]], dtype=float32)}