In [None]:
#requirements to create spark session
!pip install ipython-autotime
%load_ext autotime

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://dlcdn.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz

!tar xf /content/spark-3.0.3-bin-hadoop2.7.tgz

!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

import findspark
findspark.init()


from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("MovieRecommendation")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

Collecting ipython-autotime
  Downloading ipython_autotime-0.3.1-py2.py3-none-any.whl (6.8 kB)
Installing collected packages: ipython-autotime
Successfully installed ipython-autotime-0.3.1
time: 26.5 s (started: 2022-01-21 18:57:43 +00:00)


In [None]:
#unrar ratings and movies csv
!unrar e /content/drive/MyDrive/ratings.rar
!unrar e /content/drive/MyDrive/movies.rar


UNRAR 5.50 freeware      Copyright (c) 1993-2017 Alexander Roshal


Extracting from /content/drive/MyDrive/ratings.rar

Extracting  ratings.csv                                                    2%  4%  7%  9% 12% 14% 17% 19% 22% 24% 27% 29% 32% 34% 37% 39% 42% 44% 47% 49% 52% 54% 57% 59% 62% 64% 67% 69% 72% 74% 77% 79% 82% 84% 87% 89% 92% 94% 97% 99%  OK 
All OK

UNRAR 5.50 freeware      Copyright (c) 1993-2017 Alexander Roshal


Extracting from /content/drive/MyDrive/movies.rar

Extracting  movies.csv                                                    99%  OK 
All OK
time: 10.4 s (started: 2022-01-21 19:06:16 +00:00)


In [None]:
#read csv files as spark dataframe
ratingsDf = spark.read.csv("/content/ratings.csv", inferSchema=True, header = True)

time: 38.9 s (started: 2022-01-21 19:07:32 +00:00)


In [None]:
moviesDf = spark.read.csv("/content/movies.csv", inferSchema=True, header = True)

time: 723 ms (started: 2022-01-21 19:08:14 +00:00)


In [None]:
#import ml functions
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

time: 242 ms (started: 2022-01-21 19:16:27 +00:00)


In [None]:
ratingsDf.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows

time: 194 ms (started: 2022-01-21 19:17:48 +00:00)


In [None]:
#join movies and ratings
joinedDf = ratingsDf.join(moviesDf, "movieId" , "left")

time: 33.1 ms (started: 2022-01-21 19:14:21 +00:00)


In [None]:
#split train and test data
(train,test) = joinedDf.randomSplit([0.8,0.2])

time: 40.5 ms (started: 2022-01-21 19:14:55 +00:00)


In [None]:
#create ALS instance and evaluator instance
als = ALS(maxIter=5, regParam=0.01, userCol = "userId", itemCol = "movieId" , ratingCol = "rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")
eval = RegressionEvaluator(metricName="rmse", labelCol = "rating", predictionCol="prediction")

time: 71.8 ms (started: 2022-01-21 19:21:34 +00:00)


In [None]:
#fit model
model = als.fit(train)

time: 3min 20s (started: 2022-01-21 19:22:12 +00:00)


In [None]:
#predict on test sat
pred = model.transform(test)

time: 165 ms (started: 2022-01-21 19:26:10 +00:00)


In [None]:
pred.show()

+-------+------+------+----------+--------------------+------+----------+
|movieId|userId|rating| timestamp|               title|genres|prediction|
+-------+------+------+----------+--------------------+------+----------+
|    148| 33138|   2.5|1120729036|Awfully Big Adven...| Drama|  2.616691|
|    148| 35969|   2.0| 835094487|Awfully Big Adven...| Drama| 2.8453736|
|    148| 60359|   3.0| 895230335|Awfully Big Adven...| Drama| 3.0563157|
|    148| 72637|   3.0| 845637336|Awfully Big Adven...| Drama| 2.7196202|
|    148| 14831|   3.0| 944148276|Awfully Big Adven...| Drama| 2.5379426|
|    148| 31171|   2.0| 855003432|Awfully Big Adven...| Drama| 2.6036816|
|    148|145182|   3.0| 944952722|Awfully Big Adven...| Drama|  2.604998|
|    148|122011|   2.0|1030400425|Awfully Big Adven...| Drama| 1.9040611|
|    148|104825|   4.0| 950909863|Awfully Big Adven...| Drama| 3.0337806|
|    148| 80974|   3.5|1138041704|Awfully Big Adven...| Drama|  2.595951|
|    148| 84667|   5.0| 832207176|Awfu

In [None]:
#model evaluation
rmse = eval.evaluate(pred)
print(f"RMSE : {rmse}")

RMSE : 0.8206877198091518
time: 1min 47s (started: 2022-01-21 19:28:37 +00:00)


In [None]:
#generate 10 recommendation for each user
recs = model.recommendForAllUsers(10)

time: 739 ms (started: 2022-01-21 19:35:38 +00:00)


In [None]:
recs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   148|[[152890, 11.8044...|
|   463|[[169268, 21.6869...|
|   471|[[159467, 16.7493...|
|   496|[[146240, 17.0542...|
|   833|[[153550, 14.0488...|
|  1088|[[168314, 18.3119...|
|  1238|[[134673, 13.1662...|
|  1342|[[134673, 10.9081...|
|  1580|[[135757, 23.2020...|
|  1591|[[140353, 22.6117...|
|  1645|[[135757, 21.3701...|
|  1829|[[135757, 16.0226...|
|  1959|[[159467, 19.1480...|
|  2122|[[194268, 20.5756...|
|  2142|[[159467, 21.5881...|
|  2366|[[162740, 14.2473...|
|  2659|[[194268, 22.6220...|
|  2866|[[152890, 17.2809...|
|  3175|[[135757, 15.6132...|
|  3749|[[200196, 25.5390...|
+------+--------------------+
only showing top 20 rows

time: 9min 34s (started: 2022-01-21 19:35:44 +00:00)


In [None]:
#create new column movieid_rating
df = recs.withColumn("movieid_rating", explode("recommendations"))

time: 56.8 ms (started: 2022-01-21 19:45:54 +00:00)


In [None]:
#select the necessary columns
df2 = df.select("userId", col("movieid_rating.movieId"), col("movieid_rating.rating"))

time: 30.9 ms (started: 2022-01-21 20:03:43 +00:00)


In [None]:
df2.show(50)

+------+-------+----------+
|userId|movieId|    rating|
+------+-------+----------+
|   148| 152890|  11.80447|
|   148| 153550| 11.354564|
|   148| 159467| 10.674363|
|   148| 197597| 10.130451|
|   148| 137174| 10.082955|
|   148| 134673| 10.074032|
|   148| 165715|  9.947281|
|   148| 197331|  9.735102|
|   148| 151965|  9.631022|
|   148| 146240| 9.2446575|
|   463| 169268| 21.686922|
|   463| 185669|  20.21122|
|   463| 135757| 19.756544|
|   463| 159133| 19.094116|
|   463| 159137|  18.43109|
|   463| 192689| 18.152624|
|   463|  95697|  17.78563|
|   463| 187503| 17.603071|
|   463| 133387|  17.41904|
|   463| 197931| 17.093155|
|   471| 159467|  16.74932|
|   471| 180169| 14.600527|
|   471|  26968| 14.406885|
|   471|  84996| 14.167627|
|   471|  95697| 14.051222|
|   471| 152890| 14.030488|
|   471| 165715| 13.888521|
|   471| 102980|13.6552925|
|   471| 175275| 13.514925|
|   471| 148791|   13.2492|
|   496| 146240| 17.054262|
|   496| 152890| 15.787232|
|   496| 189159| 14.

In [None]:
#write data frame into csv
df2.write.option("header",True).csv("/content/drive/MyDrive/data_images/recommendations.csv")

time: 9min 59s (started: 2022-01-21 20:36:46 +00:00)


In [None]:
#concatenate files
os.system("cat /content/drive/MyDrive/data_images/recommendations.csv/p* > /content/drive/MyDrive/recommendations.csv")

0

time: 854 ms (started: 2022-01-21 20:47:21 +00:00)
