In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import functions as F
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import IntegerType
from pyspark.ml.feature import StringIndexer

In [2]:
# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Book Recommendation System") \
    .master("local[*]") \
    .getOrCreate()

# Load dataset
ratings = spark.read.csv('artifacts/Books_rating.csv', header=True, inferSchema=True)
ratings = ratings.selectExpr("cast(Id as int) as bookid", "User_id as userid", "cast(`review/score` as float) as rating")

In [3]:
# Remove rows with null values in bookid, userid, or rating columns
ratings = ratings.dropna(subset=["bookid", "userid", "rating"])

In [4]:
ratings.show(5)

+----------+--------------+------+
|    bookid|        userid|rating|
+----------+--------------+------+
|1882931173| AVCGYZL8FQQTD|   4.0|
| 826414346|A30TK6U7DNS82R|   5.0|
| 826414346|A3UH4UZ4RSVO82|   5.0|
| 826414346|A2MVUWT453QH61|   4.0|
| 826414346|A22X4XUPKF66MR|   4.0|
+----------+--------------+------+
only showing top 5 rows



In [5]:
# Apply StringIndexer to `userid`
indexer = StringIndexer(inputCol="userid", outputCol="userIndex")
ratings = indexer.fit(ratings).transform(ratings)

# Convert `userIndex` to integer, if required
ratings = ratings.withColumn("userIndex", ratings["userIndex"].cast(IntegerType()))

In [6]:
# Min-Max scaling
min_rating, max_rating = ratings.agg(F.min("rating"), F.max("rating")).first()
ratings = ratings.withColumn(
    "rating_normalized", (ratings["rating"] - min_rating) / ( max_rating - min_rating)
)

In [7]:
ratings.printSchema()

root
 |-- bookid: integer (nullable = true)
 |-- userid: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- userIndex: integer (nullable = true)
 |-- rating_normalized: double (nullable = true)



In [8]:
ratings.show(5)

+----------+--------------+------+---------+-----------------+
|    bookid|        userid|rating|userIndex|rating_normalized|
+----------+--------------+------+---------+-----------------+
|1882931173| AVCGYZL8FQQTD|   4.0|   125092|             0.75|
| 826414346|A30TK6U7DNS82R|   5.0|       32|              1.0|
| 826414346|A3UH4UZ4RSVO82|   5.0|    49888|              1.0|
| 826414346|A2MVUWT453QH61|   4.0|     2820|             0.75|
| 826414346|A22X4XUPKF66MR|   4.0|    20168|             0.75|
+----------+--------------+------+---------+-----------------+
only showing top 5 rows



In [9]:
# Split data into training and test sets
training, test = ratings.randomSplit([0.8, 0.2])

In [10]:
print(f"Training data count: {training.count()}")
print(f"Test data count: {test.count()}")

Training data count: 839370
Test data count: 209825


In [11]:
# Define the ALS model
als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="userIndex",
    itemCol="bookid",
    ratingCol="rating_normalized",
    coldStartStrategy="drop"
)

In [12]:
# Fit the model on the training data
model = als.fit(training)

In [13]:
# Evaluate the model on the test data
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating_normalized",
    predictionCol="prediction"
)
predictions = model.transform(test)
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 0.42631756329224046


In [14]:
# Select the first 10 users from the DataFrame 
users_subset = ratings.select("userIndex").distinct().limit(10)

In [15]:
# Get top 5 book recommendations for selected users
user_recommendations = model.recommendForUserSubset(users_subset, 5)
user_recommendations.show(truncate=False)

+---------+------------------------------------------------------------------------------------------------------------------------------+
|userIndex|recommendations                                                                                                               |
+---------+------------------------------------------------------------------------------------------------------------------------------+
|409910   |[{976203952, 0.969468}, {1930429487, 0.67282873}, {671505076, 0.63108337}, {595219152, 0.62023413}, {1401899226, 0.6188895}]  |
|442022   |[{976203952, 1.3084507}, {801484294, 1.0401274}, {966765907, 1.0350088}, {874805635, 1.0350088}, {1843920247, 0.9633909}]     |
|3794     |[{976203952, 1.521776}, {801484294, 0.90918815}, {1572240326, 0.861024}, {1561631809, 0.860424}, {810817411, 0.8502454}]      |
|18654    |[{1841450251, 0.92235994}, {1858288789, 0.89032066}, {791454428, 0.88175905}, {851706622, 0.8805537}, {940567067, 0.86939144}]|
|443065   |[{976203952, 0.6

In [16]:
items_subset = ratings.select("bookid").distinct().limit(10)

In [17]:
# Get top 5 user recommendations for the selected books
item_recommendations = model.recommendForItemSubset(items_subset, 5)
item_recommendations.show(truncate=False)

+----------+------------------------------------------------------------------------------------------------------------+
|bookid    |recommendations                                                                                             |
+----------+------------------------------------------------------------------------------------------------------------+
|1585972800|[{608149, 0.92461187}, {601901, 0.92461187}, {487598, 0.92461187}, {38884, 0.9227219}, {579405, 0.89940804}]|
|691005141 |[{43125, 0.6886607}, {256427, 0.68246627}, {437648, 0.68052465}, {194090, 0.68052465}, {507223, 0.6734114}] |
|711934924 |[{179401, 0.8830842}, {350948, 0.86518353}, {71131, 0.8271331}, {371197, 0.8085005}, {200382, 0.80259246}]  |
|815155565 |[{380120, 0.6774991}, {320124, 0.65003926}, {400122, 0.64351976}, {317910, 0.64351976}, {282357, 0.6198892}]|
|880010495 |[{41495, 0.99852574}, {437648, 0.96930134}, {194090, 0.96930134}, {488433, 0.9495964}, {58245, 0.9465443}]  |
|807041076 |[{330691, 1.

In [18]:
spark.stop()