In [5]:
import os
import pprint
import tempfile

from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs

# Preparing the dataset

- Use the same data as the retrieval tutorial
- Keep the ratings

In [6]:
ratings = tfds.load('movielens/100k-ratings', split = 'train')

ratings = ratings.map(lambda x: {
    'movie_title': x['movie_title'],
    'user_id': x['user_id'],
    'user_rating': x['user_rating']
})

In [8]:
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed = 42, reshuffle_each_iteration = False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

- Figure out unique user ids and movie titles present in the data
- Need to be able to map the raw values of our categorical features to embedding vectors
    - Need a vocabulary that maps a raw feature value to an integer in a contiguous range: Allow us to look up the corresponding embeddings in our embedding tables

In [9]:
movie_titles = ratings.batch(1_000_000).map(lambda x: x['movie_title'])
user_ids = ratings.batch(1_000_000).map(lambda x: x['user_id'])

unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

# Implemeting a model

- Architecture
    - Ranking models: 
        - Dot not face the same efficiency constraints as retrieval models do
        - Have a little more freedom in our choice of architectures

In [13]:
class RankingModel(tf.keras.Model):
    def __init__(self):
        super().__init__()
        embedding_dimension = 32

        # Compute embeddings for users.
        self.user_embeddings = tf.keras.Sequential([
            tf.keras.layers.StringLookup(
                vocabulary = unique_user_ids, mask_token = None
            ),
            tf.keras.layers.Embedding(
                len(unique_user_ids) + 1, embedding_dimension
            )
        ])

        # Compute embedding for movies.
        self.movie_embedding = tf.keras.Sequential([
            tf.keras.layers.StringLookup(
                vocabulary = unique_movie_titles, mask_token = None
            ),
            tf.keras.layers.Embedding(
                len(unique_movie_titles) + 1, embedding_dimension
            )
        ])

        # Compute predictions
        self.ratings = tf.keras.Sequential([
            # Learn multiple dense layers.
            tf.keras.layers.Dense(256, activation = 'relu'),
            tf.keras.layers.Dense(64, activation = 'relu'),
            # Make rating predictions in the final layer.
            tf.keras.layers.Dense(1)
        ])

    def call(self, inputs):
        user_id, movie_title = inputs

        user_embedding = self.user_embeddings(user_id)
        movie_embedding = self.movie_embedding(movie_title)

        return self.ratings(tf.concat([user_embedding, movie_embedding], axis = 1))

In [14]:
# Take user ids and movie titles
# Outputs a predicted rating
RankingModel()((['42'], ["One Flew Over the Cuckoo's Nest (1975)"]))

<tf.Tensor: shape=(1, 1), dtype=float32, numpy=array([[-0.01269129]], dtype=float32)>

# Loss and metrics

- Loss used to train our model
- TFRS has several loss layers and tasks to make this easy
- Make use of the Ranking task object:
    - A convenience wrapper that bundles together the loss function and metric computation
    - Use with MeanSquaredError Keras loss in order to predict the ratings

In [16]:
task = tfrs.tasks.Ranking(
    loss = tf.keras.losses.MeanSquaredError(),
    metrics = [tf.keras.metrics.RootMeanSquaredError()]
)

- The task itself is a Keras layer that takes true and predicted as arguments
    - Returns the computed loss

# The full model

In [18]:
class MovielensModel(tfrs.models.Model):
    def __init__(self):
        super().__init__()
        self.ranking_model: tf.keras.Model = RankingModel()
        self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
            loss = tf.keras.losses.MeanSquaredError(),
            metrics = [tf.keras.metrics.RootMeanSquaredError()]
        )
    
    def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
        return self.ranking_model(
            (features['user_id'], features['movie_title'])
        )
    
    def compute_loss(self, features: Dict[Text, tf.Tensor], training = False) -> tf.Tensor:
        labels = features.pop('user_rating')
        
        rating_predictions = self(features)

        # The task computes the loss and the metrics.
        return self.task(labels = labels, predictions = rating_predictions)

# Fitting and evaluating

In [20]:
# Instantitate the model
model = MovielensModel()
model.compile(optimizer = tf.keras.optimizers.Adagrad(learning_rate=0.1))

In [21]:
# Shuffle, batch and cache the training and evaluation data
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

In [22]:
# Train the model
model.fit(cached_train, epochs = 3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.src.callbacks.History at 0x2562b7e16f0>

In [23]:
# Evaluate model on the test set
model.evaluate(cached_test, return_dict = True)



{'root_mean_squared_error': 1.114282250404358,
 'loss': 1.2173455953598022,
 'regularization_loss': 0,
 'total_loss': 1.2173455953598022}

- Lower RMSE = Better predicting ratings

# Testing the ranking model

In [25]:
test_ratings = {}
test_movie_titles = ["M*A*S*H (1970)", "Dances with Wolves (1990)", "Speed (1994)"]
for movie_title in test_movie_titles:
    test_ratings[movie_title] = model({
        'user_id': np.array(['42']),
        'movie_title': np.array([movie_title])
    })
    print('Ratings:')
    for title, score in sorted(test_ratings.items(), key = lambda x: x[1], reverse = True):
        print(f'{title}: {score}')

Ratings:
M*A*S*H (1970): [[3.760667]]
Ratings:
M*A*S*H (1970): [[3.760667]]
Dances with Wolves (1990): [[3.7566967]]
Ratings:
M*A*S*H (1970): [[3.760667]]
Dances with Wolves (1990): [[3.7566967]]
Speed (1994): [[3.6384823]]


# Exporting for serving

In [26]:
tf.saved_model.save(model, 'export')

INFO:tensorflow:Assets written to: export\assets


INFO:tensorflow:Assets written to: export\assets


In [27]:
loaded = tf.saved_model.load('export')
loaded({
    'user_id': np.array(['42']),
    'movie_title': ["Speed (1994)"]
}).numpy()

array([[3.6384823]], dtype=float32)

# Convert the model to TensorFlow Lite

- Converted the trained ranking model to TensorFlow Lite
- Run it on-device (for better user privacy privacy and lower latency)

In [28]:
converter = tf.lite.TFLiteConverter.from_saved_model('export')
tflite_model = converter.convert()
open('converted_model.tflite', 'wb').write(tflite_model)

544544

- Can run it like regular TensorFlow Lite models

In [30]:
intepreter = tf.lite.Interpreter(model_path = 'converted_model.tflite')
intepreter.allocate_tensors()

# Get input and output tensors.
input_details = intepreter.get_input_details()
output_details = intepreter.get_output_details()

# Test the model.
if input_details[0]['name'] == 'serving_default_movie_title:0':
    intepreter.set_tensor(input_details[0]['index'], np.array(["Speed (1994)"]))
    intepreter.set_tensor(input_details[1]['index'], np.array(['42']))
else:
    intepreter.set_tensor(input_details[0]['index'], np.array(['42']))
    intepreter.set_tensor(input_details[1]['index'], np.array(["Speed (1994)"]))
intepreter.invoke()

rating = intepreter.get_tensor(output_details[0]['index'])
print(rating)

[[3.6384823]]
