### starter code found at https://www.kaggle.com/vchulski/tutorial-collaborative-filtering-with-pyspark

In [1]:
# start Jupyter Notebook with this command - jupyter notebook --NotebookApp.iopub_data_rate_limit=100000000
import numpy as np
import pandas as pd
import os
import gc #??? what's this for?

%env JOBLIB_TEMP_FOLDER = /tmp

env: JOBLIB_TEMP_FOLDER=/tmp


In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark import SparkFiles

sc = SparkContext('local')
spark = SparkSession(sc)

In [3]:
# load in data - interactions for collaborative filtering, books for content filtering (too big?)

In [4]:
# continued failures led me to trim back on size of dataset - choose 10k
books10k = spark.read.csv('books10k.csv', header = True)
books10k.show()

+-------+-----------------+------------+--------+-----------+----------+-----------------+--------------------+-------------------------+--------------------+--------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------+---------+---------+---------+--------------------+--------------------+
|book_id|goodreads_book_id|best_book_id| work_id|books_count|      isbn|           isbn13|             authors|original_publication_year|      original_title|               title|language_code|average_rating|ratings_count|work_ratings_count|work_text_reviews_count|ratings_1|ratings_2|ratings_3|ratings_4|ratings_5|           image_url|     small_image_url|
+-------+-----------------+------------+--------+-----------+----------+-----------------+--------------------+-------------------------+--------------------+--------------------+-------------+--------------+-------------+------------------+-----------------------+---------+---------

In [5]:
# continued failures led me to trim back on size of dataset - choose 10k
ratings10k = spark.read.csv('ratings10k.csv', header = True)
ratings10k.show()

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      1|    258|     5|
|      2|   4081|     4|
|      2|    260|     5|
|      2|   9296|     5|
|      2|   2318|     3|
|      2|     26|     4|
|      2|    315|     3|
|      2|     33|     4|
|      2|    301|     5|
|      2|   2686|     5|
|      2|   3753|     5|
|      2|   8519|     5|
|      4|     70|     4|
|      4|    264|     3|
|      4|    388|     4|
|      4|     18|     5|
|      4|     27|     5|
|      4|     21|     5|
|      4|      2|     5|
|      4|     23|     5|
+-------+-------+------+
only showing top 20 rows



In [6]:
# calculate sparsity
numerator = ratings10k.select("rating").count()
num_users = ratings10k.select("user_id").distinct().count()
num_books = ratings10k.select("book_id").distinct().count()
denominator = num_users * num_books
sparsity = (1.0 - (numerator * 1.0)/denominator) * 100
print("The ratings10k dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings10k dataframe is  98.94% empty.


In [7]:
# Avg num ratings per book
print("Avg num ratings per book: ")
ratings10k.groupBy("book_id").count().select(avg("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings10k.groupBy("user_id").count().select(avg("count")).show()

Avg num ratings per book: 
+-----------------+
|       avg(count)|
+-----------------+
|40.15235939404492|
+-----------------+

Avg num ratings per user: 
+-----------------+
|       avg(count)|
+-----------------+
|60.87513199577614|
+-----------------+



In [8]:
ratings10k = ratings10k.select(ratings10k.user_id.cast("integer"),
                                        ratings10k.book_id.cast("integer"),
                                        ratings10k.rating.cast("float"))
ratings10k.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- book_id: integer (nullable = true)
 |-- rating: float (nullable = true)



In [9]:
# correct the format to include zeros

users = ratings10k.select("user_id").distinct()
books = ratings10k.select("book_id").distinct()

# Cross join users and products
cj = users.crossJoin(books)
ratings = cj.join(ratings10k, ["user_id", "book_id"], "left").fillna(0)
ratings.show()

+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|    463|    471|   4.0|
|    463|    148|   0.0|
|    463|   2142|   0.0|
|    463|   3997|   0.0|
|    463|    496|   0.0|
|    463|   1580|   0.0|
|    463|   2366|   0.0|
|    463|    463|   0.0|
|    463|   1238|   0.0|
|    463|    833|   0.0|
|    463|   1088|   0.0|
|    463|   6620|   0.0|
|    463|   1591|   0.0|
|    463|   9852|   0.0|
|    463|   4101|   0.0|
|    463|   3918|   0.0|
|    463|   6397|   0.0|
|    463|   1342|   0.0|
|    463|   7253|   0.0|
|    463|   3794|   0.0|
+-------+-------+------+
only showing top 20 rows



In [11]:
(train, test) = ratings.randomSplit([0.80, 0.20], seed=731)

In [12]:
als_model = ALS(userCol = "user_id", itemCol = "book_id", ratingCol = "rating",
               nonnegative = True,
               coldStartStrategy = "drop",
               implicitPrefs = False)
model = als_model.fit(train)
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating',
                               predictionCol = 'prediction')
rmse = evaluator.evaluate(predictions)
print("RMSE: "), rmse

RMSE: 


(None, 0.39640662340632277)

In [13]:
predictions.show()

+-------+-------+------+------------+
|user_id|book_id|rating|  prediction|
+-------+-------+------+------------+
|   1645|    148|   4.0| 0.049995024|
|   3175|    148|   0.0|  0.05377537|
|   3918|    148|   0.0|   0.1438313|
|   5300|    148|   0.0| 0.065395646|
|   1025|    148|   0.0| 0.051750187|
|   1127|    148|   0.0|  0.07879922|
|   1507|    148|   0.0|  0.16217102|
|   2387|    148|   0.0| 0.017485976|
|   2563|    148|   0.0|  0.01884679|
|   3475|    148|   0.0|  0.03814276|
|   4190|    148|   0.0| 0.064677484|
|   4929|    148|   0.0|  0.03870188|
|   1143|    148|   0.0|   0.1420587|
|   3000|    148|   0.0|8.5675006E-4|
|    808|    148|   0.0|  0.09464113|
|   1265|    148|   0.0|  0.05013421|
|   3098|    148|   0.0|  0.03183744|
|   4078|    148|   0.0|  0.07300787|
|   4684|    148|   0.0|  0.09861391|
|   5223|    148|   0.0|  0.15989798|
+-------+-------+------+------------+
only showing top 20 rows



In [13]:
# tweak model by playing with rank, MaxIter, RegParam, goal = lowest RMSE

In [14]:
# change rank only (chose 16 b/c it was recommended by Goodreads paper)
als_model2 = ALS(userCol = "user_id", itemCol = "book_id", ratingCol = "rating",
                 rank = 16, maxIter = 10, regParam = 1,
               nonnegative = True,
               coldStartStrategy = "drop",
               implicitPrefs = False)
model2 = als_model2.fit(train)
predictions2 = model2.transform(test)
evaluator = RegressionEvaluator(metricName = 'rmse', labelCol = 'rating',
                               predictionCol = 'prediction')
rmse2 = evaluator.evaluate(predictions2)
print("RMSE: "), rmse2

RMSE: 


(None, 0.40559786366728084)

In [15]:
predictions2.show()

+-------+-------+------+----------+
|user_id|book_id|rating|prediction|
+-------+-------+------+----------+
|   1645|    148|   4.0|       0.0|
|   3175|    148|   0.0|       0.0|
|   3918|    148|   0.0|       0.0|
|   5300|    148|   0.0|       0.0|
|   1025|    148|   0.0|       0.0|
|   1127|    148|   0.0|       0.0|
|   1507|    148|   0.0|       0.0|
|   2387|    148|   0.0|       0.0|
|   2563|    148|   0.0|       0.0|
|   3475|    148|   0.0|       0.0|
|   4190|    148|   0.0|       0.0|
|   4929|    148|   0.0|       0.0|
|   1143|    148|   0.0|       0.0|
|   3000|    148|   0.0|       0.0|
|    808|    148|   0.0|       0.0|
|   1265|    148|   0.0|       0.0|
|   3098|    148|   0.0|       0.0|
|   4078|    148|   0.0|       0.0|
|   4684|    148|   0.0|       0.0|
|   5223|    148|   0.0|       0.0|
+-------+-------+------+----------+
only showing top 20 rows



In [16]:
param_grid = ParamGridBuilder().addGrid(als_model.rank, [5, 10, 15, 20]).addGrid(
    als_model.maxIter, [5, 10]).addGrid(als_model.regParam, [0.01, 0.05, 0.1, 0.15]).build()
evaluator = RegressionEvaluator(metricName = "rmse", labelCol = "rating",
                               predictionCol = "prediction")
cv = CrossValidator(estimator = als_model,
                   estimatorParamMaps = param_grid,
                   evaluator = evaluator,
                   numFolds = 5)
print ("Num models to be tested: ", len(param_grid))

Num models to be tested:  32


In [17]:
modelcv = cv.fit(train)

In [18]:
best_model = modelcv.bestModel
print(type(best_model))

<class 'pyspark.ml.recommendation.ALSModel'>


In [20]:
test_predictions = best_model.transform(test)
rmse = evaluator.evaluate(test_predictions)
print(rmse)

0.3622062135603919


In [21]:
print(best_model.rank) # k value (# of latent features)

25


In [22]:
print(best_model._java_obj.parent().getMaxIter())
print(best_model._java_obj.parent().getRegParam())

10
0.01


In [None]:
# best model: k = 25, maxIter = 10, regParam = 0.01, RMSE = 0.3622

In [23]:
test_predictions.show()

+-------+-------+------+-----------+
|user_id|book_id|rating| prediction|
+-------+-------+------+-----------+
|   1645|    148|   4.0| 0.70989245|
|   3175|    148|   0.0| 0.15258428|
|   3918|    148|   0.0| 0.71268547|
|   5300|    148|   0.0|   0.302438|
|   1025|    148|   0.0|   0.245731|
|   1127|    148|   0.0| 0.13814119|
|   1507|    148|   0.0| 0.27556774|
|   2387|    148|   0.0| 0.13265418|
|   2563|    148|   0.0|   0.218646|
|   3475|    148|   0.0| 0.22585842|
|   4190|    148|   0.0| 0.69588304|
|   4929|    148|   0.0| 0.15377276|
|   1143|    148|   0.0| 0.58321583|
|   3000|    148|   0.0|0.012677923|
|    808|    148|   0.0| 0.49552304|
|   1265|    148|   0.0|  0.0973807|
|   3098|    148|   0.0| 0.16174576|
|   4078|    148|   0.0| 0.02927421|
|   4684|    148|   0.0|0.107607946|
|   5223|    148|   0.0|  0.9644963|
+-------+-------+------+-----------+
only showing top 20 rows



In [26]:
# view recommendations
userRecs = best_model.recommendForAllUsers(10)
userRecs.show()

+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|   1580|[[11, 1.0400147],...|
|   5300|[[37, 2.0233123],...|
|   1591|[[167, 0.19397956...|
|   4101|[[11, 1.3917072],...|
|   1342|[[476, 0.63664454...|
|   2122|[[4, 1.1893904], ...|
|    463|[[7, 4.087946], [...|
|    833|[[94, 1.4590017],...|
|   3794|[[168, 1.8770207]...|
|   1645|[[11, 2.7405286],...|
|   3175|[[19, 4.241493], ...|
|   2366|[[205, 4.1261687]...|
|   5156|[[65, 3.4123454],...|
|   3997|[[10, 0.30373782]...|
|   1238|[[11, 3.7416406],...|
|   3918|[[50, 4.152341], ...|
|   4818|[[26, 1.1737348],...|
|   5518|[[125, 1.6194955]...|
|   1829|[[50, 1.9392428],...|
|   3749|[[168, 0.92164904...|
+-------+--------------------+
only showing top 20 rows



In [36]:
# Look at user 60's ratings
print("User 2's Ratings:")
ratings.filter(col("user_id") == 2).sort("rating", ascending = False).show()

# Look at the movies recommended to user 60
print("User 2's Recommendations:")
userRecs.filter(col("user_id") == 2).show()

# Look at user 63's ratings
print("User 63's Ratings:")
ratings.filter(col("user_id") == 63).sort("rating", ascending = False).show()

# Look at the movies recommended to user 63
print("User 63's Recommendations:")
userRecs.filter(col("user_id") == 63).show()

User 60's Ratings:
+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|      2|    260|   5.0|
|      2|   3753|   5.0|
|      2|   9296|   5.0|
|      2|   8519|   5.0|
|      2|   2686|   5.0|
|      2|    301|   5.0|
|      2|   4081|   4.0|
|      2|     33|   4.0|
|      2|     26|   4.0|
|      2|   2318|   3.0|
|      2|    315|   3.0|
|      2|    471|   0.0|
|      2|    496|   0.0|
|      2|    148|   0.0|
|      2|   1580|   0.0|
|      2|   1238|   0.0|
|      2|   2142|   0.0|
|      2|   2366|   0.0|
|      2|    833|   0.0|
|      2|   3997|   0.0|
+-------+-------+------+
only showing top 20 rows

User 60s Recommendations:
+-------+--------------------+
|user_id|     recommendations|
+-------+--------------------+
|      2|[[11, 0.77537215]...|
+-------+--------------------+

User 63's Ratings:
+-------+-------+------+
|user_id|book_id|rating|
+-------+-------+------+
|     63|    323|   5.0|
|     63|   6772|   5.0|
|     63|    592|   5.0|
|   

In [50]:
exploded_recs = spark.sql("SELECT user_id, explode(recommendations) AS BookRec FROM ALS_recs_temp")
exploded_recs.show()

+-------+----------------+
|user_id|         BookRec|
+-------+----------------+
|   1580| [11, 1.0400147]|
|   1580|[33, 0.65387654]|
|   1580|[100, 0.5300547]|
|   1580|[38, 0.52326894]|
|   1580| [67, 0.4987593]|
|   1580|[57, 0.47979054]|
|   1580|[45, 0.47413552]|
|   1580| [4, 0.46988013]|
|   1580| [22, 0.4662242]|
|   1580|[26, 0.45919847]|
|   5300| [37, 2.0233123]|
|   5300| [58, 1.9057353]|
|   5300| [59, 1.7356001]|
|   5300|  [29, 1.725448]|
|   5300|[138, 1.6259873]|
|   5300| [50, 1.6185813]|
|   5300|[102, 1.6020172]|
|   5300| [15, 1.5831457]|
|   5300|[117, 1.5628898]|
|   5300| [85, 1.5290784]|
+-------+----------------+
only showing top 20 rows

