## <span style="color:#ff5f27">👨🏻‍🏫 Create Deployment </span>

Creating a deployment for the recommendation system.


## <span style="color:#ff5f27">📝 Imports </span>


In [5]:
import os
import hopsworks

## <span style="color:#ff5f27">🔮 Connect to Hopsworks </span>

In [6]:
import hopsworks

project = hopsworks.login()

# Connect to Hopsworks Model Registry
mr = project.get_model_registry()

dataset_api = project.get_dataset_api()

2025-06-15 16:16:21,943 INFO: Initializing external client
2025-06-15 16:16:21,946 INFO: Base URL: https://c.app.hopsworks.ai:443
2025-06-15 16:16:23,829 INFO: Python Engine initialized.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/1220788


## <span style="color:#ff5f27">🚀 Ranking Model Deployment </span>


Since the ranking model is a CatBoost model we need to implement a `Predict` class that tells Hopsworks how to load the model and how to use it.


In [7]:
weather_ranking_model = mr.get_best_model(
    name="weather_ranking_model", 
    metric="fscore", 
    direction="max",
)
weather_ranking_model


Model(name: 'weather_ranking_model', version: 1)

In [8]:
no_weather_ranking_model = mr.get_best_model(
    name="no_weather_ranking_model", 
    metric="fscore", 
    direction="max",
)
no_weather_ranking_model

Model(name: 'no_weather_ranking_model', version: 1)

In [108]:
%%writefile weather_ranking_transformer.py

import os
import pandas as pd
import datetime
import hopsworks
import logging

class Transformer(object):
    def __init__(self):
        # Connect to Hopsworks
        project = hopsworks.connection().get_project()
        self.fs = project.get_feature_store()
        self.events_fv = self.fs.get_feature_view(name="events", version=1)
        self.event_features = [feat.name for feat in self.events_fv.schema]
        self.users_fv = self.fs.get_feature_view(name="users", version=1)
        self.candidate_index = self.fs.get_feature_view(name="candidate_embeddings", version=1)
        mr = project.get_model_registry()
        model = mr.get_model(name="weather_ranking_model", version=1)
        input_schema = model.model_schema["input_schema"]["columnar_schema"]
        self.ranking_model_feature_names = [feat["name"] for feat in input_schema]
        self.current_event_ids = []

    def preprocess(self, inputs):
        print("Transformer preprocess input:", inputs)
        if isinstance(inputs, dict) and "instances" in inputs:
            instance = inputs["instances"][0]
        else:
            instance = inputs[0] if isinstance(inputs, list) else inputs

        user_id = instance.get("user_id")
        query_emb = instance.get("query_emb")
        if user_id is None or query_emb is None:
            raise ValueError("user_id or query_emb missing in input")

        # Find candidate event IDs
        neighbors = self.candidate_index.find_neighbors(query_emb, k=100)
        candidate_ids = [n[0] for n in neighbors]

        # Retrieve event data
        events_data = [self.events_fv.get_feature_vector({"event_id": eid}) for eid in candidate_ids]
        events_df = pd.DataFrame(events_data, columns=self.event_features)

        # Filter to future events only
        current_date = datetime.datetime.now().date()
        events_df["start_date"] = pd.to_datetime(events_df["start_time"]).dt.date
        valid_events = events_df[events_df["start_date"] >= current_date]

        # If no valid events, return empty instances and store empty event_ids
        if valid_events.empty:
            self.current_event_ids = []
            print("No valid events found for user", user_id)
            return {"instances": []}

        # Merge user features
        user_features = self.users_fv.get_feature_vector({"user_id": user_id}, return_type="pandas")
        required_user_cols = [
            "user_id", "user_city", "age", "user_interests", "indoor_outdoor_preference",
            "user_weather_condition", "user_temperature", "user_precipitation"
        ]
        for col in required_user_cols:
            if col not in user_features.columns:
                raise ValueError(f"Missing user feature: {col}")
        valid_events[required_user_cols] = user_features[required_user_cols].values[0]

        # Select only the features required by the ranking model
        ranking_features = valid_events[self.ranking_model_feature_names]

        # Store event IDs for postprocess
        self.current_event_ids = valid_events["event_id"].tolist()
        print("Number of valid events:", len(self.current_event_ids))
        print("Output from preprocess:", {"instances": ranking_features.values.tolist()})

        return {"instances": ranking_features.values.tolist()}

    def postprocess(self, outputs):
        print("Transformer postprocess input:", outputs)
        predictions = outputs.get("predictions", [])
        if len(predictions) != len(self.current_event_ids):
            print("Mismatch between predictions and event IDs")
            raise ValueError("Mismatch between predictions and event IDs")
        ranking = list(zip(predictions, self.current_event_ids))
        ranking.sort(reverse=True)
        print("Postprocess returning", len(ranking), "ranked results")
        # Return both ranking and debug info
        return {
            "ranking": ranking,
            "debug": f"Number of valid events: {len(self.current_event_ids)}"
        }



Overwriting weather_ranking_transformer.py


In [109]:
# Upload the transformer script
weather_uploaded_file_path = dataset_api.upload(
    "weather_ranking_transformer.py",  # File name to be uploaded
    "Resources",                       # Destination directory in Hopsworks File System
    overwrite=True,                    # Overwrite the file if it already exists
)

# Construct the path to the uploaded transformer script
weather_transformer_script_path = os.path.join(
    "/Projects",                       # Root directory for projects in Hopsworks
    project.name,                      # Name of the current project
    weather_uploaded_file_path,        # Path to the uploaded file within the project
)

print("Transformer script path in Hopsworks:", weather_transformer_script_path)


Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_transformer.py: 0.000%|         …

Transformer script path in Hopsworks: /Projects/Weather_BasedEventRecSys/Resources/weather_ranking_transformer.py


In [110]:
%%writefile weather_ranking_predictor.py

import os
import joblib
import numpy as np
import logging

class Predict(object):
    def __init__(self):
        # Load the model from the environment variable
        model_path = os.environ["MODEL_FILES_PATH"]
        self.model = joblib.load(os.path.join(model_path, "weather_ranking_model.pkl"))
        logging.info("Model loaded successfully")

    def predict(self, inputs):
        # The inputs will be a dict with a list of lists under "instances"
        # Example: {"instances": [[feature1, feature2, ...], ...]}
        features = inputs["instances"]
        logging.info(f"Predict received {len(features)} instances")
        logging.info(f"Feature shape (if available): {np.array(features).shape if features else 'empty'}")

        # Predict probabilities for the positive class
        # (Assuming your model is a binary classifier with predict_proba)
        scores = self.model.predict_proba(features)[:, 1].tolist()

        # Return the scores (event_ids are not passed here, handle in postprocessing if needed)
        return {"predictions": scores}


Overwriting weather_ranking_predictor.py


In [112]:
predictor_uploaded_file_path = dataset_api.upload(
    "weather_ranking_predictor.py",
    "Resources",
    overwrite=True,
)

weather_predictor_script_path = os.path.join(
    "/Projects",
    project.name,
    predictor_uploaded_file_path,
)


Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_predictor.py: 0.000%|          |…

In [113]:
from hsml.transformer import Transformer

ranking_deployment_name = "weatherrankingdeployment"

# Define transformer
weather_ranking_transformer=Transformer(
    script_file=weather_transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy ranking model
weather_ranking_deployment = weather_ranking_model.deploy(
    name=ranking_deployment_name,
    description="Deployment that search for event candidates and scores them based on user metadata",
    script_file=weather_predictor_script_path,
    resources={"num_instances": 0},
    transformer=weather_ranking_transformer,
)


Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1220788/deployments/375818
Before making predictions, start the deployment by using `.start()`


In [114]:
weather_ranking_deployment.start()


  0%|          | 0/6 [00:00<?, ?it/s]

Start making predictions by using `.predict()`


In [115]:
def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[-1] for candidate in ranked_candidates['ranking'][:k]]

test_ranking_input = {
    "instances": [{
        "user_id": "LT819S",
        "query_emb": [0.214135289, 0.571055949, 0.330709577, -0.225899458, -0.308674961, 
                 -0.0115124583, 0.0730511621, -0.495835781, 0.625569344, -0.0438038409, 
                 0.263472944, -0.58485353, -0.307070434, 0.0414443575, -0.321789205, 
                 0.966559, 0.127463, -0.392714, 0.845132, -0.512387, 0.253901, 
                 -0.764589, 0.431267, 0.087342, -0.629045, 0.318976, -0.146782, 
                 0.573921, -0.087625, 0.934261, -0.271843, 0.652197]  # Your full query_emb
    }]
}

ranked_candidates = weather_ranking_deployment.predict(test_ranking_input)
recommendations = get_top_recommendations(ranked_candidates, k=3)
print(recommendations)


ModelServingException: Instances field should contain a 2-dim list.

In [61]:
%%writefile weather_ranking_transformer.py

import os
import pandas as pd
import datetime
import hopsworks
from opensearchpy import OpenSearch

import logging

class Transformer(object):
    def __init__(self):
        project = hopsworks.connection().get_project()
        self.fs = project.get_feature_store()
        self.events_fv = self.fs.get_feature_view(name="events", version=1)
        self.event_features = [feat.name for feat in self.events_fv.schema]
        self.users_fv = self.fs.get_feature_view(name="users", version=1)
        self.candidate_index = self.fs.get_feature_view(name="candidate_embeddings", version=1)
        mr = project.get_model_registry()
        model = mr.get_model(name="weather_ranking_model", version=1)
        input_schema = model.model_schema["input_schema"]["columnar_schema"]
        self.ranking_model_feature_names = [feat["name"] for feat in input_schema]

    def preprocess(self, inputs):
        # Extract the input instance (supports both list and dict input)
        if isinstance(inputs, dict) and "instances" in inputs:
            instance = inputs["instances"][0]
        else:
            instance = inputs[0] if isinstance(inputs, list) else inputs
        # Get user_id and query_emb (handle both dict and list of dicts)
        user_id = instance.get("user_id") if isinstance(instance, dict) else instance[0].get("user_id")
        query_emb = instance.get("query_emb") if isinstance(instance, dict) else instance[0].get("query_emb")
        if user_id is None or query_emb is None:
            raise ValueError("user_id or query_emb missing in input")

        # Get candidate event IDs
        neighbors = self.candidate_index.find_neighbors(query_emb, k=100)
        candidate_ids = [n[0] for n in neighbors]

        # Get full event data
        events_data = [self.events_fv.get_feature_vector({"event_id": eid}) for eid in candidate_ids]
        events_df = pd.DataFrame(events_data, columns=self.event_features)

        # Filter future events
        current_date = datetime.datetime.now().date()
        events_df["start_date"] = pd.to_datetime(events_df["start_time"]).dt.date
        valid_events = events_df[events_df["start_date"] >= current_date]

        # Merge user features
        user_features = self.users_fv.get_feature_vector({"user_id": user_id}, return_type="pandas")
        # Make sure user_features is not empty and has the required columns
        required_user_cols = ["user_id", "user_city", "age", "user_interests", "indoor_outdoor_preference",
                             "user_weather_condition", "user_temperature", "user_precipitation"]
        for col in required_user_cols:
            if col not in user_features.columns:
                raise ValueError(f"Missing user feature: {col}")
        # Assign user features to all rows
        valid_events[required_user_cols] = user_features[required_user_cols].values[0]

        # Select only the features required by the ranking model
        ranking_features = valid_events[self.ranking_model_feature_names]
        return {
            "inputs": [{
                "ranking_features": ranking_features.values.tolist(),
                "event_ids": valid_events["event_id"].tolist()
            }]
        }

    def postprocess(self, outputs):
        preds = outputs["predictions"]
        ranking = list(zip(preds["scores"], preds["event_ids"]))
        ranking.sort(reverse=True)
        return {"ranking": ranking}



Overwriting weather_ranking_transformer.py


In [62]:
# Copy transformer file into Hopsworks File System 
weather_uploaded_file_path = dataset_api.upload(
    "weather_ranking_transformer.py",    # File name to be uploaded
    "Resources",                 # Destination directory in Hopsworks File System 
    overwrite=True,              # Overwrite the file if it already exists
) 

# Construct the path to the uploaded transformer script
weather_transformer_script_path = os.path.join(
    "/Projects",                 # Root directory for projects in Hopsworks
    project.name,                # Name of the current project
    weather_uploaded_file_path,          # Path to the uploaded file within the project
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_transformer.py: 0.000%|         …

In [63]:

%%writefile weather_ranking_predictor.py

import os
import joblib
import numpy as np

import logging

class Predict(object):
    def __init__(self):
        model_path = os.environ["MODEL_FILES_PATH"]
        self.model = joblib.load(os.path.join(model_path, "weather_ranking_model.pkl"))

    def predict(self, inputs):
        if isinstance(inputs, list) and len(inputs) > 0 and isinstance(inputs[0], dict):
            features = inputs[0].pop("ranking_features")
            event_ids = inputs[0].pop("event_ids")
        else:
            raise ValueError("Inputs must be a list with one dict")
        logging.info(f"predict -> features shape: {len(features)}x{len(features[0]) if features else 0}")
        scores = self.model.predict_proba(features)[:, 1].tolist()
        return {"scores": scores, "event_ids": event_ids}


Overwriting weather_ranking_predictor.py


In [64]:

# Upload predictor file to Hopsworks
weather_uploaded_file_path = dataset_api.upload(
    "weather_ranking_predictor.py", 
    "Resources", 
    overwrite=True,
)

# Construct the path to the uploaded script
weather_predictor_script_path = os.path.join(
    "/Projects", 
    project.name, 
    weather_uploaded_file_path,
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_predictor.py: 0.000%|          |…

In [65]:

from hsml.transformer import Transformer

ranking_deployment_name = "weatherrankingdeployment"

# Define transformer
weather_ranking_transformer=Transformer(
    script_file=weather_transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy ranking model
weather_ranking_deployment = weather_ranking_model.deploy(
    name=ranking_deployment_name,
    description="Deployment that search for event candidates and scores them based on user metadata",
    script_file=weather_predictor_script_path,
    resources={"num_instances": 0},
    transformer=weather_ranking_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1220788/deployments/374791
Before making predictions, start the deployment by using `.start()`


In [66]:
# Start the deployment
weather_ranking_deployment.start()

  0%|          | 0/6 [00:00<?, ?it/s]

Start making predictions by using `.predict()`


In [None]:

def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[-1] for candidate in ranked_candidates['ranking'][:k]]


# Define a test input example for the ranking model
test_ranking_input = {"instances": [[{
    "user_id": "LT819S",  
    "query_emb": [0.214135289, 0.571055949, 0.330709577, -0.225899458, -0.308674961, 
                 -0.0115124583, 0.0730511621, -0.495835781, 0.625569344, -0.0438038409, 
                 0.263472944, -0.58485353, -0.307070434, 0.0414443575, -0.321789205, 
                 0.966559, 0.127463, -0.392714, 0.845132, -0.512387, 0.253901, 
                 -0.764589, 0.431267, 0.087342, -0.629045, 0.318976, -0.146782, 
                 0.573921, -0.087625, 0.934261, -0.271843, 0.652197]
}]]}

# Test ranking deployment
ranked_candidates = weather_ranking_deployment.predict(test_ranking_input)

# Retrieve event IDs of the top recommended items
recommendations = get_top_recommendations(ranked_candidates, k=3)
recommendations

def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[-1] for candidate in ranked_candidates['ranking'][:k]]

# Test ranking deployment
test_ranking_input = {
    "instances": [{
        "user_id": "LT819S",
        "query_emb": [0.214135289, 0.571055949, 0.330709577, -0.225899458, -0.308674961, 
                 -0.0115124583, 0.0730511621, -0.495835781, 0.625569344, -0.0438038409, 
                 0.263472944, -0.58485353, -0.307070434, 0.0414443575, -0.321789205, 
                 0.966559, 0.127463, -0.392714, 0.845132, -0.512387, 0.253901, 
                 -0.764589, 0.431267, 0.087342, -0.629045, 0.318976, -0.146782, 
                 0.573921, -0.087625, 0.934261, -0.271843, 0.652197]
    }]
}

ranked_candidates = weather_ranking_deployment.predict(test_ranking_input)
recommendations = get_top_recommendations(ranked_candidates, k=3)
print(recommendations)

ModelServingException: Instances field should contain a 2-dim list.

In [37]:
# Check logs in case of failure
weather_ranking_deployment.get_logs(component="predictor", tail=200)

Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/1220788/deployments/374789



RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1220788/serving/374789/logs). Server response: 
HTTP code: 404, HTTP reason: Not Found, body: b'{"errorCode":240027,"errorMsg":"Server logs not available"}', error code: 240027, error msg: Server logs not available, user msg: 

In [192]:
%%writefile weather_ranking_transformer.py

import os
import pandas as pd
import logging
from datetime import datetime

import hopsworks
from opensearchpy import OpenSearch

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class Transformer(object):
    
    def __init__(self):
        try:
            # Connect to Hopsworks
            project = hopsworks.connection().get_project()
            self.fs = project.get_feature_store()
            
            # Retrieve the 'events' feature view
            self.events_fv = self.fs.get_feature_view(
                name="events", 
                version=1,
            )
            
            # Get list of feature names for events
            self.event_features = [feat.name for feat in self.events_fv.schema]
            logger.info(f"Event features: {self.event_features}")
            
            # Retrieve the 'users' feature view
            self.users_fv = self.fs.get_feature_view(
                name="users", 
                version=1,
            )
            
            # Get list of user features
            self.user_features_list = [feat.name for feat in self.users_fv.schema]
            logger.info(f"User features: {self.user_features_list}")

            # Retrieve the 'candidate_embeddings' feature view
            self.candidate_index = self.fs.get_feature_view(
                name="candidate_embeddings", 
                version=1,
            )

            # Retrieve ranking model
            mr = project.get_model_registry()
            model = mr.get_model(
                name="weather_ranking_model", 
                version=1,
            )
            
            # Extract input schema from the model
            input_schema = model.model_schema["input_schema"]["columnar_schema"]
            
            # Get the names of features expected by the ranking model
            self.ranking_model_feature_names = [feat["name"] for feat in input_schema]
            logger.info(f"Ranking model features: {self.ranking_model_feature_names}")
            
            # Define query and candidate features
            self.query_features = ["user_id", "user_city", "age", "user_interests"]
            self.candidate_features = ["event_id", "title", "event_type", "event_city"]
            
            logger.info("Transformer initialized successfully")
        except Exception as e:
            logger.error(f"Error initializing transformer: {str(e)}")
            raise
            
    def preprocess(self, inputs):
        try:
            logger.info(f"Input structure: {inputs}")
            
            # Handle different input formats
            if isinstance(inputs["instances"], list):
                if len(inputs["instances"]) > 0:
                    if isinstance(inputs["instances"][0], list):
                        # Handle double nested list format: {"instances": [[{...}]]}
                        instance = inputs["instances"][0][0]
                    else:
                        # Handle single nested list format: {"instances": [{...}]}
                        instance = inputs["instances"][0]
                else:
                    logger.warning("Empty instances list")
                    return {"inputs": [{"ranking_features": [], "event_ids": []}]}
            else:
                # Handle direct format: {"instances": {...}}
                instance = inputs["instances"]

            # Extract user_id from inputs
            user_id = instance["user_id"]
            logger.info(f"Processing for user_id: {user_id}")
            
            # Check if query_emb exists
            if "query_emb" not in instance:
                logger.error("No query_emb found in input")
                return {"inputs": [{"ranking_features": [], "event_ids": []}]}
            
            # Log query embedding size
            logger.info(f"Query embedding size: {len(instance['query_emb'])}")
            
            # Search for candidate items
            try:
                neighbors = self.candidate_index.find_neighbors(
                    instance["query_emb"], 
                    k=100,
                )
                neighbors = [neighbor[0] for neighbor in neighbors]
                logger.info(f"Found {len(neighbors)} neighbors")
            except Exception as e:
                logger.error(f"Error finding neighbors: {str(e)}")
                return {"inputs": [{"ranking_features": [], "event_ids": []}]}
            
            # If no neighbors found, return empty result
            if not neighbors:
                logger.warning("No neighbors found")
                return {"inputs": [{"ranking_features": [], "event_ids": []}]}
            
            # Filter candidate events 
            event_id_list = [event_id for event_id in neighbors]
            event_id_df = pd.DataFrame({"event_id": event_id_list})
            
            # Retrieve event data for candidate events
            events_data = []
            for event_id in event_id_list:
                try:
                    event = self.events_fv.get_feature_vector({"event_id": event_id})
                    events_data.append(event)
                except Exception as e:
                    logger.warning(f"Could not get features for event {event_id}: {str(e)}")
            
            # If no events data, return empty result
            if not events_data:
                logger.warning("No valid events found")
                return {"inputs": [{"ranking_features": [], "event_ids": []}]}
            
            events_df = pd.DataFrame(
                data=events_data, 
                columns=self.event_features,
            )
            logger.info(f"Retrieved {len(events_df)} events")
            
            # Join candidate items with their features
            ranking_model_inputs = event_id_df.merge(
                events_df, 
                on="event_id", 
                how="inner",
            )
            logger.info(f"After merge: {len(ranking_model_inputs)} events")
            
            # Add user features
            try:
                user_features = self.users_fv.get_feature_vector(
                    {"user_id": user_id}, 
                    return_type="pandas",
                )
                logger.info(f"User features columns: {user_features.columns.tolist()}")
            except Exception as e:
                logger.error(f"Error getting user features: {str(e)}")
                # Continue with empty user features
                user_features = pd.DataFrame()
            
            # Add user features from query features list
            for feature in self.query_features:
                if feature in user_features.columns and len(user_features[feature].values) > 0:
                    ranking_model_inputs[feature] = user_features[feature].values[0]
                else:
                    # Add default value if feature not found
                    ranking_model_inputs[feature] = None
                    logger.warning(f"User feature {feature} not found, using None")
            
            # Check if we have all required features
            missing_features = [f for f in self.ranking_model_feature_names if f not in ranking_model_inputs.columns]
            if missing_features:
                logger.warning(f"Missing features: {missing_features}")
                # Add missing features with None values
                for feature in missing_features:
                    ranking_model_inputs[feature] = None
            
            # Select only the features required by the ranking model
            available_features = [f for f in self.ranking_model_feature_names if f in ranking_model_inputs.columns]
            
            # If no available features, return empty result
            if not available_features:
                logger.error("No available features for ranking model")
                return {"inputs": [{"ranking_features": [], "event_ids": []}]}
            
            ranking_model_inputs = ranking_model_inputs[available_features]
            
            # Check for NaN values and replace with default values
            ranking_model_inputs = ranking_model_inputs.fillna(0)


Overwriting weather_ranking_transformer.py


In [193]:
# Copy transformer file into Hopsworks File System 
uploaded_file_path = dataset_api.upload(
    "/home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_transformer.py",    # File name to be uploaded
    "Resources",                 # Destination directory in Hopsworks File System 
    overwrite=True,              # Overwrite the file if it already exists
) 

# Construct the path to the uploaded transformer script
transformer_script_path = os.path.join(
    "/Projects",                 # Root directory for projects in Hopsworks
    project.name,                # Name of the current project
    uploaded_file_path,          # Path to the uploaded file within the project
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_transformer.py: 0.000%|         …

In [194]:
%%writefile weather_ranking_predictor.py

import os
import joblib
import numpy as np
import logging

class Predict(object):
    
    def __init__(self):
        # List the directory contents to debug
        artifact_path = os.environ["ARTIFACT_FILES_PATH"]
        model_path = os.environ["MODEL_FILES_PATH"]
        logging.info(f"Artifact path contents: {os.listdir(artifact_path)}")
        logging.info(f"Model path contents: {os.listdir(model_path)}")
        
        # Try loading from MODEL_FILES_PATH instead
        self.model = joblib.load(os.path.join(model_path, "weather_ranking_model.pkl"))

    def predict(self, inputs):
        try:
            # Add detailed logging to help diagnose the issue
            logging.info(f"Input structure: {inputs}")
            
            # Handle different input formats
            if isinstance(inputs, list) and len(inputs) > 0:
                # If inputs is a list, get the first element
                input_data = inputs[0]
            else:
                # If inputs is not a list, use it directly
                input_data = inputs
            
            # Check if input_data has the expected keys
            if not isinstance(input_data, dict) or "ranking_features" not in input_data or "event_ids" not in input_data:
                logging.error(f"Invalid input format: {input_data}")
                return {"scores": [], "event_ids": []}
            
            # Extract ranking features and event IDs
            features = input_data["ranking_features"]
            event_ids = input_data["event_ids"]
            
            # Log the extracted features
            logging.info(f"Features: {features}")
            logging.info(f"Event IDs: {event_ids}")
            
            # If features is empty, return empty results
            if not features:
                return {"scores": [], "event_ids": []}
            
            # Predict probabilities for the positive class
            scores = self.model.predict_proba(features).tolist()
            
            # Get scores of positive class
            scores = np.asarray(scores)[:,1].tolist() 
            
            # Return the predicted scores along with the corresponding event IDs
            return {
                "scores": scores, 
                "event_ids": event_ids,
            }
        except Exception as e:
            # Add detailed logging to help diagnose the issue
            logging.error(f"Error in predict: {str(e)}")
            logging.error(f"Input structure: {inputs}")
            # Return empty result on error
            return {"scores": [], "event_ids": []}


Overwriting weather_ranking_predictor.py


In [None]:
# %%writefile weather_ranking_transformer.py

# import os
# import pandas as pd
# from datetime import datetime

# import hopsworks
# from opensearchpy import OpenSearch

# import logging


# class Transformer(object):
    
#     def __init__(self):
#         # Connect to Hopsworks
#         project = hopsworks.connection().get_project()
#         self.fs = project.get_feature_store()
        
#         # Retrieve the 'events' feature view
#         self.events_fv = self.fs.get_feature_view(
#             name="events", 
#             version=1,
#         )
        
#         # Get list of feature names for events
#         self.event_features = [feat.name for feat in self.events_fv.schema]
        
#         # Retrieve the 'users' feature view
#         self.users_fv = self.fs.get_feature_view(
#             name="users", 
#             version=1,
#         )

#         # Retrieve the 'candidate_embeddings' feature view
#         self.candidate_index = self.fs.get_feature_view(
#             name="candidate_embeddings", 
#             version=1,
#         )

#         # Retrieve ranking model
#         mr = project.get_model_registry()
#         model = mr.get_model(
#             name="weather_ranking_model", 
#             version=1,
#         )
        
#         # Extract input schema from the model
#         input_schema = model.model_schema["input_schema"]["columnar_schema"]
        
#         # Get the names of features expected by the ranking model
#         self.ranking_model_feature_names = [feat["name"] for feat in input_schema]
        
#         # Define specific features we need based on the provided lists
#         self.query_features = ["user_id", "user_city", "age", "user_interests"]
#         self.candidate_features = ["event_id", "title", "event_type", "event_city"]
            
#     def preprocess(self, inputs):
#         try:
#             # Extract the input instance - handle both formats
#             if isinstance(inputs["instances"], list) and len(inputs["instances"]) > 0:
#                 if isinstance(inputs["instances"][0], list):
#                     # Handle double nested list format: {"instances": [[{...}]]}
#                     instance = inputs["instances"][0][0]
#                 else:
#                     # Handle single nested list format: {"instances": [{...}]}
#                     instance = inputs["instances"][0]
#             else:
#                 # Handle direct format: {"instances": {...}}
#                 instance = inputs["instances"]

#             # Extract user_id from inputs
#             user_id = instance["user_id"]
            
#             # Search for candidate items
#             neighbors = self.candidate_index.find_neighbors(
#                 instance["query_emb"], 
#                 k=100,
#             )
#             neighbors = [neighbor[0] for neighbor in neighbors]
            
#             # Get user features - focusing on the specific query features
#             user_features = self.users_fv.get_feature_vector(
#                 {"user_id": user_id}, 
#                 return_type="pandas",
#             )
            
#             # Get user interests
#             user_interests = user_features["user_interests"].values[0].split(",") if "user_interests" in user_features.columns else []
            
#             # Get current date
#             current_date = datetime.now()
            
#             # Retrieve event data for candidate events
#             events_data = [
#                 self.events_fv.get_feature_vector({"event_id": event_id}) 
#                 for event_id 
#                 in neighbors
#             ]

#             events_df = pd.DataFrame(
#                 data=events_data, 
#                 columns=self.event_features,
#             )
            
#             # Filter logic implementation
#             filtered_events = []
#             filtered_event_ids = []
            
#             for index, row in events_df.iterrows():
#                 event_id = row["event_id"]
#                 event_type = row["event_type"] if "event_type" in events_df.columns else None
                
#                 # Skip if event category doesn't match user interests and user has interests
#                 if event_type and user_interests and event_type not in user_interests:
#                     continue
                    
#                 # If passed all filters, add to filtered list
#                 filtered_events.append(row)
#                 filtered_event_ids.append(event_id)
            
#             # Create DataFrame from filtered events
#             filtered_events_df = pd.DataFrame(filtered_events) if filtered_events else pd.DataFrame(columns=self.event_features)
#             event_id_df = pd.DataFrame({"event_id": filtered_event_ids})
            
#             # If no events passed the filters, return empty result
#             if filtered_events_df.empty:
#                 return {
#                     "inputs": [{"ranking_features": [], "event_ids": []}]
#                 }
            
#             # Join candidate items with their features - focus on candidate features
#             candidate_df = filtered_events_df[self.candidate_features].copy() if all(feat in filtered_events_df.columns for feat in self.candidate_features) else pd.DataFrame()
            
#             # Create the ranking model inputs
#             ranking_model_inputs = candidate_df.copy()
            
#             # Add user features - focus on query features
#             for feature in self.query_features:
#                 if feature in user_features.columns:
#                     ranking_model_inputs[feature] = user_features[feature].values[0]
            
#             # Select only the features required by the ranking model
#             available_features = [f for f in self.ranking_model_feature_names if f in ranking_model_inputs.columns]
#             ranking_model_inputs = ranking_model_inputs[available_features]
                    
#             return { 
#                 "inputs": [{"ranking_features": ranking_model_inputs.values.tolist(), "event_ids": filtered_event_ids}]
#             }
#         except Exception as e:
#             # Add detailed logging to help diagnose the issue
#             logging.error(f"Error in preprocess: {str(e)}")
#             logging.error(f"Input structure: {inputs}")
#             raise e

#     def postprocess(self, outputs):
#         try:
#             # Extract predictions from the outputs
#             preds = outputs["predictions"]
            
#             # Merge prediction scores and corresponding event IDs into a list of tuples
#             ranking = list(zip(preds["scores"], preds["event_ids"]))
            
#             # Sort the ranking list by score in descending order
#             ranking.sort(reverse=True)
            
#             # Return the sorted ranking list
#             return { 
#                 "ranking": ranking,
#             }
#         except Exception as e:
#             # Add detailed logging to help diagnose the issue
#             logging.error(f"Error in postprocess: {str(e)}")
#             logging.error(f"Output structure: {outputs}")
#             raise e


Writing weather_ranking_transformer.py


Overwriting no_weather_ranking_transformer.py


In [185]:
# Copy weather transformer file into Hopsworks File System 
weather_uploaded_file_path = dataset_api.upload(
    "/home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_transformer.py",    # File name to be uploaded
    "Resources",                 # Destination directory in Hopsworks File System 
    overwrite=True,              # Overwrite the file if it already exists
) 

# Construct the path to the uploaded weather transformer script
weather_transformer_script_path = os.path.join(
    "/Projects",                 # Root directory for projects in Hopsworks
    project.name,                # Name of the current project
    weather_uploaded_file_path,          # Path to the uploaded file within the project
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_transformer.py: 0.000%|         …

In [122]:
# Copy no_weather transformer file into Hopsworks File System 
no_weather_uploaded_file_path = dataset_api.upload(
    "/home/nkama/masters_thesis_project/thesis/src/no_weather_ranking_transformer.py",    # File name to be uploaded
    "Resources",                 # Destination directory in Hopsworks File System 
    overwrite=True,              # Overwrite the file if it already exists
) 

# Construct the path to the uploaded no_weather transformer script
no_weather_transformer_script_path = os.path.join(
    "/Projects",                 # Root directory for projects in Hopsworks
    project.name,                # Name of the current project
    no_weather_uploaded_file_path,          # Path to the uploaded file within the project
)

Uploading /home/nkama/masters_thesis_project/thesis/src/no_weather_ranking_transformer.py: 0.000%|          | …

In [143]:
%%writefile weather_ranking_predictor.py

import os
import joblib
import numpy as np
import logging

class Predict(object):
    
    def __init__(self):
        # List the directory contents to debug
        artifact_path = os.environ["ARTIFACT_FILES_PATH"]
        model_path = os.environ["MODEL_FILES_PATH"]
        logging.info(f"Artifact path contents: {os.listdir(artifact_path)}")
        logging.info(f"Model path contents: {os.listdir(model_path)}")
        
        # Try loading from MODEL_FILES_PATH instead
        self.model = joblib.load(os.path.join(model_path, "weather_ranking_model.pkl"))

    def predict(self, inputs):
        # Extract ranking features and event IDs from the inputs
        features = inputs[0]["ranking_features"]
        event_ids = inputs[0]["event_ids"]
        
        # Log the extracted features
        logging.info("predict -> " + str(features))

        # Predict probabilities for the positive class
        scores = self.model.predict_proba(features).tolist()
        
        # Get scores of positive class
        scores = np.asarray(scores)[:,1].tolist() 

        # Return the predicted scores along with the corresponding event IDs
        return {
            "scores": scores, 
            "event_ids": event_ids,
        }


Overwriting weather_ranking_predictor.py


In [125]:
%%writefile no_weather_ranking_predictor.py

import os
import joblib
import numpy as np
import logging

class Predict(object):
    
    def __init__(self):
        # List the directory contents to debug
        artifact_path = os.environ["ARTIFACT_FILES_PATH"]
        model_path = os.environ["MODEL_FILES_PATH"]
        logging.info(f"Artifact path contents: {os.listdir(artifact_path)}")
        logging.info(f"Model path contents: {os.listdir(model_path)}")
        
        # Try loading from MODEL_FILES_PATH instead
        self.model = joblib.load(os.path.join(model_path, "no_weather_ranking_model.pkl"))

    def predict(self, inputs):
        # Extract ranking features and event IDs from the inputs
        features = inputs[0]["ranking_features"]
        event_ids = inputs[0]["event_ids"]
        
        # Log the extracted features
        logging.info("predict -> " + str(features))

        # Predict probabilities for the positive class
        scores = self.model.predict_proba(features).tolist()
        
        # Get scores of positive class
        scores = np.asarray(scores)[:,1].tolist() 

        # Return the predicted scores along with the corresponding event IDs
        return {
            "scores": scores, 
            "event_ids": event_ids,
        }


Overwriting no_weather_ranking_predictor.py


In [195]:
# Upload weather predictor file to Hopsworks
weather_uploaded_file_path = dataset_api.upload(
    "/home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_predictor.py", 
    "Resources", 
    overwrite=True,
)

# Construct the path to the uploaded script
weather_predictor_script_path = os.path.join(
    "/Projects", 
    project.name, 
    weather_uploaded_file_path,
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/weather_ranking_predictor.py: 0.000%|          |…

In [127]:
# Upload no-weather predictor file to Hopsworks
no_weather_uploaded_file_path = dataset_api.upload(
    "no_weather_ranking_predictor.py", 
    "Resources", 
    overwrite=True,
)

# Construct the path to the uploaded script
no_weather_predictor_script_path = os.path.join(
    "/Projects", 
    project.name, 
    no_weather_uploaded_file_path,
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/no_weather_ranking_predictor.py: 0.000%|        …

## Deploy models.


In [196]:

from hsml.transformer import Transformer

weather_ranking_deployment_name = "weatherrankingdeployment"

# Define transformer
weather_ranking_transformer=Transformer(
    script_file=weather_transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy ranking model
weather_ranking_deployment = weather_ranking_model.deploy(
    name=weather_ranking_deployment_name,
    description="Deployment that search for event candidates and scores them based on user metadata",
    script_file=weather_predictor_script_path,
    resources={"num_instances": 0},
    transformer=weather_ranking_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1220788/deployments/372738
Before making predictions, start the deployment by using `.start()`


In [129]:
no_weather_ranking_deployment_name = "noweatherrankingdeployment"

# Define transformer
no_weather_ranking_transformer=Transformer(
    script_file=no_weather_transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy ranking model
no_weather_ranking_deployment = no_weather_ranking_model.deploy(
    name=no_weather_ranking_deployment_name,
    description="Deployment that search for event candidates and scores them based on user metadata",
    script_file=no_weather_predictor_script_path,
    resources={"num_instances": 0},
    transformer=no_weather_ranking_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1220788/deployments/371728
Before making predictions, start the deployment by using `.start()`


In [197]:
# Start the deployment
weather_ranking_deployment.start()


  0%|          | 0/6 [00:00<?, ?it/s]

ModelServingException: Failed to run: transformer terminated unsuccessfully. 

INFO:root:Loading component module...
Traceback (most recent call last):
  File "/serving/workspace/../kserve_server.py", line 146, in <module>
    model = _load_component_module(args)
  File "/serving/workspace/../kserve_server.py", line 99, in _load_component_module
    spec.loader.exec_module(mod)
  File "<frozen importlib._bootstrap_external>", line 879, in exec_module
  File "<frozen importlib._bootstrap_external>", line 1017, in get_code
  File "<frozen importlib._bootstrap_external>", line 947, in source_to_code
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/mnt/artifacts/transformer-weather_ranking_transformer.py", line 192
    ranking_model_inputs = ranking_model_inputs.fillna(0)
                                                         ^
SyntaxError: expected 'except' or 'finally' block
. Please, check the server logs using `.get_logs(component='transformer')`

In [131]:
no_weather_ranking_deployment.start()

Deployment is already running


In [58]:
# Stop the ranking model deployment
weather_ranking_deployment.stop()

# Stop the query model deployment
no_weather_ranking_deployment.stop()

  0%|          | 0/2 [00:00<?, ?it/s]

  0%|          | 0/4 [00:00<?, ?it/s]

# Query model Deployment

In [59]:
# Retrieve the 'query_model' from the Model Registry
query_model = mr.get_model(
    name="query_model",
    version=1,
)

In [146]:
%%writefile querymodel_transformer.py

import os
import numpy as np
import pandas as pd
from datetime import datetime

import hopsworks

import logging


class Transformer(object):
    
    def __init__(self): 
        # Connect to the Hopsworks
        project = hopsworks.connection().get_project()
        ms = project.get_model_serving()
        
        # Retrieve the 'users' feature view
        fs = project.get_feature_store()
        self.users_fv = fs.get_feature_view(
            name="users", 
            version=1,
        )
        # Retrieve the ranking deployment 
        self.ranking_server = ms.get_deployment("weatherrankingdeployment")
        
    
    def preprocess(self, inputs):
        # Check if the input data contains a key named "instances"
        # and extract the actual data if present
        inputs = inputs["instances"] if "instances" in inputs else inputs

        # Extract user_id from the inputs
        user_id = inputs["user_id"]

        # Get user features
        user_features = self.users_fv.get_feature_vector(
            {"user_id": user_id}, 
            return_type="pandas",
        )

        # Enrich inputs with user features
        inputs["user_city"] = user_features['user_city'].values[0]
        inputs["age"] = user_features['age'].values[0] 
        inputs["user_interests"] = user_features['user_interests'].values[0]
        
        return {
            "instances": [inputs]
        }
    
    def postprocess(self, outputs):
        # Return ordered ranking predictions
        return {
            "predictions": self.ranking_server.predict({"instances": outputs["predictions"]}),
        }


Overwriting querymodel_transformer.py


In [147]:
# Copy transformer file into Hopsworks File System
uploaded_file_path = dataset_api.upload(
    "querymodel_transformer.py", 
    "Models", 
    overwrite=True,
)

# Construct the path to the uploaded script
transformer_script_path = os.path.join(
    "/Projects", 
    project.name, 
    uploaded_file_path,
)

Uploading /home/nkama/masters_thesis_project/thesis/notebooks/querymodel_transformer.py: 0.000%|          | 0/…

In [148]:
from hsml.transformer import Transformer

query_model_deployment_name = "querydeployment"

# Define transformer
query_model_transformer=Transformer(
    script_file=transformer_script_path, 
    resources={"num_instances": 0},
)

# Deploy the query model
query_model_deployment = query_model.deploy(
    name=query_model_deployment_name,
    description="Deployment that generates query embeddings from user and event features using the query model",
    resources={"num_instances": 0},
    transformer=query_model_transformer,
)

Deployment created, explore it at https://c.app.hopsworks.ai:443/p/1220788/deployments/371731
Before making predictions, start the deployment by using `.start()`


In [150]:
# Start the deployment
query_model_deployment.start()

  0%|          | 0/6 [00:00<?, ?it/s]

Start making predictions by using `.predict()`


In [66]:
def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[-1] for candidate in ranked_candidates['ranking'][:k]]

In [190]:
def get_top_recommendations(ranked_candidates, k=3):
    return [candidate[1] for candidate in ranked_candidates['ranking'][:k]]

# Define a test input example for the ranking model
test_ranking_input = {"instances": [[{
    "user_id": "LT819S",  
    "query_emb": [0.214135289, 0.571055949, 0.330709577, -0.225899458, -0.308674961, 
                 -0.0115124583, 0.0730511621, -0.495835781, 0.625569344, -0.0438038409, 
                 0.263472944, -0.58485353, -0.307070434, 0.0414443575, -0.321789205, 
                 0.966559, 0.127463, -0.392714, 0.845132, -0.512387, 0.253901, 
                 -0.764589, 0.431267, 0.087342, -0.629045, 0.318976, -0.146782, 
                 0.573921, -0.087625, 0.934261, -0.271843, 0.652197, -0.418359, 
                 0.123456, -0.789012, 0.345678, -0.901234, 0.567890, -0.234567, 
                 0.890123, -0.456789, 0.012345, -0.678901, 0.234567, -0.890123, 
                 0.456789, -0.012345, 0.678901, -0.234567, 0.890123, -0.456789, 
                 0.012345, -0.678901, 0.234567, -0.890123, 0.456789, -0.012345, 
                 0.678901, -0.234567, 0.890123, -0.456789, 0.012345, -0.678901]
}]]}

# Test ranking deployment
ranked_candidates = weather_ranking_deployment.predict(test_ranking_input)

# Retrieve event IDs of the top recommended items
recommendations = get_top_recommendations(ranked_candidates, k=3)
recommendations


KeyError: 'ranking'

In [191]:
ranked_candidates
Output:
{'predictions': {'ranking': []}}

{'predictions': {'ranking': []}}

In [None]:
# Define a test input example for the query model
data = {
    "instances": [[{
        "user_id": "UP287J"
    }]]
}

# Test the deployment
ranked_candidates = query_model_deployment.predict(data)

# Retrieve event IDs of the top recommended items
recommendations = get_top_recommendations(
    ranked_candidates['predictions'], 
    k=3,
)
recommendations


In [183]:
weather_ranking_deployment.get_logs(component="predictor", tail=200)


Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/1220788/deployments/371733

DeployableComponentLogs(instance_name: 'weatherrankingdeployment-predictor-00001-deployment-6bb688n8znf', date: datetime.datetime(2025, 5, 21, 23, 37, 16, 64328)) 
INFO:root:Loading component module...
INFO:root:[PredictorModel] Initializing predictor for model: weatherrankingdeployment
INFO:root:[HopsworksModel] Initializing for model: weatherrankingdeployment
INFO:root:Artifact path contents: ['predictor-weather_ranking_predictor.py', 'transformer-simple_weather_transformer.py']
INFO:root:Model path contents: ['weather_ranking_model.pkl']
... execution time: 1.603723 seconds
INFO:root:Starting KServe server...
2025-05-21 21:36:33.556 8 kserve INFO [model_server.py:register_model():363] Registering model: weatherrankingdeployment
2025-05-21 21:36:33.556 8 kserve INFO [model_server.py:start():298] Setting max asyncio worker threads as 12
2025-05-21 21:36:33.557 8 kserve I

In [182]:
weather_ranking_deployment.get_logs(component="transformer", tail=200)


Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/1220788/deployments/371733

DeployableComponentLogs(instance_name: 'weatherrankingdeployment-transformer-00001-deployment-6896zq4tj', date: datetime.datetime(2025, 5, 21, 23, 37, 6, 848375)) 
INFO:root:Loading component module...
INFO:root:[TransformerModel] Initializing transformer for model: weatherrankingdeployment
INFO:root:[HopsworksModel] Initializing for model: weatherrankingdeployment
INFO:hsfs.engine.python:Python Engine initialized.
Downloading: 100.000%|██████████| 1094/1094 elapsed<00:00 remaining<00:00
INFO:transformer-simple_weather_transformer:Transformer initialized successfully
... execution time: 7.065520 seconds
INFO:root:Starting KServe server...
2025-05-21 21:35:37.328 8 kserve INFO [model_server.py:register_model():363] Registering model: weatherrankingdeployment
2025-05-21 21:35:37.328 8 kserve INFO [model_server.py:start():298] Setting max asyncio worker threads as 12
2025-05

In [85]:
# Test ranking deployment
ranked_candidates = weather_ranking_deployment.predict(test_ranking_input)

# Retrieve article ids of the top recommended items
recommendations = get_top_recommendations(ranked_candidates, k=3)
recommendations

RestAPIError: Metadata operation error: (url: http://15.235.46.163/v1/models/weatherrankingdeployment:predict). Server response: 
HTTP code: 500, HTTP reason: Internal Server Error, body: b'{"error":"HTTPError : HTTP 500: list indices must be integers or slices, not str"}', error code: , error msg: , user msg: 

 Check the model server logs by using `.get_logs()`

In [84]:
# Define a test input example
data = {
    "instances": {
        "user_id": "ZO502T", #TODO - make this a valid 'user_id' value from your feature group
    }
}

# Test the deployment
ranked_candidates = query_model_deployment.predict(data)

# Retrieve article ids of the top recommended items
recommendations = get_top_recommendations(
    ranked_candidates['predictions'], 
    k=3,
)
recommendations

ModelServingException: Instances field should contain a 2-dim list.

In [83]:
weather_ranking_deployment.get_logs()


Explore all the logs and filters in the Kibana logs at https://c.app.hopsworks.ai:443/p/1220788/deployments/371721



RestAPIError: Metadata operation error: (url: https://c.app.hopsworks.ai/hopsworks-api/api/project/1220788/serving/371721/logs). Server response: 
HTTP code: 404, HTTP reason: Not Found, body: b'{"errorCode":240027,"errorMsg":"Server logs not available"}', error code: 240027, error msg: Server logs not available, user msg: 