* Recommendation System using Alternating Least Square algorithm with Pyspark

Courtesy: https://github.com/KevinLiao159/MyDataSciencePortfolio/blob/master/movie_recommender/movie_recommendation_using_ALS.ipynb

Blog Post: https://towardsdatascience.com/prototyping-a-recommender-system-step-by-step-part-2-alternating-least-square-als-matrix-4a76c58714a1


## Imports

In [47]:
import os
import time
from IPython.core.display import display, HTML
import findspark
findspark.init()
# spark imports
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.recommendation import ALS
from pyspark import SQLContext
from IPython.display import Image
# data science imports
import requests, json
import numpy as np
import pandas as pd

# visualization imports
import seaborn as sns
import matplotlib.pyplot as plt


%matplotlib inline

## Create Spark Session

In [2]:
# Create Spark Session
spark=SparkSession.builder.appName("ALS_Movie_Recommendor")\
                          .config("spark.executor.memory", "16g") \
                          .config("spark.master", "local[*]") \
                          .getOrCreate()
# get the spark context
sc=spark.sparkContext
sc.setCheckpointDir('checkpoint')

## Read the data as RDD

In [3]:
data_path=os.path.abspath("./small-data/")
movies=spark.read.load(os.path.join(data_path,"movies.csv"),format='csv', header=True, inferSchema=True)
ratings=spark.read.load(os.path.join(data_path,"ratings.csv"),format='csv', header=True, inferSchema=True)
links=spark.read.load(os.path.join(data_path,"links.csv"),format='csv', header=True, inferSchema=True)
tags=spark.read.load(os.path.join(data_path,"tags.csv"),format='csv', header=True, inferSchema=True)

##  Peek into data

In [4]:
display(HTML("<h3>Movies</h3>"))
movies.show(3)
display(HTML("<h3>Ratings</h3>"))
ratings.show(3)
display(HTML("<h3>Links</h3>"))
links.show(3)
display(HTML("<h3>Tags</h3>"))
tags.show(3)


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
+-------+--------------------+--------------------+
only showing top 3 rows



+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
+------+-------+------+---------+
only showing top 3 rows



+-------+------+------+
|movieId|imdbId|tmdbId|
+-------+------+------+
|      1|114709|   862|
|      2|113497|  8844|
|      3|113228| 15602|
+-------+------+------+
only showing top 3 rows



+------+-------+---------------+----------+
|userId|movieId|            tag| timestamp|
+------+-------+---------------+----------+
|     2|  60756|          funny|1445714994|
|     2|  60756|Highly quotable|1445714996|
|     2|  60756|   will ferrell|1445714992|
+------+-------+---------------+----------+
only showing top 3 rows



## Sparsity of ratings matrix

\begin{align*}
\text{Sparsity}&=1-\frac{\text{Actual number of ratings in matrix}}{\text{(Number of Users) x (Number of Ratings)}}
\end{align*}

In [5]:
numerator=ratings.count()
distinct_users=ratings.select('userId').distinct().count()
distinct_movies=ratings.select('movieId').distinct().count()
denominator=distinct_users*distinct_movies

sparsity=1-(numerator*1.0/denominator)

print(sparsity)

0.9830003169443864


## Splitting data

In [6]:
# converting RDD to pyspark.sql.dataframe.DataFrame
test=ratings.toDF(*ratings.columns)

(train, test) = ratings.randomSplit([0.8, 0.2], seed = 1234)

## Creating ALS model

In [7]:
import time
start_time=time.time()

als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True, coldStartStrategy="drop")
# Add hyperparameters and their respective values to param_grid
param_grid = ParamGridBuilder() \
            .addGrid(als.rank, [12,13,14]  ) \
            .addGrid(als.maxIter, [18,19,20]) \
            .addGrid(als.regParam, [.17, .18, .19]) \
            .build()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction") 
print ("Num models to be tested: ", len(param_grid))

cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3)

train.checkpoint()

models=cv.fit(train)
best_model=models.bestModel

print("--- %s seconds ---" % (time.time() - start_time))

Num models to be tested:  27
--- 560.3325228691101 seconds ---


## Best Model


In [8]:
# Print best_model
#print(best_model)
print(best_model._java_obj.parent().getRank())

# Complete the code below to extract the ALS model parameters
print("**Best Model**")


# Print "Rank"
print("  Rank:", best_model._java_obj.parent().getRank())

# Print "MaxIter"
print("  MaxIter:", best_model._java_obj.parent().getMaxIter())

# Print "RegParam"
print("  RegParam:", best_model._java_obj.parent().getRegParam())

14
**Best Model**
  Rank: 14
  MaxIter: 18
  RegParam: 0.17


## Predictions

In [9]:
predictions=best_model.transform(test)
predictions=predictions.na.fill(0)
rmse=evaluator.evaluate(predictions)
print(rmse)


0.867480767564069


## Get Predictions for all users

In [22]:
sqlContext=SQLContext(sc)
recommendations=best_model.recommendForAllUsers(15)
def get_recommendation_for_user(recs):
    recs=recs.select("recommendations.movieId","recommendations.rating")
    movies=recs.select("movieId").toPandas().iloc[0,0]
    ratings=recs.select("rating").toPandas().iloc[0,0]
    ratings_matrix=pd.DataFrame(movies,columns=["movieId"])
    ratings_matrix["ratings"]=ratings
    rating_matrix_ps=sqlContext.createDataFrame(ratings_matrix)
    return rating_matrix_ps

## Check predictions and Actual values for top 15 movies of user 101

In [24]:
display(HTML("<h3>Actual Ratings for user 101 </h3>"))
movies.join(ratings,["movieId"],"left").filter(ratings.userId==101).sort("rating",ascending=False).select("movieId","rating","title","genres").show(15)

display(HTML("<h3>Suggestions for user 101 </h3>"))
user_101=get_recommendation_for_user(recommendations.filter(recommendations.userId==101)).sort("ratings",ascending=False)
user_101.join(movies,["movieId"],"left").show()

+-------+------+--------------------+--------------------+
|movieId|rating|               title|              genres|
+-------+------+--------------------+--------------------+
|   1093|   5.0|   Doors, The (1991)|               Drama|
|   2395|   5.0|     Rushmore (1998)|        Comedy|Drama|
|   3174|   5.0|Man on the Moon (...|        Comedy|Drama|
|   2599|   5.0|     Election (1999)|              Comedy|
|   1719|   5.0|Sweet Hereafter, ...|               Drama|
|   2712|   5.0|Eyes Wide Shut (1...|Drama|Mystery|Thr...|
|   2318|   5.0|    Happiness (1998)|        Comedy|Drama|
|   2959|   5.0|   Fight Club (1999)|Action|Crime|Dram...|
|   2997|   5.0|Being John Malkov...|Comedy|Drama|Fantasy|
|   2692|   4.0|Run Lola Run (Lol...|        Action|Crime|
|    223|   4.0|       Clerks (1994)|              Comedy|
|   2700|   4.0|South Park: Bigge...|Animation|Comedy|...|
|   2706|   4.0| American Pie (1999)|      Comedy|Romance|
|   1127|   4.0|   Abyss, The (1989)|Action|Adventure|..

+-------+------------------+--------------------+--------------------+
|movieId|           ratings|               title|              genres|
+-------+------------------+--------------------+--------------------+
|   3379| 4.955814361572266| On the Beach (1959)|               Drama|
|   6533| 4.679059028625488|What's Up, Doc? (...|              Comedy|
|   4393| 4.679059028625488|Another Woman (1988)|               Drama|
|   3567| 4.653641700744629|   Bossa Nova (2000)|Comedy|Drama|Romance|
| 171495|4.6460394859313965|              Cosmos|  (no genres listed)|
|   6818|4.6061320304870605|Come and See (Idi...|           Drama|War|
|   8405| 4.593552112579346|Hour of the Wolf ...|        Drama|Horror|
|   2131| 4.589432239532471|Autumn Sonata (Hö...|               Drama|
| 102217| 4.587928771972656|Bill Hicks: Revel...|              Comedy|
|  92494| 4.587928771972656|Dylan Moran: Mons...|  Comedy|Documentary|
|   7767| 4.583174228668213|Best of Youth, Th...|               Drama|
|  867

## Posters

In [55]:
user_movies=user_101.join(links,["movieId"],"left")

In [97]:
imagesList=""

def print_posters(user_movies):
    for movie in user_movies.collect():
        params={'i':"tt"+str(movie.imdbId),'apiKey':'a13a6d6c'}
        response=requests.get(url="http://www.omdbapi.com/",params=params)
        json_data = json.loads(response.text)
        #print(json_data["Response"])
        if json_data['Response']!="False":
            #plt.figure()
            #img=Image(url=json_data['Poster'],width=150,height=225)
            imagesList+="<img style='width: 150px; height: 225px; margin: 5px; float: left; border: 1px solid black;' src='{}' />".format(json_data['Poster'])
        display(HTML(imagesList))