References: https://www.tensorflow.org/recommenders/examples/basic_retrieval

# **RecSys Model 2: Retrieval**

Real-world recommender systems are often composed of two stages:

1.   **The retrieval stage** is responsible for selecting an initial set of hundreds of candidates from all possible candidates. The main objective of this model is to efficiently weed out all candidates that the user is not interested in. Because the retrieval model may be dealing with millions of candidates, it has to be computationally efficient.

2.   **The ranking stage** takes the outputs of the retrieval model and fine-tunes them to select the best possible handful of recommendations. Its task is to narrow down the set of items the user may be interested in to a shortlist of likely candidates.

In this notebook, we're going to build the first stage, retrieval.

Retrieval models are often composed of two sub-models:

1.   **A query model** computing the query representation (normally a fixed-dimensionality embedding vector) using query features.

2.   **A candidate model** computing the candidate representation (an equally-sized vector) using the candidate features

The outputs of the two models are then multiplied together to give a query-candidate affinity score, with higher scores expressing a better match between the candidate and the query.

# Imports

In [1]:
# Temporary solution for a bug in the implementation of the tfrs.layers.factorized_top_k module.
# https://github.com/tensorflow/recommenders/issues/712#issuecomment-2041163592

!pip uninstall tensorflow -y
!pip uninstall tensorflow-recommenders -y
#!pip uninstall tensorflow-datasets -y


import os
os.environ['TF_USE_LEGACY_KERAS'] = '1'

Found existing installation: tensorflow 2.17.1
Uninstalling tensorflow-2.17.1:
  Successfully uninstalled tensorflow-2.17.1
[0m

In [2]:
!pip install -q tensorflow==2.17
!pip install -q tensorflow-recommenders==0.7.3

#!pip install -q --upgrade tensorflow-datasets
!pip install -q scann

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m601.3/601.3 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m96.2/96.2 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.7/11.7 MB[0m [31m67.7 MB/s[0m eta [36m0:00:00[0m
[?25h

In [3]:
import pprint
import tempfile

from typing import Dict, Text

import numpy as np
import tensorflow as tf
#import tensorflow_datasets as tfds

import json
import pandas as pd
from google.colab import drive

In [4]:
import tensorflow_recommenders as tfrs

In [5]:
print(tf.__version__)

2.17.0


In [6]:
print(tfrs.__version__)

v0.7.3


# Importing and preprocessing the dataset

In [7]:
drive.mount('/content/drive')

Mounted at /content/drive


In [8]:
JSON_FILE = '/content/drive/My Drive/yelp_academic_dataset_review.json'

In [9]:
# Define the number of lines to read
#n_lines = 1000000

# Read the specified number of lines into a list of dictionaries
#with open(JSON_FILE, "r") as file:
#    data = [json.loads(next(file)) for _ in range(n_lines)]

# Convert the list of dictionaries into a DataFrame
#df = pd.DataFrame(data)

In [10]:
# Read the JSON lines file directly into a pandas DataFrame
#df = pd.read_json(JSON_FILE, lines=True)

In [11]:
def get_data(filename):
  # Initialize an empty list to store selected attributes
  filtered_data = []

  # Open and process JSON file line by line
  with open(filename, 'r') as file:
      for line in file:
          record = json.loads(line)
          # Extract only specific attributes
          filtered_data.append({'user_id': record['user_id'], 'business_id': record['business_id'], 'stars': record['stars']})

  # Create a DataFrame
  return pd.DataFrame(filtered_data)

In [12]:
df = get_data(JSON_FILE)

In [13]:
# Display the first few rows
print(df.head())

                  user_id             business_id  stars
0  mh_-eMZ6K5RLWhZyISBhwA  XQfwVwDr-v0ZS3_CbbE5Xw    3.0
1  OyoGAe7OKpv6SyGZT5g77Q  7ATYjTIgM3jUlt4UM3IypQ    5.0
2  8g_iMtfSiwikVnbP2etR0A  YjUWPpI6HXG530lwP-fb2A    3.0
3  _7bHUi9Uuf5__HHc_Q8guQ  kxX2SOes4o-D3ZQBkiMRfA    5.0
4  bcjbaE6dDog4jkNY91ncLQ  e4Vwtrqf-wpJfwesgvdgxQ    4.0


In [14]:
print(len(df)) # total number of entries

6990280


In [15]:
def get_employee_ids_with_null_categories():
  JSON_FILE = '/content/drive/My Drive/yelp_academic_dataset_business.json'

  df = pd.read_json(JSON_FILE, lines=True)

  # Extract business_ids where categories is null (NaN)
  business_ids_with_null_categories = df.loc[df['categories'].isna(), 'business_id'].to_numpy()

  return business_ids_with_null_categories

In [16]:
employee_ids_with_null_categories = get_employee_ids_with_null_categories()

# Remove rows where business_id matches any value in employee_ids_with_null_categories
df = df[~df['business_id'].isin(employee_ids_with_null_categories)]

In [17]:
print(len(df))  # number of entries after removing employees who have null 'categories'

6989591


In [18]:
# Rename columns
df = df.rename(columns={'user_id': 'customer_id', 'business_id': 'employee_id'})

# Display the result
print(df.head())

              customer_id             employee_id  stars
0  mh_-eMZ6K5RLWhZyISBhwA  XQfwVwDr-v0ZS3_CbbE5Xw    3.0
1  OyoGAe7OKpv6SyGZT5g77Q  7ATYjTIgM3jUlt4UM3IypQ    5.0
2  8g_iMtfSiwikVnbP2etR0A  YjUWPpI6HXG530lwP-fb2A    3.0
3  _7bHUi9Uuf5__HHc_Q8guQ  kxX2SOes4o-D3ZQBkiMRfA    5.0
4  bcjbaE6dDog4jkNY91ncLQ  e4Vwtrqf-wpJfwesgvdgxQ    4.0


In [19]:
# Create TensorFlow Dataset using tf.data
tf_dataset = tf.data.Dataset.from_tensor_slices((
    {'customer_id': df['customer_id'].astype(str).values,      # Ensure conversion to strings
    'employee_id': df['employee_id'].astype(str).values,   # Ensure conversion to strings
    'stars': df['stars'].astype(float).values}  # Ensure conversion to floats
))

In [20]:
# Displaying a sample from the TensorFlow Dataset using pprint
for x in tf_dataset.take(1).as_numpy_iterator():
    pprint.pprint(x)

{'customer_id': b'mh_-eMZ6K5RLWhZyISBhwA',
 'employee_id': b'XQfwVwDr-v0ZS3_CbbE5Xw',
 'stars': 3.0}


Let's figure out **unique employee ids** and **customer ids** present in the data.

This is important because we **need to be able to map the raw values of our categorical features to embedding vectors** in our models. To do that, we **need a vocabulary that maps a raw feature value to an integer in a contiguous range**: *this allows us to look up the corresponding embeddings in our embedding tables*.

In [21]:
# Extracting & processing data to build vocabularies (for query and candidate towers)

customers = tf_dataset.map(lambda x: x["customer_id"])
employees = tf_dataset.map(lambda x: x["employee_id"])

customer_ids = customers.batch(1_000)
employee_ids = employees.batch(1_000)

unique_customer_ids = np.unique(np.concatenate(list(customer_ids))) # vocabulary for the query tower
unique_employee_ids = np.unique(np.concatenate(list(employee_ids))) # vocabulary for the candidate tower

In [22]:
unique_customer_ids[:10]

array([b'---1lKK3aKOuomHnwAkAow', b'---2PmXbF47D870stH1jqA',
       b'---UgP94gokyCDuB5zUssA', b'---fa6ZK37T9NjkGKI4oSg',
       b'---r61b7EpVPkb4UVme5tA', b'---zemaUC8WeJeWKqS6p9Q',
       b'--034gGozmK4y5txuPsdAA', b'--0DrQkM0FT-yCQRWw82uQ',
       b'--0FNOzZkEQlz8WzS3WttQ', b'--0Jj_J_MmUJ51f1Y394Uw'], dtype=object)

In [23]:
print(len(unique_customer_ids))

1987685


In [24]:
unique_employee_ids[:10]

array([b'---kPU91CF4Lq2-WlRu9Lw', b'--0iUa4sNDFiZFrAdIWhZQ',
       b'--30_8IhuyMHbSOcNWd6DQ', b'--7PUidqRWpRSpXebiyxTg',
       b'--7jw19RH9JKXgFohspgQw', b'--8IbOsAAxjKRoYsBFL-PA',
       b'--9osgUCSDUWUkoTLdvYhQ', b'--ARBQr1WMsTWiwOKOj-FQ',
       b'--FWWsIwxRwuw9vIMImcQg', b'--FcbSxK1AoEtEAxOgBaCw'], dtype=object)

In [25]:
print(len(unique_employee_ids))

150243


In [26]:
# Data to train/test the model
tf_dataset = tf_dataset.map(lambda x: {
    "customer_id": x["customer_id"],
    "employee_id": x["employee_id"]
})

In [27]:
# Split data into a training and evaluation set

tf.random.set_seed(42)
shuffled = tf_dataset.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

'''FOLLWING IS NOT APPLICABLE FOR THIS MODEL 2'''
# Since this model creates just a retrival index, it is suitable to use the test dataset also for training to index them as well.
# Because no unseen data/queries are given as input to this model under any circumstance, the model doesn't need to generalise to unseen data.
# Therefore, following code snippets to create train and test splits are ommitted during execution.

trainset_size = round(len(shuffled) * 0.8)
testset_size = round(len(shuffled) * 0.2)

train = shuffled.take(trainset_size)
test = shuffled.skip(trainset_size).take(testset_size)

# Implementing a model
 A two-tower retrieval model, we can build each tower separately and then combine them in the final model.

In [28]:
# The dimensionality of the query and candidate representations
embedding_dimension = 32

## The query tower
A query model computing the query representation (normally a fixed-dimensionality embedding vector) using query features.


 Use Keras preprocessing layers to first convert customer ids to integers, and then convert those to customer id embeddings via an `Embedding` layer. Note that we use the list of unique customer id we computed earlier as a vocabulary

In [29]:
customer_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_customer_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens (to handle unseen or out-of-vocabulary (OOV) data.)
  tf.keras.layers.Embedding(len(unique_customer_ids) + 1, embedding_dimension)
])

## The candidate tower
A candidate model computing the candidate representation (an equally-sized vector) using the candidate features

 Use Keras preprocessing layers to first convert employee ids to integers, and then convert those to employee id embeddings via an `Embedding` layer. Note that we use the list of unique employee ids we computed earlier as a vocabulary

In [30]:
employee_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_employee_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens (to handle unseen or out-of-vocabulary (OOV) data.)
  tf.keras.layers.Embedding(len(unique_employee_ids) + 1, embedding_dimension)
])

## Metrics

In our training data we have positive (customer, employee) pairs. To figure out how good our model is, we need to compare the affinity score that the model calculates for this pair to the scores of all the other possible candidates: if the score for the positive pair is higher than for all other candidates, our model is highly accurate.

To do this, we can use the `tfrs.metrics.FactorizedTopK metric`. The metric has one required argument: the dataset of candidates that are used as implicit negatives for evaluation.

In our case, that's the employee ids dataset, converted into embeddings via our employee model:

In [31]:
metrics = tfrs.metrics.FactorizedTopK(
  candidates=employees.batch(128).map(employee_model)
)

## Loss

The next component is the loss used to train our model. TFRS has several loss layers and tasks to make this easy.

In this instance, we'll make use of the Retrieval task object: a convenience wrapper that bundles together the loss function and metric computation:

In [32]:
task = tfrs.tasks.Retrieval(
  metrics=metrics
)

The task itself is a Keras layer that takes the query and candidate embeddings as arguments, and returns the computed loss: we'll use that to implement the model's training loop.

## The full model

We can now put it all together into a model. TFRS exposes a base model class (`tfrs.models.Model`) which streamlines building models: all we need to do is to set up the components in the` __init__` method, and implement the compute_loss method, taking in the raw features and returning a loss value.

The base model will then take care of creating the appropriate training loop to fit our model.

In [45]:
class YelpModel(tfrs.Model):

  def __init__(self, category_model, employee_model):
    super().__init__()
    self.customer_model: tf.keras.Model = customer_model
    self.employee_model: tf.keras.Model = employee_model
    self.task: tf.keras.layers.Layer = task

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # We pick out the customer features and pass them into the customer model.
    customer_embeddings = self.customer_model(features["customer_id"])
    # And pick out the employee features and pass them into the employee model,
    # getting embeddings back.
    positive_employee_embeddings = self.employee_model(features["employee_id"])

    #if training:
      # The task computes the loss and not the metrics during training to speed up the process.
    #  return self.task(customer_embeddings, positive_employee_embeddings, compute_metrics=False)


    # The task computes the loss and the metrics.
    return self.task(customer_embeddings, positive_employee_embeddings, compute_metrics=False)

The `tfrs.Model` base class is a simply convenience class: it allows us to compute both training and test losses using the same method.

# Fitting and evaluating

After defining the model, we can use standard Keras fitting and evaluation routines to fit and evaluate the model.

Let's first instantiate the model.

In [46]:
model = YelpModel(customer_model, employee_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

Then shuffle, batch, and cache the training and evaluation data

In [47]:
'''FOLLWING IS NOT APPLICABLE FOR THIS MODEL 2'''
# Since this model creates just a retrival index, it is suitable to use the test dataset also for training to index them as well.
# Because no unseen data/queries are given as input to this model under any circumstance, the model doesn't need to generalise to unseen data.
# Therefore, following code snippets to create train and test splits are ommitted during execution.

cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

#cached_train = shuffled.shuffle(100_000).batch(8192).cache()

Then train the model:

In [48]:
model.fit(cached_train, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<tf_keras.src.callbacks.History at 0x7df3137f3370>

As the model trains, the loss is falling and a set of top-k retrieval metrics is updated. These tell us whether the true positive is in the top-k retrieved items from the entire candidate set. For example, a top-5 categorical accuracy metric of 0.2 would tell us that, on average, the true positive is in the top 5 retrieved items 20% of the time.

Note that, in this example, we evaluate the metrics during training as well as evaluation. Because this can be quite slow with large candidate sets, it may be prudent to turn metric calculation off in training, and only run it in evaluation.

Finally, we can evaluate our model on the test set:

In [49]:
'''FOLLWING IS NOT APPLICABLE FOR THIS MODEL 2'''
# Since this model creates just a retrival index, it is suitable to use the test dataset also for training to index them as well.
# Because no unseen data/queries are given as input to this model under any circumstance, the model doesn't need to generalise to unseen data.
# Therefore, following code snippet to test the model is ommitted during execution.


model.evaluate(cached_test, return_dict=True)



{'factorized_top_k/top_1_categorical_accuracy': 0.0,
 'factorized_top_k/top_5_categorical_accuracy': 0.0,
 'factorized_top_k/top_10_categorical_accuracy': 0.0,
 'factorized_top_k/top_50_categorical_accuracy': 0.0,
 'factorized_top_k/top_100_categorical_accuracy': 0.0,
 'loss': 8363.802734375,
 'regularization_loss': 0,
 'total_loss': 8363.802734375}

## Making predictions

Now that we have a model, we would like to be able to make predictions. We can use the `tfrs.layers.factorized_top_k.BruteForc`e layer to do this.

In [50]:
unique_employee_ids = tf.constant(unique_employee_ids)  # Convert to Tensor to make the data (numpy array) ready for subsequent TensorFlow operations
unique_employee_ids = tf.data.Dataset.from_tensor_slices(unique_employee_ids)  # Convert the tensor into a Dataset

In [51]:
# Create a model that takes in raw query features, and
index = tfrs.layers.factorized_top_k.BruteForce(model.customer_model, k=1000)
# recommends employees out of the entire unique employee dataset.
index.index_from_dataset(
  tf.data.Dataset.zip((unique_employee_ids.batch(1000), unique_employee_ids.batch(1000).map(model.employee_model)))
)

# Get recommendations.
_, employee_ids = index(tf.constant(["Ha3iJu77CxlrFm-vQRs_8g"]), k=20)
print(f"Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': {employee_ids[0, :5]}")

Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': [b'YbnJYHNp_fHbI-hcFg48vQ' b'DD3TxygdxBxKh9gbjCuLDA'
 b'1bJxvwuMTyXmQGu90WLPhA' b'W0vdz23JQtVQX5vJkiCj3g'
 b'lTCoYu00AUV0SHxOa-XXBw']


In [52]:
print(len(employee_ids[0]))

20


Of course, the BruteForce layer is going to be too slow to serve a model with many possible candidates. The following sections shows how to speed this up by using an approximate retrieval index.

An approximate retrieval index to speed up predictions. This will make it possible to efficiently surface recommendations from sets of tens of millions of candidates.

To do so, we can use the `scann` package. This is an optional dependency of TFRS, and we installed it separately at the beginning of this notebook by calling `!pip install -q scann`.

Once installed we can use the TFRS `ScaNN` layer:

In [53]:
# Create a model that takes in raw query features, and
scann_index = tfrs.layers.factorized_top_k.ScaNN(model.customer_model, k=1000)
# recommends employees out of the entire unique employee dataset.
scann_index.index_from_dataset(
  tf.data.Dataset.zip((unique_employee_ids.batch(1000), unique_employee_ids.batch(1000).map(model.employee_model)))
)

<tensorflow_recommenders.layers.factorized_top_k.ScaNN at 0x7df31381ad10>

This layer will perform approximate lookups: this makes retrieval slightly less accurate, but orders of magnitude faster on large candidate sets.

In [54]:
# Get recommendations.
_, employee_ids = scann_index(tf.constant(["Ha3iJu77CxlrFm-vQRs_8g"]), k=20)
print(f"Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': {employee_ids[0, :5]}")

Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': [b'W0vdz23JQtVQX5vJkiCj3g' b'SbdL-8NSmTWgSwdGZBa7WQ'
 b'lTCoYu00AUV0SHxOa-XXBw' b'H-1qpp_77KggOAr9htUrEw'
 b'lS1dmSXpAtQqT04eRm9kiA']


In [55]:
print(len(employee_ids[0]))

20


# Model serving

After the model is trained, we need a way to deploy it.

In a two-tower retrieval model, serving has two components:


*   **a serving query model**, taking in features of the query and transforming them into a query embedding, and
*   **a serving candidate model**. This most often takes the form of an approximate nearest neighbours (ANN) index which allows fast approximate lookup of candidates in response to a query produced by the query model.


In TFRS, both components can be packaged into a single exportable model, giving us a model that takes the raw category names and returns the ids of top/most similar employees for that category. This is done via exporting the model to a `SavedModel` format, which makes it possible to serve using TensorFlow Serving.

To deploy a model like this, we simply export the `BruteForce` layer and/or `ScaNN` layer we created above:

In [56]:
# Export the query model.
with tempfile.TemporaryDirectory() as tmp:
  path = os.path.join(tmp, "model")

  # Save the index.
  tf.saved_model.save(
      scann_index,
      path,
      options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
  )

  # Load it back; can also be done in TensorFlow Serving.
  loaded = tf.saved_model.load(path)

  # Pass a customer id in, get top predicted employee ids back.
  scores, employee_ids = loaded(tf.constant(["Ha3iJu77CxlrFm-vQRs_8g"]))

  print(f"Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': {employee_ids[0][:5]}")



Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': [b'W0vdz23JQtVQX5vJkiCj3g' b'SbdL-8NSmTWgSwdGZBa7WQ'
 b'lTCoYu00AUV0SHxOa-XXBw' b'H-1qpp_77KggOAr9htUrEw'
 b'lS1dmSXpAtQqT04eRm9kiA']


In [57]:
print(len(employee_ids[0]))

1000


In [58]:
# Define the folder path for saving the model
save_dir = '/content/drive/My Drive/Colab Notebooks/Saved Models'
#save_dir = '/content/Saved Model'

# Ensure the folder exists
os.makedirs(save_dir, exist_ok=True)

# Path to save the model
model_path = os.path.join(save_dir, "recsys_model_two_retrieval")

# Save the ScaNN index
tf.saved_model.save(
    scann_index,
    model_path,
    options=tf.saved_model.SaveOptions(namespace_whitelist=["Scann"])
)

# Load the model back
loaded = tf.saved_model.load(model_path)

# Pass a category name and get top recommendations
scores, employee_ids = loaded(tf.constant(["Ha3iJu77CxlrFm-vQRs_8g"]))

print(f"Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': {employee_ids[0][:5]}")



Recommendations for customer 'Ha3iJu77CxlrFm-vQRs_8g': [b'W0vdz23JQtVQX5vJkiCj3g' b'SbdL-8NSmTWgSwdGZBa7WQ'
 b'lTCoYu00AUV0SHxOa-XXBw' b'H-1qpp_77KggOAr9htUrEw'
 b'lS1dmSXpAtQqT04eRm9kiA']


In [59]:
print(len(employee_ids[0]))

1000
