In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=f64cacec0e3b7cd4dbe1a0129bcc083129af92cba59396a3cd3ba2e117766eec
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [3]:
import pyspark
from pyspark.sql import SparkSession, SQLContext

In [4]:
spark = SparkSession.builder.appName('Movie Recommendation').getOrCreate()

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

lines = spark.read.text("/content/drive/MyDrive/Colab Notebooks/Big Data/ratings.dat").rdd
parts = lines.map(lambda row: row.value.split("::"))
ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                     rating=int(p[2]), timestamp=int(p[3])))

In [6]:
ratings = spark.createDataFrame(ratingsRDD)
(training, test) = ratings.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
# Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
predictions.show()

import math
result = predictions.rdd.map(lambda row: row['prediction'] - row['rating']).map(lambda x: x*x).filter(lambda x: not math.isnan(x))
mse = result.reduce(lambda x,y: x+y)

+------+-------+------+---------+----------+
|userId|movieId|rating|timestamp|prediction|
+------+-------+------+---------+----------+
|     1|     48|     5|978824351| 3.2903128|
|     1|    588|     4|978824268| 4.0765696|
|     1|   1197|     3|978302268| 4.5495815|
|     1|   1287|     5|978302039|   4.26719|
|     1|   1545|     4|978824139| 2.9463923|
|     1|   2791|     4|978302188| 4.4776206|
|     1|   2797|     4|978302039| 4.5382385|
|     1|   3105|     5|978301713|  4.193651|
|     2|    265|     4|978299026| 3.8823757|
|     2|    368|     4|978300002| 3.4251528|
|     2|    442|     3|978300025| 2.6461937|
|     2|    480|     5|978299809| 3.7572517|
|     2|    647|     3|978299351| 3.6202981|
|     2|    736|     4|978300100|  3.233206|
|     2|    780|     3|978299966| 3.5382962|
|     2|   1193|     5|978298413|  4.145354|
|     2|   1213|     2|978298458| 3.6673465|
|     2|   1245|     2|978299200| 3.2155285|
|     2|   1293|     5|978298261|  4.049032|
|     2|  