In [1]:
import pandas as pd
import tensorflow as tf
import os
import pprint
import tensorflow_datasets as tfds
import numpy as np
import tempfile
from typing import Dict, Text
import tensorflow_recommenders as tfrs


2024-09-22 15:27:38.848331: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-22 15:27:38.954791: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-09-22 15:27:39.071166: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:485] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-09-22 15:27:39.164991: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:8454] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-09-22 15:27:39.192606: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1452] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
2024-09-22 15:27:39.428396: I tensorflow/core/platform/cpu_feature_gu

In [2]:
def read_ratings(ratings_csv: str, data_dir: str = "/home/antoine/Ml_Ops_Movies_Reco/src/data/data/raw") -> pd.DataFrame:
    """
    Lit le fichier CSV contenant les évaluations des films.

    :param ratings_csv: Nom du fichier CSV contenant les évaluations.
    :param data_dir: Répertoire où se trouve le fichier CSV.
    :return: DataFrame contenant les évaluations.
    """
    data = pd.read_csv(os.path.join(data_dir, ratings_csv))
    print("Dataset ratings chargé")
    return data

def read_movies(movies_csv: str, data_dir: str = "/home/antoine/Ml_Ops_Movies_Reco/src/data/data/raw") -> pd.DataFrame:
    """
    Lit le fichier CSV contenant les informations sur les films.

    :param movies_csv: Nom du fichier CSV contenant les informations sur les films.
    :param data_dir: Répertoire où se trouve le fichier CSV.
    :return: DataFrame contenant les informations sur les films.
    """
    df = pd.read_csv(os.path.join(data_dir, movies_csv))
    print("Dataset movies chargé")
    return df

In [31]:
ratings = read_ratings('ratings.csv')
movies = read_movies('movies.csv')

Dataset ratings chargé
Dataset movies chargé


In [32]:
ratings = ratings.merge(movies, on = "movieId", how = "left")
ratings = ratings[ratings["userId"] < 25000]
ratings.movieId = ratings.movieId.astype('str')
ratings.userId = ratings.userId.astype('str')
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp,title,genres
0,1,2,3.5,1112486027,Jumanji (1995),Adventure|Children|Fantasy
1,1,29,3.5,1112484676,"City of Lost Children, The (CitÃ© des enfants ...",Adventure|Drama|Fantasy|Mystery|Sci-Fi
2,1,32,3.5,1112484819,Twelve Monkeys (a.k.a. 12 Monkeys) (1995),Mystery|Sci-Fi|Thriller
3,1,47,3.5,1112484727,Seven (a.k.a. Se7en) (1995),Mystery|Thriller
4,1,50,3.5,1112484580,"Usual Suspects, The (1995)",Crime|Mystery|Thriller


In [33]:
# transformation en datset tensorflow
ratings = tf.data.Dataset.from_tensor_slices((dict(ratings[['rating', 'movieId','title', 'userId']])))

In [34]:
for x in ratings.take(1).as_numpy_iterator():
  pprint.pprint(x)

{'movieId': b'2', 'rating': 3.5, 'title': b'Jumanji (1995)', 'userId': b'1'}


In [35]:
movies.movieId = movies.movieId.astype('str')

movies = movies[['movieId', 'title']]
movies.head()

Unnamed: 0,movieId,title
0,1,Toy Story (1995)
1,2,Jumanji (1995)
2,3,Grumpier Old Men (1995)
3,4,Waiting to Exhale (1995)
4,5,Father of the Bride Part II (1995)


In [36]:
# transformation en datset tensorflow
movies = tf.data.Dataset.from_tensor_slices((dict(movies[['title', "movieId"]])))

In [37]:
for x in movies.take(1).as_numpy_iterator():
  pprint.pprint(x)

{'movieId': b'1', 'title': b'Toy Story (1995)'}


2024-09-22 17:10:55.932015: I tensorflow/core/framework/local_rendezvous.cc:404] Local rendezvous is aborting with status: OUT_OF_RANGE: End of sequence


In [38]:
# Nous ne gardons que le user_id et movie_title champs dans l'ensemble de données.
ratings = ratings.map(lambda x: {
    "movie_title": x["title"],
    "user_id": x["userId"],
})
movies = movies.map(lambda x: x["title"])

In [39]:
for x in ratings.take(1).as_numpy_iterator():
  pprint.pprint(x)

{'movie_title': b'Jumanji (1995)', 'user_id': b'1'}


In [40]:
for x in movies.take(1).as_numpy_iterator():
  pprint.pprint(x)

b'Toy Story (1995)'


In [41]:
user_ids_vocabulary = tf.keras.layers.StringLookup(mask_token=None)
user_ids_vocabulary.adapt(ratings.map(lambda x: x["user_id"]))

movie_titles_vocabulary = tf.keras.layers.StringLookup(mask_token=None)
movie_titles_vocabulary.adapt(movies)

In [None]:
class MovieLensModel(tfrs.Model):
  # We derive from a custom base class to help reduce boilerplate. Under the hood,
  # these are still plain Keras Models.

  def __init__(
      self,
      user_model: tf.keras.Model,
      movie_model: tf.keras.Model,
      task: tfrs.tasks.Retrieval):
    super().__init__()

    # Set up user and movie representations.
    self.user_model = user_model
    self.movie_model = movie_model

    # Set up a retrieval task.
    self.task = task

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # Define how the loss is computed.

    user_embeddings = self.user_model(features["user_id"])
    movie_embeddings = self.movie_model(features["movie_title"])

    return self.task(user_embeddings, movie_embeddings)

In [None]:
# Define user and movie models.
user_model = tf.keras.Sequential([
    user_ids_vocabulary,
    tf.keras.layers.Embedding(user_ids_vocabulary.vocab_size(), 64)
])
movie_model = tf.keras.Sequential([
    movie_titles_vocabulary,
    tf.keras.layers.Embedding(movie_titles_vocabulary.vocab_size(), 64)
])

# Define your objectives.
task = tfrs.tasks.Retrieval(metrics=tfrs.metrics.FactorizedTopK(
    movies.batch(128).map(movie_model)
  )
)

In [None]:
# Create a retrieval model.
model = MovieLensModel(user_model, movie_model, task)
model.compile(optimizer=tf.keras.optimizers.Adagrad(0.5))

# Train for 3 epochs.
model.fit(ratings.batch(4096), epochs=3)

# Use brute-force search to set up retrieval using the trained representations.
index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
index.index_from_dataset(
    movies.batch(100).map(lambda title: (title, model.movie_model(title))))

# Get some recommendations.
_, titles = index(np.array(["42"]))
print(f"Top 10 recommendations for user 42: {titles[0, :10]}")

Pour ajuster et évaluer le modèle, nous devons le diviser en un ensemble de formation et d'évaluation. Dans un système de recommender industriel, ce serait très probablement fait par le temps: les données jusqu'à temps 
 serait utilisé pour prédire les interactions après 
.

Dans cet exemple simple, cependant, utilisons une répartition aléatoire, mettant 80% des notes dans l'ensemble de train et 20% dans l'ensemble de test.

In [31]:
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

Déterminons également les identifiants d'utilisateur uniques et les titres de films présents dans les données.

Ceci est important car nous devons être en mesure de mapper les valeurs brutes de nos caractéristiques catégorielles aux vecteurs intégrés dans nos modèles. Pour ce faire, nous avons besoin d'un vocabulaire qui mappe une valeur de caractéristique brute à un entier dans une plage contiguë : cela nous permet de rechercher les plongements correspondants dans nos tables de plongement.

In [60]:
movie_titles = movies.batch(1_000)
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])

unique_movie_titles = np.unique(np.concatenate(list(movie_titles)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

unique_movie_titles[:10]

array([b'"Great Performances" Cats (1998)',
       b'#chicagoGirl: The Social Network Takes on a Dictator (2013)',
       b'$ (Dollars) (1971)', b'$5 a Day (2008)', b'$9.99 (2008)',
       b'$ellebrity (Sellebrity) (2012)', b"'71 (2014)",
       b"'Hellboy': The Seeds of Creation (2004)",
       b"'Human' Factor, The (Human Factor, The) (1975)",
       b"'Neath the Arizona Skies (1934)"], dtype=object)

Le choix de l'architecture de notre modèle est un élément clé de la modélisation.

Parce que nous construisons un modèle de récupération à deux tours, nous pouvons construire chaque tour séparément, puis les combiner dans le modèle final.

### Le tour d'interrogation

La première étape consiste à décider de la dimensionnalité de la requête et des représentations candidates :

In [61]:
embedding_dimension = 32

Des valeurs plus élevées correspondront à des modèles qui peuvent être plus précis, mais seront également plus lents à s'adapter et plus sujets au surajustement.

La seconde est de définir le modèle lui-même. Ici, nous allons utiliser des couches de pré - traitement KERAS premier utilisateur à convertir ids en entiers, puis convertir ceux incorporations utilisateur via une Embedding couche. Notez que nous utilisons la liste des identifiants d'utilisateur uniques que nous avons calculés précédemment comme vocabulaire :

In [62]:
user_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_user_ids, mask_token=None),
  # We add an additional embedding to account for unknown tokens.
  tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
])

Un modèle simple comme cela correspond exactement à un classique factorisation matrice approche. Lors de la définition d' une sous - classe de tf.keras.Model pour ce modèle simple pourrait être surpuissant, nous pouvons facilement l' étendre à un modèle arbitrairement complexe en utilisant des composants standards KERAS, aussi longtemps que nous revenons d' une embedding_dimension sortie -Wide à la fin.

### Le tour des candidats

In [63]:
movie_model = tf.keras.Sequential([
  tf.keras.layers.StringLookup(
      vocabulary=unique_movie_titles, mask_token=None),
  tf.keras.layers.Embedding(len(unique_movie_titles) + 1, embedding_dimension)
])

In [64]:
# Example input for movie_model
sample_movies = tf.constant(["Toy Story (1995)", "Jumanji (1995)"])
output = movie_model(sample_movies)
print(output.shape)  # Check the shape of the output

(2, 32)


### Metrics

Dans nos données d'entraînement, nous avons des paires positives (utilisateur, film). Pour déterminer la qualité de notre modèle, nous devons comparer le score d'affinité que le modèle calcule pour cette paire aux scores de tous les autres candidats possibles : si le score de la paire positive est plus élevé que pour tous les autres candidats, notre modèle est très précis.

Pour ce faire, nous pouvons utiliser la tfrs.metrics.FactorizedTopK métrique. La métrique a un argument obligatoire : l'ensemble de données des candidats qui sont utilisés comme négatifs implicites pour l'évaluation.

Dans notre cas, c'est le movies ensemble de données, converti en incorporations via notre modèle de film:

In [65]:
candidates = movies.batch(128).map(movie_model)

In [66]:
metrics = tfrs.metrics.FactorizedTopK(candidates=candidates)

ValueError: Cannot convert '('c', 'o', 'u', 'n', 't', 'e', 'r')' to a shape. Found invalid entry 'c' of type '<class 'str'>'. 

### Perte

Le composant suivant est la perte utilisée pour entraîner notre modèle. TFRS a plusieurs couches et tâches de perte pour rendre cela facile.

Dans ce cas, nous utilisons la Retrieval objet tâche: commodité emballage que liasses la fonction de perte et calcul métrique:

In [52]:
task = tfrs.tasks.Retrieval(
  metrics=metrics
)

NameError: name 'metrics' is not defined

### Modèle complet:

Nous pouvons maintenant mettre le tout ensemble dans un modèle. TFRS expose une classe de modèle de base ( tfrs.models.Model ) qui rationalise la construction de modèles: tout ce que nous devons faire est de mettre en place les composants dans la __init__ méthode, et mettre en œuvre la compute_loss méthode, en prenant les fonctions premières et de retourner une valeur de perte .

Le modèle de base s'occupera ensuite de créer la boucle d'entraînement appropriée pour s'adapter à notre modèle.

In [None]:
class MovielensModel(tfrs.Model):

  def __init__(self, user_model, movie_model):
    super().__init__()
    self.movie_model: tf.keras.Model = movie_model
    self.user_model: tf.keras.Model = user_model
    self.task: tf.keras.layers.Layer = task

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    # We pick out the user features and pass them into the user model.
    user_embeddings = self.user_model(features["user_id"])
    # And pick out the movie features and pass them into the movie model,
    # getting embeddings back.
    positive_movie_embeddings = self.movie_model(features["movie_title"])

    # The task computes the loss and the metrics.
    return self.task(user_embeddings, positive_movie_embeddings)

La tfrs.Model classe de base est une classe tout simplement pratique: il nous permet de calculer les pertes de formation et de test en utilisant la même méthode.

Sous le capot, c'est toujours un modèle Keras sobre. Vous pouvez obtenir la même fonctionnalité en héritant de tf.keras.Model et en remplaçant les train_step et test_step fonctions (voir le guide pour les détails):


In [None]:
class NoBaseClassMovielensModel(tf.keras.Model):

  def __init__(self, user_model, movie_model):
    super().__init__()
    self.movie_model: tf.keras.Model = movie_model
    self.user_model: tf.keras.Model = user_model
    self.task: tf.keras.layers.Layer = task

  def train_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:

    # Set up a gradient tape to record gradients.
    with tf.GradientTape() as tape:

      # Loss computation.
      user_embeddings = self.user_model(features["user_id"])
      positive_movie_embeddings = self.movie_model(features["movie_title"])
      loss = self.task(user_embeddings, positive_movie_embeddings)

      # Handle regularization losses as well.
      regularization_loss = sum(self.losses)

      total_loss = loss + regularization_loss

    gradients = tape.gradient(total_loss, self.trainable_variables)
    self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

    metrics = {metric.name: metric.result() for metric in self.metrics}
    metrics["loss"] = loss
    metrics["regularization_loss"] = regularization_loss
    metrics["total_loss"] = total_loss

    return metrics

  def test_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:

    # Loss computation.
    user_embeddings = self.user_model(features["user_id"])
    positive_movie_embeddings = self.movie_model(features["movie_title"])
    loss = self.task(user_embeddings, positive_movie_embeddings)

    # Handle regularization losses as well.
    regularization_loss = sum(self.losses)

    total_loss = loss + regularization_loss

    metrics = {metric.name: metric.result() for metric in self.metrics}
    metrics["loss"] = loss
    metrics["regularization_loss"] = regularization_loss
    metrics["total_loss"] = total_loss

    return metrics

### Ajustement et évaluation

Après avoir défini le modèle, nous pouvons utiliser les routines d'ajustement et d'évaluation Keras standard pour ajuster et évaluer le modèle.

Commençons par instancier le modèle.

In [None]:
model = MovielensModel(user_model, movie_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

# mélangez, regroupez et cachez les données d'entraînement et d'évaluation.
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

# entraînez le modèle :
model.fit(cached_train, epochs=3)

Au fur et à mesure que le modèle s'entraîne, la perte diminue et un ensemble de métriques de récupération top-k est mis à jour. Ceux-ci nous indiquent si le vrai positif se trouve dans le top-k des éléments récupérés de l'ensemble des candidats. Par exemple, une métrique de précision catégorielle des 5 premiers de 0,2 nous indiquerait qu'en moyenne, le vrai positif se trouve dans les 5 premiers éléments récupérés 20 % du temps.

Notez que, dans cet exemple, nous évaluons les métriques pendant la formation ainsi que l'évaluation. Étant donné que cela peut être assez lent avec de grands ensembles de candidats, il peut être prudent de désactiver le calcul des métriques lors de la formation et de ne l'exécuter que lors de l'évaluation.

Enfin, nous pouvons évaluer notre modèle sur l'ensemble de test :

In [None]:
model.evaluate(cached_test, return_dict=True)

### Faire des prédictions
Maintenant que nous avons un modèle, nous aimerions pouvoir faire des prédictions. Nous pouvons utiliser la tfrs.layers.factorized_top_k.BruteForce couche pour ce faire.


In [None]:
# Create a model that takes in raw query features, and
index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
# recommends movies out of the entire movies dataset.
index.index_from_dataset(
  tf.data.Dataset.zip((movies.batch(100), movies.batch(100).map(model.movie_model)))
)

# Get recommendations.
_, titles = index(tf.constant(["42"]))
print(f"Recommendations for user 42: {titles[0, :10]}")

### Modèle au service

Une fois le modèle formé, nous avons besoin d'un moyen de le déployer.

Dans un modèle de récupération à deux tours, la diffusion comporte deux composants :

- un modèle de requête de service, prenant en compte les caractéristiques de la requête et les transformant en une incorporation de requête, et
- un modèle de candidat au service. Cela prend le plus souvent la forme d'un index approximatif des voisins les plus proches (ANN) qui permet une recherche approximative rapide des candidats en réponse à une requête produite par le modèle de requête.
Dans TFRS, les deux composants peuvent être regroupés dans un seul modèle exportable, ce qui nous donne un modèle qui prend l'identifiant utilisateur brut et renvoie les titres des meilleurs films pour cet utilisateur. Cela se fait via l' exportation du modèle à un SavedModel format, ce qui permet de servir en utilisant tensorflow service .

Pour déployer un modèle comme celui - ci, nous avons simplement exporter la BruteForce couche que nous avons créé ci - dessus:

In [None]:
# Export the query model.
with tempfile.TemporaryDirectory() as tmp:
  path = os.path.join(tmp, "model")

  # Save the index.
  tf.saved_model.save(index, path)

  # Load it back; can also be done in TensorFlow Serving.
  loaded = tf.saved_model.load(path)

  # Pass a user id in, get top predicted movie titles back.
  scores, titles = loaded(["42"])

  print(f"Recommendations: {titles[0][:3]}")