# Predicting user movie reviews using collaborative filtering

This exercise is to use ALS to predict movie reviews.

Adaped from https://github.com/apache/spark/blob/master/examples/src/main/python/ml/als_example.py

# Install pyspark and download data

In [1]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
[K     |████████████████████████████████| 281.3 MB 44 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 15.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.0-py2.py3-none-any.whl size=281764026 sha256=1ba9b8c6409bd11cff227b89a72ceeca7042eb4ee37c4ec3289fde40b65ea4c6
  Stored in directory: /root/.cache/pip/wheels/7a/8e/1b/f73a52650d2e5f337708d9f6a1750d451a7349a867f928b885
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.0


In [2]:
!wget https://github.com/garyongguanjie/learning-pyspark/raw/main/data/ml-latest-small.zip
!unzip ml-latest-small.zip

--2022-06-28 09:08:57--  https://github.com/garyongguanjie/learning-pyspark/raw/main/data/ml-latest-small.zip
Resolving github.com (github.com)... 192.30.255.113
Connecting to github.com (github.com)|192.30.255.113|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/garyongguanjie/learning-pyspark/main/data/ml-latest-small.zip [following]
--2022-06-28 09:08:57--  https://raw.githubusercontent.com/garyongguanjie/learning-pyspark/main/data/ml-latest-small.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.109.133, 185.199.111.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 978202 (955K) [application/zip]
Saving to: ‘ml-latest-small.zip’


2022-06-28 09:08:57 (32.1 MB/s) - ‘ml-latest-small.zip’ saved [978202/978202]

Archive:  ml-latest-small.zip
   creating: ml-lates

In [3]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.appName('movie-recommendation-example').getOrCreate()

In [4]:
df = spark.read.csv('ml-latest-small/ratings.csv',header=True,inferSchema=True)

In [5]:
df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [6]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



# EDA

In [7]:
df.describe().show()

+-------+------------------+----------------+------------------+--------------------+
|summary|            userId|         movieId|            rating|           timestamp|
+-------+------------------+----------------+------------------+--------------------+
|  count|            100836|          100836|            100836|              100836|
|   mean|326.12756356856676|19435.2957177992| 3.501556983616962|1.2059460873684695E9|
| stddev| 182.6184914635004|35530.9871987003|1.0425292390606342|2.1626103599513078E8|
|    min|                 1|               1|               0.5|           828124615|
|    max|               610|          193609|               5.0|          1537799250|
+-------+------------------+----------------+------------------+--------------------+



In [8]:
df.select('userId').distinct().count()

610

In [9]:
df.select('movieId').distinct().count()

9724

In [10]:
df.agg({'rating':'mean'}).show()

+-----------------+
|      avg(rating)|
+-----------------+
|3.501556983616962|
+-----------------+



In [11]:
df.groupBy('rating').count().sort('rating').show()

+------+-----+
|rating|count|
+------+-----+
|   0.5| 1370|
|   1.0| 2811|
|   1.5| 1791|
|   2.0| 7551|
|   2.5| 5550|
|   3.0|20047|
|   3.5|13136|
|   4.0|26818|
|   4.5| 8551|
|   5.0|13211|
+------+-----+



# Train test split

In [12]:
train_df,val_df = df.randomSplit([0.8,0.2])

We will use the alternating least squares algorithm for collaborative filtering.

For this exercise we will use the MSE metric for evaluation.

In [13]:
from pyspark.ml.recommendation import ALS

Because this uses collaborative filtering (i.e. recommend new movies based on movies one has seen in which other people has also seen)
We need to add a cold start strategy here as it is possible that a user in the validation set does not exist in the training set. In the `drop` strategy the new user is simply dropped.

In [14]:
als = ALS(userCol='userId',itemCol='movieId',ratingCol='rating',coldStartStrategy='drop')

In [15]:
model = als.fit(train_df)

In [16]:
pred = model.transform(val_df)

In [17]:
pred.show()

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   148|   4896|   4.0|1482548717| 3.4411955|
|   148|   5618|   3.0|1482548682|  3.382751|
|   148|   8368|   4.0|1482548676| 3.6174788|
|   148|  30816|   5.0|1482548570| 3.0144217|
|   148|  40815|   4.0|1482548512|  3.496603|
|   148|  54001|   4.0|1482548674| 3.7294972|
|   148|  72998|   4.0|1482548525| 3.0137384|
|   148|  79091|   3.5|1482548714| 3.2940521|
|   148|  81834|   4.0|1482548498|  4.002914|
|   148|  88125|   4.0|1482548673|  4.130434|
|   148| 134853|   4.0|1482548516| 3.6821988|
|   148| 152081|   4.0|1482548452| 3.3737414|
|   148| 160718|   4.5|1482548446|  3.682309|
|   463|    110|   4.5|1145460295|  4.328841|
|   463|   1690|   4.0|1145460012| 2.9821324|
|   463|   2006|   3.0|1145460032| 3.3196847|
|   463|   2028|   4.5|1145460281|  4.344501|
|   463|   2167|   3.0|1145460039| 3.4716434|
|   463|   3448|   3.0|1145459479|

In [18]:
from pyspark.ml.evaluation import RegressionEvaluator

In [19]:
evaluator = RegressionEvaluator(labelCol="rating")

In [20]:
evaluator.evaluate(pred)

0.8807787731484015

A baseline where it predicts a rating of 3.5 for everything. As you can see ALS do better than the 'dumb' baseline.

In [21]:
baseline = pred.withColumn("dummy",F.lit(3.5))

In [22]:
evaluator = RegressionEvaluator(labelCol="rating",predictionCol="dummy")

In [23]:
evaluator.evaluate(baseline)

1.0354849785111764

Recommend 10 movies for all users in training set

In [24]:
userRecs = model.recommendForAllUsers(10)

In [25]:
userRecs.show(truncate=100)

+------+----------------------------------------------------------------------------------------------------+
|userId|                                                                                     recommendations|
+------+----------------------------------------------------------------------------------------------------+
|     1|[{170355, 5.856256}, {123, 5.639745}, {3925, 5.61079}, {2239, 5.582789}, {5490, 5.5450077}, {1323...|
|     3|[{6835, 4.9420767}, {5919, 4.9420767}, {5181, 4.8933578}, {70946, 4.8860064}, {2851, 4.822424}, {...|
|     5|[{5915, 4.6417694}, {132333, 4.5926046}, {5490, 4.5926046}, {170355, 4.585514}, {720, 4.5403166},...|
|     6|[{6732, 5.0526934}, {32892, 4.9636126}, {69134, 4.9060764}, {3142, 4.875879}, {720, 4.8332124}, {...|
|     9|[{3358, 5.069122}, {6442, 5.049187}, {33649, 4.995799}, {158872, 4.9945307}, {2202, 4.9684615}, {...|
|    12|[{3086, 5.813661}, {3682, 5.6294136}, {3155, 5.611012}, {3142, 5.5831394}, {3925, 5.5637584}, {58...|
|    13|[{

In [26]:
movieRecs = model.recommendForAllItems(10)
movieRecs.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{43, 5.29557}, {...|
|      3|[{43, 4.8339076},...|
|      5|[{224, 4.2960553}...|
|      6|[{53, 5.2866116},...|
|      9|[{267, 4.655725},...|
|     12|[{53, 3.970046}, ...|
|     13|[{360, 4.2680173}...|
|     15|[{243, 4.3894143}...|
|     16|[{53, 5.1450443},...|
|     17|[{413, 5.493391},...|
|     19|[{240, 4.0130186}...|
|     20|[{535, 3.9775367}...|
|     22|[{543, 4.742965},...|
|     26|[{43, 4.7028913},...|
|     27|[{584, 4.6478004}...|
|     28|[{456, 5.622594},...|
|     31|[{12, 4.605619}, ...|
|     34|[{360, 4.6275034}...|
|     40|[{413, 5.209754},...|
|     41|[{572, 4.7742066}...|
+-------+--------------------+
only showing top 20 rows



In [27]:
# Generate top 10 movie recommendations for a specified set of users
users = df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = df.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

In [28]:
userSubsetRecs.show()
movieSubSetRecs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[{177593, 4.81930...|
|   463|[{32892, 4.782992...|
|   148|[{72171, 4.991331...|
+------+--------------------+

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[{53, 4.9529014},...|
|   3175|[{53, 5.0497837},...|
|   2366|[{418, 4.7256117}...|
+-------+--------------------+



# Conclusion

This concludes the introduction to pyspark ml. If you spot any errors or improvements please submit an issue or a pull request.