<a href="https://colab.research.google.com/github/Ujjwal1khadka/blog/blob/main/Spark_Book_review_ALS.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [161]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count
from pyspark.sql.functions import col


from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

#Setup Spark Session
spark = SparkSession.builder.appName('Recommender').getOrCreate()
spark

In [162]:
df = spark.read.csv("/content/book_ratings.csv",  header=True, inferSchema=True)

In [163]:
print((df.count(), len(df.columns)))

(981756, 3)


In [164]:
df= df.na.drop()

In [165]:
df.count()

981756

In [187]:
df.show(50)

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|     10|   7001|     4|
|     13|  29819|     4|
|     13|  49297|     5|
|     16|  28158|     4|
|     18|  15318|     3|
|     19|   3922|     5|
|     20|  10288|     2|
|     20|  47730|     5|
|     22|  16377|     5|
|     25|  24582|     5|
|     28|  47800|     5|
|     30|   6630|     3|
|     30|  10610|     5|
|     31|   5379|     4|
|     31|  19526|     5|
|     35|   1169|     1|
|     35|  11285|     3|
|     38|  36099|     4|
|     40|  14248|     3|
|     41|  10249|     5|
|     42|  38798|     3|
|     46|  13544|     4|
|     48|  21733|     4|
|     49|  23612|     3|
|     50|   3022|     3|
|     51|  50289|     2|
|     53|  26942|     3|
|     55|  11868|     5|
|     55|  32748|     5|
|     56|   9804|     3|
|     58|   4536|     2|
|     58|  12455|     4|
|     58|  17228|     5|
|     62|  10140|     2|
|     62|  13360|     4|
|     65|  21217|     3|
|     65|  27934|     4|


In [167]:
df = df.dropDuplicates()
df.show()

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|     10|   7001|     4|
|     13|  29819|     4|
|     13|  49297|     5|
|     16|  28158|     4|
|     18|  15318|     3|
|     19|   3922|     5|
|     20|  10288|     2|
|     20|  47730|     5|
|     22|  16377|     5|
|     25|  24582|     5|
|     28|  47800|     5|
|     30|   6630|     3|
|     30|  10610|     5|
|     31|   5379|     4|
|     31|  19526|     5|
|     35|   1169|     1|
|     35|  11285|     3|
|     38|  36099|     4|
|     40|  14248|     3|
|     41|  10249|     5|
+-------+-------+------+
only showing top 20 rows



In [168]:
df.count()

980112

In [169]:
df.printSchema()


root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [170]:
train_data, test_data = df.randomSplit([0.8, 0.2])


In [171]:
als = ALS( userCol="user_id", itemCol="book_id", ratingCol="rating",
          coldStartStrategy="drop", nonnegative = True, implicitPrefs = False)

In [172]:
type(als)


In [173]:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [10, 50, 75, 100]) \
            .addGrid(als.maxIter, [5, 50, 75, 100]) \
            .addGrid(als.regParam, [.01, .05, .1, .15]) \
            .build()

In [174]:

# Define evaluator as RMSE
evaluator = RegressionEvaluator(metricName = "rmse",
                                labelCol = "rating",
                                predictionCol = "prediction")
# Print length of evaluator
print ("Num models to be tested using param_grid: ", len(param_grid))

Num models to be tested using param_grid:  64


In [175]:

# Build cross validation using CrossValidator
cv = CrossValidator(estimator = als,
                    estimatorParamMaps = param_grid,
                    evaluator = evaluator,
                    numFolds = 5)


In [176]:
print(cv)


CrossValidator_1de0cd9d72b8


In [177]:
model = als.fit(train_data)


In [178]:
predictions = model.transform(test_data)


In [179]:
predictions.show(10)


+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|      1|  10335|     4| 3.9540668|
|      1|  10944|     5|  4.206169|
|      1|  17434|     5|  4.104765|
|      1|  26145|     4| 3.3919623|
|      1|  32923|     5| 3.9538445|
|      1|  33890|     3| 3.3281388|
|      1|  37834|     5| 4.4845634|
|      1|  44397|     5|  4.837988|
|      1|  49298|     4| 3.8482459|
|      1|  51460|     3| 4.1033874|
+-------+-------+------+----------+
only showing top 10 rows



In [180]:
predictions.createOrReplaceTempView("predictions")


In [181]:
# Execute SQL query
sql_query = """
SELECT *
FROM predictions
"""

result = spark.sql(sql_query)


In [182]:
result.show(50)

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|      1|  10335|     4| 3.9540668|
|      1|  10944|     5|  4.206169|
|      1|  17434|     5|  4.104765|
|      1|  26145|     4| 3.3919623|
|      1|  32923|     5| 3.9538445|
|      1|  33890|     3| 3.3281388|
|      1|  37834|     5| 4.4845634|
|      1|  44397|     5|  4.837988|
|      1|  49298|     4| 3.8482459|
|      1|  51460|     3| 4.1033874|
|      2|   6342|     3|  4.122509|
|      2|  10140|     3| 3.1439147|
|      2|  10288|     5|  4.598488|
|      2|  10751|     3|  4.042369|
|      2|  17434|     5| 3.9249997|
|      2|  17566|     4| 3.7731078|
|      2|  19724|     5|  4.557031|
|      2|  21487|     5| 4.0531945|
|      2|  27499|     5|  4.637166|
|      2|  33716|     5| 3.9981987|
|      2|  47800|     5|  4.359119|
|      2|  48687|     1| 4.0429144|
|      3|   5885|     4| 2.8628042|
|      3|   9731|     2|  2.373184|
|      3|  10246|     1|  2.

In [183]:
#Printing and calculating RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9163827741959887


In [184]:
#Now, we will predict/recommend the book to a single user – user1 (let’s say, userId:5461) with the help of our trained model.


user1 = test_data.filter(test_data['user_id']==32592).select(['book_id','user_id'])


user1.show()

+-------+-------+
|book_id|user_id|
+-------+-------+
|     20|  32592|
|     31|  32592|
|     40|  32592|
|     46|  32592|
|     62|  32592|
|    100|  32592|
|    111|  32592|
|    137|  32592|
|    172|  32592|
|    225|  32592|
|    256|  32592|
|    325|  32592|
|    395|  32592|
|    478|  32592|
|    482|  32592|
|    501|  32592|
|    559|  32592|
|    768|  32592|
|    834|  32592|
|    864|  32592|
+-------+-------+
only showing top 20 rows



In [188]:
##Here, we will predict/recommend the book to multiple users.


# Define a list of user IDs
user_ids = [5461, 32592, 1001, 999]

# Filter the test data for these users
user_data = test_data.filter(test_data['user_id'].isin(user_ids)).select(['book_id','user_id'])

# Show the user-item pairs for these users
user_data.show(100)

+-------+-------+
|book_id|user_id|
+-------+-------+
|      5|   5461|
|      7|   5461|
|     20|  32592|
|     31|  32592|
|     40|  32592|
|     46|  32592|
|     62|  32592|
|     70|   5461|
|    100|  32592|
|    111|  32592|
|    121|   5461|
|    137|  32592|
|    172|  32592|
|    181|   5461|
|    225|  32592|
|    256|  32592|
|    296|   5461|
|    323|   5461|
|    325|  32592|
|    395|  32592|
|    478|  32592|
|    482|  32592|
|    501|  32592|
|    521|   5461|
|    559|  32592|
|    561|   5461|
|    768|  32592|
|    834|  32592|
|    864|  32592|
|    966|   5461|
|   1161|   5461|
|   1266|   5461|
|   1275|  32592|
|   1360|  32592|
|   2322|  32592|
|   3579|   5461|
|   5211|   5461|
|   9406|  32592|
|      1|  32592|
|      3|   5461|
|      4|  32592|
|      9|   5461|
|     11|  32592|
|     14|   5461|
|     22|  32592|
|     55|   5461|
|     60|   5461|
|     86|  32592|
|     89|  32592|
|    113|  32592|
|    133|  32592|
|    138|  32592|
|    178| 