In [None]:
appname = "Recommendation systems - Movielens"

# Look into https://spark.apache.org/downloads.html for the latest version
spark_mirror = "https://mirrors.sonic.net/apache/spark"
spark_version = "3.3.1"
hadoop_version = "3"

# Install Java 8 (Spark does not work with newer Java versions)
! apt-get update
! apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Download and extract Spark binary distribution
! rm -rf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz spark-{spark_version}-bin-hadoop{hadoop_version}
! wget -q {spark_mirror}/spark-{spark_version}/spark-{spark_version}-bin-hadoop{hadoop_version}.tgz
! tar xzf spark-{spark_version}-bin-hadoop{hadoop_version}.tgz

# The only 2 environment variables needed to set up Java and Spark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/spark-{spark_version}-bin-hadoop{hadoop_version}"

# Set up the Spark environment based on the environment variable SPARK_HOME 
! pip install -q findspark
import findspark
findspark.init()

# Get the Spark session object (basic entry point for every operation)
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(appname).master("local[*]").getOrCreate()

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Hit:2 http://archive.ubuntu.com/ubuntu bionic InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:8 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:9 http://archive.ubuntu.com/ubuntu bionic-backports InRelease [83.3 kB]
Hit:10 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:11 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:12 http://ppa.launchpad.net/graphics-drivers/ppa/ubuntu bionic InRelease [21.3 kB]
Get:14 http://archive.ubuntu.co

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import os
os.environ["KAGGLE_CONFIG_DIR"] = "/content/drive/MyDrive/kaggle"

from kaggle.api.kaggle_api_extended import KaggleApi
api = KaggleApi()
api.authenticate()
! rm -f movielens-20m-dataset.zip 
! rm -f genome_scores.csv genome_tags.csv link.csv movie.csv rating.csv tag.csv
api.dataset_download_files("grouplens/movielens-20m-dataset")
! unzip movielens-20m-dataset.zip
df = spark.read.format('csv').options(inferSchema=True, header=True).load('rating.csv')
movies = spark.read.format('csv').options(inferSchema=True, header=True).load('movie.csv')

Archive:  movielens-20m-dataset.zip
  inflating: genome_scores.csv       
  inflating: genome_tags.csv         
  inflating: link.csv                
  inflating: movie.csv               
  inflating: rating.csv              
  inflating: tag.csv                 


In [None]:
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)



In [None]:
df.na.drop().count()

20000263

In [None]:
df.describe().show()

+-------+-----------------+------------------+------------------+
|summary|           userId|           movieId|            rating|
+-------+-----------------+------------------+------------------+
|  count|         20000263|          20000263|          20000263|
|   mean|69045.87258292554| 9041.567330339605|3.5255285642993797|
| stddev|40038.62665316267|19789.477445413264| 1.051988919294229|
|    min|                1|                 1|               0.5|
|    max|           138493|            131262|               5.0|
+-------+-----------------+------------------+------------------+



In [None]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

estimator = ALS(userCol='userId', 
                itemCol='movieId', 
                ratingCol='rating', 
                coldStartStrategy='drop')

evaluator = RegressionEvaluator(metricName='rmse', 
                                labelCol='rating', 
                                predictionCol='prediction')

params = ParamGridBuilder() \
            .addGrid(estimator.rank, [5]) \ #You should always try increasing ranks until reaching an elbow (because more ranks implies more computing time)
            .addGrid(estimator.regParam, [.1]) \
            .build()

crossval = CrossValidator(estimator=estimator,
                          estimatorParamMaps=params,
                          evaluator=evaluator,
                          numFolds=3)

model = crossval.fit(df)
results = model.transform(df)

In [None]:
import numpy as np
for param, value in model.getEstimatorParamMaps()[np.argmax(model.avgMetrics)].items():
  print (f"{param}: {value}")

ALS_1f8aa27056d5__rank: 5
ALS_1f8aa27056d5__regParam: 0.1


In [None]:
results.show(10)

+------+-------+------+-------------------+----------+
|userId|movieId|rating|          timestamp|prediction|
+------+-------+------+-------------------+----------+
|   148|      7|   4.0|2002-04-16 14:27:44| 3.8135636|
|   148|     17|   5.0|2002-04-16 14:22:47|  4.403224|
|   148|     18|   1.0|2002-04-16 14:07:27|   2.39471|
|   148|     25|   2.0|2002-04-16 14:28:15| 3.1632519|
|   148|     39|   4.0|2002-04-16 14:24:46| 3.5958822|
|   148|     46|   4.0|2002-04-16 14:25:24| 3.5198455|
|   148|     58|   4.0|2002-04-16 14:29:01| 4.0450335|
|   148|     74|   3.0|2002-04-16 14:29:01| 3.4309003|
|   148|     86|   2.0|2002-04-16 14:06:04| 3.5072398|
|   148|    163|   1.0|2002-04-16 14:29:01| 2.8306003|
+------+-------+------+-------------------+----------+
only showing top 10 rows



In [None]:
evaluator.evaluate(results)

0.792542699982798

In [None]:
# Generate 3 Recommendations for all users
recs = model.bestModel.recommendForAllUsers(3)
recs.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{129536, 5.54601...|
|     3|[{129536, 6.26761...|
|     5|[{126219, 8.08693...|
|     6|[{126219, 7.29649...|
|     9|[{121029, 5.40190...|
+------+--------------------+
only showing top 5 rows



In [None]:
recs.printSchema()

root
 |-- userId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- movieId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



This is a way to divide a struct(element of an array made by two elements)

In [None]:
from pyspark.sql.functions import explode, col
nrecs = recs.withColumn("rec", explode("recommendations"))\
    .select('userId', col("rec.movieId"), col("rec.rating")).limit(10)
nrecs.show()

+------+-------+---------+
|userId|movieId|   rating|
+------+-------+---------+
|     1| 129536|5.5460114|
|     1| 117907| 5.444763|
|     1| 112577|5.3575153|
|     3| 129536|6.2676115|
|     3| 126219|6.2378845|
|     3| 117907|6.1491456|
|     5| 126219| 8.086935|
|     5| 121029|6.2746572|
|     5|  77736| 6.208368|
|     6| 126219|7.2964945|
+------+-------+---------+



In [None]:
nrecs.join(movies, on='movieId').filter('userId = 3').show()

+-------+------+---------+--------------------+------------------+
|movieId|userId|   rating|               title|            genres|
+-------+------+---------+--------------------+------------------+
| 117907|     3|6.1491456|My Brother Tom (2...|             Drama|
| 126219|     3|6.2378845|    Marihuana (1936)| Documentary|Drama|
| 129536|     3|6.2676115|Code Name Coq Rou...|(no genres listed)|
+-------+------+---------+--------------------+------------------+



In [None]:
# Generate top 10 movie recommendations for a specified set of users
users = df.select(estimator.getUserCol()).distinct().limit(3)
userSubsetRecs = model.bestModel.recommendForUserSubset(users, 10)

userSubsetRecs.withColumn("rec", explode("recommendations"))\
    .select('userId', col("rec.movieId"), col("rec.rating"))\
    .join(movies, on='movieId').show(truncate=False)

+-------+------+---------+--------------------+--------------------+
|movieId|userId|   rating|               title|              genres|
+-------+------+---------+--------------------+--------------------+
| 126219|   471| 6.386113|    Marihuana (1936)|   Documentary|Drama|
|  77736|   471|5.1473484|Crazy Stone (Feng...|        Comedy|Crime|
| 101855|   471|5.0245543|Shepard & Dark (2...|         Documentary|
| 120821|   471|  4.95501|The War at Home (...|     Documentary|War|
| 112942|   471|4.9274178|   Sky Murder (1940)|Action|Adventure|...|
| 117907|   471| 4.902201|My Brother Tom (2...|               Drama|
|  86237|   471|4.8977923|  Connections (1978)|         Documentary|
| 128187|   471|4.8617964| Freedom Song (2000)|               Drama|
| 112473|   471|  4.80062|Stuart: A Life Ba...|               Drama|
| 121029|   471|4.7975054|No Distance Left ...|         Documentary|
| 126219|   463|6.6757703|    Marihuana (1936)|   Documentary|Drama|
| 112473|   463|5.6879425|Stuart: 

In [None]:
# Generate top 10 user recommendations for each movie
movieRecs = model.bestModel.recommendForAllItems(10)
movieRecs.show(5)

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{138139, 5.52531...|
|      3|[{108389, 4.95321...|
|      5|[{92390, 5.029114...|
|      6|[{30542, 5.624197...|
|      9|[{122674, 4.92038...|
+-------+--------------------+
only showing top 5 rows



In [None]:
movieRecs.printSchema()

root
 |-- movieId: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- userId: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [None]:
# Generate top 10 user recommendations for a specified set of movies
movieSubset = df.select(estimator.getItemCol()).distinct().limit(3)
movieSubsetRecs = model.bestModel.recommendForItemSubset(movies, 10)

movieSubsetRecs.withColumn("rec", explode("recommendations"))\
    .select('movieId', col("rec.userId"), col("rec.rating")).show()

+-------+------+---------+
|movieId|userId|   rating|
+-------+------+---------+
|      1|138139|5.5253105|
|      1| 61498| 5.478763|
|      1|104672|5.4407544|
|      1| 76693|5.4375834|
|      1| 99259|5.4229274|
|      1|108993|5.4214816|
|      1|122277| 5.420915|
|      1| 23589| 5.419697|
|      1|129211| 5.419454|
|      1| 34232|5.4126673|
|      3|108389| 4.953211|
|      3| 95066| 4.930213|
|      3| 30542|4.8926744|
|      3| 96157|4.8886056|
|      3|122674|4.8782015|
|      3| 21838|  4.86653|
|      3| 76693|4.8639727|
|      3|122030|4.8629923|
|      3|119345|4.8525047|
|      3|129211| 4.843232|
+-------+------+---------+
only showing top 20 rows

