<div style="background-color:moccasin; color:black; padding:5px; font-size:20px">
Setup

In [1]:
!pip install hopsworks

Collecting hopsworks
  Using cached hopsworks-3.4.4-py3-none-any.whl
Collecting hsfs<3.5.0,>=3.4.0 (from hsfs[python]<3.5.0,>=3.4.0->hopsworks)
  Using cached hsfs-3.4.7-py3-none-any.whl
Collecting hsml<3.5.0,>=3.4.0 (from hopsworks)
  Using cached hsml-3.4.6-py3-none-any.whl
Collecting pyhumps==1.6.1 (from hopsworks)
  Using cached pyhumps-1.6.1-py3-none-any.whl.metadata (3.7 kB)
Collecting furl (from hopsworks)
  Using cached furl-2.1.3-py2.py3-none-any.whl.metadata (1.2 kB)
Collecting pyjks (from hopsworks)
  Downloading pyjks-20.0.0-py2.py3-none-any.whl.metadata (1.7 kB)
Collecting avro==1.11.0 (from hsfs<3.5.0,>=3.4.0->hsfs[python]<3.5.0,>=3.4.0->hopsworks)
  Using cached avro-1.11.0-py2.py3-none-any.whl
Collecting PyMySQL[rsa] (from hsfs<3.5.0,>=3.4.0->hsfs[python]<3.5.0,>=3.4.0->hopsworks)
  Using cached PyMySQL-1.1.0-py3-none-any.whl.metadata (4.4 kB)
Collecting great-expectations==0.14.13 (from hsfs<3.5.0,>=3.4.0->hsfs[python]<3.5.0,>=3.4.0->hopsworks)
  Using cached great_exp

<div style="background-color:teal; color:white; padding:0px; font-size:20px">
Imports

In [2]:
import os

<div style="background-color:moccasin; color:black; padding:5px; font-size:20px">
Hopsworks Feature Store Connection

In [3]:
import hopsworks

project = hopsworks.login()

fs = project.get_feature_store()

Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/475285
Connected. Call `.close()` to terminate connection gracefully.


In [4]:
# Connect to Hopsworks Model Registry
mr = project.get_model_registry()

dataset_api = project.get_dataset_api()

Connected. Call `.close()` to terminate connection gracefully.


<div style="background-color:moccasin; color:black; padding:5px; font-size:20px">
Ranking Transformer & Model Definition

In [5]:
ranking_model = mr.get_best_model(
    name="ranking_model", 
    metric="fscore", 
    direction="max",
)
ranking_model

Model(name: 'ranking_model', version: 1)

<div style="background-color: darkgreen ; color:white; padding:0px; font-size:15px">
Transformer Definition

In [5]:
%%writefile scripts/ranking_transformer.py

# PINECONE SETUP
# - - - - - - - - - - - - - - - - - - 
import subprocess
import sys

# Use subprocess to run the pip install command
subprocess.check_call([sys.executable, "-m", "pip", "install", "pinecone-client"])

import pinecone
from pinecone import Pinecone

# IMPORTS
# - - - - - - - - - - - - - - - - - - 

import os
import pandas as pd

import hopsworks
from opensearchpy import OpenSearch

import logging

class Transformer(object):
    
    def __init__(self):
        
        # Connect to Hopsworks
        project = hopsworks.connection().get_project()
        self.fs = project.get_feature_store()

        # Connect to Pinecone
        self.pc = Pinecone(api_key='83447319-a1d1-446b-bfbb-9fbce3071957')
        
        # Establish the 'articles' feature view
        self.articles_fv = self.fs.get_feature_view(
            name="articles", 
            version=1,
        )
        
        # Get list of feature names for articles
        self.articles_features = [feat.name for feat in self.articles_fv.schema]
        
        # Establish the 'customers' feature view
        self.customer_fv = self.fs.get_feature_view(
            name="customers", 
            version=1,
        )

        # Establish the 'candidate_embeddings' feature view
        self.pc_index = self.pc.Index("recsys-project")

        # Retrieve ranking model
        mr = project.get_model_registry()
        model = mr.get_model(
            name="ranking_model", 
            version=1,
        )
        
        # Extract input schema from the model
        input_schema = model.model_schema["input_schema"]["columnar_schema"]
        
        # Get the names of features expected by the ranking model
        self.ranking_model_feature_names = [feat["name"] for feat in input_schema]

    def preprocess(self, inputs):
        # Extract the input instance
        inputs = inputs["instances"][0]
        
        # Extract customer_id from inputs
        customer_id = inputs["customer_id"]

        # Retrieve the 'candidate_embeddings' feature view
        neighbors = self.pc_index.query(
            vector=inputs["query_emb"],
            top_k=100,
            include_values=True)

        # Extract item_ids from returned object
        neighbors = [match['id'] for match in neighbors['matches']]

        # BUILD ARTICLE DATA INPUTS
        # - - - - - - - - - - - - - - - - - - - - - - - - - -  
        # Filter Out previously purchased candidates
        # Enrich remaining with Article data
        
        # Get IDs of items already bought by the customer
        already_bought_items_ids = self.fs.sql(
            f"SELECT article_id from transactions_1 WHERE customer_id = '{customer_id}'"
        ).values.reshape(-1).tolist()

        # Filter candidate items to exclude those already bought by the customer
        item_id_list = [
            str(item_id) 
            for item_id 
            in neighbors 
            if str(item_id) 
            not in already_bought_items_ids
        ]
        # Create df from filtered list
        item_id_df = pd.DataFrame({"article_id" : item_id_list})
        
        # Retrieve Article data for candidate items
        articles_data = [
            self.articles_fv.get_feature_vector({"article_id": item_id}) 
            for item_id 
            in item_id_list
        ]

        articles_df = pd.DataFrame(
            data=articles_data, 
            columns=self.articles_features,
        )
        
        # Join candidate items with their features
        ranking_model_inputs = item_id_df.merge(
            articles_df, 
            on="article_id", 
            how="inner",
        )        

        # ADD CUSTOMER DATA
        # - - - - - - - - - - - - - - - - - - - - - 
        
        # Add customer features to ranking_model_inputs
        customer_features = self.customer_fv.get_feature_vector(
            {"customer_id": customer_id}, 
            return_type="pandas",
        )
        ranking_model_inputs["age"] = customer_features.age.values[0]   
        ranking_model_inputs["month_sin"] = inputs["month_sin"]
        ranking_model_inputs["month_cos"] = inputs["month_cos"]

        # Select only the features required by the ranking model
        ranking_model_inputs = ranking_model_inputs[self.ranking_model_feature_names]
                
        return { 
            "inputs" : [{"ranking_features": ranking_model_inputs.values.tolist(), "article_ids": item_id_list}]
        }
        
    # A simple postprocess method that just returns the model's output
    def postprocess(self, outputs):
        return outputs

Overwriting ranking_transformer.py


In [6]:
# Copy transformer file into Hopsworks File System 
uploaded_file_path = dataset_api.upload(
    "ranking_transformer.py",    # File name to be uploaded
    "Resources",                 # Destination directory in Hopsworks File System 
    overwrite=True,              # Overwrite the file if it already exists
) 

# Construct the path to the uploaded transformer script
transformer_script_path = os.path.join(
    "/Projects",                 # Root directory for projects in Hopsworks
    project.name,                # Name of the current project
    uploaded_file_path,          # Path to the uploaded file within the project
)
print(transformer_script_path)

Uploading: 0.000%|          | 0/5404 elapsed<00:00 remaining<?

/Projects/RecSys_Project/Resources/ranking_transformer.py


<div style="background-color: darkgreen ; color:white; padding:0px; font-size:15px">
Ranking Predictor Definition

In [50]:
%%writefile scripts/ranking_predictor.py

import os
import joblib
import numpy as np

import logging

class Predict(object):
    
    def __init__(self):
        # Instantiate preloaded model through env. path
        self.model = joblib.load(os.environ["ARTIFACT_FILES_PATH"] + "/ranking_model.pkl")

    def predict(self, inputs):
        # Extract ranking features and article IDs from the inputs
        features = inputs[0].pop("ranking_features")
        article_ids = inputs[0].pop("article_ids")
        
        # Log the extracted features
        logging.info("predict -> " + str(features))

        # Predict probabilities for the positive class
        scores = self.model.predict_proba(features).tolist()
        
        # Get scores of positive class
        scores = np.asarray(scores)[:,1].tolist() 

        # Return the predicted scores along with the corresponding article IDs
        return {
            "scores": scores, 
            "article_ids": article_ids,
        }

Overwriting ranking_predictor.py


In [51]:
# Upload predictor file to Hopsworks
uploaded_file_path = dataset_api.upload(
    "ranking_predictor.py", 
    "Resources", 
    overwrite=True,
)

# Construct the path to the uploaded script
predictor_script_path = os.path.join(
    "/Projects", 
    project.name, 
    uploaded_file_path,
)

Uploading: 0.000%|          | 0/1056 elapsed<00:00 remaining<?

<div style="background-color:teal; color:white; padding:0px; font-size:15px">
Deploy Ranking Transformer & Predictor

In [52]:
from hsml.transformer import Transformer

ranking_deployment_name = "rankingdeployment"

# Define transformer
ranking_transformer=Transformer(
    script_file=transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy ranking model
ranking_deployment = ranking_model.deploy(
    name=ranking_deployment_name,
    description="Deployment that search for item candidates and scores them based on customer metadata",
    script_file=predictor_script_path,
    resources={"num_instances": 0},
    transformer=ranking_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/475285/deployments/211971
Before making predictions, start the deployment by using `.start()`


In [53]:
# Start the deployment
ranking_deployment.start()

  0%|          | 0/6 [00:00<?, ?it/s]

Start making predictions by using `.predict()`


In [None]:
# Check logs in case of failure
# ranking_deployment.get_logs(component="predictor", tail=200)

<div style="background-color:teal; color:white; padding:0px; font-size:15px">
Test Input

In [54]:
# Define a test input example
test_ranking_input = {"instances": [{"customer_id": "641e6f3ef3a2d537140aaa0a06055ae328a0dddf2c2c0dd6e60eb0563c7cbba0",
    "month_sin": 1.2246467991473532e-16,
    "query_emb": [0.214135289,
     0.571055949,
     0.330709577,
     -0.225899458,
     -0.308674961,
     -0.0115124583,
     0.0730511621,
     -0.495835781,
     0.625569344,
     -0.0438038409,
     0.263472944,
     -0.58485353,
     -0.307070434,
     0.0414443575,
     -0.321789205,
     0.966559],
    "month_cos": -1.0}]}

# Test ranking deployment
ranked_candidates = ranking_deployment.predict(test_ranking_input)

In [56]:
print(len(ranked_candidates['predictions']['scores']))
ranked_candidates['predictions']['scores']

100


[0.7430244211692006,
 0.7585596262407897,
 0.6861102701865665,
 0.6455397102284,
 0.699930794440468,
 0.7130005673651948,
 0.7601510040212085,
 0.5539483210251976,
 0.7965213583317088,
 0.7331866512756948,
 0.1317626649365221,
 0.7585596262407897,
 0.6914248604130043,
 0.8200589437024415,
 0.7039788194004061,
 0.8688150609668962,
 0.7453963163379268,
 0.37384192492142393,
 0.48851115644954474,
 0.6393241343209795,
 0.699930794440468,
 0.740882735032415,
 0.7662364701556239,
 0.48018742725347224,
 0.8082301506408861,
 0.07972623701178268,
 0.7965213583317088,
 0.8086197542617813,
 0.8716521751272145,
 0.8132439697229588,
 0.6664842052627522,
 0.6145544189758932,
 0.7791381593522888,
 0.12047450744624971,
 0.7895187755856431,
 0.7148938628854001,
 0.7675905446370843,
 0.6495325600334888,
 0.8716521751272145,
 0.8122650444828086,
 0.6785211715004464,
 0.8785795693948232,
 0.7147594316631354,
 0.7662364701556239,
 0.8352648209651341,
 0.6311623789578753,
 0.5659960808467102,
 0.75635755410

In [25]:
# Check logs in case of failure
# ranking_deployment.get_logs(component="transformer",tail=200)

Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/475285/deployments/212993

Instance name: rankingdeployment2-transformer-default-00001-deployment-8cgmdq8
2024-02-28 22:37:59.752 7 root INFO [<module>():180] Loading serving script
Collecting pinecone-client
  Downloading pinecone_client-3.1.0-py3-none-any.whl (210 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 211.0/211.0 kB 5.0 MB/s eta 0:00:00
Installing collected packages: pinecone-client
Successfully installed pinecone-client-3.1.0
2024-02-28 22:38:06.915 7 root INFO [__init__():117] Initializing transformer for deployment: rankingdeployment2
Connected. Call `.close()` to terminate connection gracefully.
2024-02-28 22:38:09.200 7 root INFO [<module>():196] Starting KServe server
2024-02-28 22:38:09.201 7 root INFO [register_model():187] Registering model: rankingdeployment2
2024-02-28 22:38:09.201 7 root INFO [start():129] Setting max asyncio worker threads as 12
2024-02-28 22:38:09.202 7 ro

<div style="background-color:moccasin; color:black; padding:5px; font-size:20px">
Query Model

In [36]:
# Retrieve the 'query_model' from the Model Registry
query_model = mr.get_model(
    name="query_model",
    version=1,
)

<div style="background-color: darkgreen ; color:white; padding:0px; font-size:15px">
Query Transformer Definition

In [37]:
%%writefile scripts/querymodel_transformer.py

import os
import numpy as np
import pandas as pd
from datetime import datetime

import hopsworks

import logging

class Transformer(object):
    
    def __init__(self):            
        # Connect to the Hopsworks
        project = hopsworks.connection().get_project()
        ms = project.get_model_serving()
    
        # Retrieve the 'customers' feature view
        fs = project.get_feature_store()
        self.customer_fv = fs.get_feature_view(
            name="customers", 
            version=1,
        )
        # Retrieve the ranking deployment 
        self.ranking_server = ms.get_deployment("rankingdeployment")

    def preprocess(self, inputs):
        # Check if the input data contains a key named "instances"
        # and extract the actual data if present
        inputs = inputs["instances"] if "instances" in inputs else inputs
        
        # Extract customer_id and transaction_date from the inputs
        customer_id = inputs["customer_id"]
        transaction_date = inputs["transaction_date"]
        
        # Extract month from the transaction_date
        month_of_purchase = datetime.fromisoformat(inputs.pop("transaction_date"))
        
        # Get customer features
        customer_features = self.customer_fv.get_feature_vector(
            {"customer_id": customer_id}, 
            return_type="pandas",
        )
        
        # Enrich inputs with customer age
        inputs["age"] = customer_features.age.values[0]   
        
        # Calculate the sine and cosine of the month_of_purchase
        month_of_purchase = datetime.strptime(transaction_date, "%Y-%m-%dT%H:%M:%S.%f").month
        
        # Calculate a coefficient for adjusting the periodicity of the month
        coef = np.random.uniform(0, 2 * np.pi) / 12
        
        # Calculate the sine and cosine components for the month_of_purchase
        inputs["month_sin"] = float(np.sin(month_of_purchase * coef)) 
        inputs["month_cos"] = float(np.cos(month_of_purchase * coef))
                
        return {
            "instances" : [inputs]
        }

    def postprocess(self, outputs):
        # Return ordered ranking predictions        
        return {
            "predictions": self.ranking_server.predict({ "instances": outputs["predictions"]}),
        }

Writing querymodel_transformer.py


In [38]:
# Copy transformer file into Hopsworks File System
uploaded_file_path = dataset_api.upload(
    "querymodel_transformer.py", 
    "Models", 
    overwrite=True,
)

# Construct the path to the uploaded script
transformer_script_path = os.path.join(
    "/Projects", 
    project.name, 
    uploaded_file_path,
)

Uploading: 0.000%|          | 0/2301 elapsed<00:00 remaining<?

In [40]:
from hsml.transformer import Transformer

query_model_deployment_name = "querydeployment"

# Define transformer
query_model_transformer=Transformer(
    script_file=transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy the query model
query_model_deployment = query_model.deploy(
    name=query_model_deployment_name,
    description="Deployment that generates query embeddings from customer and item features using the query model",
    resources={"num_instances": 0},
    transformer=query_model_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/475285/deployments/212994
Before making predictions, start the deployment by using `.start()`


<div style="background-color:moccasin; color:black; padding:5px; font-size:20px">
Run Registered Deployment

In [41]:
# Start the deployment
query_model_deployment.start()

  0%|          | 0/6 [00:00<?, ?it/s]

Start making predictions by using `.predict()`


In [57]:
query_model_deployment.get_state()

PredictorState(status: 'Running')

In [None]:
# Check logs in case of failure
# query_model_deployment.get_logs(component="transformer", tail=20)

<div style="background-color:teal; color:white; padding:0px; font-size:15px">
Test Input

In [58]:
# Define a test input example
data = {"instances": {"customer_id": "641e6f3ef3a2d537140aaa0a06055ae328a0dddf2c2c0dd6e60eb0563c7cbba0", "transaction_date": "2022-11-15T12:16:25.330916"}}

# Test the deployment
ranked_candidates = query_model_deployment.predict(data)

In [46]:
# Check logs in case of failure
# query_model_deployment.get_logs(component="transformer",tail=200)

Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/475285/deployments/212994

Instance name: querydeployment-transformer-default-00001-deployment-756bfmpr7s
2024-02-28 22:47:49.850 7 root INFO [<module>():180] Loading serving script
2024-02-28 22:47:52.548 7 root INFO [__init__():117] Initializing transformer for deployment: querydeployment
Connected. Call `.close()` to terminate connection gracefully.
2024-02-28 22:47:54.252 7 root INFO [<module>():196] Starting KServe server
2024-02-28 22:47:54.252 7 root INFO [register_model():187] Registering model: querydeployment
2024-02-28 22:47:54.252 7 root INFO [start():129] Setting max asyncio worker threads as 12
2024-02-28 22:47:54.253 7 root INFO [serve():139] Starting uvicorn with 1 workers
2024-02-28 22:47:54.277 7 uvicorn.error INFO [serve():84] Started server process [7]
2024-02-28 22:47:54.278 7 uvicorn.error INFO [startup():45] Waiting for application startup.
2024-02-28 22:47:54.280 7 root INFO 

In [59]:
ranked_candidates

{'predictions': {'predictions': {'scores': [0.23599776567707298,
    0.19932391500208757,
    0.5942909856232121,
    0.23599776567707298,
    0.23987238926958487,
    0.2364033224912844,
    0.20707352284616973,
    0.2366265728914199,
    0.20225402051433128,
    0.5655834042601063,
    0.26274826372743915,
    0.38158253989696667,
    0.403598433212939,
    0.23599776567707298,
    0.33799600895184473,
    0.22853893786044238,
    0.2435052652018657,
    0.6589744285442811,
    0.30261287712302587,
    0.30564657570210196,
    0.45809089362495237,
    0.24674364979348845,
    0.49932652478589273,
    0.203628937594705,
    0.20436548906506452,
    0.19779895434749692,
    0.6176950253223634,
    0.30153787619906763,
    0.23153043265445283,
    0.30261287712302587,
    0.2597973345593323,
    0.2220562931959128,
    0.27632656576659503,
    0.3849930121081394,
    0.7063842671822252,
    0.1282713555390683,
    0.26353636169276234,
    0.2977738624990898,
    0.6407987819605172,
   

<div style="background-color:moccasin; color:black; padding:5px; font-size:20px">
Stop Deployment

In [60]:
# Stop the ranking model deployment
ranking_deployment.stop()

# Stop the query model deployment
query_model_deployment.stop()

  0%|          | 0/4 [00:00<?, ?it/s]

  0%|          | 0/3 [00:00<?, ?it/s]