# 3. Use collaborative filtering to build a movie recommendation system

Collaborative filtering is a technique that can filter out items that a user might like on the basis of reactions by similar users.

It works by searching a large group of people and finding a smaller set of users with tastes similar to a particular user. It looks at the items they like and combines them to create a ranked list of suggestions.

To experiment with recommendation algorithms, we need data that contains a set of items and a set of users who have reacted to some of the items.

The user-item matrix is a matrix where the rows represent users and the columns represent items. 

Workflow : 
1. create a user-item matrix 
2. cluster both users and items dataset
2. prediction system : find the rate given by user 2 for item 7 <br>
    a. find cluster of user 2 and item 7 <br>
    b. aggregate ratings from corresponding clusters (agg fn: med, mean...)


In [2]:
#get java and spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget	-q	https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip 
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment 
import os

os.environ["JAVA_HOME"]	= "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"]	= "/content/spark-3.0.0-bin-hadoop3.2"

!pip install -q findspark

import findspark 
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

spark.version

'3.0.0'

In [3]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col

In [4]:
from google.colab import drive
drive.mount('/content/drive');

# This file will be used for generating recommendation. This is of samller size to help support processing capabilities
ratings_df=spark.read\
.format("com.databricks.spark.csv")\
.option("multiline",True)\
.option("header",True)\
.option("escape", "\"")\
.option("inferschema",True)\
.csv("/content/drive/MyDrive/Colab Notebooks/archive/ratings_small.csv")

print((ratings_df.count(), len(ratings_df.columns)))
ratings_df.show(5, truncate=False)

Mounted at /content/drive
(100004, 4)
+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |31     |2.5   |1260759144|
|1     |1029   |3.0   |1260759179|
|1     |1061   |3.0   |1260759182|
|1     |1129   |2.0   |1260759185|
|1     |1172   |4.0   |1260759205|
+------+-------+------+----------+
only showing top 5 rows



Even with the ratings sample dataset, matrix operations take ages to run. I have no choice but to subset the ratings sample dataset in order to be able to test different solutions. In a production context, once we have interated through the tests, we re run the cells with the full dataset.

In [28]:
from pyspark.sql.functions import *

subset = ratings_df.limit(10000)
print((subset.count(), len(subset.columns)))
subset.show(5, truncate=False)

(10000, 4)
+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|1     |31     |2.5   |1260759144|
|1     |1029   |3.0   |1260759179|
|1     |1061   |3.0   |1260759182|
|1     |1129   |2.0   |1260759185|
|1     |1172   |4.0   |1260759205|
+------+-------+------+----------+
only showing top 5 rows



In [29]:
from pyspark.sql import functions as F

# Create a pivot table with user_id as index, item_id as columns and rating as values
matrix = subset.groupBy("userId").pivot("movieId").agg(F.first("rating"))

# Fill the null values with 0
matrix = matrix.na.fill(0)

In [30]:
matrix.show(5)

+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

### We use the K-means algorithm to cluster the users in the user-item matrix

In [31]:
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator 
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler


# convert the dataframe to a vector format
assembler = VectorAssembler(inputCols=matrix.columns, outputCol="features")
matrix = assembler.transform(matrix)

In [32]:
matrix.show(5)

+------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+

In [None]:
# figuring out the best number of clusters k


silhouette_score = []
evaluator = ClusteringEvaluator(predictionCol='prediction', featuresCol='standardized', metricName='silhouette', distanceMeasure='squaredEuclidean')

for k in range(5,20,2):
    
    # fit the model
    kmeans = KMeans().setK(k).setSeed(1)
    model = kmeans.fit(matrix)
    pred = model.transform(matrix)
    score = evaluator.evaluate(pred)
    silhouette_score.append(score)
    print("Silhouette Score:",score)

In [33]:
# cluster the users in the user-item matrix into k clusters

# set the number of clusters
k = 10

# fit the model
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(matrix)

In [34]:
# predict the cluster for each user
clusters = model.transform(matrix)

In [35]:
evaluator = ClusteringEvaluator()

# compute the silhouette score
silhouette = evaluator.evaluate(clusters)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.04057771563124111


### We use the K-means algorithm to cluster the items in the user-item matrix

In [36]:
# Create a pivot table with user_id as index, item_id as columns and rating as values
matrix_items = subset.groupBy("movieId").pivot("userId").agg(F.first("rating"))

# Fill the null values with 0
matrix_items = matrix_items.na.fill(0)


In [37]:
matrix_items.show(5)

+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|movieId|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 27| 28| 29| 30| 31| 32| 33| 34| 35| 36| 37| 38| 39| 40| 41| 42| 43| 44| 45| 46| 47| 48| 49| 50| 51| 52| 53| 54| 55| 56| 57| 58| 59| 60| 61| 62| 63| 64| 65| 66| 67| 68| 69| 70|
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
|     31|2.5|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|4

In [38]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

# convert the dataframe to a vector format
assembler = VectorAssembler(inputCols=matrix_items.columns, outputCol="features")
matrix_items = assembler.transform(matrix_items)

In [39]:
matrix_items.show(5)

+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------------------+
|movieId|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 27| 28| 29| 30| 31| 32| 33| 34| 35| 36| 37| 38| 39| 40| 41| 42| 43| 44| 45| 46| 47| 48| 49| 50| 51| 52| 53| 54| 55| 56| 57| 58| 59| 60| 61| 62| 63| 64| 65| 66| 67| 68| 69| 70|            features|
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------------------+
|     31|2.5|0.0|0.0|0.0|0.0|0.0|3.0|0.0|0.0|0.0|0.0|0.0|0.0|0.0|0.

In [40]:
# cluster the items in the user-item matrix into k clusters

# set the number of clusters
k = 15

# fit the model
kmeans_items = KMeans().setK(k).setSeed(1)
model_items = kmeans_items.fit(matrix_items)

In [41]:
# predict the cluster for each item
clusters_items = model_items.transform(matrix_items)

In [42]:
evaluator = ClusteringEvaluator()

# compute the silhouette score
silhouette = evaluator.evaluate(clusters_items)
print("Silhouette with squared euclidean distance = " + str(silhouette))

Silhouette with squared euclidean distance = 0.7551391162251584


### Give recommendations to a specific user

Once we have clustered the users and items, we can give movie recommendations to a specific user by recommending items that are popular among other users in the same cluster as the target user. 

* We need to find the cluster that the target user belongs to
* Then find the items that are in the same cluster as the target user 
* Then filter out the items that the target user has already interacted with
* Join the ratings dataframe with the items dataframe
* Then group the items by id and aggregate the ratings by average (or other stat)
* Finaly sort the items by average rating in descending order.

In [43]:
# Find the cluster that the target user belongs to
target_user_id = 5
target_user_cluster = clusters.filter(clusters.userId == target_user_id).select("prediction").first()[0]
target_user_cluster

4

In [44]:
# Find the items that are popular among users in the same cluster as the target user
cluster_it = clusters_items.filter(clusters_items.prediction == target_user_cluster)
cluster_it.show(5)

+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------------------+----------+
|movieId|  1|  2|  3|  4|  5|  6|  7|  8|  9| 10| 11| 12| 13| 14| 15| 16| 17| 18| 19| 20| 21| 22| 23| 24| 25| 26| 27| 28| 29| 30| 31| 32| 33| 34| 35| 36| 37| 38| 39| 40| 41| 42| 43| 44| 45| 46| 47| 48| 49| 50| 51| 52| 53| 54| 55| 56| 57| 58| 59| 60| 61| 62| 63| 64| 65| 66| 67| 68| 69| 70|            features|prediction|
+-------+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+--------------------+----------+
|  84236|0.0|0.0|4.0|0.0|0.0|0.0|0

In [58]:
# Extract the item_id from the cluster_items
cluster_movies = cluster_it.select('movieId')

In [59]:
# Filter the items that the target user has not interacted with yet 
# and join the ratings dataframe with the items dataframe
target_user_items = subset.filter(subset.userId != target_user_id).select('movieId')
cluster_movies_ = cluster_movies.subtract(target_user_items)

# join cluster_items (popular items among user's cluster) and ratings_df on movieId column
cluster_movies = cluster_movies.join(subset, 'movieId')
cluster_movies.show(5)

+-------+------+------+----------+
|movieId|userId|rating| timestamp|
+-------+------+------+----------+
|  84236|     3|   4.0|1298922130|
|  77455|    11|   4.5|1391658141|
|  79132|    11|   4.0|1391658115|
|  80489|    11|   4.5|1391658399|
|  80906|    11|   3.0|1391658137|
+-------+------+------+----------+
only showing top 5 rows



In [60]:
# group the items by item_id and aggregate the ratings by average
cluster_movies = cluster_movies.groupBy("movieId").agg({"rating": "avg"})

# show the average rating for each item
cluster_items = cluster_movies.sort(col("avg(rating)").desc())
cluster_items.show(5)

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
|  79677|        5.0|
|  77846|        5.0|
|  85438|        5.0|
|  86000|        5.0|
|  81156|        5.0|
+-------+-----------+
only showing top 5 rows



In [61]:
# Making a proper function out of this code

def suggest_top_n_movies(target_user_id: int, nb_movies: int):

  # Find the cluster that the target user belongs to
  target_user_cluster = clusters.filter(clusters.userId == target_user_id).select("prediction").first()[0]

  # Find the items that are popular among users in the same cluster as the target user
  cluster_it = clusters_items.filter(clusters_items.prediction == target_user_cluster)

  # Extract the item_id from the cluster_items
  cluster_movies = cluster_it.select('movieId')

  # Filter the items that the target user has not interacted with yet 
  # and join the ratings dataframe with the items dataframe
  target_user_items = subset.filter(subset.userId != target_user_id).select('movieId')
  cluster_movies_ = cluster_movies.subtract(target_user_items)

  # join cluster_items (popular items among user's cluster) and ratings_df on movieId column
  cluster_movies = cluster_movies.join(subset, 'movieId')

  # group the items by item_id and aggregate the ratings by average
  cluster_movies = cluster_movies.groupBy("movieId").agg({"rating": "avg"})

  # show the average rating for each item
  cluster_items = cluster_movies.sort(col("avg(rating)").desc())
  cluster_items.show(nb_movies)


suggest_top_n_movies(5, 10)

+-------+-----------+
|movieId|avg(rating)|
+-------+-----------+
|  81156|        5.0|
|  77846|        5.0|
|  85438|        5.0|
|  79677|        5.0|
|  86000|        5.0|
|  86781|       4.75|
|  78574|        4.5|
|  84187|        4.5|
|  89492|        4.5|
|  90428|        4.5|
+-------+-----------+
only showing top 10 rows

