In [161]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
#it is important to set enough memory to SparkSession
#or else it will trigure outOfMemoryError
spark = SparkSession.builder.appName('BG_final_project')\
                            .config("spark.driver.memory", "8g")\
                            .config("spark.executor.memory", "8g")\
                            .getOrCreate()

Load data

In [162]:
pre = spark.read.option('header', True)\
                .option("inferschema",True)\
                .csv("preprocess_20.csv")

In [163]:
pre = pre.withColumn('Title', regexp_replace(col('Title'), '""', '"'))
pre = pre.withColumn('Title', regexp_replace(col('Title'), '^"|"$', ''))

In [164]:
#pre.count()

In [165]:
#pre.show(5,truncate=False)

In [166]:
#pre.printSchema()

Recommender model

In [167]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer

In [168]:
#map Title and User_id to index using StringIndexer so that bid, uid can fit in ML model
indexer = StringIndexer(inputCol="Title", outputCol="bid")
indexedData = indexer.fit(pre).transform(pre)
#indexedData.show(5)

In [169]:
#StrinIndexer can not handle null value, use setHandleInvalid to skip the invalid values 
user_indexer = StringIndexer(inputCol="User_id", outputCol="uid")
pre_id = user_indexer.setHandleInvalid("skip").fit(indexedData).transform(indexedData)
#pre_id.show(5)

In [170]:
pre_id.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- authors: string (nullable = true)
 |-- categories: string (nullable = true)
 |-- rates: double (nullable = true)
 |-- bid: double (nullable = false)
 |-- uid: double (nullable = false)



In [207]:
data=pre_id[["uid","bid","rates"]]

In [208]:
(train_df,val_df,test_df)= data.randomSplit([0.6, 0.2,0.2])

In [180]:
'''from pyspark.sql.types import IntegerType
data =data.withColumn("rates",data["rates"].cast(IntegerType()))'''

'from pyspark.sql.types import IntegerType\ndata =data.withColumn("rates",data["rates"].cast(IntegerType()))'

In [181]:
als = ALS(maxIter=10, 
          regParam=0.01, 
          userCol="uid", 
          itemCol="bid", 
          ratingCol="rates", 
          coldStartStrategy="drop")

In [182]:
als_model = als.fit(train_df)

In [209]:
als_predictions = als_model.transform(val_df)

In [184]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rates", predictionCol="prediction")

In [185]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# create the parameter grid
params = ParamGridBuilder().addGrid(als.regParam, [.01, .05, .1, .15])\
                           .addGrid(als.rank, [10, 50, 100, 150])\
                           .build()

#instantiating crossvalidator estimator
cv = CrossValidator(estimator=als, 
                    estimatorParamMaps=params, 
                    evaluator=evaluator, 
                    parallelism=4,
                    numFolds=5)

best_model = cv.fit(train_df)
cv_model = best_model.bestModel


In [231]:
'''
#instantiating crossvalidator estimator
cv_1 = CrossValidator(estimator=als, estimatorParamMaps=params, evaluator=evaluator, parallelism=4)
cv_1_best_model = cv_1.fit(train_df)
cv_1_model = cv_1_best_model.bestModel'''

In [237]:
cv_3 = CrossValidator(estimator=als, estimatorParamMaps=params, evaluator=evaluator, parallelism=4,numFolds=3)
cv_3_best_model = cv_3.fit(train_df)
cv_3_model = cv_3_best_model.bestModel

In [232]:
#cv_1_model

ALSModel: uid=ALS_1440c846b0e6, rank=150

In [238]:
cv_3_model

ALSModel: uid=ALS_1440c846b0e6, rank=150

In [239]:
cv_3_predictions = cv_3_model.transform(val_df)

cv_3_rmse = evaluator.evaluate(cv_3_predictions)
print("Root Mean Square Error for 3-fold cross validation on validation data = " + str(cv_3_rmse))

Root Mean Square Error for 3-fold cross validation on validation data = 0.9693235416495645


In [186]:
cv_model

ALSModel: uid=ALS_1440c846b0e6, rank=150

In [210]:
cv_predictions = cv_model.transform(val_df)

In [233]:
'''cv_1_predictions = cv_1_model.transform(val_df)

cv_1_rmse = evaluator.evaluate(cv_1_predictions)
print("Root Mean Square Error for cross validation on validation data = " + str(cv_1_rmse))'''

Root Mean Square Error for cross validation on validation data = 0.9693235416495645


In [235]:
als_rmse = evaluator.evaluate(als_predictions)
print("Root Mean Square Error for ALS on validation data = " + str(als_rmse))

cv_rmse = evaluator.evaluate(cv_predictions)
print("Root Mean Square Error for 5-fold cross validation on validation data = " + str(cv_rmse))

Root Mean Square Error for ALS on validation data = 0.2740378532403792
Root Mean Square Error for 5-fold cross validation on validation data = 0.3628177611688855


In [236]:
als_test_predictions=als_model.transform(test_df)

als_test_rmse = evaluator.evaluate(als_test_predictions)
print("Root Mean Square Error for ALS on test data = " + str(als_test_rmse))

cv_test_predictions=cv_model.transform(test_df)

cv_test_rmse = evaluator.evaluate(cv_test_predictions)
print("Root Mean Square Error for 5-fold cross validation on test data = " + str(cv_test_rmse))

Root Mean Square Error for ALS on test data = 0.26911412723790346
Root Mean Square Error for 5-fold cross validation on test data = 0.36014514156426813


In [250]:
# Generate top 10 book recommendations for each user
userRecs = cv_model.recommendForUserSubset(cv_test_predictions.select("uid"),10)

In [253]:
#userRecs.show(truncate=False)

+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|uid|recommendations                                                                                                                                                                                             |
+---+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|12 |[{139391, 6.5770655}, {38611, 6.145306}, {126020, 6.1290016}, {19582, 5.922312}, {202935, 5.5806875}, {7262, 5.4845986}, {60689, 5.386907}, {25127, 5.3861046}, {180252, 5.356891}, {18776, 5.318654}]      |
|22 |[{38734, 4.805154}, {38603, 4.805154}, {120293, 4.805154}, {182922, 4.805154}, {30663, 4.805154}, {194290, 4.805154}, {179150, 4.805154}, {92730, 4.805

In [192]:
#userRecs.printSchema()

root
 |-- uid: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- bid: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [217]:
userMapping = pre_id.select("User_id", "uid").distinct()

In [190]:
#userMapping.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- uid: double (nullable = false)



In [254]:
userMapping = userMapping.alias("mapping")
userRecs = userRecs.alias("recs")

In [255]:
indexedUserRecs = userRecs.join(userMapping, col("recs.uid") == col("mapping.uid"), "left_outer")
indexedUserRecs = indexedUserRecs.select(col("mapping.User_id"), col("recs.recommendations.bid"),col("recs.recommendations.rating"))

In [256]:
#indexedUserRecs.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- bid: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- rating: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [257]:
#indexedUserRecs.show(5)

+--------------+--------------------+--------------------+
|       User_id|                 bid|              rating|
+--------------+--------------------+--------------------+
|A2NJO6YE954DBH|[139391, 38611, 1...|[6.5770655, 6.145...|
|A1RAUVCWYHTQI4|[38734, 38603, 12...|[4.805154, 4.8051...|
|A34UTL4AVX80MK|[139391, 38611, 1...|[6.7133007, 6.652...|
| AUTBHG6070SL4|[139391, 38611, 1...|[6.1642537, 5.850...|
|A22DUZU3XVA8HA|[38611, 126020, 1...|[6.4856834, 6.342...|
+--------------+--------------------+--------------------+
only showing top 5 rows



In [271]:
# Explode the recommendations column to show each recommendation in a separate row
explodedRecs = indexedUserRecs.select("User_id",  explode("bid").alias("bid"))
#explodedRecs=explodedRecs.select("User_id","book_id",explode("rating").alias("rating"))

In [260]:
#explodedRecs.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
#explodedRecs.show(5,truncate=False)

In [222]:
bookMapping = pre_id.select("Title", "bid").distinct()

In [227]:
bookMapping = bookMapping.alias("book")
explodedRecs = explodedRecs.alias("exrecs")

In [228]:
indexeded_explodedRecs = explodedRecs.join(bookMapping, col("exrecs.book_id") == col("book.bid"), "left_outer")
indexeded_explodedRecs = indexeded_explodedRecs.select(col("exrecs.User_id"), col("book.Title"))

In [229]:
#indexeded_explodedRecs.show(5,truncate=False)

+-------------+-------------------------------------+
|User_id      |Title                                |
+-------------+-------------------------------------+
|AUTBHG6070SL4|Penny Plain [Hardcover] by O. Douglas|
|AUTBHG6070SL4|The Diary of Anne Frank.             |
|AUTBHG6070SL4|Messages from Water, Vol. 1          |
|AUTBHG6070SL4|Transmetropolitan Vol. 5: Lonely City|
|AUTBHG6070SL4|Vagueness: A Reader (Bradford Books) |
+-------------+-------------------------------------+
only showing top 5 rows



In [265]:
#user_info=pre_id.filter(col("User_id")==random_user).select("Title","rates")


In [268]:
#user_info.sort("Title").show(user_info.count(),truncate=False)

+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|Title                                                                                                                                                                                             |rates|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----+
|1601 And Is Shakespeare Dead? (Mark Twain Works)                                                                                                                                                  |4.0  |
|A Bit of Madness                                                                                                                                                                           

In [230]:
#change userId to any userId you want in the validation data
#and you can see the result of 10 books recommended to the specific userId

userId = "A14OJS0VWMOSWO"
# or you can randomly select one User_id from the dataframe
random_user = indexeded_explodedRecs.select(col("User_id")).sample(False, 0.1).limit(1).first()["User_id"]

rec = indexeded_explodedRecs.filter(col("User_id") == random_user).select("Title")
rec.show(truncate=False)

+----------------------------------------------------------------------------+
|Title                                                                       |
+----------------------------------------------------------------------------+
|Penny Plain [Hardcover] by O. Douglas                                       |
|The Diary of Anne Frank.                                                    |
|Messages from Water, Vol. 1                                                 |
|Transmetropolitan Vol. 5: Lonely City                                       |
|Vagueness: A Reader (Bradford Books)                                        |
|The Cake Bible                                                              |
|Betsy and the Great World (Betsy-Tacy Ser.)                                 |
|Pajama Time!                                                                |
|The Rough Guide to Russian Dictionary Phrasebook 2 (Rough Guide Phrasebooks)|
|Jocks: True Stories of America's Gay Male Athletes 

In [280]:
rec_split = userRecs.withColumn("rec", explode("recommendations"))
rec_split=rec_split.select("uid", "rec.bid", "rec.rating")

In [281]:
#rec_split.show(5)

+---+------+---------+
|uid|   bid|   rating|
+---+------+---------+
| 12|139391|6.5770655|
| 12| 38611| 6.145306|
| 12|126020|6.1290016|
| 12| 19582| 5.922312|
| 12|202935|5.5806875|
+---+------+---------+
only showing top 5 rows



In [283]:
rec_split = rec_split.alias("rec_split")

In [285]:
rec_map_user = rec_split.join(userMapping, col("rec_split.uid") == col("mapping.uid"), "left_outer")
rec_map_user = rec_map_user.select(col("mapping.User_id"), col("rec_split.bid"),col("rec_split.rating"))

In [286]:
#rec_map_user.show(5)

+--------------+------+---------+
|       User_id|   bid|   rating|
+--------------+------+---------+
|A2NJO6YE954DBH|139391|6.5770655|
|A2NJO6YE954DBH| 38611| 6.145306|
|A2NJO6YE954DBH|126020|6.1290016|
|A2NJO6YE954DBH| 19582| 5.922312|
|A2NJO6YE954DBH|202935|5.5806875|
+--------------+------+---------+
only showing top 5 rows



In [293]:
rec_map_book = rec_map_user.join(bookMapping, col("rec_split.bid") == col("book.bid"), "left_outer")
rec_map_book = rec_map_book.select(col("mapping.User_id"),col("book.Title") ,col("rec_split.rating"))

In [294]:
#rec_map_book.show(5)

+--------------+--------------------+---------+
|       User_id|               Title|   rating|
+--------------+--------------------+---------+
|A2NJO6YE954DBH|Penny Plain [Hard...|6.5770655|
|A2NJO6YE954DBH|The Diary of Anne...| 6.145306|
|A2NJO6YE954DBH|Messages from Wat...|6.1290016|
|A2NJO6YE954DBH|Transmetropolitan...| 5.922312|
|A2NJO6YE954DBH|Vagueness: A Read...|5.5806875|
+--------------+--------------------+---------+
only showing top 5 rows



In [295]:
#rec_map_book.printSchema()

root
 |-- User_id: string (nullable = true)
 |-- Title: string (nullable = true)
 |-- rating: float (nullable = true)



In [296]:
# Define the minimum and maximum values for the rating range
min_rating = rec_map_book.selectExpr("MIN(rating)").collect()[0][0]
max_rating = rec_map_book.selectExpr("MAX(rating)").collect()[0][0]

In [307]:
print("min_rating = ",min_rating)
print("max_rating = ",max_rating)

min_rating =  1.0409730672836304
max_rating =  7.3838210105896


In [299]:
# Linearly transform the rating column to the range 1-5
linear_ranged_rec = rec_map_book.withColumn("transformed_rating", 
                    1 + (5 - 1) * ((col("rating") - min_rating) / (max_rating - min_rating)))

In [300]:
#linear_ranged_rec.show(5)

+--------------+--------------------+---------+------------------+
|       User_id|               Title|   rating|transformed_rating|
+--------------+--------------------+---------+------------------+
|A2NJO6YE954DBH|Penny Plain [Hard...|6.5770655| 4.491234505404437|
|A2NJO6YE954DBH|The Diary of Anne...| 6.145306| 4.218953434622625|
|A2NJO6YE954DBH|Messages from Wat...|6.1290016| 4.208671307038188|
|A2NJO6YE954DBH|Transmetropolitan...| 5.922312| 4.078326177252845|
|A2NJO6YE954DBH|Vagueness: A Read...|5.5806875| 3.862887142294254|
+--------------+--------------------+---------+------------------+
only showing top 5 rows



In [303]:
result = linear_ranged_rec.filter(col("User_id") == random_user)\
                       .select("Title",col("transformed_rating").alias("predicted_rating"))
result.show(truncate=False)

+----------------------------------------------------------------------------+------------------+
|Title                                                                       |predicted_rating  |
+----------------------------------------------------------------------------+------------------+
|Penny Plain [Hardcover] by O. Douglas                                       |4.23090238972148  |
|The Diary of Anne Frank.                                                    |4.032834987214718 |
|Messages from Water, Vol. 1                                                 |4.0238636482168495|
|Transmetropolitan Vol. 5: Lonely City                                       |3.8759736778526177|
|Vagueness: A Reader (Bradford Books)                                        |3.666720217848312 |
|The Cake Bible                                                              |3.6033128121671942|
|Betsy and the Great World (Betsy-Tacy Ser.)                                 |3.536531456511012 |
|Pajama Time!       

In [350]:
total_rate = result.select(round(sum("predicted_rating"),3)).collect()[0][0]

In [351]:
#total_rate

37.397

In [343]:
from pyspark.sql.functions import round
rounded_acc = result.select(round(sum("predicted_rating")/50*100, 3)).collect()[0][0]


In [344]:
#rounded_acc

74.795

In [352]:
print(f"The prediction gets {total_rate} stars out of 50 stars.")
print(f"Predicted Accuracy = {rounded_acc}%")

The prediction gets 37.397 stars out of 50 stars.
Predicted Accuracy = 74.795%
