In [80]:
import os
import sys
from pyspark.rdd import RDD
from pyspark.sql import Row
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col
from pyspark.sql.functions import desc
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.recommendation import ALS
import numpy as np
#Show image
from IPython.display import Image, display


In [4]:
#Initialize a spark session.
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark Book Recommendation") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [122]:
spark = init_spark()

In [50]:
ratings_df = spark.read.csv("./archive/ratings.csv", header = True, inferSchema = True)
books_df = spark.read.csv("./archive/books.csv", header = True, inferSchema = True)

In [51]:
ratings_df.show(2)
ratings_df.printSchema()

+-------+-------+------+
|book_id|user_id|rating|
+-------+-------+------+
|      1|    314|     5|
|      1|    439|     3|
+-------+-------+------+
only showing top 2 rows

root
 |-- book_id: integer (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [52]:
(training_df, validatioin_df) = ratings_df.randomSplit([.8, .2])

# Hyperparameter Tuning
### numBlocks: is the number of blocks used to parallelize computation (set to -1 to auto-configure).
### rank: is the number of features to use (also referred to as the number of latent factors).
### iterations: is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less. this work with grediant decent means how many backwork or forward should be there to adjust the tuning parameter to adjust the error.
### seed: to keep the splited data same everytime when I run the command
### regParam: regularization parameter means in each iteration how much point or value we want to change.

numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).

rank is the number of latent factors in the model (defaults to 10).

maxIter is the maximum number of iterations to run (defaults to 10).

regParam specifies the regularization parameter in ALS (defaults to 1.0).

implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).

alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).

http://restanalytics.com/2019-02-27-Hyperparameter-Tuning-Alternating-Least-Squares-Recommender-System/

In [53]:
#hyperParameter of ALS model
maxIteration = 10
rank = 4
regParam = 0.1
seed = 123
error = []

In [68]:
als = ALS(maxIter = maxIteration, rank= rank, regParam=regParam, userCol="user_id", itemCol="book_id", 
          ratingCol="rating", coldStartStrategy="drop", seed=seed)
model = als.fit(training_df)
predictions = model.transform(validatioin_df)
new_predictions = predictions.filter(col('prediction') != np.nan)
evaluator = RegressionEvaluator(metricName = "rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(new_predictions)
print("rmse before cross validations: " + str(rmse))

rmse before cross validations: 0.905563529523405


In [69]:
for rank in range(4, 7):
    als = ALS(maxIter = maxIteration, rank= rank, regParam=regParam, userCol="user_id", itemCol="book_id", 
          ratingCol="rating", coldStartStrategy="drop", seed=seed)
    model = als.fit(training_df)
    predictions = model.transform(validatioin_df)
    new_predictions = predictions.filter(col('prediction') != np.nan)
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    print("rmse before cross validations: " + str(rmse))

rmse before cross validations: 0.8929300591009571
rmse before cross validations: 0.9059769309808305
rmse before cross validations: 0.905563529523405


# Cross Validation
Fitting the model tuning the parameter of ALS

 A CrossValidator requires an Estimator, a set of Estimator ParamMaps, and an Evaluator.
 We use a ParamGridBuilder to construct a grid of parameters to search over.
 With 2 values for rank and 2 values for regParam, 2 values for maxIter,2 values for alpha
 this grid will have 2 x 2 x 2 x 2 = 16 parameter settings for CrossValidator to choose from.


In [64]:
als = ALS(maxIter = maxIteration, rank= rank, regParam=regParam, userCol="user_id", itemCol="book_id", 
          ratingCol="rating", coldStartStrategy="drop", seed=seed)

paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.1, 0.01, 0.001, 0.18]) \
    .addGrid(als.rank, range(4, 10)) \
    .addGrid(als.maxIter, [5,10]) \
    .addGrid(als.alpha, [2.0,3.0]) \
    .build()
evaluator = RegressionEvaluator(metricName = "rmse", labelCol="rating", predictionCol="prediction")
crossVal = CrossValidator(estimator = als,
                          estimatorParamMaps = paramGrid,
                          evaluator = evaluator,
                          numFolds = 5)
cvModel = crossVal.fit(training_df)
 

In [100]:
    predictions = cvModel.bestModel.transform(validatioin_df)
    new_predictions = predictions.filter(col('prediction') != np.nan)
    
    evaluator = RegressionEvaluator(metricName = "rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(new_predictions)
    print("rmse after cross validations: " + str(rmse))

rmse after cross validations: 0.8754686781368951


In [103]:
cvPrediction = cvModel.bestModel.transform(validatioin_df)
cvPrediction.show(5)

+-------+-------+------+----------+
|book_id|user_id|rating|prediction|
+-------+-------+------+----------+
|    148|  26629|     4|   3.70065|
|    148|  41282|     3| 4.0466104|
|    148|  24326|     5| 3.3590097|
|    148|  13034|     4| 3.3892398|
|    148|  32055|     3| 3.0925324|
+-------+-------+------+----------+
only showing top 5 rows



### Join preedicted value with books

In [104]:
cvPrediction.join(books_df, "book_id").select("user_id", "title", "prediction").show(5)

+-------+----------+----------+
|user_id|     title|prediction|
+-------+----------+----------+
|   6460|Lysistrata| 4.0269313|
|  40820|Lysistrata| 3.2764637|
|   2331|Lysistrata| 3.2153287|
|  11326|Lysistrata| 4.0256095|
|  26933|Lysistrata| 3.8841832|
+-------+----------+----------+
only showing top 5 rows



### Predict books for one specific user

In [106]:
for_one_user = cvPrediction.filter(col("user_id") == 40820) \
                         .join(books_df, "book_id").select("user_id", "title", "prediction", "image_url")
for_one_user.show(5)

+-------+--------------------+----------+--------------------+
|user_id|               title|prediction|           image_url|
+-------+--------------------+----------+--------------------+
|  40820|          Lysistrata| 3.2764637|https://images.gr...|
|  40820|A Tale of Two Cities|  2.991767|https://images.gr...|
+-------+--------------------+----------+--------------------+



### See the images of those books that have been recommended

In [107]:
for book in for_one_user.take(10):
    print(book.title)
    display(Image(url=book.image_url))

Lysistrata


A Tale of Two Cities


# Genereate top 10 books recommendation for each users

In [110]:
userRecs = cvModel.bestModel.recommendForAllUsers(10)
userRecs.printSchema()
userRecs.select("user_id", "recommendations.book_id").show(10, False)

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- book_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

+-------+------------------------------------------------------------+
|user_id|book_id                                                     |
+-------+------------------------------------------------------------+
|148    |[862, 6590, 3628, 1788, 5207, 6920, 4483, 8187, 9842, 4868] |
|463    |[862, 6590, 5207, 6920, 1788, 3628, 4483, 8109, 3395, 9566] |
|471    |[8109, 8663, 6642, 7254, 6920, 5207, 3395, 6590, 2937, 8978]|
|496    |[8109, 7368, 6642, 8663, 4881, 5879, 8249, 1308, 7014, 5752]|
|833    |[4868, 7844, 9842, 862, 3628, 8233, 3491, 8187, 3953, 2205] |
|1088   |[4868, 7844, 3628, 5207, 6590, 7275, 4336, 9842, 9566, 1380]|
|1238   |[8548, 862, 4344, 8187, 9182, 562, 7424, 5435, 1029, 7373]  |
|1342   |[4868, 7844, 3628, 9842, 5207, 6590, 7275, 

# Genereate top 10 user recommendation for each books

In [111]:
bookRecs = cvModel.bestModel.recommendForAllItems(10)
bookRecs.printSchema()
bookRecs.select("book_id", "recommendations.user_id").show(10, False)

root
 |-- book_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- user_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

+-------+----------------------------------------------------------------------+
|book_id|user_id                                                               |
+-------+----------------------------------------------------------------------+
|1580   |[38201, 29642, 46874, 30038, 12906, 32731, 49857, 11100, 51626, 46594]|
|4900   |[50307, 21791, 41819, 49857, 504, 48202, 49374, 51739, 7571, 36622]   |
|5300   |[42979, 29500, 28146, 44520, 20324, 16555, 36411, 9219, 46457, 52593] |
|6620   |[36647, 46594, 49857, 30757, 14734, 49001, 46874, 11100, 21720, 29642]|
|7240   |[46594, 7571, 32731, 48416, 16392, 49857, 41819, 27206, 46874, 11260] |
|7340   |[32731, 29983, 19217, 38866, 16392, 36622, 14816, 47036, 49374, 46343]|
|7880   |[19217, 51218, 4421, 345

# Generate top 10 user recommendations for a specified set of books 

In [113]:
items = ratings_df.select(als.getItemCol()).distinct().limit(5)
itemSubsetRecs = cvModel.bestModel.recommendForItemSubset(items, 10)
itemSubsetRecs.printSchema()
itemSubsetRecs.select("book_id", "recommendations.user_id").show(10, False)

root
 |-- book_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- user_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

+-------+---------------------------------------------------------------------+
|book_id|user_id                                                              |
+-------+---------------------------------------------------------------------+
|471    |[49857, 50307, 32731, 49374, 19217, 29642, 5527, 46874, 36622, 38866]|
|463    |[50307, 49857, 49374, 5527, 2137, 19217, 504, 51739, 43247, 36622]   |
|833    |[50307, 32731, 49857, 49374, 36622, 38866, 5527, 16392, 19217, 2404] |
|496    |[49857, 32731, 29642, 50307, 49374, 46874, 19217, 51626, 30038, 5527]|
|148    |[49857, 32731, 29642, 49374, 50307, 46874, 19217, 51626, 30038, 5527]|
+-------+---------------------------------------------------------------------+



# Generate top 10 books recommendations for a specified set of users

In [120]:
users = ratings_df.select(als.getUserCol()).distinct().limit(5)
usersSubsetRecs = cvModel.bestModel.recommendForUserSubset(users, 10)
usersSubsetRecs.printSchema()
usersSubsetRecs.select("user_id", "recommendations.book_id").show(10, False)

root
 |-- user_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- book_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)

+-------+------------------------------------------------------------+
|user_id|book_id                                                     |
+-------+------------------------------------------------------------+
|32592  |[862, 6590, 5207, 6920, 1788, 3628, 4483, 2244, 3753, 6228] |
|35982  |[4868, 7844, 3628, 9842, 5207, 6590, 9566, 6920, 9076, 1788]|
|19984  |[862, 6590, 6920, 5207, 8109, 1788, 1308, 8663, 6642, 4483] |
|1088   |[4868, 7844, 3628, 5207, 6590, 7275, 4336, 9842, 9566, 1380]|
|3918   |[4868, 3628, 5207, 7254, 6920, 7844, 9566, 6590, 5580, 8978]|
+-------+------------------------------------------------------------+



# Predictions some books for specific users

In [119]:
books = ratings_df.select(als.getItemCol()).distinct().limit(10)
users = ratings_df.select(als.getUserCol()).distinct().limit(10)
book_ids = []
user_ids = []
    
for i in range(0, books.count()):
        book_ids.append(books.collect()[i][0])
        user_ids.append(users.collect()[i][0])

newUserPred = spark.createDataFrame(zip(book_ids, user_ids), schema = ['book_id', 'user_id'])
newPredictions = cvModel.bestModel.transform(newUserPred)

newPredictions.printSchema()
newPredictions.select("book_id", "user_id", "prediction").show(10, False)

root
 |-- book_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- prediction: float (nullable = false)

+-------+-------+----------+
|book_id|user_id|prediction|
+-------+-------+----------+
|148    |32592  |3.5472047 |
|463    |19984  |5.1802907 |
|471    |35982  |3.6147094 |
|496    |1088   |3.5657008 |
|833    |3918   |2.846635  |
|1088   |6397   |3.2996466 |
|1238   |6658   |3.486772  |
|1342   |4900   |3.9429505 |
|1580   |11317  |3.2816837 |
|1591   |15727  |3.8160303 |
+-------+-------+----------+



# Save Model

In [121]:
#save to folder models which is created in the writing process
path="./PythonRecommenderSystem/models"

model = cvModel
model.write().overwrite().save(path)

#load the model 
sameModel = cvModel.load(path)
sameModel

CrossValidatorModel_b9c2951edd8a

In [None]:
#stop the spark cluster here
pool.close()
spark.stop()