# 0. Problem Statement:

We have user-interaction data for movies and users; (Ratings)
<br>We have to recommend movies to user those movies which they have not watched but are more likely to watch/ enjoy.

We are using a ALS model (Matrix Factoriztion model) from spark.
<br>This model is a collaborative filtering recommendation model,
<br>and better models like deep learning recommendation model and hybrid models can capture even more complex relationship between user features and item features.

# 1. Import libraries

In [1]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.conf import SparkConf
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS

# 2. Initialize SparkSession for local driver

In [2]:
spark = SparkSession.builder \
        .appName("ALS") \
        .config("spark.driver.memory", "2g") \
        .config("spark.executor.memory", "2g") \
        .config("spark.executor.cores", "8") \
        .getOrCreate()

23/03/31 10:53:35 WARN Utils: Your hostname, pascal-G3-3579 resolves to a loopback address: 127.0.1.1; using 192.168.13.145 instead (on interface wlo1)
23/03/31 10:53:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/31 10:53:36 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Web UI access for Spark

Generates the ip and port used by spark.

In [None]:
spark.sparkContext.uiWebUrl

# 3. Importing data: 

### user-interaction data

In [4]:
ratings = spark.read.format('csv').option('header','true').option('inferSchema','true').load('ratings.csv')

                                                                                

In [5]:
ratings.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    110|   1.0|1425941529|
|     1|    147|   4.5|1425942435|
|     1|    858|   5.0|1425941523|
|     1|   1221|   5.0|1425941546|
|     1|   1246|   5.0|1425941556|
|     1|   1968|   4.0|1425942148|
|     1|   2762|   4.5|1425941300|
|     1|   2918|   5.0|1425941593|
|     1|   2959|   4.0|1425941601|
|     1|   4226|   4.0|1425942228|
|     1|   4878|   5.0|1425941434|
|     1|   5577|   5.0|1425941397|
|     1|  33794|   4.0|1425942005|
|     1|  54503|   3.5|1425941313|
|     1|  58559|   4.0|1425942007|
|     1|  59315|   5.0|1425941502|
|     1|  68358|   5.0|1425941464|
|     1|  69844|   5.0|1425942139|
|     1|  73017|   5.0|1425942699|
|     1|  81834|   5.0|1425942133|
+------+-------+------+----------+
only showing top 20 rows



# 4. Preprocess data using sql query

In [6]:
ratings.createOrReplaceTempView('ratings_q')
data = spark.sql("SELECT userId, movieId, rating FROM ratings_q")

In [7]:
data.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [8]:
(training, test) = data.randomSplit([0.8, 0.2])

In [9]:
training.show(5)

[Stage 3:>                                                          (0 + 1) / 1]

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    110|   1.0|
|     1|    147|   4.5|
|     1|   1221|   5.0|
|     1|   1246|   5.0|
|     1|   2762|   4.5|
+------+-------+------+
only showing top 5 rows



                                                                                

In [10]:
test.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    858|   5.0|
|     1|   1968|   4.0|
|     1|   2959|   4.0|
|     1|   4878|   5.0|
|     1|  33794|   4.0|
+------+-------+------+
only showing top 5 rows



                                                                                

# 5. ALS Model training

In [11]:
als = ALS(regParam=0.1,userCol="userId", itemCol="movieId", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training)

                                                                                

# 6. Model Evaluation

In [12]:
p_t = model.transform(training)
p_t.createOrReplaceTempView('p_t_q')
p_t = spark.sql('select * from p_t_q where prediction between 0 and 5')

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(p_t)
print("Root-mean-square error for training data = " + str(rmse))



Root-mean-square error for training data = 0.7647781702300647


                                                                                

In [13]:
predictions = model.transform(test)
predictions.createOrReplaceTempView('predictions_q')
predictions = spark.sql('select * from predictions_q where prediction between 0 and 6')

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error for test data = " + str(rmse))



Root-mean-square error for test data = 0.8142715685205713


                                                                                

### 0.81 for test dataset is ok for recommendation

### This means that in average each rating prediction vary from its actual rating within 0.81 value

# 7. Model sample for test

In [14]:
model.transform(test).show(20)

                                                                                

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    12|     32|   2.0| 3.9930055|
|    12|     34|   5.0|   3.57819|
|    12|     97|   4.0|  4.344831|
|    12|    125|   5.0| 3.9484487|
|    12|    232|   4.0| 4.1938753|
|    12|    235|   5.0| 4.0297747|
|    12|    293|   4.0|  4.035114|
|    12|    307|   5.0| 4.3809977|
|    12|    337|   4.0|  3.970195|
|    12|    501|   5.0| 4.1721406|
|    12|    529|   4.0| 3.7749987|
|    12|    581|   5.0|  4.140504|
|    12|    866|   3.0|  3.782192|
|    12|    916|   4.0| 4.0173793|
|    12|   1036|   4.0| 3.4112544|
|    12|   1059|   4.0| 3.6425056|
|    12|   1086|   4.0|  4.060757|
|    12|   1111|   5.0|  4.253432|
|    12|   1185|   4.0| 4.0638943|
|    12|   1207|   4.0|  4.278233|
+------+-------+------+----------+
only showing top 20 rows



# 8. Generating Recommendation

In [15]:
userRecs = model.recommendForAllUsers(10)
movieRecs = model.recommendForAllItems(10)

# 9. Recommendation sample

In [16]:
temp = userRecs.toPandas()

                                                                                

### For user id = 1; we have movie recommendation of movieId:

In [17]:
temp[temp['userId']==1].iloc[0,1]

[Row(movieId=101862, rating=6.973142623901367),
 Row(movieId=135113, rating=6.735901355743408),
 Row(movieId=107252, rating=6.522273540496826),
 Row(movieId=159761, rating=6.400022506713867),
 Row(movieId=74061, rating=6.370952606201172),
 Row(movieId=147124, rating=6.20601749420166),
 Row(movieId=173871, rating=6.117490768432617),
 Row(movieId=98275, rating=6.113006114959717),
 Row(movieId=118236, rating=6.102718353271484),
 Row(movieId=146501, rating=6.072976112365723)]

### For user id = 2; we have movie recommendation of movieId:

In [18]:
temp[temp['userId']==2].iloc[0,1]

[Row(movieId=159761, rating=5.1935553550720215),
 Row(movieId=151615, rating=4.9627509117126465),
 Row(movieId=131586, rating=4.884283542633057),
 Row(movieId=146724, rating=4.865076065063477),
 Row(movieId=166812, rating=4.859807968139648),
 Row(movieId=107252, rating=4.8042073249816895),
 Row(movieId=164937, rating=4.7965922355651855),
 Row(movieId=163799, rating=4.793788909912109),
 Row(movieId=154588, rating=4.752204418182373),
 Row(movieId=173153, rating=4.71326208114624)]

### For user id = 3; we have movie recommendation of movieId:

In [19]:
temp[temp['userId']==3].iloc[0,1]

[Row(movieId=175335, rating=4.224606990814209),
 Row(movieId=66389, rating=4.212313652038574),
 Row(movieId=101862, rating=4.198924541473389),
 Row(movieId=165689, rating=4.176628589630127),
 Row(movieId=158571, rating=4.096182346343994),
 Row(movieId=144202, rating=4.0868659019470215),
 Row(movieId=170683, rating=4.065147399902344),
 Row(movieId=166812, rating=4.059095859527588),
 Row(movieId=164937, rating=4.054503917694092),
 Row(movieId=150228, rating=4.051459312438965)]