In [1]:
from pyspark.sql import SparkSession

from typing import Dict, Text

import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
import pandas as pd

In [2]:
spark = SparkSession.builder.appName("retrival").getOrCreate()

In [6]:
dataPath = "hdfs://localhost:9000/user/nhom7/book/data/"
ratingsFilePath = dataPath + "BX-Book-Ratings.csv"
rating_df = spark.read.options(inferSchema="true", header="true", delimiter=';').csv(ratingsFilePath)

In [7]:
ratings = tf.data.Dataset.from_tensor_slices(dict(rating_df.toPandas()))
ratings = ratings.map(lambda x: {
    "isbn": x["ISBN"],
    "user_id": x["User-ID"],
    "user_rating": x["Book-Rating"]
})

Instructions for updating:
Lambda fuctions will be no more assumed to be used in the statement where they are used, or at least in the same block. https://github.com/tensorflow/tensorflow/issues/56089


In [8]:
tf.random.set_seed(42)
shuffled = ratings.shuffle(100_000, seed=42, reshuffle_each_iteration=False)

train = shuffled.take(80_000)
test = shuffled.skip(80_000).take(20_000)

In [9]:
book_isbns = ratings.batch(1_000_000).map(lambda x: x["isbn"])
user_ids = ratings.batch(1_000_000).map(lambda x: x["user_id"])

unique_book_isbns = np.unique(np.concatenate(list(book_isbns)))
unique_user_ids = np.unique(np.concatenate(list(user_ids)))

In [16]:
class RankingModel(tf.keras.Model):

  def __init__(self):
    super().__init__()
    embedding_dimension = 32

    # Compute embeddings for users.
    self.user_embeddings = tf.keras.Sequential([
      tf.keras.layers.IntegerLookup(
        vocabulary=unique_user_ids, mask_token=None),
      tf.keras.layers.Embedding(len(unique_user_ids) + 1, embedding_dimension)
    ])

    # Compute embeddings for movies.
    self.book_embeddings = tf.keras.Sequential([
      tf.keras.layers.StringLookup(
        vocabulary=unique_book_isbns, mask_token=None),
      tf.keras.layers.Embedding(len(unique_book_isbns) + 1, embedding_dimension)
    ])

    # Compute predictions.
    self.ratings = tf.keras.Sequential([
      # Learn multiple dense layers.
      tf.keras.layers.Dense(256, activation="relu"),
      tf.keras.layers.Dense(64, activation="relu"),
      # Make rating predictions in the final layer.
      tf.keras.layers.Dense(1)
    ])

  def call(self, inputs):

    user_id, book_isbn = inputs

    user_embedding = self.user_embeddings(user_id)
    book_embedding = self.book_embeddings(book_isbn)

    return self.ratings(tf.concat([user_embedding, book_embedding], axis=1))

In [17]:
class BookRecModel(tfrs.models.Model):

  def __init__(self):
    super().__init__()
    self.ranking_model: tf.keras.Model = RankingModel()
    self.task: tf.keras.layers.Layer = tfrs.tasks.Ranking(
      loss = tf.keras.losses.MeanSquaredError(),
      metrics=[tf.keras.metrics.RootMeanSquaredError()]
    )

  def call(self, features: Dict[str, tf.Tensor]) -> tf.Tensor:
    return self.ranking_model(
        (features["user_id"], features["isbn"]))

  def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
    labels = features.pop("user_rating")

    rating_predictions = self(features)

    # The task computes the loss and the metrics.
    return self.task(labels=labels, predictions=rating_predictions)

In [18]:
model = BookRecModel()
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))

In [19]:
cached_train = train.shuffle(100_000).batch(8192).cache()
cached_test = test.batch(4096).cache()

In [20]:
model.fit(cached_train, epochs=3)

Epoch 1/3
Epoch 2/3
Epoch 3/3


<keras.callbacks.History at 0x1861e2c3b20>

In [21]:
model.evaluate(cached_test, return_dict=True)



{'root_mean_squared_error': 3.626434564590454,
 'loss': 13.36085033416748,
 'regularization_loss': 0,
 'total_loss': 13.36085033416748}

In [24]:
test_ratings = {}
test_book_isbns = ["0345404793",
                "0380841940",
                "0451129040",
                "0812510488",
                "0553114271",]
for book_isbn in test_book_isbns:
  test_ratings[book_isbn] = model({
      "user_id": np.array([12]),
      "isbn": np.array([book_isbn])
  })

print("Ratings:")
for title, score in sorted(test_ratings.items(), key=lambda x: x[1], reverse=True):
  print(f"{title}: {score}")

Ratings:
0812510488: [[2.9836402]]
0451129040: [[2.972382]]
0380841940: [[2.9580398]]
0345404793: [[2.9505556]]
0553114271: [[2.936958]]


In [23]:
path = ("./model/ranking_model/1/")
tf.saved_model.save(model, path)



INFO:tensorflow:Assets written to: ./model/ranking_model/1/assets


INFO:tensorflow:Assets written to: ./model/ranking_model/1/assets


In [25]:
np.array([12])

array([12])