In [None]:
# I could not install Spark on local laptop. 
# That's why I am using Colab.
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Install specific Java and Spark for Python.
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java
!java -version
!pip install pyspark

Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Get:2 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Get:4 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Ign:5 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:6 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Hit:7 http://archive.ubuntu.com/ubuntu bionic InRelease
Hit:8 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:10 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Hit:11 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Get:12 http://security.ubuntu.com/ubuntu bionic-securi

In [None]:
# DataFrame object will be generated by using SparkSession
# All required libraries are imported
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder
from pyspark.sql.functions import isnan, when, count, col, explode

spark = SparkSession.builder.appName("ALS_Recommendation").getOrCreate()

In [None]:
!unzip "/content/drive/MyDrive/Spark Notebooks/ml-25m.zip" -d "/content/drive/MyDrive/Spark Notebooks"

In [None]:
# I will work on movielens dataset. 
ratings = spark.read.csv("/content/drive/MyDrive/Spark Notebooks/ratings.csv", inferSchema=True, header=True)

In [None]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [None]:
ratings = ratings.\
    withColumn('userId', ratings['userId'].cast('integer')).\
    withColumn('movieId', ratings['movieId'].cast('integer')).\
    withColumn('rating', ratings['rating'].cast('float')).\
    drop('timestamp')
ratings.show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
|     1|    665|   5.0|
|     1|    899|   3.5|
|     1|   1088|   4.0|
|     1|   1175|   3.5|
|     1|   1217|   3.5|
|     1|   1237|   5.0|
|     1|   1250|   4.0|
|     1|   1260|   3.5|
|     1|   1653|   4.0|
|     1|   2011|   2.5|
|     1|   2012|   2.5|
|     1|   2068|   2.5|
|     1|   2161|   3.5|
|     1|   2351|   4.5|
|     1|   2573|   4.0|
|     1|   2632|   5.0|
|     1|   2692|   5.0|
+------+-------+------+
only showing top 20 rows



In [None]:
print("Number of rows: ", ratings.count())
print("Number of columns: ", len(ratings.columns))

Number of rows:  819474
Number of columns:  3


In [None]:
# Null Analysis
ratings.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in ratings.columns]).show()

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     0|      0|     0|
+------+-------+------+



In [None]:
# What is the Sparsity of Data ?
rating_n = ratings.select("rating").count() # total number of ratings
user_n = ratings.select("userId").distinct().count() # number of distinct userIds
movie_n = ratings.select("movieId").distinct().count() # number of distinct movieIds
expected_n = user_n * movie_n

spars_perc = (1.0 - rating_n/expected_n)*100
print("Movilens data is ", "%.3f" % spars_perc + "% sparse.")

Movilens data is  99.281% sparse.


In [None]:
SEED = 12345

# Create test and train set
trainDF, testDF = ratings.randomSplit([0.8, 0.2], seed = SEED)
#eva = RankingMetrics(metricName="ndcgAt(3)", labelCol="rating", predictionCol="prediction")
eva = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") # in ALS rec, the metric is genereally RMSE

In [None]:
# ALS
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")

model_simple = als.fit(trainDF)

resultDF = model_simple.transform(trainDF) 
accuracy = eva.evaluate(resultDF)
print("Training Set RMSE: ", accuracy) 

resultDF = model_simple.transform(testDF) 
accuracy = eva.evaluate(resultDF)
print("Test Set RMSE: ", accuracy) # High Bias & Small Overfitting Issue

Training Set RMSE:  0.7269250636804417
Test Set RMSE:  0.830943993512455


In [None]:
resultDF.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|  1238|  32460|   4.0|  4.455361|
|  1829|   1580|   3.0| 3.7102973|
|   858|   1342|   0.5| 2.3080688|
|   858|   2142|   4.0| 2.5964804|
|   897|   3175|   3.5|  3.110111|
|  2811|   1088|   2.5| 3.1564202|
|   516|    833|   3.0| 2.5749044|
|  1339|   1580|   3.0| 2.9960284|
|  1975|   3175|   4.5| 3.5792768|
|  2025|   1580|   4.0| 3.4870303|
|  2025|   1959|   3.0|  3.382549|
|   879|  68135|   5.0| 4.9348774|
|  1977|   4519|   3.0| 2.7046783|
|  1977|   5300|   4.0| 2.3205984|
|  1977|   8638|   3.0| 3.3015082|
|   481|   1580|   4.0| 3.6451063|
|   588|   1580|   2.5| 2.7053306|
|  2247|   1088|   4.0| 2.5497398|
|  2559|    471|   3.0| 3.0884092|
|  3105|  33722|   4.0| 2.9010663|
+------+-------+------+----------+
only showing top 20 rows



In [None]:
# Make top-n recommendations for each users in the dataset
nrec = model_simple.recommendForAllUsers(1)
nrec.limit(10).show(truncate=False)



+------+---------------------+
|userId|recommendations      |
+------+---------------------+
|1     |[{69699, 5.6115956}] |
|3     |[{168760, 5.506734}] |
|5     |[{101850, 5.347312}] |
|6     |[{198185, 5.3276887}]|
|9     |[{26325, 6.0876036}] |
|12    |[{168760, 4.9787526}]|
|13    |[{127019, 5.327953}] |
|15    |[{69699, 6.190282}]  |
|16    |[{69699, 5.562439}]  |
|17    |[{26325, 5.607745}]  |
+------+---------------------+



In [None]:
nrec = nrec.withColumn("rec_exp", explode("recommendations")).select('userId', col("rec_exp.movieId"), col("rec_exp.rating"))

nrec.limit(10).show(truncate=False)

+------+-------+---------+
|userId|movieId|rating   |
+------+-------+---------+
|1     |69699  |5.6115956|
|3     |168760 |5.506734 |
|5     |101850 |5.347312 |
|6     |198185 |5.3276887|
|9     |26325  |6.0876036|
|12    |168760 |4.9787526|
|13    |127019 |5.327953 |
|15    |69699  |6.190282 |
|16    |69699  |5.562439 |
|17    |26325  |5.607745 |
+------+-------+---------+



In [None]:
# Hyperparameter Tuning
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, implicitPrefs = False, coldStartStrategy="drop")
# an option to force non negative constraints on ALS. 
# set the coldStartStrategy parameter to “drop” in order to drop any rows in the DataFrame of predictions that contain NaN values. The evaluation metric will then be computed over the non-NaN data and will be valid
# alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0)
#rank is the number of latent factors in the model (defaults to 10)
params = ParamGridBuilder().addGrid(als.rank, [2, 5, 10]) \
                            .addGrid(als.regParam, [0.1, 1, 3, 10]) \
                            .build()
#[10, 20, 50, 100]
# [0.01, 0.05, 0.1]
# [1, 0.5, 0.1, 0.01]
validator = CrossValidator(estimator=als,
                                estimatorParamMaps=params,
                                evaluator=eva,
                                numFolds=5,
                                parallelism=4,
                                seed=SEED,
                          collectSubModels=True)

model = validator.fit(trainDF)

print(model.subModels)

print ("Num models to be tested without considering folds: ", len(params))


print("Num of Iteration: ", model.bestModel._java_obj.parent().getRank())
print("Regularization parameter: ", model.bestModel._java_obj.parent().getRegParam())
print("ElasticNet mixing parameter: ", model.bestModel._java_obj.parent().getAlpha())

resultDF = model.transform(testDF) # Best model will be selected here

accuracy = eva.evaluate(resultDF)
print("Test Set RMSE: ", accuracy)