## Moive Recommendation

In [2]:
import math
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import re
import seaborn as sns


In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /FileStore/tables/elastic/elasticsearch_spark_20_2_11_7_5_1-cdf5c.jar pyspark-shell'

## Part1: Data ETL and Exploration

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
display(movies.head(5))

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


In [8]:
display(movies.head(5))

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy


In [9]:
display(ratings.describe())

summary,userId,movieId,rating,timestamp
count,27753444.0,27753444.0,27753444.0,27753444.0
mean,141942.01557064414,18487.99983414671,3.530445212493268,1193121854.9319255
stddev,81707.400091494,35102.625247468335,1.066352750231989,216048228.52234188
min,1.0,1.0,0.5,1000000065.0
max,99999.0,99999.0,5.0,999999978.0


In [10]:
# to see if there is null in ratings
ratings.where(ratings['rating'].isNull()).count()

In [11]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [12]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Spark SQL and OLAP

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *


In [15]:
movies.createOrReplaceTempView('movies')
ratings.createOrReplaceTempView('ratings')

#### The number of Users

In [17]:
num_user = ratings.select('userId').distinct().count()
num_rating = ratings.select('rating').count()
print('number of users are: {}'.format(num_user))
print('total rating number is: {}'.format(num_rating))

#### The number of Movies

In [19]:
num_mv = movies.select('movieId').distinct().count()
print('number of Movies are: {}'.format(num_mv))

#### Number of movies rated by users. Also list movies not rated before (for better visualization, only show 15 movies of them).

In [21]:
%sql

Select count(distinct movieId) as rated_num
From ratings
Where rating is not null

rated_num
53889


In [22]:
%sql
-- method1 slower

Select distinct movieId, title
From movies
Where movieId not in (Select distinct movieId From ratings Where rating is not null)
Order by movieId
Limit 15

movieId,title
100609,Fambul Tok (2011)
100642,Sunday Lovers (1980)
101216,Shadow Boxers (1999)
101229,May the Best Man Win (2009)
101237,9500 Liberty (2009)
101369,Shadow Zone: My Teacher Ate My Homework (1997)
101391,"Ascent, The (1994)"
101407,Laffghanistan: Comedy Down Range (2009)
101437,"Baby Dance, The (1998)"
101441,"Inspectors 2: A Shred of Evidence, The (2000)"


In [23]:
%sql
-- method2 faster

Select distinct m.movieId, title
From movies m
Left Join ratings r on m.movieId=r.movieId
Where r.rating is null
Order by movieId
Limit 15

movieId,title
100609,Fambul Tok (2011)
100642,Sunday Lovers (1980)
101216,Shadow Boxers (1999)
101229,May the Best Man Win (2009)
101237,9500 Liberty (2009)
101369,Shadow Zone: My Teacher Ate My Homework (1997)
101391,"Ascent, The (1994)"
101407,Laffghanistan: Comedy Down Range (2009)
101437,"Baby Dance, The (1998)"
101441,"Inspectors 2: A Shred of Evidence, The (2000)"


#### conclusion 1

In this dataset, we have 58098 movies, only 53889 of them are rated. There are 4209 of them are not rated by users.

#### List Movie Genres

In [26]:
%sql

Select distinct explode(split(genres,'\\|')) as categories
From movies

categories
Crime
Romance
Thriller
Adventure
Drama
War
Documentary
Fantasy
Mystery
Musical


### Number of movies and average rating for each category

In [28]:
categories = spark.sql(
                   '''Select cte.category, count(*) as rate_num, avg(rating) as avg_rate
                      From movies m
                      Join
                        (Select movieId, title, explode(split(genres,'\\\\|')) as category
                        From movies) cte
                      on m.movieId=cte.movieId
                      Join ratings r on m.movieId=r.movieId
                      Group by cte.category
                      Order by 2 Desc''')

In [29]:
display(categories.head(5))

category,rate_num,avg_rate
Drama,12191048,3.676370727110581
Comedy,9999184,3.419786704595095
Action,8214956,3.462565532913384
Thriller,7489628,3.5188010672893237
Adventure,6452309,3.513283616764169


In [30]:
fig, (ax1, ax2) = plt.subplots(figsize=(10,6), ncols=2, constrained_layout=True)
categories_p = categories.toPandas()

xticks = [cate[0] for cate in categories.select("category").collect()]

ax1.plot('category','avg_rate',data=categories_p)
# plt.plot('iter_num','validation_mse',data=categories_p)
ax1.set_xlabel("categories")
ax1.set_xticklabels(xticks,rotation=90)
ax1.set_ylabel("average rating")
ax1.set_title("average rating for each categories", y=1.08)
ax1.legend()

ax2.plot('category','rate_num',data=categories_p)
ax2.set_xlabel("categories")
ax2.set_xticklabels(xticks,rotation=90)
ax2.set_ylabel("rating number")
ax2.set_title("rating number for each categories", y=1.08)
ax2.legend()

display(fig.show())

#### Conclusion 2

We have 20 categories of movies in the dataset. **Drama**, **Comedy** and **Action** are top 3 categories received most rating (around 10 million) by users. **Drama**, **Crime**, **Mystery**, **War** and **Film-Noir** are categories rated by relatively higher score. The average rating scores for all these categories are between 3.3 and 3.9 (rating in 0-5 range).

### Number of each rating score

In [33]:
%sql

Select rating, count(*) as num
From ratings
Group by rating
Order by 1

rating,num
0.5,442388
1.0,886233
1.5,441354
2.0,1850627
2.5,1373419
3.0,5515668
3.5,3404360
4.0,7394710
4.5,2373550
5.0,4071135


### Top 1000 movies received most rating

In [35]:
top_rating_num = spark.sql(
                '''Select r.movieId, m.title, sum(case when rating is not null then 1 else 0 end) as rate_num
                From ratings r
                Join movies m on r.movieId=m.movieId
                Group by 1,2
                Order by 3 desc
                Limit 1000''')


In [36]:
def get_movie_year(title):
  matches = re.search("\(\d{4}\)", title)
  if matches:
    group = matches.group()
    year = group[1:-1]
#     start = matches.start()
    title = title[:matches.start()-1]
    return (title, year)
  else:
    return (title, None)
    
get_year = udf(get_movie_year,\
               StructType([StructField("title", StringType(), True),\
                           StructField("release_year", StringType(), True)]))


In [37]:
top_rating_num_movies = top_rating_num.select(
                                "movieId", get_year("title").title.alias("title"),\
                                get_year("title").release_year.alias("release_year"),\
                                "rate_num")

display(top_rating_num_movies)

top_rating_num_movies.createOrReplaceTempView('top_rate_num_movies')

movieId,title,release_year,rate_num
318,"Shawshank Redemption, The",1994,97999
356,Forrest Gump,1994,97040
296,Pulp Fiction,1994,92406
593,"Silence of the Lambs, The",1991,87899
2571,"Matrix, The",1999,84545
260,Star Wars: Episode IV - A New Hope,1977,81815
480,Jurassic Park,1993,76451
527,Schindler's List,1993,71516
110,Braveheart,1995,68803
1,Toy Story,1995,68469


In [38]:
%sql

Select release_year, count(*) as num_by_year
From top_rate_num_movies
Group by release_year
Order by 2 Desc

release_year,num_by_year
1995,68
1996,56
1999,51
1994,50
2000,48
1998,47
1997,41
2004,39
2001,38
1993,38


In [39]:
%sql

Select (case when release_year<1970 then 'before 1970' when release_year between 1970 and 1979 then '1970s' when release_year between 1980 and 1989 then '1980s' when release_year between 1990 and 1999 then '1990s' when release_year between 2000 and 2009 then '2000s' else 'after 2010' end) as decade, count(*) as num_by_decade
From top_rate_num_movies
Group by 1
Order by 2 Desc

decade,num_by_decade
1990s,413
2000s,277
1980s,136
before 1970,66
after 2010,66
1970s,42


### Top 1000 movies with highest average rating (received over 1000 ratings)

In [41]:

top_avg_rating_movies = spark.sql(
                  '''Select r.movieId, m.title, round(avg(rating), 2) as avg_rate, sum(case when r.rating is not null then 1 else 0 end) as rate_num
                  From ratings r
                  Join movies m on r.movieId=m.movieId
                  Group by 1,2
                  Having sum(case when r.rating is not null then 1 else 0 end)>=1000
                  Order by 3 desc
                  Limit 1000''')

In [42]:
top_avg_rating_movies = top_avg_rating_movies.select(
                                "movieId", get_year("title").title.alias("title"),\
                                get_year("title").release_year.alias("release_year"),\
                                "avg_rate", "rate_num")

top_avg_rating_movies.createOrReplaceTempView('top_avg_rate_movies')

display(top_avg_rating_movies)

movieId,title,release_year,avg_rate,rate_num
159817,Planet Earth,2006.0,4.46,1384
318,"Shawshank Redemption, The",1994.0,4.42,97999
174053,Black Mirror: White Christmas,2014.0,4.35,1074
858,"Godfather, The",1972.0,4.33,60904
50,"Usual Suspects, The",1995.0,4.29,62180
527,Schindler's List,1993.0,4.26,71516
1221,"Godfather: Part II, The",1974.0,4.26,38875
2019,Seven Samurai (Shichinin no samurai),1954.0,4.25,14578
1203,12 Angry Men,1957.0,4.24,17931
2959,Fight Club,1999.0,4.23,65678


In [43]:
%sql

Select (case when release_year<1970 then 'before 1970' when release_year between 1970 and 1979 then '1970s' when release_year between 1980 and 1989 then '1980s' when release_year between 1990 and 1999 then '1990s' when release_year between 2000 and 2009 then '2000s' else 'after 2010' end) as decade, count(*) as num_by_decade, round(sum(avg_rate*rate_num)/sum(rate_num), 2) as avg_score_by_decade
From top_avg_rate_movies
Group by 1
Order by 2 Desc, 3 Desc

decade,num_by_decade,avg_score_by_decade
2000s,234,3.92
before 1970,218,4.02
1990s,201,4.0
after 2010,138,3.91
1980s,125,3.96
1970s,84,4.04


#### Conclusion 3

Rating scores are mainly distributed between 3.0 and 5.0.   
The top 3 most popular movies are 'Shawshank Redemption', 'Forrest Gump' and 'Pulp Fiction' which receive more than 90,000 rating.   
1990s has 413 of top 1000 popular movies (received most rating in this dataset).   
'Planet Earth', 'Shawshank Redemption' and 'Black Mirror: White Christmas' are the top 3 movies with the highest average rating score around 4.4. While 'Shawshank Redemption', 'The Godfather' and 'The Usual Suspects' are movies both received high number of rating and average rating score. These movies should be recommended to users in general.

## Part2: Spark ALS based approach for training model

In [46]:
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.sql import Row
from pyspark.sql.functions import arrays_zip, col, explode

In [47]:
ratings = ratings['userId','movieId','rating']

ratings = ratings.withColumn("userId", ratings["userId"].cast(LongType()))
ratings = ratings.withColumn("movieId", ratings["movieId"].cast(LongType()))
ratings = ratings.withColumn("rating", ratings["rating"].cast(FloatType()))


#### Now split the data into training/validation/testing sets using a 6/2/2 ratio. Also split it by a 3/1 ratio to use k-fold cross validation.

In [49]:
train, test = ratings.randomSplit([0.75,0.25],seed = 42)
training, validation, testing = ratings.randomSplit([0.6, 0.2, 0.2],seed = 40)

In [50]:
print('train size:', train.count())
print('test size:', test.count())

### ALS Model Selection and Evaluation

#### Method 1: use cross-validation

In [53]:
ranks = [6, 8, 10]
reg_params = [0.05, 0.1, 0.2]
als = ALS(maxIter=10, userCol="userId", itemCol="movieId", ratingCol="rating",
          coldStartStrategy="drop")

paramGrid = ParamGridBuilder() \
          .addGrid(ALS.rank, ranks) \
          .addGrid(ALS.regParam, reg_params) \
          .build()

mse_evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")

CV = CrossValidator(estimator=als,
                   estimatorParamMaps=paramGrid,
                   evaluator=mse_evaluator,
                   numFolds=3)

cvModel = CV.fit(train)

In [54]:
best_model = cvModel.bestModel
# best_model.rank
best_model.params

In [55]:
cvmodel_path = '/FileStore/tables/movieLen' + '/best_cvModel'
best_model.save(cvmodel_path)

In [56]:
# cvmodel_path = '/FileStore/tables/movieLen' + '/best_cvModel'
cv_model = ALSModel.load(cvmodel_path)
cv_model.rank

#### Method 2: use cross-evaluation toolbox

In [58]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # write your approach to train ALS model
            # make prediction
            # get the rating result
            # get the RMSE
            als = ALS(rank=rank, maxIter=num_iters, regParam=reg, userCol="userId", itemCol="movieId", ratingCol="rating",
                      coldStartStrategy="drop")
            model = als.fit(train_data)
            predictions = model.transform(validation_data)
            
            evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
            error = evaluator.evaluate(predictions)
            print ('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print ('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model, best_rank, best_regularization

In [59]:
num_iterations = 10
ranks = [6, 8, 10, 12, 14]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

import time
start_time = time.time()
final_model, final_rank, final_reg = train_ALS(training, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

#### Plot learning rate

In [61]:
def learning_curve(iter_array, train, validation, rank, reg):
  train_errors, val_errors = [], []
  for iter in iter_array:
    als = ALS(rank=rank, maxIter=iter, regParam=reg, userCol="userId", itemCol="movieId", ratingCol="rating",
                      coldStartStrategy="drop")
    als_model = als.fit(train)
    train_predict = als_model.transform(train)
    val_predict = als_model.transform(validation)
    train_errors.append(mse_evaluator.evaluate(train_predict))
    val_errors.append(mse_evaluator.evaluate(val_predict))
  
  return train_errors, val_errors
    
    

In [62]:
iter_array = [1, 2, 5, 10]
train_errors, val_errors = learning_curve(iter_array, train, validation, final_rank, final_reg)

In [63]:
def plot_lr_curve(iter_array, train_errors, val_errors):
  # create dataframe
  lst = [iter_array, train_errors, val_errors]
  R = Row('iter_num', 'train_mse', 'validation_mse')
  lr_df = sc.parallelize(R(*r) for r in zip(*lst)).toDF()

  # plot learning curve
  fig, axes = plt.subplots(1,1)
  lr_df_p = lr_df.toPandas()
  plt.plot('iter_num','train_mse',data=lr_df_p)
  plt.plot('iter_num','validation_mse',data=lr_df_p)
  plt.xlabel("num_iteration")
  plt.ylabel("RMSE")
  plt.title("learning curve")
  plt.legend()

  display(fig.show())

In [64]:
plot_lr_curve(iter_array, train_errors, val_errors)

In [65]:
model_path = '/FileStore/tables/movieLen' + '/bestModel2'
final_model.save(model_path)

In [66]:
# model_path = '/FileStore/tables/movieLen' + '/bestModel2'
final_model = ALSModel.load(model_path)

### Model testing
And finally, make a prediction and check the testing error.

In [68]:
rmse1 = mse_evaluator.evaluate(cv_model.transform(test))
rmse2 = mse_evaluator.evaluate(final_model.transform(testing))
print('test rmse for method 1 is: {}'.format(rmse1))
print('test rmse for method 2 is: {}'.format(rmse2))

### Apply the model to do recommendation

In [70]:
rec_to_user = final_model.recommendForAllUsers(10)

In [71]:
def get_recommend(recommend_to_user_df):
  recommend = (recommend_to_user_df
        .withColumn("tmp", arrays_zip("recommendations"))
        .withColumn("tmp", explode("tmp"))
        .select("userId", col("tmp.recommendations")['movieId'].alias('movieId'), col("tmp.recommendations")['rating'].alias('rating')))
  return recommend.join(movies, recommend.movieId==movies.movieId).select(recommend.userId, "title", "genres", "rating")

#### Recommend for all users (10 recommendations per user, only show recommendations to 2 user)

In [73]:
rec_to_user_df = get_recommend(rec_to_user)
display(rec_to_user_df.head(20))

userId,title,genres,rating
148,Heroes (2008),(no genres listed),5.751230239868164
148,The War at Home (1979),Documentary|War,5.672374725341797
148,Glenn Killing på Berns (1993),Comedy,5.396997928619385
148,The Great Piggy Bank Robbery (1946),Animation|Children|Comedy,5.376803398132324
148,"Rising Place, The (2002)",Drama,5.242195129394531
148,The Old Gun (1975),Drama|Thriller|War,5.179086685180664
148,Ali Baba Bunny (1957),Animation|Children|Comedy,5.108641147613525
148,Sharpe's Eagle (1993),Action|Adventure|War,5.011753559112549
148,The Yellow Rolls-Royce (1964),Comedy|Drama|Romance,5.000919818878174
148,Freedom on My Mind (1994),Documentary,4.993757247924805


In [74]:
display(rec_to_user_df.describe())

summary,userId,title,genres,rating
count,2798690.0,2798690,2798690,2798690.0
mean,141630.38205017347,,,5.646920058055335
stddev,81767.65366554947,,,0.8499038642426338
min,1.0,"""""""Great Performances"""" Cats (1998)""",(no genres listed),0.45407003
max,283228.0,チェブラーシカ (2010),Western,13.582502


#### Personalized recommendation

In [76]:
user_subset = ratings.where(ratings.userId == 1)
user_subset_recs = get_recommend(final_model.recommendForUserSubset(user_subset, 10))


In [77]:
display(user_subset_recs)

userId,title,genres,rating
1,Willie & Phil (1980),Comedy|Drama|Romance,5.9306693
1,O Pátio das Cantigas (1942),Comedy,5.832705
1,Let's Play Two (2017),Documentary,5.802419
1,The State I Am In (2000),Drama,5.708998
1,Pearl Jam: Immagine in Cornice - Live in Italy 2006 (2007),Documentary|Musical,5.60396
1,"What Did You Do in the War, Thanassi? (1971)",Comedy|Drama,5.575147
1,Ο Θανάσης στη χώρα της σφαλιάρας (1976),(no genres listed),5.5695586
1,Garbage Warrior (2007),Documentary,5.5587406
1,Return to Source: The Philosophy of The Matrix (2004),Documentary,5.5492215
1,Day of the Wacko (Dzien swira) (2002),Comedy|Drama,5.5443153


In [78]:
# recommend to new users

# 1. Adding the new ratings into the DataFrame (hint: look into using the .union() method)
# 2. Fitting the ALS model
# 3. Make recommendations for the user of choice
# 4. Print out the names of the top $n$ recommendations in a reader-friendly manner


# self_def_user = sc.parallelize([(7654321, 1, 4.0), (7654321, 16, 3.5), (7654321, 40, 4.5), (7654321, 78, 3.5), (7654321, 90, 2.0), 
#                                 (7654321, 100, 1.0), (7654321, 200, 5.0), (7654321, 352, 4.0), (7654321, 508, 4.5), (7654321, 1200, 3.5)]).toDF(("userId", "movieId", "rating"))

# rec_to_self_def_user = get_recommend(final_model.recommendForUserSubset(self_def_user, 2))

