In Spark, there is one paritcular recommendation algorithm, Alternating Least Squares (ALS). This algorithm leverages collaborative filtering, which makes recommendations based only on which items users interacted with in the past. That is, it does not require or use any additional features about the users or the items.

In [2]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr


# select 4 cores to process this
spark = SparkSession\
        .builder\
        .appName("ALSExample")\
        .config("spark.executor.cores", '4')\
        .getOrCreate()

# Loading data

In [4]:
ratings = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/goodbooks-10k-master/ratings.csv")
ratings.printSchema()
ratings.createOrReplaceTempView("dfTable")

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: integer (nullable = true)



In [5]:
ratings.show(3)

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      1|    258|     5|
|      2|   4081|     4|
|      2|    260|     5|
+-------+-------+------+
only showing top 3 rows



In [8]:
books = spark.read.format("csv")\
  .option("header", "true")\
  .option("inferSchema", "true")\
  .load("data/goodbooks-10k-master/books.csv")
books.printSchema()
books.createOrReplaceTempView("dfTable")

root
 |-- book_id: integer (nullable = true)
 |-- goodreads_book_id: integer (nullable = true)
 |-- best_book_id: integer (nullable = true)
 |-- work_id: integer (nullable = true)
 |-- books_count: integer (nullable = true)
 |-- isbn: string (nullable = true)
 |-- isbn13: double (nullable = true)
 |-- authors: string (nullable = true)
 |-- original_publication_year: double (nullable = true)
 |-- original_title: string (nullable = true)
 |-- title: string (nullable = true)
 |-- language_code: string (nullable = true)
 |-- average_rating: string (nullable = true)
 |-- ratings_count: string (nullable = true)
 |-- work_ratings_count: string (nullable = true)
 |-- work_text_reviews_count: string (nullable = true)
 |-- ratings_1: double (nullable = true)
 |-- ratings_2: integer (nullable = true)
 |-- ratings_3: integer (nullable = true)
 |-- ratings_4: integer (nullable = true)
 |-- ratings_5: integer (nullable = true)
 |-- image_url: string (nullable = true)
 |-- small_image_url: string (nu

In [9]:
book_names = books.select("book_id", "title", "authors")
book_names.show(5)

+-------+--------------------+--------------------+
|book_id|               title|             authors|
+-------+--------------------+--------------------+
|      1|The Hunger Games ...|     Suzanne Collins|
|      2|Harry Potter and ...|J.K. Rowling, Mar...|
|      3|Twilight (Twiligh...|     Stephenie Meyer|
|      4|To Kill a Mocking...|          Harper Lee|
|      5|    The Great Gatsby| F. Scott Fitzgerald|
+-------+--------------------+--------------------+
only showing top 5 rows



In [10]:
ratings.groupby("book_id").count().orderBy("count", ascending=False).show(10,False)

+-------+-----+
|book_id|count|
+-------+-----+
|1      |22806|
|2      |21850|
|4      |19088|
|3      |16931|
|5      |16604|
|17     |16549|
|20     |15953|
|18     |15855|
|23     |15657|
|7      |15558|
+-------+-----+
only showing top 10 rows



### if you don't have a indexed book_id, use StringIndexer to change into indexes

# Build Model and Fit

In [11]:
from pyspark.ml.recommendation import ALS

#split data into training and test set
training, test = ratings.randomSplit([0.8, 0.2])

als = ALS()\
  .setMaxIter(5)\
  .setRegParam(0.01)\
  .setUserCol("user_id")\
  .setItemCol("book_id")\
  .setRatingCol("rating")
# print(als.explainParams())

In [12]:
alsModel = als.fit(training)
predictions = alsModel.transform(test)

ALS effectively predicted the ratings that a user would give for a particular book

In [13]:
predictions.show(10)

+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|   9427|    148|     3| 3.8146927|
|  24354|    148|     5| 4.0027723|
|  34061|    148|     4| 3.5079865|
|  50223|    148|     4|   3.97144|
|   1721|    148|     4|  3.206058|
|  29827|    148|     4|  4.491929|
|  31928|    148|     5|  4.469343|
|  35868|    148|     3| 3.3751755|
|  40199|    148|     4| 4.4291224|
|  10798|    148|     3| 3.7368596|
+-------+-------+------+----------+
only showing top 10 rows



# Evaluate

When covering the cold-start strategy, we can set up an automatic model evaluator when working with ALS. One thing that may not be immediately obvious is that this recommendation problem is really just a kind of regression problem. Since we’re predicting values (ratings) for given users, we want to optimize for reducing the total difference between our users’ ratings and the true values. We can do this using the RegressionEvaluator.

In [14]:
# in Python
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator()\
              .setMetricName("rmse")\
              .setLabelCol("rating")\
              .setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = %f" % rmse)

Root-mean-square error = 0.840571


This error isn't the best! The model can be improvied further by tuning the hyperparameters

# Cross-Val and Param-Grid

In [16]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

parameter_grid = (
    ParamGridBuilder()
    .addGrid(als.rank, [1, 5, 10])
    .addGrid(als.maxIter, [20])
    .addGrid(als.regParam, [0.05, 0.1])
    .build()
)

In [17]:
from pprint import pprint

pprint(parameter_grid)

[{Param(parent='ALS_201113d65e36', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_201113d65e36', name='rank', doc='rank of the factorization'): 1,
  Param(parent='ALS_201113d65e36', name='regParam', doc='regularization parameter (>= 0).'): 0.05},
 {Param(parent='ALS_201113d65e36', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_201113d65e36', name='rank', doc='rank of the factorization'): 1,
  Param(parent='ALS_201113d65e36', name='regParam', doc='regularization parameter (>= 0).'): 0.1},
 {Param(parent='ALS_201113d65e36', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_201113d65e36', name='rank', doc='rank of the factorization'): 5,
  Param(parent='ALS_201113d65e36', name='regParam', doc='regularization parameter (>= 0).'): 0.05},
 {Param(parent='ALS_201113d65e36', name='maxIter', doc='max number of iterations (>= 0).'): 20,
  Param(parent='ALS_201113d65e36', name='rank', doc='rank of t

In [20]:
crossvalidator = CrossValidator(
    estimator=als,
    estimatorParamMaps=parameter_grid,
    evaluator=evaluator,
    numFolds=2
)

crossval_model = crossvalidator.fit(training)
model = crossval_model.bestModel
predictions = model.transform(test)

In [21]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = %f" % rmse)

Root-mean-square error = 0.816494


some improvement! 

# Recommendation Results

We can now output the top 𝘬 recommendations for each user or book. The model’s recommendForAllUsers method returns a DataFrame of a user_id, an array of recommendations, as well as a rating for each of those books. recommendForAllItems returns a DataFrame of a book_id, as well as the top users for that book:

In [23]:
from pyspark.sql.functions import col

# generate top 10 book recs for each user
alsModel.recommendForAllUsers(10)\
  .selectExpr("user_id", "explode(recommendations)").show(5)

# generate top 10 user recommendations for each book 
alsModel.recommendForAllItems(10)\
  .selectExpr("book_id", "explode(recommendations)").show(5)

+-------+-----------------+
|user_id|              col|
+-------+-----------------+
|    148|[8616, 5.0621395]|
|    148|[7135, 4.8263083]|
|    148| [7929, 4.778312]|
|    148| [5794, 4.752222]|
|    148|[7537, 4.7467303]|
+-------+-----------------+
only showing top 5 rows

+-------+------------------+
|book_id|               col|
+-------+------------------+
|   1580|[35641, 6.0571265]|
|   1580|[28734, 5.6772246]|
|   1580| [43656, 5.641564]|
|   1580| [48984, 5.636069]|
|   1580|  [48155, 5.49555]|
+-------+------------------+
only showing top 5 rows



### select test user

In [24]:
test_user_id = 78

test_user = ratings.filter(ratings['user_id'] == test_user_id)

joinExpression = test_user["book_id"] == book_names['book_id']
joinType = 'left'

test_user_profile = test_user.join(book_names, joinExpression, joinType)\
 .orderBy('rating', ascending = False)

test_user_profile.show(truncate = False)

+-------+-------+------+-------+-------------------------------------------------------------------+--------------------------------------------------------------------------------+
|user_id|book_id|rating|book_id|title                                                              |authors                                                                         |
+-------+-------+------+-------+-------------------------------------------------------------------+--------------------------------------------------------------------------------+
|78     |1135   |5     |1135   |Dubliners                                                          |James Joyce, Jeri Johnson                                                       |
|78     |161    |5     |161    |The Return of the King (The Lord of the Rings, #3)                 |J.R.R. Tolkien                                                                  |
|78     |3785   |5     |3785   |On Photography                                            

In [25]:
unique_books = books.select("title").distinct()


books_read = test_user_profile.select("title").distinct()


### filter for results and join with book names

In [26]:
userRecs = alsModel.recommendForAllUsers(10)

test_userRecs = userRecs.filter(userRecs['user_id'] == test_user_id)\
                    .selectExpr("user_id", "explode(recommendations)")

test_userRecs = test_userRecs.select("user_id", 'col.*')

In [27]:
joinExpression = test_userRecs["book_id"] == book_names['book_id']
joinType = "inner"

recommended_books = test_userRecs.join(book_names, joinExpression, joinType)

recommended_books.select("title", "rating").show(truncate = False)

+----------------------------------------------------------------------------+---------+
|title                                                                       |rating   |
+----------------------------------------------------------------------------+---------+
|Damaged: The Heartbreaking True Story of a Forgotten Child                  |5.907332 |
|أقوم قيلا                                                                   |5.806222 |
|رباعيات خيام                                                                |5.7730765|
|Shadow & Claw (The Book of the New Sun #1-2)                                |5.3648663|
|The Harbinger: The Ancient Mystery that Holds the Secret of America's Future|5.3429446|
|Novecento. Un monologo                                                      |5.236621 |
|On the Genealogy of Morals                                                  |5.0507164|
|Hopscotch                                                                   |5.0181146|
|The Magic Mountain  

### can also find top users for a given book

In [16]:
test_book_id = 177

book_names.filter(book_names['book_id'] == test_book_id).show()

+-------+--------------------+--------------------+
|book_id|               title|             authors|
+-------+--------------------+--------------------+
|    177|Crime and Punishment|Fyodor Dostoyevsk...|
+-------+--------------------+--------------------+



In [17]:
bookRecs = alsModel.recommendForAllItems(10)\
                    .selectExpr("book_id", "explode(recommendations)")

test_bookRec = bookRecs.filter(bookRecs['book_id'] == test_book_id)\
                        .select("book_id", "col.*")

test_bookRec.show()

+-------+-------+---------+
|book_id|user_id|   rating|
+-------+-------+---------+
|    177|  35904| 5.975092|
|    177|  48743| 5.798015|
|    177|  46273| 5.786284|
|    177|  45135|5.7776213|
|    177|  33840|5.7763076|
|    177|  37025| 5.760677|
|    177|   3597|5.7047806|
|    177|  39883| 5.692147|
|    177|  51097|5.6818557|
|    177|  24063| 5.635712|
+-------+-------+---------+



# Further evaluation metrics

A RankingMetric allows us to compare our recommendations with an actual set of ratings (or preferences) expressed by a given user. RankingMetric does not focus on the value of the rank but rather whether or not our algorithm recommends an already ranked item again to a user. 

First, we need to collect a set of highly ranked movies for a given user. In our case, we’re going to use a rather low threshold: movies ranked above 2.5. Tuning this value will largely be a business decision:

In [104]:
# in Python
from pyspark.mllib.evaluation import RankingMetrics, RegressionMetrics
from pyspark.sql.functions import col, expr
perUserActual = predictions\
  .where("rating > 2.5")\
  .groupBy("user_id")\
  .agg(expr("collect_set(book_id) as books"))

At this point, we have a collection of users, along with a truth set of previously ranked movies for each user. Now we will get our top 10 recommendations from our algorithm on a per-user basis. We will then see if the top 10 recommendations show up in our truth set. If we have a well-trained model, it will correctly recommend the movies a user already liked. If it doesn’t, it may not have learned enough about each particular user to successfully reflect their preferences:

In [105]:
perUserPredictions = predictions\
  .orderBy(col("user_id"), expr("prediction DESC"))\
  .groupBy("user_id")\
  .agg(expr("collect_list(book_id) as books"))

Now we have two DataFrames, one of predictions and another the top-ranked items for a particular user. We can pass them into the RankingMetrics object. This object accepts an RDD of these combinations, as you can see in the following join and RDD conversion:

In [108]:
# in Python
perUserActualvPred = perUserActual.join(perUserPredictions, ["user_id"]).rdd\
  .map(lambda row: (row[1], row[2][:15]))
ranks = RankingMetrics(perUserActualvPred)

Now we can see the metrics from that ranking. For instance, we can see how precise our algorithm is with the mean average precision. We can also get the precision at certain ranking points, for instance, to see where the majority of the positive recommendations fall:

In [109]:
ranks.meanAveragePrecision
ranks.precisionAt(5)

0.6791110445413869