# Recommendation system using ALS algorithm in Spark (Explicit feedback)

### I have utilised the movielens (100k) dataset for building the recommendation system 

In [2]:
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
from pyspark.sql.functions import col, avg
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format

In [3]:
spark_1= SparkSession.builder.appName('Recommender_system').getOrCreate()

Reading our CSV 

In [4]:
df=spark_1.read.csv('ratings.csv',header=True,inferSchema=True)
df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|     31|   2.5|1260759144|
|     1|   1029|   3.0|1260759179|
|     1|   1061|   3.0|1260759182|
|     1|   1129|   2.0|1260759185|
|     1|   1172|   4.0|1260759205|
+------+-------+------+----------+
only showing top 5 rows



We will not be using the timestamp column for our application.

In [5]:
#Number of distinct users
df.select('userId').distinct().count()

671

ALS works well with sparse datasets.
dividing the number of ratings present in the matrix by the product of users and movies in the matrix and subtracting that from 1 will give us the sparsity 

In [6]:
num= df.select('rating').count()
user_no= df.select('userId').distinct().count()
movie_no= df.select('movieId').distinct().count()

sparsity= (1.0-(num*1.0)/(user_no*movie_no))*100

print("The movie ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The movie ratings dataframe is  98.36% empty.


#### Exploring our data 

In [7]:
#Let us see how many movies which userId had rated
df.groupBy('userId').count().show()

+------+-----+
|userId|count|
+------+-----+
|     1|   20|
|     2|   76|
|     3|   51|
|     4|  204|
|     5|  100|
|     6|   44|
|     7|   88|
|     8|  116|
|     9|   45|
|    10|   46|
|    11|   38|
|    12|   61|
|    13|   53|
|    14|   20|
|    15| 1700|
|    16|   29|
|    17|  363|
|    18|   51|
|    19|  423|
|    20|   98|
+------+-----+
only showing top 20 rows



In [8]:
# Avg num ratings per movie
print("Average num ratings per movie: ")
df.groupBy("movieId").count().select(avg("count")).show()


# Avg num ratings per users
print("Avg num ratings per user: ")
df.groupBy("userId").count().select(avg("count")).show()

Average num ratings per movie: 
+------------------+
|        avg(count)|
+------------------+
|11.030664019413193|
+------------------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|149.03725782414307|
+------------------+



In [9]:
#Average sum of rating given by user 
print('The avg sum of rating given by users is: ')
df.groupBy('rating').count().select(avg('count')).show()

The avg sum of rating given by users is: 
+----------+
|avg(count)|
+----------+
|   10000.4|
+----------+



In [10]:
#Checking the schema of our dataframe 
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [11]:
#Dropping timestamp column as it is not necessary for our application 
df.drop('timestamp')

DataFrame[userId: int, movieId: int, rating: double]

## Modelling 

### Apache Spark ML implements alternating least squares (ALS) and we will use collaborative filtering for our application

In [12]:
# Create training and testing data
(train, test) = df.randomSplit([0.7, 0.3], seed = 567)

In [13]:
als = ALS(maxIter=10,regParam=0.1,rank=5,userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")
type(als)

pyspark.ml.recommendation.ALS

In [14]:
model = als.fit(train)

In [15]:
predictions = model.transform(test)

In [16]:
#Evaluating our model performance using root mean squared error 
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9245592708112048


In [17]:
predictions.show(n=10)

+------+-------+------+----------+----------+
|userId|movieId|rating| timestamp|prediction|
+------+-------+------+----------+----------+
|   380|    463|   3.0| 968949106| 2.9645996|
|   242|    463|   4.0| 956685706| 3.9067533|
|   440|    471|   3.0| 835337519| 3.0974295|
|   452|    471|   3.0| 976422396|  3.473732|
|   299|    471|   4.5|1344186741|  4.303039|
|    15|    471|   3.0|1166586067| 2.7023463|
|   358|    471|   5.0| 957479605| 3.6117182|
|   502|    471|   4.0| 861322541| 4.2407923|
|   537|    471|   5.0| 879502608| 3.7298114|
|   514|    471|   4.0| 853893788|  3.761124|
+------+-------+------+----------+----------+
only showing top 10 rows



### To make the output more readable and detailed. I will read the movies, tags and links file into separate dataframes and join them on their common movie ID.

In [19]:
movies_df= spark_1.read.csv('datasets_66613_153886_movies.csv', inferSchema=True,header=True)
movies_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [20]:
links_df= spark_1.read.csv('links.csv', inferSchema=True,header=True)
links_df.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- imdbId: integer (nullable = true)
 |-- tmdbId: integer (nullable = true)



In [23]:
tags_df= spark_1.read.csv('tags.csv', inferSchema=True,header=True)
tags_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: integer (nullable = true)



In [26]:
tags_df1= tags_df.drop('userId','timestamp')

In [28]:
predictions.join(movies_df,'movieId').join(tags_df1,'movieId').select('userId','title','genres','tag','prediction').show(10)

+------+--------------------+--------------------+----------------+----------+
|userId|               title|              genres|             tag|prediction|
+------+--------------------+--------------------+----------------+----------+
|   372|Dirty Dancing (1987)|Drama|Musical|Rom...|   rich families|  4.292309|
|   372|Dirty Dancing (1987)|Drama|Musical|Rom...|musical parodies|  4.292309|
|   372|Dirty Dancing (1987)|Drama|Musical|Rom...|           music|  4.292309|
|   372|Dirty Dancing (1987)|Drama|Musical|Rom...|    girlie movie|  4.292309|
|   372|Dirty Dancing (1987)|Drama|Musical|Rom...|           dance|  4.292309|
|   372|Dirty Dancing (1987)|Drama|Musical|Rom...|    80's classic|  4.292309|
|    15|Dirty Dancing (1987)|Drama|Musical|Rom...|   rich families| 1.4738356|
|    15|Dirty Dancing (1987)|Drama|Musical|Rom...|musical parodies| 1.4738356|
|    15|Dirty Dancing (1987)|Drama|Musical|Rom...|           music| 1.4738356|
|    15|Dirty Dancing (1987)|Drama|Musical|Rom...|  

### Let us obtain recommendations for a particular user 

In [30]:
for_514= predictions.filter(col('userId')==514).join(movies_df,'movieId').join(links_df,'movieId').select('userId','title','genres','tmdbId','prediction')
for_514.show(5)

for_60= predictions.filter(col('userId')==60).join(movies_df,'movieId').join(links_df,'movieId').select('userId','title','genres','tmdbId','prediction')
for_60.show(5)


+------+--------------------+--------------------+------+----------+
|userId|               title|              genres|tmdbId|prediction|
+------+--------------------+--------------------+------+----------+
|   514|Hudsucker Proxy, ...|              Comedy| 11934|  3.761124|
|   514|     Candyman (1992)|     Horror|Thriller|  9529| 3.1466775|
|   514|Sword in the Ston...|Animation|Childre...|  9078| 3.2427876|
|   514|Back to the Futur...|Adventure|Comedy|...|   105| 3.8019257|
|   514|Groundhog Day (1993)|Comedy|Fantasy|Ro...|   137| 3.7863579|
+------+--------------------+--------------------+------+----------+
only showing top 5 rows

+------+--------------------+--------------------+------+----------+
|userId|               title|              genres|tmdbId|prediction|
+------+--------------------+--------------------+------+----------+
|    60|Sea Inside, The (...|               Drama|  1913|  4.585308|
|    60|      Gattaca (1997)|Drama|Sci-Fi|Thri...|   782| 4.0912075|
|    60| 

### Let us obtain top 10 movie recommendations for each user

In [33]:
userReco = model.recommendForAllUsers(10)
userReco.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|   471|[[3414, 5.341756]...|
|   463|[[67504, 5.396364...|
|   496|[[3414, 6.153739]...|
|   148|[[67504, 5.622106...|
|   540|[[8530, 6.8435507...|
|   392|[[92494, 5.321902...|
|   243|[[67504, 5.354881...|
|   623|[[67504, 6.065434...|
|    31|[[67504, 5.406029...|
|   516|[[83318, 5.212477...|
|   580|[[67504, 5.266863...|
|   251|[[79824, 5.674916...|
|   451|[[8535, 5.69648],...|
|    85|[[3414, 6.5535035...|
|   137|[[67504, 5.350032...|
|    65|[[40412, 5.993492...|
|   458|[[3414, 5.0007744...|
|   481|[[67504, 6.33814]...|
|    53|[[3181, 6.2962766...|
|   255|[[67504, 6.233779...|
+------+--------------------+
only showing top 20 rows



### Let us obtain top 10 user recommendations for each movie

In [34]:
movieReco = model.recommendForAllItems(10)
movieReco.show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|   1580|[[113, 5.158268],...|
|   5300|[[154, 5.5862513]...|
|   6620|[[545, 4.897958],...|
|   7340|[[228, 5.1227913]...|
|  54190|[[261, 5.6989427]...|
|    471|[[70, 4.740661], ...|
|   1591|[[113, 4.2510023]...|
|   4101|[[4, 5.6551213], ...|
|   1342|[[401, 4.211051],...|
|   2122|[[46, 3.3270643],...|
|   2142|[[112, 4.729786],...|
|   7982|[[113, 5.2086515]...|
|  44022|[[46, 5.17425], [...|
| 141422|[[228, 3.079749],...|
|    463|[[46, 5.2227902],...|
|    833|[[113, 1.2288729]...|
|   5803|[[113, 3.0634222]...|
|   7833|[[113, 4.906775],...|
| 160563|[[145, 4.045021],...|
|   3794|[[484, 4.5837765]...|
+-------+--------------------+
only showing top 20 rows



### Now we will obtain the movie website using the tmdbId which acts as an index for 'https://www.themoviedb.org/movie/'

In [32]:
import webbrowser
link= 'https://www.themoviedb.org/movie/'
for movie in for_514.take(2):
    URL= link+ str(movie.tmdbId)
    print(movie.title)
    webbrowser.open(URL)
    

Hudsucker Proxy, The (1994)
Candyman (1992)


For userId 514 we have extracted two recommendations and after we run this block we will be redirected towards the movie websites.

# THANK YOU