In [2]:
import numpy as np
import pandas as pd
import random
import tensorflow as tf
import tensorflow_recommenders as tfrs
from typing import Dict, Text

In [5]:
PREPROCESSED_DATA_PATH = './data/feature_store'

USER_DATA_PATH = f'{PREPROCESSED_DATA_PATH}/users.csv'
ITEM_DATA_PATH = f'{PREPROCESSED_DATA_PATH}/movies.csv'
RATINGS_DATA_PATH = f'{PREPROCESSED_DATA_PATH}/ratings.csv'

MODEL_PATH = './model/tfrs-ranking-with-sf'

### Load data

In [10]:
# user data
users_df = pd.read_csv(USER_DATA_PATH)

# cast data type
users_df["user_id"] = users_df["user_id"].astype(str)
users_df["age"] = users_df["age"].astype(str)
users_df["gender"] = users_df["gender"].astype(str)
users_df["occupation"] = users_df["occupation"].astype(str)
# select only the required columns
users_df = users_df[['user_id', 'age', 'gender', 'occupation']]

# movie data
movies_df = pd.read_csv(ITEM_DATA_PATH)

# cast data type
movies_df["title"] = movies_df["title"].astype(str)
movies_df["genres"] = movies_df["title"].astype(str)
# select only the required columns
movies_df = movies_df[['movie_id', 'title', 'genres', 'year']]

# ratings data
ratings_df = pd.read_csv(RATINGS_DATA_PATH)

# cast data type
ratings_df["user_id"] = ratings_df["user_id"].astype(str)  # for StringLookup
ratings_df["title"] = ratings_df["title"].astype(str) # for StringLookup
ratings_df["rating"] = ratings_df["rating"].astype(float)

In [11]:
# # Load data
# users_df = pd.read_csv(USER_DATA_PATH)
# movies_df = pd.read_csv(ITEM_DATA_PATH)
# ratings_df = pd.read_csv(RATINGS_DATA_PATH)

# # create user features
# users_df["user_id"] = users_df["user_id"].astype(str)
# users_df["age"] = users_df["age"].astype(str)  # treat as categorical
# users_df["gender"] = users_df["gender"].astype(str)
# users_df["occupation"] = users_df["occupation"].astype(str)

# users_df = users_df[['user_id', 'age', 'gender', 'occupation']]

# # create item features
# movies_df["title"] = movies_df["title"].astype(str)

# # combine genres into one string label per movie
# genre_cols = ['unknown', 'Action', 'Adventure', 'Animation', "Children's", 'Comedy', 'Crime', 
#             'Documentary', 'Drama', 'Fantasy', 'Film-Noir', 'Horror', 'Musical', 'Mystery', 
#             'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']

# movies_df["genres"] = movies_df[genre_cols].apply(
#     lambda row: "|".join([genre for genre, val in row.items() if val == 1]), axis=1)

# # extract release year
# movies_df["year"] = movies_df["release_date"].str.extract(r"(\d{4})").fillna("unknown")

# movies_df = movies_df[['movie_id', 'title', 'genres', 'year']]

# # ratings: join user and movie side features to ratings_df
# ratings_df["user_id"] = ratings_df["user_id"].astype(str)  # for StringLookup
# ratings_df["title"] = ratings_df["title"].astype(str) # for StringLookup
# ratings_df["rating"] = ratings_df["rating"].astype(float)
# ratings_df = ratings_df.merge(users_df, on="user_id", how="left")
# ratings_df = ratings_df.merge(movies_df[["title", "genres", "year"]], on="title", how="left")

In [12]:
# convert to tf datasets
ratings = tf.data.Dataset.from_tensor_slices(dict(ratings_df))

ratings = ratings.map(lambda x: {
    "movie_title": x["title"],
    "user_id": x["user_id"],
    "user_rating": x["rating"],
    "gender": x["gender"],
    "occupation": x["occupation"],
    "genres": x["genres"]
})


In [13]:
type(ratings)

tensorflow.python.data.ops.map_op._MapDataset

In [14]:
# train/test split
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

In [15]:
# vocabulary
movie_titles = ratings.batch(1_000_000).map(lambda x: x["movie_title"])
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])

unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

unique_genders = ratings_df["gender"].dropna().unique().tolist()
unique_occupations = ratings_df["occupation"].dropna().unique().tolist()
unique_ages = ratings_df["age"].dropna().unique().tolist()
unique_genres = ratings_df["genres"].dropna().unique().tolist()
unique_years = ratings_df["year"].dropna().unique().tolist()

### Implement Ranking Model

In [16]:
# movie model
class MovieModel(tf.keras.Model):
    def __init__(self, unique_movie_titles, unique_genres):
        super().__init__()

        max_tokens = 10_000

        # Movie title embedding
        self.title_embedding = tf.keras.Sequential([
            tf.keras.layers.StringLookup(vocabulary=unique_movie_titles, mask_token=None),
            tf.keras.layers.Embedding(len(unique_movie_titles) + 1, 32)
        ])

        # Movie title text embedding
        self.title_vectorizer = tf.keras.layers.TextVectorization(max_tokens=max_tokens)
        self.title_text_embedding = tf.keras.Sequential([
            self.title_vectorizer,
            tf.keras.layers.Embedding(max_tokens, 32, mask_zero=True),
            tf.keras.layers.GlobalAveragePooling1D(),
        ])

        # Genre text vectorization and embedding
        self.genre_vectorizer = tf.keras.layers.TextVectorization(max_tokens=max_tokens)
        self.genre_embedding = tf.keras.Sequential([
            self.genre_vectorizer,
            tf.keras.layers.Embedding(max_tokens, 16, mask_zero=True),
            tf.keras.layers.GlobalAveragePooling1D()
        ])

        # Adapt vectorizers
        self.genre_vectorizer.adapt(unique_genres)
        self.title_vectorizer.adapt(unique_movie_titles)

    def call(self, inputs):  # inputs is a dict with 'movie_title' and 'genres'
        return tf.concat([
            self.title_embedding(inputs["movie_title"]),
            self.title_text_embedding(inputs["movie_title"]),
            self.genre_embedding(inputs["genres"])
        ], axis=1)

# user model
class UserModel(tf.keras.Model):
    def __init__(self, unique_user_ids, unique_genders, unique_occupations):
        super().__init__()

        self.user_embedding = tf.keras.Sequential([
            tf.keras.layers.StringLookup(vocabulary=unique_user_ids, mask_token=None),
            tf.keras.layers.Embedding(len(unique_user_ids) + 1, 32)
        ])

        self.gender_embedding = tf.keras.Sequential([
            tf.keras.layers.StringLookup(vocabulary=unique_genders, mask_token=None),
            tf.keras.layers.Embedding(len(unique_genders) + 1, 8)
        ])

        self.occupation_embedding = tf.keras.Sequential([
            tf.keras.layers.StringLookup(vocabulary=unique_occupations, mask_token=None),
            tf.keras.layers.Embedding(len(unique_occupations) + 1, 8)
        ])

    def call(self, inputs):  # inputs is a dict with 'user_id', 'gender', 'occupation'
        return tf.concat([
            self.user_embedding(inputs["user_id"]),
            self.gender_embedding(inputs["gender"]),
            self.occupation_embedding(inputs["occupation"])
        ], axis=1)


# ranking model
class RankingModel(tf.keras.Model):
    def __init__(self, user_model, movie_model):
        super().__init__()
        self.user_model = user_model
        self.movie_model = movie_model

        self.mlp = tf.keras.Sequential([
            tf.keras.layers.Dense(256, activation="relu"),
            tf.keras.layers.Dense(64, activation="relu"),
            tf.keras.layers.Dense(1)
        ])

    def call(self, inputs):
        user_repr = self.user_model({
            "user_id": inputs["user_id"],
            "gender": inputs["gender"],
            "occupation": inputs["occupation"]
        })

        movie_repr = self.movie_model({
            "movie_title": inputs["movie_title"],
            "genres": inputs["genres"]
        })

        return self.mlp(tf.concat([user_repr, movie_repr], axis=1))
  
# full model
class RecommendationModel(tfrs.models.Model):
    def __init__(self, ranking_model):
        super().__init__()
        self.ranking_model = ranking_model
        self.task = tfrs.tasks.Ranking(
            loss=tf.keras.losses.MeanSquaredError(),
            metrics=[tf.keras.metrics.RootMeanSquaredError()]
        )

    def call(self, features):
        return self.ranking_model(features)

    def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
        labels = features["user_rating"]
        inputs = {key: features[key] for key in features if key != "user_rating"}

        rating_predictions = self(inputs)

        # The task computes the loss and the metrics.
        return self.task(labels=labels, predictions=rating_predictions)


### Train

In [17]:
LR = 0.05
EPOCHS = 15

user_model = UserModel(unique_user_ids, unique_genders, unique_occupations)
movie_model = MovieModel(unique_movie_titles, unique_genres)
ranking_model = RankingModel(user_model, movie_model)
model = RecommendationModel(ranking_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=LR))

cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

model.fit(cached_train, epochs=EPOCHS)

Epoch 1/15




[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 66ms/step - loss: 4.2733 - regularization_loss: 0.0000e+00 - root_mean_squared_error: 2.6436 - total_loss: 4.2733
Epoch 2/15
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 62ms/step - loss: 1.2883 - regularization_loss: 0.0000e+00 - root_mean_squared_error: 1.1354 - total_loss: 1.2883
Epoch 3/15
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 62ms/step - loss: 1.2743 - regularization_loss: 0.0000e+00 - root_mean_squared_error: 1.1356 - total_loss: 1.2743
Epoch 4/15
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 63ms/step - loss: 1.2388 - regularization_loss: 0.0000e+00 - root_mean_squared_error: 1.1187 - total_loss: 1.2388
Epoch 5/15
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 62ms/step - loss: 1.2152 - regularization_loss: 0.0000e+00 - root_mean_squared_error: 1.1072 - total_loss: 1.2152
Epoch 6/15
[1m10/10[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [

<keras.src.callbacks.history.History at 0x212c61e1690>

### Evaluate

In [20]:
model.evaluate(cached_test, return_dict=True)

[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 27ms/step - loss: 1.0204 - regularization_loss: 0.0000e+00 - root_mean_squared_error: 1.0099 - total_loss: 1.0204


{'loss': <tf.Tensor: shape=(), dtype=float32, numpy=1.0102993249893188>,
 'root_mean_squared_error': <tf.Tensor: shape=(), dtype=float32, numpy=1.011264443397522>,
 'regularization_loss': <tf.Tensor: shape=(), dtype=int32, numpy=0>,
 'total_loss': <tf.Tensor: shape=(), dtype=float32, numpy=1.0102993249893188>}

### Inference

In [21]:
DEFAULT_GENDER = "M",
DEFAULT_OCCUPATION = "other",
DEFAULT_GENRE = "unknown"

def rank_movies_for_user(model, user_id:str, candidate_titles:list) -> list:
    # if user_id not found
    if user_id not in users_df["user_id"].values:
        raise ValueError(f"user_id {user_id} not found")

    # Get user side features or fallback to default feature values
    user_row = users_df[users_df["user_id"] == user_id]
    if not user_row.empty:
        gender = user_row["gender"].iloc[0] if pd.notna(user_row["gender"].iloc[0]) else DEFAULT_GENDER
        occupation = user_row["occupation"].iloc[0] if pd.notna(user_row["occupation"].iloc[0]) else DEFAULT_OCCUPATION
    else:
        gender = DEFAULT_GENDER
        occupation = DEFAULT_OCCUPATION

    movie_titles = []
    movie_genres = []

    for title in candidate_titles:
        movie_row = movies_df[movies_df["title"] == title]
        genre = (
            movie_row["genres"].iloc[0] if not movie_row.empty and pd.notna(movie_row["genres"].iloc[0])
            else DEFAULT_GENRE
        )
        movie_titles.append(title)
        movie_genres.append(genre)

    n = len(candidate_titles)
    model_inputs = {
        "user_id": tf.constant([user_id] * n),
        "gender": tf.constant([gender] * n),
        "occupation": tf.constant([occupation] * n),
        "movie_title": tf.constant(movie_titles),
        "genres": tf.constant(movie_genres)
    }

    # Predict
    predictions = model(model_inputs)
    scores = tf.squeeze(predictions, axis=1).numpy()

    # Rank
    ranked = sorted(zip(candidate_titles, scores), key=lambda x: x[1], reverse=True)
    ranked = [{"movie_title": title, "score": round(float(score), 4)} for title, score in ranked] # return a list of dicts
    return ranked


In [42]:
candidate_titles = [
    "Star Wars (1977)",
    "Toy Story (1995)",
    "Fargo (1996)",
    "L.A. Confidential (1997)",
    "Titanic (1997)"
]

user_id = "82"

ranked_results = rank_movies_for_user(model, user_id, candidate_titles)
print(ranked_results)
# for title, score in ranked_results:
#     print(f"{title}: {score:.3f}")

[{'movie_title': 'Star Wars (1977)', 'score': 4.102}, {'movie_title': 'Titanic (1997)', 'score': 3.9679}, {'movie_title': 'Fargo (1996)', 'score': 3.9383}, {'movie_title': 'L.A. Confidential (1997)', 'score': 3.8208}, {'movie_title': 'Toy Story (1995)', 'score': 3.7696}]


### Export model

In [44]:
@tf.function(input_signature=[
    tf.TensorSpec([None], tf.string, name="user_id"),
    tf.TensorSpec([None], tf.string, name="gender"),
    tf.TensorSpec([None], tf.string, name="occupation"),
    tf.TensorSpec([None], tf.string, name="movie_title"),
    tf.TensorSpec([None], tf.string, name="genres"),
])
def serve_fn(user_id, gender, occupation, movie_title, genres):
    features = {
        "user_id": user_id,
        "gender": gender,
        "occupation": occupation,
        "movie_title": movie_title,
        "genres": genres,
    }
    return model(features)

tf.saved_model.save(model, export_dir=MODEL_PATH, signatures={"serving_default": serve_fn})

# tf.saved_model.save(model, MODEL_PATH)

INFO:tensorflow:Assets written to: ./model/tfrs-ranking-with-sf\assets


INFO:tensorflow:Assets written to: ./model/tfrs-ranking-with-sf\assets


In [52]:
DEFAULT_GENDER = "M"
DEFAULT_OCCUPATION = "other"
DEFAULT_GENRE = "unknown"

def rank_movies_for_user_with_loaded_model(model, user_id: str, candidate_titles: list) -> list:
    # Check if user_id exists
    if user_id not in users_df["user_id"].values:
        raise ValueError(f"user_id {user_id} not found")

    # Get user side features or use defaults
    user_row = users_df[users_df["user_id"] == user_id]
    gender = user_row["gender"].iloc[0] if not user_row.empty and pd.notna(user_row["gender"].iloc[0]) else DEFAULT_GENDER
    occupation = user_row["occupation"].iloc[0] if not user_row.empty and pd.notna(user_row["occupation"].iloc[0]) else DEFAULT_OCCUPATION

    # Prepare movie inputs
    movie_titles = []
    movie_genres = []
    for title in candidate_titles:
        movie_row = movies_df[movies_df["title"] == title]
        genre = (
            movie_row["genres"].iloc[0]
            if not movie_row.empty and pd.notna(movie_row["genres"].iloc[0])
            else DEFAULT_GENRE
        )
        movie_titles.append(title)
        movie_genres.append(genre)

    n = len(candidate_titles)

    # Run inference using individual keyword arguments
    predictions = model(
        user_id=tf.constant([user_id] * n),
        gender=tf.constant([gender] * n),
        occupation=tf.constant([occupation] * n),
        movie_title=tf.constant(movie_titles),
        genres=tf.constant(movie_genres)
    )

    scores = tf.squeeze(predictions["output_0"], axis=1).numpy()

    # Rank and return
    ranked = sorted(zip(candidate_titles, scores), key=lambda x: x[1], reverse=True)
    ranked = [{"movie_title": title, "score": round(float(score), 4)} for title, score in ranked]
    return ranked

In [56]:
# load the saved model and perform inference
tfrs_model = tf.saved_model.load(MODEL_PATH).signatures["serving_default"]
tfrs_model

<ConcreteFunction (*, gender: TensorSpec(shape=(None,), dtype=tf.string, name='gender'), genres: TensorSpec(shape=(None,), dtype=tf.string, name='genres'), movie_title: TensorSpec(shape=(None,), dtype=tf.string, name='movie_title'), occupation: TensorSpec(shape=(None,), dtype=tf.string, name='occupation'), user_id: TensorSpec(shape=(None,), dtype=tf.string, name='user_id')) -> Dict[['output_0', TensorSpec(shape=(None, 1), dtype=tf.float32, name='output_0')]] at 0x212E682FB50>

In [58]:
# Combine all features
result = tfrs_model(
    user_id=tf.constant(["42"]),
    gender=tf.constant(["F"]),
    occupation=tf.constant(["student"]),
    movie_title=tf.constant(["Titanic (1997)"]),
    genres=tf.constant(["Action"])
)

print(result)

{'output_0': <tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[3.6877604]], dtype=float32)>}


In [59]:
user_id = "72"
candidate_movie_list = ["Speed (1994)", "Titanic (1997)"]

ranked_recs = rank_movies_for_user_with_loaded_model(tfrs_model, user_id, candidate_movie_list)
print(ranked_recs)
# for title, score in ranked_recs:
#     print(f"{title}: {score:.3f}")

[{'movie_title': 'Titanic (1997)', 'score': 4.3067}, {'movie_title': 'Speed (1994)', 'score': 3.8966}]


### evaluation

In [39]:
from collections import defaultdict

def build_user_seen_dict(dataset):
    user_seen = defaultdict(set)
    for x in dataset:
        user_seen[x["user_id"].numpy().decode("utf-8")].add(x["movie_title"].numpy().decode("utf-8"))
    return user_seen

train_user_seen = build_user_seen_dict(train)
test_user_seen = build_user_seen_dict(test)

In [42]:
def evaluate_ranking_model(model, test_user_seen, train_user_seen, all_movies, k=10):
    hits, precision_sum, recall_sum, ndcg_sum = 0, 0.0, 0.0, 0.0
    total_users = 0

    for user_id, true_movies in test_user_seen.items():
        # Remove movies already seen in training set
        seen_train_movies = train_user_seen.get(user_id, set())
        candidate_movies = [title for title in all_movies if title not in seen_train_movies]

        # Get top-k predictions
        ranked = rank_movies_for_user(model, user_id, candidate_movies)
        top_k_preds = [title for title, score in ranked[:k]]

        hit_set = true_movies & set(top_k_preds)
        num_hits = len(hit_set)
        hits += int(num_hits > 0)
        precision_sum += num_hits / k
        recall_sum += num_hits / len(true_movies)

        # NDCG@k
        dcg = 0.0
        for i, movie in enumerate(top_k_preds):
            if movie in true_movies:
                dcg += 1 / np.log2(i + 2)
        idcg = sum(1 / np.log2(i + 2) for i in range(min(len(true_movies), k)))
        ndcg = dcg / idcg if idcg > 0 else 0
        ndcg_sum += ndcg

        total_users += 1

    return {
        'HitRate@k': hits / total_users,
        'Precision@k': precision_sum / total_users,
        'Recall@k': recall_sum / total_users,
        'NDCG@k': ndcg_sum / total_users
    }


In [43]:
# Get all unique movie titles from the dataset
all_movie_titles = list(set(movies_df["title"].tolist()))

# Evaluate
K = 10
metrics = evaluate_ranking_model(
    model=model,
    test_user_seen=test_user_seen,
    train_user_seen=train_user_seen,
    all_movies=all_movie_titles,
    k=K
)

# Print results
print(f"Ranking Model Evaluation (k={K}):")
for metric, value in metrics.items():
    print(f"{metric}: {value:.4f}")


Ranking Model Evaluation (k=10):
HitRate@k: 0.4846
Precision@k: 0.1001
Recall@k: 0.0445
NDCG@k: 0.1236
