In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=b8801b1326f9f8cd924c1178d09d114acee5222f62cc5bb5952187d6694102f8
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
import pyspark
from pyspark.sql import SparkSession, SQLContext

In [4]:
spark = SparkSession.builder.appName('Movie Recommendation').getOrCreate()

In [7]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("/content/ratings.dat").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=int(p[2]), timestamp=int(p[3])))

In [8]:
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
predictions.show()

import math
result = predictions.rdd.map(lambda row: row['prediction'] - row['rating']).map(lambda x: x*x).filter(lambda x: not math.isnan(x))
mse = result.reduce(lambda x,y: x+y)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|    260|     4|978300760| 4.2940583|
|     1|    588|     4|978824268| 4.0924025|
|     1|    594|     4|978302268|  4.315671|
|     1|    745|     3|978824268|  4.079152|
|     1|    914|     3|978301968|  4.632781|
|     1|    938|     4|978301752|  4.889193|
|     1|   1246|     4|978302091| 4.7879314|
|     1|   1907|     4|978824330| 4.7883625|
|     1|   2791|     4|978302188| 3.1937318|
|     1|   2918|     4|978302124|  4.152903|
|     1|   3186|     4|978300019|   4.19529|
|     2|    163|     4|978299809| 2.6491513|
|     2|    235|     3|978299351| 2.4286256|
|     2|    265|     4|978299026| 3.9984334|
|     2|    442|     3|978300025| 2.9161165|
|     2|    457|     4|978299773|  4.170656|
|     2|    593|     5|978298517| 4.1588864|
|     2|    902|     2|978298905| 4.4323697|
|     2|   1090|     2|978298580| 4.1181197|
|     2|  