In [1]:
import time

notebook_start_time = time.time()

In [None]:
import sys
from pathlib import Path

def is_google_colab() -> bool:
    if "google.colab" in str(get_ipython()):
        return True
    return False

def clone_repository() -> None:
    !git clone https://github.com/decodingml/hands-on-recommender-system.git
    %cd hands-on-recommender-system/

def install_dependencies() -> None:
    !pip install --upgrade uv
    !uv pip install --all-extras --system --requirement pyproject.toml

if is_google_colab():
    clone_repository()
    install_dependencies()

    root_dir = str(Path().absolute()) 
    print("⛳️ Google Colab environment")
else:
    root_dir = str(Path().absolute().parent)
    print("⛳️ Local environment")

# Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.
if root_dir not in sys.path:
    sys.path.append(root_dir)

## <span style="color:#ff5f27">🧬 Train Retrieval Model </span>

In this notebook, you will train a retrieval model that will be able to quickly generate a small subset of candidate items from a large collection of items. Your model will be based on the *two-tower architecture*, which embeds queries and candidates (keys) into a shared low-dimensional vector space. Here, a query consists of features of a customer and a transaction (e.g. timestamp of the purchase), whereas a candidate consists of features of a particular item. All queries will have a user ID and all candidates will have an item ID, and the model will be trained such that the embedding of a user will be close to all the embeddings of items the user has previously bought.

After training the model you will save and upload its components to the Hopsworks Model Registry.

Let's go ahead and load the data.

Add the root directory to the `PYTHONPATH` to use the `recsys` Python module from the notebook.

In [2]:
import sys
from pathlib import Path

root_dir = str(Path().absolute().parent)
if root_dir not in sys.path:
    sys.path.append(root_dir)

## <span style="color:#ff5f27">📝 Imports </span>

In [4]:
%load_ext autoreload
%autoreload 2

import warnings

import tensorflow as tf

warnings.filterwarnings("ignore")

from recsys import utils
from recsys.data import retrieval
from recsys.models import two_tower, two_tower_serving

## <span style="color:#ff5f27">🔮 Connect to Hopsworks Feature Store </span>

In [6]:
project, fs = utils.get_hopsworks_feature_store()

[32m2024-11-08 19:33:41.345[0m | [1mINFO    [0m | [36mrecsys.utils[0m:[36mget_hopsworks_feature_store[0m:[36m10[0m - [1mLoging to Hopsworks using HOPSWORKS_API_KEY env var.[0m


Connection closed.
Connected. Call `.close()` to terminate connection gracefully.

Logged in to project, explore it here https://c.app.hopsworks.ai:443/p/15551
Connected. Call `.close()` to terminate connection gracefully.


## <span style="color:#ff5f27">⚙️ Feature View Creation </span>

In Hopsworks, you write features to feature groups (where the features are stored) and you read features from feature views. A feature view is a logical view over features, stored in feature groups, and a feature view typically contains the features used by a specific model. This way, feature views enable features, stored in different feature groups, to be reused across many different models.

In [7]:
feature_view = retrieval.create_feature_view(fs)

To view and explore data in the feature view you can retrieve batch data using the `get_batch_data()` method.

## <span style="color:#ff5f27">🏋️ Training Dataset </span>


In [8]:
train_df, val_df, test_df, _, _, _ = feature_view.train_validation_test_split(
    validation_size=0.1,
    test_size=0.1,
    description="Retrieval dataset splits",
)

Finished: Reading data from Hopsworks, using Hopsworks Feature Query Service (247.85s) 

CPU times: user 823 ms, sys: 197 ms, total: 1.02 s
Wall time: 4min 18s


You will train your retrieval model with a subset of features.

For the query embedding you will use:
- `customer_id`: ID of the customer.
- `age`: age of the customer at the time of purchase.
- `month_sin`, `month_cos`: time of year the purchase was made.

For the candidate embedding you will use:
- `article_id`: ID of the item.
- `garment_group_name`: type of garment.
- `index_group_name`: menswear/ladieswear etc.

In [9]:
query_features = ["customer_id", "age", "month_sin", "month_cos"]
candidate_features = ["article_id", "garment_group_name", "index_group_name"]


def df_to_ds(df):
    return tf.data.Dataset.from_tensor_slices({col: df[col] for col in df})


BATCH_SIZE = 2048
train_ds = df_to_ds(train_df).batch(BATCH_SIZE).cache().shuffle(BATCH_SIZE * 10)
val_ds = df_to_ds(val_df).batch(BATCH_SIZE).cache()

You will need a list of user and item IDs when you initialize your embeddings.

In [10]:
# Retrieve unique customer IDs and article IDs from the training dataset
user_id_list = train_df["customer_id"].unique().tolist()
item_id_list = train_df["article_id"].unique().tolist()

# Retrieve unique garment group names and index group names from the training dataset
garment_group_list = train_df["garment_group_name"].unique().tolist()
index_group_list = train_df["index_group_name"].unique().tolist()

# Print the number of transactions, number of users, number of items, and unique garment group names
print(f"Number of transactions: {len(train_df):,}")
print(f"Number of users: {len(user_id_list):,}")
print(f"Number of items: {len(item_id_list):,}")
print(garment_group_list)

Number of transactions: 88,244
Number of users: 4,831
Number of items: 32,051
['Jersey Fancy', 'Knitwear', 'Accessories', 'Trousers Denim', 'Jersey Basic', 'Trousers', 'Outdoor', 'Dresses Ladies', 'Shoes', 'Blouses', 'Under-, Nightwear', 'Woven/Jersey/Knitted mix Baby', 'Socks and Tights', 'Swimwear', 'Dressed', 'Skirts', 'Shorts', 'Shirts', 'Special Offers', 'Unknown', 'Dresses/Skirts girls']


## <span style="color:#ff5f27">🏰 Two Tower Model </span>

The two tower model consist of two models:
- Query model: Generates a query representation given user and transaction features.
- Candidate model: Generates an item representation given item features.

**Both models produce embeddings that live in the same embedding space**. You let this space be low-dimensional to prevent overfitting on the training data. (Otherwise, the model might simply memorize previous purchases, which makes it recommend items customers already have bought).

In [11]:
EMB_DIM = 16

You start with creating the query model.

In [12]:
query_model = two_tower.QueryTower(user_ids=user_id_list, emb_dim=EMB_DIM)
# TODO: Move this inside model
query_model.normalized_age.adapt(train_ds.map(lambda x: x["age"]))

# Initialize model with inputs.
query_df = train_df[query_features]
query_ds = df_to_ds(query_df).batch(1)
query_model(next(iter(query_ds)))

<tf.Tensor: shape=(1, 16), dtype=float32, numpy=
array([[ 0.1764864 , -0.00185757, -0.12651373, -0.04278179, -0.03960358,
         0.27857444,  0.16759372,  0.20195682, -0.08409421,  0.28024834,
         0.05457634, -0.11726266, -0.08702008,  0.07779447, -0.12798475,
        -0.00588928]], dtype=float32)>

The candidate model is very similar to the query model. A difference is that it has two categorical features as input, which you one-hot encode.

In [13]:
item_model = two_tower.ItemTower(
    item_ids=item_id_list,
    garment_groups=garment_group_list,
    index_groups=index_group_list,
    emb_dim=EMB_DIM,
)

You will evaluate the two tower model using the *top-100 accuracy*. That is, for each transaction in the validation data you will generate the associated query embedding and retrieve the set of the 100 items that are closest to this query in the embedding space. The top-100 accuracy measures how often the item that was actually bought is part of this subset. To evaluate this, you create a dataset of all unique items in the training data.

In [14]:
item_df = train_df[candidate_features]
item_df.drop_duplicates(subset="article_id", inplace=True)
item_ds = df_to_ds(item_df)

### <span style="color:#ff5f27">🏃🏻‍♂️ Model Training </span>

You'll train our model using the AdamW optimizer, which applies weight regularization during training.

In [15]:
# Create a TwoTowerModel with the specified query_model and item_model
model = two_tower.TwoTowerModel(
    query_model, item_model, item_ds=item_ds, batch_size=BATCH_SIZE
)

# Define an optimizer using AdamW with a learning rate of 0.01
optimizer = tf.keras.optimizers.AdamW(weight_decay=0.001, learning_rate=0.01)

# Compile the model using the specified optimizer
model.compile(optimizer=optimizer)



In [16]:
model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=5,
)

Epoch 1/5
Epoch 2/5
Epoch 3/5
Epoch 4/5
Epoch 5/5


<keras.src.callbacks.History at 0x3629bcc90>

## <span style="color:#ff5f27">🗄️ Upload Model to Model Registry </span>

One of the features in Hopsworks is the model registry. This is where you can store different versions of models and compare their performance. Models from the registry can then be served as API endpoints.

Let's connect to the model registry using the [HSML library](https://docs.hopsworks.ai/machine-learning-api/latest) from Hopsworks.

In [17]:
mr = project.get_model_registry()

Connected. Call `.close()` to terminate connection gracefully.


First, you need to save our models locally.

In [19]:
query_model = two_tower_serving.QueryModelModule(model.query_model)
query_model.save_to_hopsworks(mr=mr, query_df=query_df, emb_dim=EMB_DIM)

item_model = two_tower_serving.CandidateModelModule(model.item_model)
item_model.save_to_hopsworks(mr=mr, item_df=item_df, emb_dim=EMB_DIM)

2024-11-08 19:40:43,015 INFO: Function `compute_emb` contains input name(s) table_handle, 4347, resource with unsupported characters which will be renamed to query_tower_sequential_string_lookup_none_lookup_lookuptablefindv2_table_handle, query_tower_sequential_embedding_embedding_lookup_4347, query_tower_sequential_1_dense_1_biasadd_readvariableop_resource in the SavedModel.
INFO:tensorflow:Assets written to: query_model/assets
2024-11-08 19:40:43,405 INFO: Assets written to: query_model/assets


  0%|          | 0/6 [00:00<?, ?it/s]

Uploading: 0.000%|          | 0/56 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/465870 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/316858 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/561 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/166 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/497 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/15551/models/query_model/4
INFO:tensorflow:Assets written to: candidate_model/assets
2024-11-08 19:41:03,055 INFO: Assets written to: candidate_model/assets


  0%|          | 0/6 [00:00<?, ?it/s]

Uploading: 0.000%|          | 0/56 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/710310 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/2060522 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/424 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/103 elapsed<00:00 remaining<?

Uploading: 0.000%|          | 0/448 elapsed<00:00 remaining<?

Model created, explore it at https://c.app.hopsworks.ai:443/p/15551/models/candidate_model/4


---

In [20]:
# End the timer
notebook_end_time = time.time()

# Calculate and print the execution time
notebook_execution_time = notebook_end_time - notebook_start_time
print(f"⌛️ Notebook Execution time: {notebook_execution_time:.2f} seconds")

⌛️ Notebook Execution time: 483.72 seconds


---
## <span style="color:#ff5f27">⏩️ Next Steps </span>

Retrieving the top-k closest candidate embeddings in a brute-force way (computing the distances between the query embedding and all candidate embeddings) is too expensive in a practical setting. In the next notebook, you will compute embeddings and create a feature view which will allow you to retrieve candidates with very low latency.