# Spark Collaborative Filtering

Collaborative Filtering with Spark 3.0

In [1]:
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.master("spark://spark-master:7077").getOrCreate()   

## Getting the MovieLens Data

In [10]:
!wget http://files.grouplens.org/datasets/movielens/ml-25m.zip -P /opt/spark-data/
!wget http://files.grouplens.org/datasets/movielens/ml-25m.zip.md5 -P /opt/spark-data/
!md5sum -c /opt/spark-data/*.md5 

--2020-06-23 01:12:55--  http://files.grouplens.org/datasets/movielens/ml-25m.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 261978986 (250M) [application/zip]
Saving to: ‘/opt/spark-data/ml-25m.zip’


2020-06-23 01:14:15 (3.11 MB/s) - ‘/opt/spark-data/ml-25m.zip’ saved [261978986/261978986]

--2020-06-23 01:14:16--  http://files.grouplens.org/datasets/movielens/ml-25m.zip.md5
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 45 [application/zip]
Saving to: ‘/opt/spark-data/ml-25m.zip.md5’


2020-06-23 01:14:17 (7.22 MB/s) - ‘/opt/spark-data/ml-25m.zip.md5’ saved [45/45]

md5sum: ml-25m.zip: No such file or directory
ml-25m.zip: FAILED open or read


In [12]:
!ls /opt/spark-data/ml-25m

genome-scores.csv  links.csv   ratings.csv  tags.csv
genome-tags.csv    movies.csv  README.txt


# Explore and build model

In [3]:
# Create Schema
from pyspark.sql.types import *

In [4]:
schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", DecimalType(), True),
    StructField("timestamp", DoubleType(), True),
])

In [5]:
RawLines = spark.read.schema(schema).csv("/opt/spark-data/ml-25m/ratings.csv", header=True)

In [6]:
RawLines.take(10)

[Row(userId=1, movieId=296, rating=Decimal('5'), timestamp=1147880044.0),
 Row(userId=1, movieId=306, rating=Decimal('4'), timestamp=1147868817.0),
 Row(userId=1, movieId=307, rating=Decimal('5'), timestamp=1147868828.0),
 Row(userId=1, movieId=665, rating=Decimal('5'), timestamp=1147878820.0),
 Row(userId=1, movieId=899, rating=Decimal('4'), timestamp=1147868510.0),
 Row(userId=1, movieId=1088, rating=Decimal('4'), timestamp=1147868495.0),
 Row(userId=1, movieId=1175, rating=Decimal('4'), timestamp=1147868826.0),
 Row(userId=1, movieId=1217, rating=Decimal('4'), timestamp=1147878326.0),
 Row(userId=1, movieId=1237, rating=Decimal('5'), timestamp=1147868839.0),
 Row(userId=1, movieId=1250, rating=Decimal('4'), timestamp=1147868414.0)]

In [7]:
(training, test) = RawLines.randomSplit([0.8, 0.2])

# Modelling with ALS

In [8]:
from pyspark.ml.recommendation import ALS

In [9]:
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
model = als.fit(training)

# Examine Predictions

In [10]:
from pyspark.ml.evaluation import RegressionEvaluator

In [11]:
# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

In [12]:
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.8267054042943826


Close Out

In [13]:
# Commented out so that we can run all without stopping our session
# spark.stop