In [1]:
#To measure the performance of the ALS model
from pyspark.ml.evaluation import RegressionEvaluator
#start a sessio to create a dataframe
from pyspark.sql import SparkSession
#ALS model for recommendation
from pyspark.ml.recommendation import ALS
#for parameter tuning and cross validation
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SQLContext
import pandas as pd
from sklearn.model_selection import train_test_split

In [2]:
#Load dataset
ratings = pd.read_csv('ratings.csv')

In [3]:
#Visualize top 5 rows
ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,296,5.0,1147880044
1,1,306,3.5,1147868817
2,1,307,5.0,1147868828
3,1,665,5.0,1147878820
4,1,899,3.5,1147868510


In [3]:
#Drop unwanted columns
ratings.drop(columns = 'timestamp',inplace=True)

In [4]:
#Split data into training and testing with 0.9 and 0.1 ratio
#Use stratified sampling
df_train, df_test = train_test_split(ratings, test_size=0.1, stratify=ratings['userId'])

In [5]:
print(df_train.shape)
print(df_test.shape)

(22500085, 3)
(2500010, 3)


In [6]:
#Store training and testing dataset as seperate csv files
df_train.to_csv('Training.csv',index=False)
df_test.to_csv('Testing.csv',index=False)

In [7]:
#create a spark session
spark = SparkSession.builder.master('local').appName("MovieRatings").getOrCreate()

In [8]:
#load Training.csv into df
df = spark.read.csv("Training.csv",inferSchema="true", header="true")

In [16]:
#View dataframe
df.show(10)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
| 34288|  60950|   2.0|
|155791|   2761|   4.0|
| 53777|   1032|   4.5|
|116950|    839|   4.0|
| 86698|   3263|   3.5|
|138557| 110882|   3.0|
| 31370|  72998|   3.5|
| 94952|    736|   5.0|
|160054|    919|   4.0|
| 17275|    515|   4.0|
+------+-------+------+
only showing top 10 rows



In [9]:
#load Testing.csv into df
test_df = spark.read.csv("Testing.csv",inferSchema="true", header="true")

In [11]:
# Build the recommendation model using ALS on the training data
# set cold start strategy to 'drop' to ensure there are no NaN evaluation metrics
# set nonnegative = True so that it does not return negative predictions

als = ALS(maxIter=15, regParam=0.015,rank =20, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop",nonnegative = True)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")


### Fit Model and calculate RMSE for testing Data

In [12]:
#fit cross validator on training set
model = als.fit(df)

#Generate predictions and evaluate the model by computing the RMSE on the test data
predictions = model.transform(test_df)
rmse = evaluator.evaluate(predictions)

print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.7868925798813587


In [13]:
#display predictions and actual rating for test data
predictions.sort(["userId","rating"]).show(10)

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|     1|   5269|   0.5| 3.5044727|
|     1|   6954|   3.5| 3.9630039|
|     1|   1217|   3.5|  3.849604|
|     1|   2843|   4.5|  4.351509|
|     1|   4973|   4.5| 4.5405316|
|     1|   6711|   5.0|   4.48783|
|     1|  32591|   5.0| 4.4634914|
|     2|  34162|   0.5|  2.888835|
|     2|   2987|   0.5| 3.5694652|
|     2|   1923|   0.5|  2.725802|
+------+-------+------+----------+
only showing top 10 rows



In [14]:
#Recommended top items for all users ,predicted ratings
recuser = model.recommendForAllUsers(10)

### To understand the recommended movies for a particular user
#### filter out movies rated by particular user from the ratings.csv file
#### Merge this dataset with movies.csv to include title and genere column


In [15]:
#Load movies dataset
pdf1 = pd.read_csv('movies.csv')

In [16]:
pdf1.tail()

Unnamed: 0,movieId,title,genres
62418,209157,We (2018),Drama
62419,209159,Window of the Soul (2001),Documentary
62420,209163,Bad Poems (2018),Comedy|Drama
62421,209169,A Girl Thing (2001),(no genres listed)
62422,209171,Women of Devil's Island (1962),Action|Adventure|Drama


In [49]:
# see the historical rating of the user in ratings.csv
user_history = ratings[ratings['userId']==20055]
user_history.reset_index(drop=True)

Unnamed: 0,userId,movieId,rating
0,20055,1,5.0
1,20055,2,3.0
2,20055,3,2.5
3,20055,5,1.5
4,20055,6,4.0
...,...,...,...
7483,20055,92643,4.0
7484,20055,93116,3.0
7485,20055,93838,4.5
7486,20055,99114,3.0


In [37]:
ratings['userId'].value_counts()

72315     32202
80974      9178
137293     8913
33844      7919
20055      7488
          ...  
52927        20
43931        20
27547        20
27546        20
39653        20
Name: userId, Length: 162541, dtype: int64

In [50]:
#Merge ratings and movies data to better understand what movies the particular user liked mostly
user_out = (user_history.merge(pdf1, left_on='movieId', right_on='movieId')
          .reindex(columns=['userId', 'movieId', 'rating', 'title','genres']))

In [57]:
user_out.sort_values(by='rating', ascending=False).head(25)

Unnamed: 0,userId,movieId,rating,title,genres
0,20055,1,5.0,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
578,20055,914,5.0,My Fair Lady (1964),Comedy|Drama|Musical|Romance
2386,20055,3418,5.0,Thelma & Louise (1991),Adventure|Crime|Drama
2384,20055,3415,5.0,"Mirror, The (Zerkalo) (1975)",Drama
2342,20055,3365,5.0,"Searchers, The (1956)",Drama|Western
2341,20055,3364,5.0,"Asphalt Jungle, The (1950)",Crime|Film-Noir
2340,20055,3363,5.0,American Graffiti (1973),Comedy|Drama
2339,20055,3362,5.0,Dog Day Afternoon (1975),Crime|Drama
562,20055,898,5.0,"Philadelphia Story, The (1940)",Comedy|Drama|Romance
563,20055,899,5.0,Singin' in the Rain (1952),Comedy|Musical|Romance


In [52]:
#Select a random user and collect the recommendations for that user
rec_sp = recuser.where("userId = 20055").collect()

In [53]:
#Convert the row to readable format using pandas df
column_names = ["movieId", "rating"]
rec_sp_df = pd.DataFrame(columns = column_names)
for i in range(len(rec_sp[0][1])):
    rec_sp_df = rec_sp_df.append({'movieId': rec_sp[0][1][i][0],'rating':rec_sp[0][1][i][1]  }, ignore_index=True)

In [54]:
rec_sp_df

Unnamed: 0,movieId,rating
0,193587.0,7.527944
1,138400.0,7.436458
2,196559.0,6.838944
3,198657.0,6.7822
4,196557.0,6.726239
5,185653.0,6.570718
6,133339.0,6.187793
7,155758.0,6.029768
8,122222.0,5.982402
9,117352.0,5.981124


In [55]:
#Merge the recommended movies with movies daatset for that particular user
user_recomm = (rec_sp_df.merge(pdf1, left_on='movieId', right_on='movieId')
          .reindex(columns=['movieId', 'rating', 'title','genres']))

In [56]:
user_recomm

Unnamed: 0,movieId,rating,title,genres
0,193587.0,7.527944,Bungo Stray Dogs: Dead Apple (2018),Action|Animation
1,138400.0,7.436458,Ammutta muddica al cinema (2013),Comedy
2,196559.0,6.838944,Adventures of Mowgli: Akela's Last Hunt (1969),Adventure|Animation|Children
3,198657.0,6.7822,Manikarnika (2019),Action|Drama
4,196557.0,6.726239,Adventures of Mowgli: Return to Mankind (1971),Adventure|Animation|Children
5,185653.0,6.570718,WWE: The Top 25 Rivalries in Wrestling History...,(no genres listed)
6,133339.0,6.187793,1915 (2015),Drama
7,155758.0,6.029768,Irudhi Suttru (2016),Action|Drama
8,122222.0,5.982402,The Bride Goes Wild (1948),Comedy|Romance
9,117352.0,5.981124,A Kind of America 2 (2008),Comedy


#### We can see that the user with id 20055 likes wide range of generes such as Drama,Comedy, Action movies the most
#### The movies recommended by ALS model is also represents that, so this makes sense