# 02-01 : Two Towers - Retrieval

Recommender systems are often composed of two stages:

1. The retrieval stage is responsible for selecting an initial set of hundreds of candidates from all possible candidates. The main objective of this model is to efficiently weed out all candidates that the user is not interested in. Because the retrieval model may be dealing with millions of candidates, it has to be computationally efficient.

2. The ranking stage takes the outputs of the retrieval model and fine-tunes them to select the best possible handful of recommendations. Its task is to narrow down the set of items the user may be interested in to a shortlist of likely candidates.

This notebook is going to focus on the first stage, retrieval.

Retrieval models are often composed of two sub-models:

1. A query model computing the query representation (normally a fixed-dimensionality embedding vector) using query features.

2. A candidate model computing the candidate representation (an equally-sized vector) using the candidate features.

The outputs of the two models are then multiplied together to give a query-candidate affinity score, with higher scores expressing a better match between the candidate and the query.


## References

- [Recommending movies: retrieval](https://www.tensorflow.org/recommenders/examples/basic_retrieval#item-to-item_recommendation)
- [Recommending movies: retrieval using a sequential model](https://www.tensorflow.org/recommenders/examples/sequential_retrieval)
- [Item-to-item recommendation and sequential recommendation](https://www.youtube.com/watch?v=ZBaKzw938oM)

In [1]:
import pprint

from typing import Dict, Text

import pandas as pd
import tensorflow as tf
import tensorflow_recommenders as tfrs

2024-03-04 15:49:42.805363: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-03-04 15:49:42.805390: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-03-04 15:49:42.806176: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-03-04 15:49:42.810102: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


## 1. The dataset

We will use the RetailRocket source dataset as prepared for the GRU4Rec paper:
https://github.com/JohnnyFoulds/GRU4Rec/blob/master/notebooks/01_%20preprocess/01-02_retailrocket.ipynb

The dataset is already split into training, validation, and test sets as tab separated files. The columns are:

- `SessionId` - the id of the session. In one session there are one or many items.
- `ItemId` - the id of the item.
- `Time` - the event time.

In [2]:
data_path = '../../data/RetailRocket'
model_path = '../../models/RetailRocket'

# file paths for the data files
train_path = f'{data_path}/retailrocket_processed_view_train_tr.tsv'
validation_path = f'{data_path}/retailrocket_processed_view_train_valid.tsv'
test_path = f'{data_path}/retailrocket_processed_view_test.tsv'

In [3]:
# load the datasets
df_train = pd.read_csv(train_path, sep='\t').sample(frac=0.3, random_state=42)
df_validation = pd.read_csv(validation_path, sep='\t')
df_test = pd.read_csv(test_path, sep='\t')

In [4]:
# convert SessionId and ItemId to string
df_train['SessionId'] = df_train['SessionId'].astype(str)
df_train['ItemId'] = df_train['ItemId'].astype(str)

df_validation['SessionId'] = df_validation['SessionId'].astype(str)
df_validation['ItemId'] = df_validation['ItemId'].astype(str)

df_test['SessionId'] = df_test['SessionId'].astype(str)
df_test['ItemId'] = df_test['ItemId'].astype(str)

In [5]:
# head of the training set
display(df_train.head())

Unnamed: 0,SessionId,ItemId,Time
150593,363804,451942,1433134557529
700326,1684469,441756,1440134153810
673447,1615632,357925,1434672250853
48855,117260,2129,1435772219980
515183,1231963,7804,1432004462904


## 2. Preparing the dataset

Let's also figure out unique session ids and items present in the data. 

This is important because we need to be able to map the raw values of our categorical features to embedding vectors in our models. To do that, we need a vocabulary that maps a raw feature value to an integer in a contiguous range: this allows us to look up the corresponding embeddings in our embedding tables.

Get a complete list of all the all the items in the datasets. 

In [6]:
# get a list of the unique item ids across all datasets
unique_items = pd.concat([df_train, df_validation, df_test]).ItemId.unique()

# create a tensorflow dataset from the unique item ids
items = tf.data.Dataset.from_tensor_slices(unique_items)
#items = items.map(lambda x: {'ItemID': x})

for x in items.take(1).as_numpy_iterator():
  pprint.pprint(x)

b'451942'


2024-03-04 15:49:44.071481: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-03-04 15:49:44.099498: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-L355
2024-03-04 15:49:44.099692: I external/local_xla/xla/stream_executor/cuda/cuda_executor.cc:901] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero. See more at https://github.com/torvalds/linux/blob/v6.0/Documentation/ABI/testing/sysfs-bus-pci#L344-

Get a list of the unique session IDs in the datasets.

In [7]:
unique_session_ids = pd.concat([df_train, df_validation, df_test]).SessionId.unique()

In [8]:
unique_session_ids[:10]

array(['363804', '1684469', '1615632', '117260', '1231963', '600546',
       '1448680', '511006', '1425809', '422582'], dtype=object)

Create TensorFlow datasets from the pandas dataframes. 

In [9]:
ds_train = tf.data.Dataset.from_tensor_slices(dict(df_train))
ds_validation = tf.data.Dataset.from_tensor_slices(dict(df_validation))
ds_test = tf.data.Dataset.from_tensor_slices(dict(df_test))

for x in ds_train.take(1).as_numpy_iterator():
  pprint.pprint(x)

{'ItemId': b'451942', 'SessionId': b'363804', 'Time': 1433134557529}


In this example, we'r going to focus on the sequential events only.

We keep only the `SessionId` and `ItemId` fields in the dataset.

In [10]:
train_events = ds_train.map(lambda x: {'session_id': x['SessionId'], 'item_id': x['ItemId']})
validation_events = ds_validation.map(lambda x: {'session_id': x['SessionId'], 'item_id': x['ItemId']})
test_events = ds_test.map(lambda x: {'session_id': x['SessionId'], 'item_id': x['ItemId']})

for x in train_events.take(1).as_numpy_iterator():
    pprint.pprint(x)

{'item_id': b'451942', 'session_id': b'363804'}


## 3. Implementing a model
Choosing the architecture of our model is a key part of modelling.

Because we are building a two-tower retrieval model, we can build each tower separately and then combine them in the final model.

### 3.1 The query tower
Let's start with the query tower.

The first step is to decide on the dimensionality of the query and candidate representations:

In [11]:
embedding_dimension = 32

Higher values will correspond to models that may be more accurate, but will also be slower to fit and more prone to overfitting.

The second is to define the model itself. Here, we're going to use Keras preprocessing layers to first convert user ids to integers, and then convert those to user embeddings via an Embedding layer. Note that we use the list of unique user ids we computed earlier as a vocabulary:

In [12]:
user_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_session_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens.
  tf.keras.layers.Embedding(len(unique_session_ids) + 1, embedding_dimension)
])

A simple model like this corresponds exactly to a classic [matrix factorization](https://ieeexplore.ieee.org/abstract/document/4781121) approach. While defining a subclass of `tf.keras.Model` for this simple model might be overkill, we can easily extend it to an arbitrarily complex model using standard Keras components, as long as we return an `embedding_dimension`-wide output at the end.

### 3.2 The candidate tower

We can do the same with the candidate tower.

In [13]:
item_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_items, mask_token=None),
  tf.keras.layers.Embedding(len(unique_items) + 1, embedding_dimension)
])

### 3.2 Metrics

In our training data we have positive (session, item) pairs. To figure out how good our model is, we need to compare the affinity score that the model calculates for this pair to the scores of all the other possible candidates: if the score for the positive pair is higher than for all other candidates, our model is highly accurate.

To do this, we can use the `tfrs.metrics.FactorizedTopK` metric. The metric has one required argument: the dataset of candidates that are used as implicit negatives for evaluation.

In our case, that's the `items` dataset, converted into embeddings via our movie model:

In [14]:
items.take(1).as_numpy_iterator().next()

b'451942'

In [15]:
metrics = tfrs.metrics.FactorizedTopK(
  candidates=items.batch(128).map(item_model)
)

### 3.3 Loss

The next component is the loss used to train our model. TFRS has several loss layers and tasks to make this easy.

In this instance, we'll make use of the `Retrieval` task object: a convenience wrapper that bundles together the loss function and metric computation:

In [16]:
task = tfrs.tasks.Retrieval(
  metrics=metrics
)

The task itself is a Keras layer that takes the query and candidate embeddings as arguments, and returns the computed loss: we'll use that to implement the model's training loop.

### 3.4 The full model

We can now put it all together into a model. TFRS exposes a base model class (`tfrs.models.Model`) which streamlines building models: all we need to do is to set up the components in the `__init__` method, and implement the `compute_loss` method, taking in the raw features and returning a loss value.

The base model will then take care of creating the appropriate training loop to fit our model.

In [17]:
class RetailRocketModel(tfrs.Model):

  def __init__(self, user_model, item_model):
    global task
    
    super().__init__()
    self.item_model: tf.keras.Model = item_model
    self.user_model: tf.keras.Model = user_model
    self.task: tf.keras.layers.Layer = task

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # We pick out the user features and pass them into the user model.
    user_embeddings = self.user_model(features["session_id"])
    # And pick out the movie features and pass them into the item model,
    # getting embeddings back.
    positive_item_embeddings = self.item_model(features["item_id"])

    # The task computes the loss and the metrics.
    return self.task(user_embeddings, positive_item_embeddings)

The `tfrs.Model` base class is a simply convenience class: it allows us to compute both training and test losses using the same method.

Under the hood, it's still a plain Keras model. You could achieve the same functionality by inheriting from `tf.keras.Model` and overriding the `train_step` and `test_step` functions (see [the guide](https://www.tensorflow.org/guide/keras/customizing_what_happens_in_fit) for details):

In these tutorials, however, we stick to using the `tfrs.Model` base class to keep our focus on modelling and abstract away some of the boilerplate.

## 4. Fitting and evaluating

After defining the model, we can use standard Keras fitting and evaluation routines to fit and evaluate the model.

Let's first instantiate the model.

In [18]:
model = RetailRocketModel(user_model, item_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

Then shuffle, batch, and cache the training and evaluation data.

In [19]:
cached_train = train_events.shuffle(100_000).batch(8192).cache()
cached_test = test_events.batch(4096).cache()

Then train the  model:

In [20]:
model.fit(cached_train, epochs=3)

Epoch 1/3


2024-03-04 15:49:45.557510: I external/local_xla/xla/service/service.cc:168] XLA service 0x7f5db7e19410 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
2024-03-04 15:49:45.557530: I external/local_xla/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 3080 Ti, Compute Capability 8.6
2024-03-04 15:49:45.561924: I tensorflow/compiler/mlir/tensorflow/utils/dump_mlir_util.cc:269] disabling MLIR crash reproducer, set env var `MLIR_CRASH_REPRODUCER_DIRECTORY` to enable.
2024-03-04 15:49:45.616077: I external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:454] Loaded cuDNN version 8904
I0000 00:00:1709560185.678191  625881 device_compiler.h:186] Compiled cluster using XLA!  This line is logged at most once for the lifetime of the process.


Epoch 2/3
Epoch 3/3


<keras.src.callbacks.History at 0x7f5ed0b4bf40>

If you want to monitor the training process with TensorBoard, you can add a TensorBoard callback to fit() function and then start TensorBoard using `%tensorboard --logdir logs/fit`. Please refer to [TensorBoard documentation](https://www.tensorflow.org/tensorboard/get_started) for more details.

As the model trains, the loss is falling and a set of top-k retrieval metrics is updated. These tell us whether the true positive is in the top-k retrieved items from the entire candidate set. For example, a top-5 categorical accuracy metric of 0.2 would tell us that, on average, the true positive is in the top 5 retrieved items 20% of the time.

Note that, in this example, we evaluate the metrics during training as well as evaluation. Because this can be quite slow with large candidate sets, it may be prudent to turn metric calculation off in training, and only run it in evaluation.

Finally, we can evaluate our model on the test set:

In [21]:
model.evaluate(cached_test, return_dict=True)



{'factorized_top_k/top_1_categorical_accuracy': 0.00013723068695981055,
 'factorized_top_k/top_5_categorical_accuracy': 0.0018183066276833415,
 'factorized_top_k/top_10_categorical_accuracy': 0.003327844198793173,
 'factorized_top_k/top_50_categorical_accuracy': 0.011047069914638996,
 'factorized_top_k/top_100_categorical_accuracy': 0.018251681700348854,
 'loss': 2933.805908203125,
 'regularization_loss': 0,
 'total_loss': 2933.805908203125}

## Making predictions

Now that we have a model, we would like to be able to make predictions. We can use the `tfrs.layers.factorized_top_k.BruteForce` layer to do this.

In [22]:
# Create a model that takes in raw query features, and
index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
# recommends movies out of the entire movies dataset.
index.index_from_dataset(
  tf.data.Dataset.zip((items.batch(100), items.batch(100).map(model.item_model)))
)

# Get recommendations.
_, titles = index(tf.constant(["42"]))
print(f"Recommendations for user 42: {titles[0, :3]}")

Recommendations for user 42: [b'320494' b'193828' b'422425']
