# **BOOK RECOMMENDER SYSTEM**

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.stat import Correlation
ratings_path = "/content/ratings.csv"
books_path = "/content/books.csv"

### **Memory Allocation for Spark Session**

In [None]:

spark = SparkSession.builder.appName("BooksRecommendationSystem") \
    .config("spark.driver.memory", "8g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.maxResultSize", "4g") \
    .config("spark.memory.fraction", "0.8") \
    .config("spark.memory.storageFraction", "0.5") \
    .config("spark.sql.shuffle.partitions", "200") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.initialExecutors", "2") \
    .config("spark.dynamicAllocation.minExecutors", "1") \
    .config("spark.dynamicAllocation.maxExecutors", "10") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.shuffle.compress", "true") \
    .config("spark.io.compression.codec", "snappy") \
    .config("spark.speculation", "true") \
    .config("spark.network.timeout", "600s") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.default.parallelism", "400") \
    .getOrCreate()


spark.catalog.clearCache()


### **Data Preprossessing**

In [None]:
ratings_df = spark.read.option("header", "true").option("delimiter", ";").csv(ratings_path, inferSchema=True)
books_df = spark.read.option("header", "true").option("delimiter", ";").csv(books_path, inferSchema=True)

In [None]:
ratings_df.limit(5).show()

+-------+----------+-----------+
|User-ID|      ISBN|Book-Rating|
+-------+----------+-----------+
| 276725|034545104X|          0|
| 276726|0155061224|          5|
| 276727|0446520802|          0|
| 276729|052165615X|          3|
| 276729|0521795028|          6|
+-------+----------+-----------+



In [None]:
books_df.limit(5).show()

+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|      ISBN|          Book-Title|         Book-Author|Year-Of-Publication|           Publisher|         Image-URL-S|         Image-URL-M|         Image-URL-L|
+----------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+--------------------+
|0195153448| Classical Mythology|  Mark P. O. Morford|               2002|Oxford University...|http://images.ama...|http://images.ama...|http://images.ama...|
|0002005018|        Clara Callan|Richard Bruce Wright|               2001|HarperFlamingo Ca...|http://images.ama...|http://images.ama...|http://images.ama...|
|0060973129|Decision in Normandy|        Carlo D'Este|               1991|     HarperPerennial|http://images.ama...|http://images.ama...|http://images.ama...|
|0374157065|Flu: The Story of...|    Gina Bari

In [None]:
ratings_df = ratings_df.withColumnRenamed("User-ID", "userid") \
                      .withColumnRenamed("ISBN", "bookid") \
                      .withColumnRenamed("Book-Rating", "rating")

books_df = books_df.withColumnRenamed("ISBN", "bookid") \
                  .withColumnRenamed("Book-Title", "title")

In [None]:
from pyspark.sql.functions import col
ratings_df = ratings_df.withColumn("userid", col("userid").cast("integer")) \
                      .withColumn("bookid", col("bookid").cast("integer")) \
                      .withColumn("rating", col("rating").cast("float"))

In [None]:
# Keep users who rated at least 5 books
ratings_count = ratings_df.groupBy("userid").count().filter(col("count") >= 20)
ratings_df = ratings_df.join(ratings_count, "userid", "inner")

# Keep books with at least 10 ratings
book_ratings_count = ratings_df.groupBy("bookid").count().filter(col("count") >= 30)
ratings_df = ratings_df.join(book_ratings_count, "bookid", "inner")


In [None]:
ratings_df.cache()
books_df.cache()


DataFrame[bookid: string, title: string, Book-Author: string, Year-Of-Publication: int, Publisher: string, Image-URL-S: string, Image-URL-M: string, Image-URL-L: string]

### **Item Based Collaborative Filtering**

In [None]:
print(books_df.columns)
print(ratings_df.columns)

['bookid', 'title', 'Book-Author', 'Year-Of-Publication', 'Publisher', 'Image-URL-S', 'Image-URL-M', 'Image-URL-L']
['bookid', 'userid', 'rating', 'count', 'count']


In [None]:
from pyspark.sql import functions as F

# Compute item-item similarities using self-join
item_similarities = (
    ratings_df.alias("a")
    .join(ratings_df.alias("b"), "userid")
    .filter(F.col("a.bookid") != F.col("b.bookid"))
    .groupBy(F.col("a.bookid").alias("bookid1"), F.col("b.bookid").alias("bookid2"))
    .agg(F.corr("a.rating", "b.rating").alias("similarity"))
    .filter(F.col("similarity").isNotNull())
    .orderBy(F.desc("similarity"))
)

# Join with books_df to get book titles
item_similarities = (
    item_similarities
    .join(books_df.alias("book1"), F.col("bookid1") == F.col("book1.bookid"), "inner")
    .join(books_df.alias("book2"), F.col("bookid2") == F.col("book2.bookid"), "inner")
    .select(
        F.col("book1.title").alias("Book 1"),
        F.col("book2.title").alias("Book 2"),
        "similarity"
    )
)

# Show top similar books
item_similarities.show(10, False)  # Show top 10 similar books


+---------------------------------------------------------------+-------------------+-------------------+
|Book 1                                                         |Book 2             |similarity         |
+---------------------------------------------------------------+-------------------+-------------------+
|The Magic Circle                                               |Mariette in Ecstasy|1.0                |
|Sideways Stories from Wayside School (Wayside School)          |Mariette in Ecstasy|0.5765566601970551 |
|Moon Music (Peter Decker &amp; Rina Lazarus Novels (Paperback))|Mariette in Ecstasy|-0.9819805060619657|
|Summer Sisters                                                 |Mariette in Ecstasy|0.4006756718521207 |
|The Plains of Passage (Earth's Children (Paperback))           |Mariette in Ecstasy|0.0                |
|The Mummy or Ramses the Damned                                 |Mariette in Ecstasy|1.0                |
|A Cup of Tea (Ballantine Reader's Circle)    

In [None]:
spark.catalog.clearCache()

### **User Based Collaborative Filtering (Using Pearson Correlation)**

In [None]:
from pyspark.sql import functions as F

# Compute user similarities using self-join
user_similarities = (
    ratings_df.alias("a")
    .join(ratings_df.alias("b"), "bookid")  # Use "bookid" instead of "ISBN"
    .filter(F.col("a.userid") != F.col("b.userid"))  # Use "userid" instead of "User-ID"
    .groupBy("a.userid", "b.userid")  # Use "userid"
    .agg(F.corr("a.rating", "b.rating").alias("similarity"))  # Use "rating" instead of "Book-Rating"
    .orderBy(F.desc("similarity"))
)

# Select the most similar users for each user
top_k_users = (
    user_similarities.groupBy("a.userid")
    .agg(F.collect_list("b.userid").alias("similar_users"))
)

# Explode similar_users array to join correctly
top_k_users_exploded = top_k_users.withColumn("similar_user", F.explode(F.col("similar_users")))

# Join with ratings to get books rated by similar users
user_based_recommendations = (
    top_k_users_exploded.alias("users")
    .join(ratings_df.alias("ratings"), F.col("users.similar_user") == F.col("ratings.userid"))
    .groupBy("users.userid", "ratings.bookid")
    .agg(F.avg("ratings.rating").alias("predicted_rating"))  # Use "rating"
)

# Join with books_df to get book titles
user_based_recommendations = (
    user_based_recommendations
    .join(books_df, "bookid")  # Use "bookid"
    .select(
        F.col("userid"),
        F.col("title").alias("Recommended Book"),  # Use "title" instead of "Book-Title"
        "predicted_rating"
    )
)

# Collect recommended books per user
final_recommendations = (
    user_based_recommendations.groupBy("userid")
    .agg(F.collect_list("Recommended Book").alias("recommended_books"))
)

# Show recommendations
final_recommendations.show(5, False)


+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
spark.catalog.clearCache()

### **Slope One Algorithm**

In [None]:
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.window import Window

# Step 1: Compute rating deviations for each book pair
ratings_pairs = (
    ratings_df.alias("a")
    .join(ratings_df.alias("b"), "userid")
    .filter(F.col("a.bookid") < F.col("b.bookid"))
)

# Compute rating difference and count occurrences
diffs = (
    ratings_pairs.withColumn("diff", F.col("a.rating") - F.col("b.rating"))
    .groupBy(F.col("a.bookid").alias("bookid1"), F.col("b.bookid").alias("bookid2"))
    .agg(F.avg("diff").alias("avg_diff"), F.count("diff").alias("count"))
)

# Step 2: Apply Slope One for all users
user_ratings = ratings_df.select("userid", "bookid", "rating")

# Join user ratings with diffs to compute recommendations for all users
predictions = (
    diffs.alias("d")
    .join(user_ratings.alias("ur"), F.col("d.bookid2") == F.col("ur.bookid"))
    .withColumn("weighted_sum", (F.col("ur.rating") + F.col("d.avg_diff")) * F.col("d.count"))
    .withColumn("count_sum", F.col("d.count"))
    .groupBy("userid", "bookid1")
    .agg(F.sum("weighted_sum").alias("total_weighted_sum"), F.sum("count_sum").alias("total_count"))
    .withColumn("rating", (F.col("total_weighted_sum") / F.col("total_count")).cast("float"))
    .select(F.col("userid"), F.col("bookid1").alias("bookid"), "rating")
)

# Show results
predictions.show()


+------+--------+------------+
|userid|  bookid|      rating|
+------+--------+------------+
| 22089|60173289|    0.140625|
| 85526|60926317|   3.0857143|
| 27193|60959037|   2.1505377|
|116183|60540753|    6.366667|
| 35859|20868308|   2.7775817|
| 52584|60740450|  0.21169505|
| 92346|60512822|         4.5|
| 52584|60392452|   2.3621655|
| 28204|61000043|   1.3894324|
| 91100|60740450| -0.85714287|
| 28204|61044431|-0.096062995|
|  3373|61093106|   4.0833335|
| 74691|61094129|   4.6666665|
|104636|60927569|   4.3560863|
| 72238|60958022|   0.6571429|
| 76710|60930535|   1.2020619|
| 28360|61000175|   0.7948718|
|131594|60198133|    2.755319|
|264321| 7154615|   3.6759582|
|198711|60557257| -0.99245286|
+------+--------+------------+
only showing top 20 rows



In [None]:
spark.catalog.clearCache()

## **ALS (Alternative Least Squares - Matrix Factorization)**

In [None]:
als = ALS(
    userCol="userid",
    itemCol="bookid",
    ratingCol="rating",
    rank=10,
    maxIter=10,
    regParam=0.1,
    coldStartStrategy="drop"
)
(training, test) = ratings_df.randomSplit([0.8, 0.2], seed=42)
model = als.fit(training)

# Make Predictions
predictions = model.transform(test)
predictions.show(5)

+--------+------+------+-----+-----+----------+
|  bookid|userid|rating|count|count|prediction|
+--------+------+------+-----+-----+----------+
|60168013| 23768|   0.0| 1708|   30| 0.8221905|
|60168013| 60583|   0.0|   56|   30| 2.2437608|
|60168013| 94347|   0.0|  744|   30|-5.0139685|
|60168013|132930|   9.0|   56|   30| 6.5926194|
|60168013|153513|   0.0|  117|   30| 1.6251277|
+--------+------+------+-----+-----+----------+
only showing top 5 rows



In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 4.330088702610726


### **USER RECOMMENDATION**

In [None]:
user_recommendations = model.recommendForAllUsers(5)
user_recommendations.show(5, False)

+------+-------------------------------------------------------------------------------------------------------------------------+
|userid|recommendations                                                                                                          |
+------+-------------------------------------------------------------------------------------------------------------------------+
|243   |[{811825558, 11.609499}, {345318854, 11.276466}, {440174643, 10.064649}, {446365386, 9.908981}, {141301155, 9.826984}]   |
|388   |[{618129022, 18.472477}, {684801221, 15.61836}, {60809833, 15.204291}, {449215024, 14.390121}, {385314698, 14.372648}]   |
|626   |[{60987561, 0.0}, {60936231, 0.0}, {60934417, 0.0}, {60930187, 0.0}, {60929871, 0.0}]                                    |
|1025  |[{380791978, 17.051695}, {811802981, 15.885312}, {451526341, 15.352804}, {552546933, 15.102768}, {140386335, 14.97331}]  |
|1848  |[{688163165, 18.161911}, {894803204, 15.910597}, {670032379, 14.568929}, {3

### **BOOK RECOMMENDATION**

In [None]:
book_recommendations = model.recommendForAllItems(5)
book_recommendations.show(5, False)

+--------+------------------------------------------------------------------------------------------------------+
|bookid  |recommendations                                                                                       |
+--------+------------------------------------------------------------------------------------------------------+
|1400    |[{265656, 13.827921}, {86489, 13.237922}, {11245, 12.047387}, {36817, 11.583971}, {165512, 11.316549}]|
|20198906|[{185722, 20.27707}, {276670, 18.887873}, {57105, 18.08065}, {178035, 17.06899}, {10030, 16.819187}]  |
|20442505|[{47128, 14.085605}, {266109, 14.056491}, {104334, 12.536686}, {75119, 12.53152}, {236948, 12.42187}] |
|20868308|[{87949, 18.30235}, {13221, 16.930986}, {29526, 16.704365}, {190185, 15.351534}, {3167, 15.115448}]   |
|60085444|[{154781, 16.09783}, {155810, 15.134682}, {66518, 14.56434}, {232066, 13.778312}, {262541, 13.777999}]|
+--------+------------------------------------------------------------------------------

In [None]:
spark.stop()