## Book recommendation engine

In this notebook I will build a book recommendation engine using `PySpark` and `ALS`. I will use the **GoodReads 10k** dataset to train the model. This dataset contains the following files:
1. `book_tags.csv` - Contains data for user tagged genres for each book.
2. `books.csv` - Contains book metadata like `title`, `author`, `ISBN` etc.
3. `ratings.csv` - Contains implicit ratings for each book given by the users, used for training an explicit rating model.
4. `tags.csv` - Contains all the `genres` and their corresponding `tag_id`
5. `to_read.csv` - Lists books marked as *to read* by the users, used for training an implicit rating model.

The dataset contains 10,000 books and around 53,400 users. I will train both explicit ratings and implicit ratings models, using ALS and compare their performance. The best model will be saved and used in the book recommendation app later on.

In [None]:
# Importing required libraries

import os
import zipfile
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

import shutil
from google.colab import files

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import DoubleType
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator, RankingEvaluator

In [None]:
spark = SparkSession.builder.appName("good_reads").getOrCreate()

In [None]:
# Loading the dataset into spark dataframes

books_df = spark.read.csv("Books.csv", header=True, inferSchema=True)
ratings_df = spark.read.csv("Ratings.csv", header=True, inferSchema=True)
to_read_df = spark.read.csv("to_read.csv", header=True, inferSchema=True)

In [None]:
# Exploring ratings dataset

r, c = ratings_df.count(), len(ratings_df.columns)
ratings_df.show(5)
print(f"Shape of ratings: {r, c}")
print(f"Number of unique books: {ratings_df.select('book_id').distinct().count()}")
print(f"Number of unique users: {ratings_df.select('user_id').distinct().count()}")
print(f"Average rating: {ratings_df.agg(F.avg('rating')).collect()[0][0]:.2f}")

In [None]:
# Let's see how many ratings for each book

book_ratings = ratings_df.groupBy("book_id").agg(F.count("rating").alias("num_ratings"))
book_ratings_pd = book_ratings.toPandas()

# Plotting
plt.figure(figsize=(10, 6))
sns.scatterplot(data=book_ratings_pd, x="book_id", y="num_ratings")
plt.title("Number of Ratings for each Book")
plt.xlabel("Book ID")
plt.ylabel("Number of Ratings")
plt.show();

So most of the books in the dataset have substantial ratings with a few outliers that have less than 20 ratings. Let us explore the `to_read` dataframe.

In [None]:
# Exploring to_read dataset

r, c = to_read_df.count(), len(to_read_df.columns)
to_read_df.show(5)
n = to_read_df.groupBy("user_id").agg(F.count("book_id").alias("num_books_to_read")) \
              .agg(F.avg("num_books_to_read")).collect()[0][0]
print(f"Shape of ratings: {r, c}")
print(f"Number of unique books: {to_read_df.select('book_id').distinct().count()}")
print(f"Number of unique users: {to_read_df.select('user_id').distinct().count()}")
print(f"On average each user has {n:.2f} books in their to read list.")

So we see that not every book is in users to read list. Also not every user has a to read list. Let us now head towards making the **explicit rating** recommendation model.

In [None]:
%%time

ratings_train, ratings_test = ratings_df.randomSplit([0.8, 0.2])

als = ALS(userCol="user_id", itemCol="book_id", ratingCol="rating",
          coldStartStrategy="drop", nonnegative=True, rank=64,
          regParam=0.08)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

model = als.fit(ratings_train)
predictions = model.transform(ratings_test)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse:.4f}")

In [None]:
# Calculating Precision@K

K = 5
predictions = model.recommendForAllUsers(5)
actual_user_books = ratings_test.groupBy("user_id") \
                                .agg(F.collect_set("book_id").alias("actual_books"))
actual_and_preds = actual_user_books.join(predictions, on="user_id", how="inner")

In [None]:
def precision_at_k(actual, pred):
    actual_set = set(actual)
    pred_set = set([x.book_id for x in pred])

    if not actual_set:
        return 0.0

    return 1.0 * len(pred_set & actual_set) / len(pred_set)


def recall_at_k(actual, pred):
    actual_set = set(actual)
    pred_set = set([x.book_id for x in pred])

    if not actual_set:
        return 0.0

    return len(pred_set & actual_set) / len(actual_set)


precision_udf = F.udf(precision_at_k, DoubleType())
recall_udf = F.udf(recall_at_k, DoubleType())

In [None]:
metrics_df = actual_and_preds.withColumn("precision_at_5", precision_udf("actual_books", "recommendations")) \
                             .withColumn("recall_at_5", recall_udf("actual_books", "recommendations"))

precision_5 = metrics_df.agg(F.avg("precision_at_5")).first()[0]
recall_5 = metrics_df.agg(F.avg("recall_at_5")).first()[0]

print(f"Average precision at 5: {precision_5:.4f}")
print(f"Average recall at 5: {recall_5:.4f}")

While the explicit ratings model has good RMSE, the precision@K and recall@K are abysmal. This is expected as the model tends to recommend popular books instead of personalizing the recommendations. Now let's work on an implicit model by using the `ratings` and `to_read` data.

In [None]:
weights = {
    "TO_READ": 2,
    "RATING_HIGH": 6,
    "RATING_MID": 2,
    "RATING_LOW": 0.2
}


# Taking only the highest rating for a book by a user
ratings_collapsed = ratings_df.groupBy("user_id", "book_id").agg(F.max("rating").alias("rating"))

# Applying weights to ratings
ratings_weighted = ratings_collapsed.withColumn(
                                        "w",
                                        F.when(F.col("rating") >= 4, F.lit(weights["RATING_HIGH"]))
                                         .when(F.col("rating") == 3, F.lit(weights["RATING_MID"]))
                                         .otherwise(F.lit(weights["RATING_LOW"]))
                                        ) \
                                        .select("user_id", "book_id", "w")

# To read weights
to_read_weighted = to_read_df.withColumn("w", F.lit(weights["TO_READ"]))

events = ratings_weighted.unionByName(to_read_weighted)
events.show(5)

+-------+-------+---+
|user_id|book_id|  w|
+-------+-------+---+
|  33697|      4|6.0|
|  40490|      6|2.0|
|  23303|     15|6.0|
|  28767|     16|6.0|
|  51460|     16|6.0|
+-------+-------+---+
only showing top 5 rows



In [None]:
# Aggregating weighted preferences and clipping it to 0-5

preferences = events.groupBy("user_id", "book_id") \
                   .agg(F.sum("w").alias("preference")) \
                   .withColumn("preference", F.least(F.col("preference"), F.lit(10)))

In [None]:
pref_train, pref_test = preferences.randomSplit([0.8, 0.2])

als_imp = ALS(userCol="user_id", itemCol="book_id", ratingCol="preference",
              rank=48, regParam=0.2, alpha=80, implicitPrefs=True,
              nonnegative=True, coldStartStrategy="drop", maxIter=12)

implicit_model = als_imp.fit(pref_train)

In [None]:
test_users = pref_test.select("user_id").distinct()
predictions = implicit_model.recommendForUserSubset(test_users, numItems=5)
predictions.show(5)

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      1|[{359, 0.7740007}...|
|      3|[{1035, 0.8792589...|
|      5|[{5828, 0.9602315...|
|      6|[{472, 1.0375816}...|
|      9|[{2664, 1.0572494...|
+-------+--------------------+
only showing top 5 rows



In [None]:
# Getting all the book_ids from recommendations

predictions = predictions.withColumn(
    "predicted_book_ids", F.expr("transform(recommendations, x -> x.book_id)")
)

In [None]:
# Grouping to read books per user

per_user_books = pref_test.groupBy("user_id") \
                          .agg(F.collect_set("book_id").alias("actual_book_ids"))

In [None]:
# Joining the two data frames

joined = per_user_books.join(predictions, on="user_id", how="inner") \
                       .select("user_id", "actual_book_ids", "predicted_book_ids")
joined.show(5)

+-------+--------------------+--------------------+
|user_id|     actual_book_ids|  predicted_book_ids|
+-------+--------------------+--------------------+
|      1|[6285, 3334, 533,...|[359, 5628, 9173,...|
|      3|              [5448]| [1035, 1, 5, 4, 14]|
|      5|              [3074]|[5828, 6965, 5762...|
|      6|              [5820]|[472, 2036, 385, ...|
|      9|[7285, 230, 2516,...|[2664, 822, 638, ...|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Evaluating precision@K and recall@K

joined = joined.where(F.size("actual_book_ids") > 0)
joined = joined.withColumn("actual_book_ids", F.expr("transform(actual_book_ids, x -> double(x))")) \
               .withColumn("predicted_book_ids", F.expr("transform(predicted_book_ids, x -> double(x))"))

evaluator = RankingEvaluator(metricName="precisionAtK", k=5,
                             labelCol="actual_book_ids",
                             predictionCol="predicted_book_ids")

print(f"Average precision at 5: {evaluator.evaluate(joined):.4f}")

evaluator.setMetricName("recallAtK").setK(5)
print(f"Average recall at 5: {evaluator.evaluate(joined):.4f}")

Average precision at 5: 0.0150
Average recall at 5: 0.0117


In [None]:
# Checking matching between itemFactors.id and training dataset

unique_if_ids = implicit_model.itemFactors.select("id").distinct()
unique_pref_ids = pref_train.select("book_id").distinct()

print(f"Number of books in model.itemFactors: {unique_if_ids.count()}")
print(f"Number of books in pref_train: {unique_pref_ids.count()}")

all = unique_if_ids.join(unique_pref_ids, unique_if_ids.id == unique_pref_ids.book_id, how="inner")
print(f"Number of books in both: {all.count()}")

Number of books in model.itemFactors: 10000
Number of books in pref_train: 10000
Number of books in both: 10000


In [None]:
# Saving the models

implicit_model.write().overwrite().save("models/als_implicit")
implicit_model.itemFactors.write.mode("overwrite").parquet("models/item_factors")
books_df.select("book_id","Title","Author").write.mode("overwrite").parquet("models/book_metadata")

In [None]:
# Downloading the models

shutil.make_archive("models", "zip", "models")
files.download("models.zip")

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

Now that we have built and saved our model, its time to build functions to recommend books to users. My use case focuses on getting a book title as input from the user and then recommending 5 books to them, based on the input book. For this I will use the `ALS.itemFactors` and `cosine similarity`. Let's get started.

In [None]:
# Extracting our model checkpoints

extract_to = "models"
zip_path = "models.zip"

os.makedirs(extract_to, exist_ok=True)

with zipfile.ZipFile(zip_path, "r") as f:
    f.extractall(extract_to)

In [None]:
# Loading the models

model = ALSModel.load("models/als_implicit")
item_factors = spark.read.parquet("models/item_factors")
books_df = spark.read.parquet("models/book_metadata")

In [None]:
item_factors.show(5)
books_df.show(5)

+---+--------------------+
| id|            features|
+---+--------------------+
|  7|[0.104591504, 0.0...|
| 17|[0.0024172193, 0....|
| 27|[0.17495234, 0.0,...|
| 37|[0.0, 0.0, 0.0, 0...|
| 47|[0.27273846, 0.11...|
+---+--------------------+
only showing top 5 rows

+-------+--------------------+--------------------+
|book_id|               Title|              Author|
+-------+--------------------+--------------------+
|      1|The Hunger Games ...|     Suzanne Collins|
|      2|Harry Potter and ...|J.K. Rowling, Mar...|
|      3|Twilight (Twiligh...|     Stephenie Meyer|
|      4|To Kill a Mocking...|          Harper Lee|
|      5|    The Great Gatsby| F. Scott Fitzgerald|
+-------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# Creating magnitude column for cosine similarity

def norm(values):
    return sum(x * x for x in values) ** 0.5

norm_udf = F.udf(norm, T.DoubleType())
item_factors = item_factors.withColumn("magnitude", norm_udf("features")).cache()

In [None]:
def recommend_similar_books(book_title: str, k: int = 5):
    """Recommends similar books based on a given book title using cosine similarity.

    Args:
        book_title: The title of the book to find similar recommendations for.
        k: The number of recommendations to return.

    Returns:
        A list of rows containing book information (book_id, Title, Author, similarity)
        for the top k recommended books. Returns an empty list if the book is
        not found or the book vector is not found.
    """
    # Check if book exists in the dataset
    book_id = books_df.filter(books_df["title"] == book_title) \
                      .select("book_id") \
                      .first()

    if not book_id:
        print("book name not found")
        return []

    # Get the associated item vector
    book_vector = item_factors.filter(item_factors["id"] == book_id[0]) \
                              .select("features", "magnitude") \
                              .first()

    if not book_vector:
        print("book vector not found")
        return []

    def cosine_similarity(vec, mag):
        numerator = sum(x * y for x, y in zip(vec, book_vector.features))
        denominator = mag * book_vector.magnitude or 1e-9
        return numerator * 1.0 / denominator

    udf_cosine_sim = F.udf(cosine_similarity, T.DoubleType())
    similarities = item_factors.withColumn("similarity", udf_cosine_sim("features", "magnitude")) \
                               .filter(F.col("id") != book_id[0]) \
                               .orderBy(F.col("similarity").desc()) \
                               .limit(k + 10)

    similar_books = similarities.join(books_df, similarities.id == books_df.book_id, how="inner") \
                                .select("book_id", "Title", "Author", "similarity") \
                                .limit(k) \
                                .collect()
    return similar_books

In [None]:
recs = recommend_similar_books("A Walk to Remember")

In [None]:
[r.Title for r in recs]

['Twilight (Twilight, #1)',
 'Romeo and Juliet',
 'The Lion, the Witch, and the Wardrobe (Chronicles of Narnia, #1)',
 'The Notebook (The Notebook, #1)',
 'New Moon (Twilight, #2)']

In [None]:
# Checking sparsity of the the book vectors

EPS = 1e-8

item_sparsity = (item_factors
  .withColumn("dim", F.size("features"))
  .withColumn(
      "nnz",
      F.expr(f"aggregate(transform(features, x -> IF(ABS(x) < {EPS}, 0, 1)), 0, (acc, v) -> acc + v)")
  )
  .withColumn("sparsity", (F.col("dim") - F.col("nnz")) / F.col("dim"))
)

avg_item_sparsity = item_sparsity.select(F.avg("sparsity").alias("avg_sparsity")).first()[0]
print("Average sparsity over all items:", avg_item_sparsity)

Average sparsity over all items: 0.8249479166666664


Now that our recommender is trained and can make recommendations given a title, I want to try and convert the metadata to pandas for serving. While spark is powerful at processing huge amounts of data fast, it also has a lot of overhead of needing JDK and dependencies. Converting the metadata to pandas will let me serve it using a light-weight `FastAPI` app.

In [None]:
if_pd = item_factors.toPandas()
if_pd.head()

Unnamed: 0,id,features,magnitude
0,7,"[0.104591503739357, 0.0, 0.0, 0.0, 0.0, 0.0, 0...",1.6897
1,17,"[0.0024172193370759487, 0.0, 0.0, 0.0, 0.0, 0....",1.796059
2,27,"[0.17495234310626984, 0.0, 0.0, 0.0, 0.0, 0.0,...",1.617949
3,37,"[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.22473175...",1.617251
4,47,"[0.2727384567260742, 0.11225360631942749, 0.0,...",1.66997


In [None]:
books_pd = books_df.toPandas()
books_pd.head()

Unnamed: 0,book_id,Title,Author
0,1,"The Hunger Games (The Hunger Games, #1)",Suzanne Collins
1,2,Harry Potter and the Sorcerer's Stone (Harry P...,"J.K. Rowling, Mary GrandPré"
2,3,"Twilight (Twilight, #1)",Stephenie Meyer
3,4,To Kill a Mockingbird,Harper Lee
4,5,The Great Gatsby,F. Scott Fitzgerald


In [None]:
# Pandas recommend method

def recommend_books_pd(book_name: str, k: int = 5):
    """Recommends similar books based on a given book title using cosine similarity.

    Args:
        book_name: The title of the book to find similar recommendations for.
        k: The number of recommendations to return.

    Returns:
        A pandas DataFrame containing the Title, Author, and similarity score
        of the top k recommended books. Returns an empty list if the book is
        not found or the book vector is not found.
    """
    book_id = books_pd.loc[books_pd["Title"] == book_name]

    if book_id.empty:
        print("book name not found")
        return []

    book_id = book_id["book_id"].values[0]
    book_vector = if_pd.loc[if_pd["id"] == book_id]

    if book_vector.empty:
        print("book vector not found")
        return []

    book_vector = book_vector["features"].values[0]

    def cosine_similarity(vec):
        numerator = np.dot(vec, book_vector)
        denominator = np.linalg.norm(vec) * np.linalg.norm(book_vector) or 1e-9
        return numerator / denominator

    if_pd["similarity"] = if_pd["features"].apply(cosine_similarity)
    similar_books = if_pd.sort_values(by="similarity", ascending=False) \
                         .head(k + 10)

    similar_books = similar_books[similar_books["id"] != book_id]

    similar_books = similar_books.merge(books_pd, left_on="id", right_on="book_id") \
                                 .sort_values(by="similarity", ascending=False) \
                                 .head(k)

    return similar_books[["Title", "Author", "similarity"]]

In [None]:
if_pd.to_csv("item_factors.csv", index=False)
books_pd.to_csv("books.csv", index=False)