# Collaborative Filtering

## Low-level Spark API (pyspark.mllib)

In [1]:
from pyspark import SparkContext
from pyspark.mllib.recommendation import Rating
from pyspark.mllib.recommendation import ALS
import os

os.environ['PYSPARK_PYTHON'] = 'python'
# sc.stop()
sc = SparkContext()

In [2]:
# https://grouplens.org/datasets/movielens/ ml-latest-small.zip (size: 1 MB)
file_path = "data//ratings.csv"

# Create the RDD from a csv file
ratings = sc.textFile(file_path)

In [3]:
ratings = ratings.map(lambda l: l.split(','))
# csv has header
header = ratings.first()
ratings = ratings.filter(lambda l: l != header)
ratings = ratings.map(lambda line: (int(line[0]), (int(line[1]), float(line[2]))))

In [4]:
print(f"There are {ratings.count()} ratings")
for u, r in ratings.map(lambda x: x[0]).countByValue().items():
    if u < 6:
        print(f"There are {r} ratings for user #{u}")

There are 100836 ratings
There are 232 ratings for user #1
There are 29 ratings for user #2
There are 39 ratings for user #3
There are 216 ratings for user #4
There are 44 ratings for user #5


In [5]:
ratings = ratings.sortByKey(ascending=True)
ratings = ratings.map(lambda line: Rating(line[0], line[1][0], line[1][1]))
ratings.take(5)

[Rating(user=1, product=1, rating=4.0),
 Rating(user=1, product=3, rating=4.0),
 Rating(user=1, product=6, rating=4.0),
 Rating(user=1, product=47, rating=5.0),
 Rating(user=1, product=50, rating=5.0)]

In [6]:
training_data, test_data = ratings.randomSplit([0.8, 0.2])
test_norating_data = test_data.map(lambda p: (p[0], p[1]))

In [7]:
print(f"Train size: {training_data.count()}")
print(f"Test size: {test_data.count()}")

Train size: 80819
Test size: 20017


In [8]:
model = ALS.train(training_data, rank=10, iterations=10)
predictions = model.predictAll(test_norating_data)

In [9]:
predictions.take(1)

[Rating(user=368, product=3272, rating=3.0845464892288605)]

In [10]:
# create tuples where user-product is a key
# join rates and preds by key
# afer joining we have
# ((user, product), (y, y_hat))
rates = test_data.map(lambda x: ((x[0], x[1]), x[2]))
preds = predictions.map(lambda x: ((x[0], x[1]), x[2]))
rates_and_preds = rates.join(preds)

In [11]:
rates_and_preds.take(1)

[((1, 1089), (5.0, 4.412471655306398))]

In [12]:
MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print(f"MSE: {MSE:.2f}")

MSE: 1.29


## High-level Spark APIs (pyspark.ml)

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, FloatType
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

In [14]:
spark = SparkSession.builder.getOrCreate()

In [15]:
schema = StructType() \
      .add('userId', IntegerType(), True) \
      .add('movieId', IntegerType(), True) \
      .add('rating', FloatType(), True)

ratings_df = spark.read.options(header='True') \
                       .schema(schema) \
                       .csv(file_path) \
                       .drop('timestamp')

In [16]:
ratings_df.show(5)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|      1|   4.0|
|     1|      3|   4.0|
|     1|      6|   4.0|
|     1|     47|   5.0|
|     1|     50|   5.0|
+------+-------+------+
only showing top 5 rows



In [17]:
training, test = ratings_df.randomSplit([0.8, 0.2])
als = ALS(rank=10, maxIter=10, userCol='userId', itemCol='movieId', ratingCol='rating', coldStartStrategy='drop')
model = als.fit(training)
predictions = model.transform(test)
predictions.show(5)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|   5618|   3.0| 3.1956534|
|   148|  30816|   5.0| 3.2353656|
|   148|  44191|   4.0| 3.7005756|
|   148|  69757|   3.5|  3.922342|
|   148|  98491|   5.0|  3.826993|
+------+-------+------+----------+
only showing top 5 rows



In [18]:
evaluator = RegressionEvaluator(metricName='mae', labelCol='rating',
                                predictionCol='prediction')
MSE = evaluator.evaluate(predictions)
print(f"MSE: {MSE:.2f}")

MSE: 0.68
