### Recommender Systems with PySpark: Movie Lens Dataset

#### Collaborative Filtering: Alternating Least Squares (ALS)

- numBlocks (-1 imply auto-config)
- rank
- iterations
- lambda : regularization
- implicitPref
- alpha

In [24]:
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [25]:
spark = SparkSession.builder.appName('movielens').getOrCreate()

In [26]:
df = spark.read.csv("ratings.csv", inferSchema=True, header=True)
df.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [27]:
df.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+



In [28]:
(train, test) = df.randomSplit([0.7, 0.3], seed=42)

In [29]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")

In [35]:
model = als.fit(train)
model.setColdStartStrategy("drop") # this code ensures NaN values are dropped

ALSModel: uid=ALS_a66ca433e500, rank=10

In [36]:
pred = model.transform(test)

In [37]:
pred.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   463|   1088|   3.5|1145460096| 3.2061002|
|   580|   3175|   2.5|1167792674| 4.3968816|
|   580|  44022|   3.5|1167792560|  3.859246|
|   362|   1645|   5.0|1530641485| 3.7456007|
|   597|   1959|   4.0| 941640006| 4.3160577|
|   155|   3175|   4.0| 961861723|  4.446461|
|   368|   1645|   3.0| 975828061| 2.7419548|
|   368|   2122|   2.0| 971277319| 2.5674756|
|   115|   1645|   4.0| 957648208| 1.5511395|
|   115|   3175|   4.0| 965425216|  4.373758|
|    28|   1580|   3.0|1234516117| 2.8019404|
|    28|   1645|   2.5|1242033151|  2.740254|
|    28|   3175|   1.5|1242290498| 3.3846095|
|   587|   1580|   4.0| 953138475| 4.3159933|
|   587|   3175|   5.0| 953139667| 3.9039378|
|   332|   1645|   3.5|1352672578| 3.0574317|
|   332|   2366|   3.5|1352672632| 4.6366754|
|   577|   1580|   3.0| 945965825| 3.6822178|
|   577|   1959|   4.0| 945978449|

In [38]:
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [39]:
rmse = evaluator.evaluate(pred)
print(f"RMSE: {rmse}")

RMSE: 1.0974903513817016


RMSE describe our error in terms of the starts rating column. 

In [40]:
user_1 = test.filter(test['userId'] == 1).select(['movieId', 'userId'])
user_1.show()

+-------+------+
|movieId|userId|
+-------+------+
|      6|     1|
|    101|     1|
|    151|     1|
|    157|     1|
|    231|     1|
|    235|     1|
|    260|     1|
|    349|     1|
|    362|     1|
|    423|     1|
|    441|     1|
|    527|     1|
|    543|     1|
|    552|     1|
|    590|     1|
|    593|     1|
|    596|     1|
|    673|     1|
|    780|     1|
|    804|     1|
+-------+------+
only showing top 20 rows



In [41]:
rec = model.transform(user_1)

In [42]:
rec.orderBy("prediction", ascending=False).show()

                                                                                

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|    940|     1|  5.715617|
|    101|     1| 5.4101367|
|   1197|     1|  5.223727|
|   1031|     1| 5.2082744|
|   1214|     1|  5.120736|
|   1258|     1| 5.1121025|
|    527|     1|  5.017513|
|    543|     1| 4.9827476|
|   2959|     1|  4.981103|
|    260|     1| 4.9798326|
|   1198|     1| 4.9572086|
|   1089|     1| 4.9299936|
|      6|     1|  4.863474|
|    441|     1| 4.8511815|
|    593|     1| 4.7646637|
|   3062|     1| 4.7323914|
|   1396|     1| 4.7208004|
|   3479|     1| 4.7125273|
|    362|     1| 4.7007127|
|   1220|     1|  4.692816|
+-------+------+----------+
only showing top 20 rows

