### Moive Recommendation System

In this project, we will use an Alternating Least Squares (ALS) algorithm to predict the ratings for the movies in [MovieLens small dataset](https://grouplens.org/datasets/movielens/latest/).

In [2]:
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math

from pyspark.sql.functions import col

In [3]:
import os
os.environ["PYSPARK_PYTHON"] = "python3"

## Part1: Data ETL and Data Exploration

In [5]:
from pyspark.sql import SparkSession
spark = SparkSession \
    .builder \
    .appName("moive analysis") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [6]:
movies = spark.read.load("/FileStore/tables/movies.csv", format='csv', header = True)
ratings = spark.read.load("/FileStore/tables/ratings.csv", format='csv', header = True)
links = spark.read.load("/FileStore/tables/links.csv", format='csv', header = True)
tags = spark.read.load("/FileStore/tables/tags.csv", format='csv', header = True)

In [7]:
movies.show(5)

In [8]:
ratings.show(5)

In [9]:
tmp1 = ratings.groupBy("userID").count().toPandas()['count'].min()
tmp2 = ratings.groupBy("movieId").count().toPandas()['count'].min()
print('For the users that rated movies and the movies that were rated:')
print('Minimum number of ratings per user is {}'.format(tmp1))
print('Minimum number of ratings per movie is {}'.format(tmp2))

In [10]:
tmp1 = sum(ratings.groupBy("movieId").count().toPandas()['count'] == 1)
tmp2 = ratings.select('movieId').distinct().count()
print('{} out of {} movies are rated by only one user'.format(tmp1, tmp2))

## Part 2: Spark SQL and OLAP

#### The number of Users

In [13]:
q1_res = ratings.select('userId').distinct().count()
print('The number of users is {}.'.format(q1_res))

#### The number of Movies

In [15]:
q2_res = movies.select('movieId').distinct().count()
print('The number of movies is {}.'.format(q2_res))

### The number of movies are rated by users

In [17]:
q3_res_1 = ratings.select('movieId').distinct().count()
print('The number of movies have been rated is {}.'.format(q3_res_1))

The movies have not been rated.

In [19]:
movies.createOrReplaceTempView('movies')
ratings.createOrReplaceTempView('ratings')

q3_res_2 = spark.sql(" SELECT movieId, title FROM movies WHERE movieId NOT IN (select distinct movieId from ratings)")
display(q3_res_2)

movieId,title
25817,Break of Hearts (1935)
26361,Baby Blue Marine (1976)
27153,Can't Be Heaven (Forever Together) (2000)
27433,Bark! (2002)
31945,Always a Bridesmaid (2000)
52696,"Thousand and One Nights, A (1001 Nights) (1945)"
58209,Alex in Wonder (Sex and a Girl) (2001)
60234,"Shock, The (1923)"
69565,Bling: A Planet Rock (2007)
69834,Agency (1980)


#### List Movie Genres

In [21]:
display(movies.select('genres').where(col('genres').contains('(no genres listed)') == False)).distinct().orderBy("genres", ascending=False)

genres
Adventure|Animation|Children|Comedy|Fantasy
Adventure|Children|Fantasy
Comedy|Romance
Comedy|Drama|Romance
Comedy
Action|Crime|Thriller
Comedy|Romance
Adventure|Children
Action
Action|Adventure|Thriller


In [22]:
Genres = set(movies.select('genres')\
                   .where(col('genres').contains('(no genres listed)') == False)\
                   .distinct().rdd.flatMap(lambda x: x).flatMap(lambda x: x.split('|')).collect())
Genres

#### Movie for Each Category

In [24]:
d = {}
for genre in Genres:
    d[genre] = movies.where(col('genres').contains(genre)).select('title')

In [25]:
d['Action'].show()

## Part2: Spark ALS based on Spark RDD

In [27]:
from pyspark.mllib.recommendation import ALS

In [28]:
movie_rating = sc.textFile("/FileStore/tables/ratings_small.csv")
rating_small = spark.read.load("/FileStore/tables/ratings_small.csv", format='csv', header = True)
movies_small = spark.read.load("/FileStore/tables/movies_small.csv", format='csv', header = True)

In [29]:
header = movie_rating.take(1)[0]
rating_data = movie_rating.filter(lambda line: line!=header).map(lambda line: line.split(",")).map(lambda tokens: (tokens[0],tokens[1],tokens[2])).cache()


In [30]:
# check three rows
rating_data.take(3)

Now we split the data into training/validation/testing sets using a 6/2/2 ratio.

In [32]:
train, validation, test = rating_data.randomSplit([6,2,2],seed = 7856)

In [33]:
train.cache()

In [34]:
validation.cache()

In [35]:
test.cache()

### ALS Model Selection and Evaluation

With the ALS model, we can use a grid search to find the optimal hyperparameters.

In [37]:
def train_ALS(train_data, validation_data, num_iters, reg_param, ranks):
    min_error = float('inf')
    best_rank = -1
    best_regularization = 0
    best_model = None
    for rank in ranks:
        for reg in reg_param:
            # write your approach to train ALS model
            als = ALS()
            model = als.train(train_data, rank = rank, iterations= num_iters, lambda_= reg)
            # make prediction
            predect = model.predictAll(validation_data.map(lambda x: (x[0],x[1]))).map(lambda x: ((x[0],x[1]),x[2]))
#             print(predect.collect())
            # get the rating result
            rating = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predect)
            # get the RMSE
#             print(rating.take(5))
            error = np.sqrt(rating.map(lambda x: (x[1][0] - x[1][1]) ** 2).mean())
            print ('{} latent factors and regularization = {}: validation RMSE is {}'.format(rank, reg, error))
            if error < min_error:
                min_error = error
                best_rank = rank
                best_regularization = reg
                best_model = model
    print ('\nThe best model has {} latent factors and regularization = {}'.format(best_rank, best_regularization))
    return best_model

In [38]:
num_iterations = 10
ranks = [6, 8, 10, 12, 14]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

import time
start_time = time.time()
final_model = train_ALS(train, validation, num_iterations, reg_params, ranks)

print ('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

In [39]:
iter_array = [1, 2, 5, 10]

def plot_learning_curve(iter_array, train_data, validation_data, reg, rank):
    val_err = []
    train_err = []
    for iter in iter_array:
        # write your approach to train ALS model
        model = ALS().train(train_data, rank = rank, iterations= iter, lambda_= reg)
        # make prediction
        predect_val = model.predictAll(validation_data.map(lambda x: (x[0],x[1]))).map(lambda x: ((x[0],x[1]),x[2]))
        predect_train = model.predictAll(train_data.map(lambda x: (x[0],x[1]))).map(lambda x: ((x[0],x[1]),x[2]))

        # get the rating result
        rating_val = validation_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predect_val)
        rating_train = train_data.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predect_train)

        # get the RMSE
        error_val = np.sqrt(rating_val.map(lambda x: (x[1][0] - x[1][1]) ** 2).mean())
        error_train = np.sqrt(rating_train.map(lambda x: (x[1][0] - x[1][1]) ** 2).mean())  
        val_err.append(error_val)
        train_err.append(error_train)
    plt.figure()
    plt.plot(iter_array, val_err, label = 'val_err')
    plt.plot(iter_array, train_err, label = 'train_err')
    plt.legend()
    display()
plot_learning_curve(iter_array, train, validation, 0.2, 8)
# write your function to plot the learning curve 

### Model testing on the test data
And finally, wite your code to make a prediction and check the testing error.

In [41]:
model = ALS().train(train, rank = 8, iterations= 10, lambda_= 0.2)
# make prediction
predect_test = model.predictAll(test.map(lambda x: (x[0],x[1]))).map(lambda x: ((x[0],x[1]),x[2]))
rating_test = test.map(lambda x: ((int(x[0]), int(x[1])), float(x[2]))).join(predect_test)
error_test = np.sqrt(rating_test.map(lambda x: (x[1][0] - x[1][1]) ** 2).mean())
print ('\nThe rmase of testinf on best model is {}.'.format(error_test))       

In [42]:
movieFeature = spark.createDataFrame(model.productFeatures(), ['movieId', 'feature'])
userFeature = spark.createDataFrame(model.userFeatures(), ['userId', 'feature'])

In [43]:
userFeature.show()

In [44]:
def movieName(movieId):
  name = movies_small.where(col('movieId').isin(movieId)).select('title').toPandas()['title'].tolist()
  return list(zip(movieId,name))

def recommendaedMovie(userid,num):
  recommendaedMovies = pd.DataFrame(model.recommendProducts(user = userid, num = num))['product'].tolist()
  return movieName(recommendaedMovies)

def ratedMovie(userid,num):
  ratedMovies_rating = rating_small.where(col('userId') == userid).select('movieId','rating').orderBy('rating',ascending = False).limit(num).toPandas()
  ratedMovies = ratedMovies_rating['movieId'].tolist()
  rating = ratedMovies_rating['rating'].tolist()
  return list(zip(movieName(ratedMovies),rating))

def similartiy(movieId1, movieId2, userId):
  movieFeatures = movieFeature.where(col('movieId').isin(movieId1,movieId2)).select('feature').collect()
  movie1 = np.array(movieFeatures[0][0])
  movie2 = np.array(movieFeatures[1][0])
  user = np.array(userFeature.where(col('userId') == userId).select('feature').collect()[0][0])
  movieSim = np.dot(movie1,movie2)
  userMov1 = np.dot(movie1,user)
  userMov2 = np.dot(movie2,user)
  print('movieSim:' + str(movieSim))
  print('userMov1:' + str(userMov1))
  print('userMov2:' + str(userMov2))

In [45]:
userid = 600
num = 5
print('This user like the following movies.')
print(ratedMovie(userid, num))
print('\nThese movies have been recommended to this user.')
print(recommendaedMovie(userid, num))

In [46]:
movieId1 = 1073
movieId2 = 170355dd
similartiy(movieId1, movieId2, userid)

### Conclusion. 
Did Data ETL and Data Exploration on 1GB MovieLens movie rating dataset using Spark SQL and Spark Dataframe.  
Implemented Alternating Least Square (ALS) model using Spark.MLlib to achieve a movie recommendation system.  
Tuned the hyper-parameters of ALS based on Spark RDD and Dataframe and learned the latent factors of users and items.   
Based on the recommendation system, recommended personalized movies for each user and analyzed underlying factors.