In [6]:
import time

notebook_start_time = time.time()

# Set up environment

In [7]:
import sys
from pathlib import Path


def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False


def clone_repository() -> None:
    !git clone https://github.com/decodingml/hands-on-recommender-system.git
    %cd hands-on-recommender-system/


def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml


if is_google_colab():
    clone_repository()
    install_dependencies()

    root_dir = str(Path().absolute())
    print("⛳️ Google Colab environment")
else:
    root_dir = str(Path().absolute().parent)
    print("⛳️ Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    print(f"Adding the following directory to the PYTHONPATH: {root_dir}")
    sys.path.append(root_dir)

⛳️ Local environment


# Inference pipeline: Deploying and testing the inference pipeline 

In this notebook, we will dig into the inference pipeline and deploy it to Hopsworks as a real-time service.

## 📝 Imports

In [8]:
import warnings

warnings.filterwarnings("ignore")

from loguru import logger

from recsys import hopsworks_integration

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [10]:
project, fs = hopsworks_integration.get_feature_store()

[32m2025-01-01 02:29:14.820[0m | [1mINFO    [0m | [36mrecsys.hopsworks_integration.feature_store[0m:[36mget_feature_store[0m:[36m15[0m - [1mLoging to Hopsworks using HOPSWORKS_API_KEY env var.[0m


2025-01-01 02:29:14,822 INFO: Initializing external client
2025-01-01 02:29:14,823 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-01-01 02:29:16,380 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1197208


# Deploying the ranking inference pipeline
You start by deploying your ranking model. Since it is a CatBoost model you need to implement a `Predict` class that tells Hopsworks how to load the model and how to use it:

In [11]:
ranking_deployment = hopsworks_integration.ranking_serving.HopsworksRankingModel.deploy(
    project=project
)

Uploading: 100.000%|██████████| 4427/4427 elapsed<00:02 remaining<00:00
Uploading: 100.000%|██████████| 1113/1113 elapsed<00:01 remaining<00:00


Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1197208/deployments/353350
Before making predictions, start the deployment by using `.start()`


Now, we have to explicitly start the deployment:

In [12]:
ranking_deployment.start()

Deployment is ready: 100%|██████████| 6/6 [00:26<00:00,  4.41s/it]    

Start making predictions by using `.predict()`





In [None]:
# Check logs in case of failure
ranking_deployment.get_logs(component="transformer", tail=200)

## <span style="color:#ff5f27"> Test the ranking inference pipeline</span>


In [19]:

# get Hopsworks Model Serving
ms = project.get_model_serving()

# get deployment object
ranking_deployment = ms.get_deployment("ranking")
# get Hopsworks Model Registry
mr = project.get_model_registry()
  
# get model
model = mr.get_model(ranking_deployment.model_name, ranking_deployment.model_version)


In [13]:
def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[-1] for candidate in ranked_candidates["ranking"][:k]]

Let's define a dummy test example to test our ranking deployment (only the `user_id` has to match):

In [18]:
test_ranking_input = [
        {
            "user_id": "a926fcb4-fcb9-461c-bcf3-94e99087488a",
            "query_emb": [-0.055558718740940094, 
            0.265354186296463,0.14594393968582153, 
            0.5653434991836548, 0.02886250615119934, 
            0.06645234674215317, -0.11563731729984283, 
            -0.11164169758558273, 0.03345891833305359, 
            -0.1873944103717804, -0.23655693233013153, 
            -0.2209630161523819, -0.04303218796849251, 
            0.05191042274236679, -0.13368633389472961, 0.011423708871006966]
        }
    ]

# Test ranking deployment
ranked_candidates = ranking_deployment.predict(inputs=test_ranking_input)

# Retrieve artwork ids of the top recommended items
recommendations = get_top_recommendations(ranked_candidates["predictions"], k=3)
recommendations

['515d04725eeb1c524c0020f4',
 '516c96a30f8b7850670001ee',
 '515b6eb41b12b0244a000661']

Check logs in case of failure:

In [15]:
ranking_deployment.get_logs(component="transformer", tail=200)

Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/1197208/deployments/353350

DeployableComponentLogs(instance_name: 'ranking-transformer-00001-deployment-f97cb5bbf-p5h76', date: datetime.datetime(2025, 1, 1, 2, 30, 34, 787575)) 
INFO:root:Loading component module...
INFO:root:[TransformerModel] Initializing transformer for model: ranking
INFO:root:[HopsworksModel] Initializing for model: ranking
INFO:hsfs.engine.python:Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai/p/1197208
... execution time: 7.677094 seconds
INFO:root:Starting KServe server...
2025-01-01 01:30:02.215 8 kserve INFO [model_server.py:register_model():363] Registering model: ranking
2025-01-01 01:30:02.216 8 kserve INFO [model_server.py:start():298] Setting max asyncio worker threads as 12
2025-01-01 01:30:02.216 8 kserve INFO [model_server.py:_serve_rest():244] Starting uvicorn with 1 workers


2025-01-01 01:30:02.251 uvicorn.error INF

# Deploying the query inference pipeline

In [16]:
query_model_deployment = (
    hopsworks_integration.two_tower_serving.HopsworksQueryModel.deploy(project=project)
)

Uploading: 100.000%|██████████| 2469/2469 elapsed<00:02 remaining<00:00


Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1197208/deployments/352324
Before making predictions, start the deployment by using `.start()`


In [17]:
query_model_deployment.start()

Deployment is ready: 100%|██████████| 6/6 [00:26<00:00,  4.47s/it]    

Start making predictions by using `.predict()`





In [None]:
# Check logs in case of failure
query_model_deployment.get_logs(component="transformer", tail=20)

In [19]:
test_ranking_input = [
        {
            "user_id": "a926fcb4-fcb9-461c-bcf3-94e99087488a",
        }
    ]

# Test ranking deployment
ranked_candidates = query_model_deployment.predict(inputs=test_ranking_input)

# Retrieve artwork ids of the top recommended items
recommendations = get_top_recommendations(ranked_candidates["predictions"], k=3)
recommendations

ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

# <span style="color:#ff5f27"> Stopping the Hopsworks deployments </span>
Stop the deployment when you're not using it.

In [None]:
ranking_deployment.stop()
query_model_deployment.stop()