In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext, Row
from pyspark.sql.functions import col, min, max, avg
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import time
from pyspark.sql.types import *

In [2]:
sc = SparkContext()

In [3]:
sqlContext = SQLContext(sc)

In [4]:
spark = SparkSession.builder.appName("movie-recommendation-systems").getOrCreate()

In [5]:
ratings_local = spark.read.csv('ratings-2M.csv',header=True)

In [6]:
movies = spark.read.csv('movies-27m.csv',header=True)

# EDA

In [7]:
movies.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [8]:
ratings_local.show(5)

+---+------+-------+------+----------+
|_c0|userId|movieId|rating| timestamp|
+---+------+-------+------+----------+
|  0|     1|    307|   3.5|1256677221|
|  1|     1|    481|   3.5|1256677456|
|  2|     1|   1091|   1.5|1256677471|
|  3|     1|   1257|   4.5|1256677460|
|  4|     1|   1449|   4.5|1256677264|
+---+------+-------+------+----------+
only showing top 5 rows



In [9]:
total_instances = ratings_local.count()
print("Total instances: ", total_instances)

Total instances:  2000000


In [10]:
users_count = ratings_local.select('userId').distinct().count()
print("Number of users: ", users_count)

Number of users:  20507


In [11]:
movies_count = ratings_local.select('movieId').distinct().count()
print("Number of movies: ", movies_count)

Number of movies:  26030


In [15]:
#Number of movie rated by each user
ratings_count = ratings_local.groupby('userId').count()
ratings_count.show(5)

+------+-----+
|userId|count|
+------+-----+
|   296|   10|
|   467|   10|
|   675|    4|
|   691|   15|
|   829|   15|
+------+-----+
only showing top 5 rows



In [16]:
#Minimum ratings for movies by user
ratings_count.select(min('count')).show()

+----------+
|min(count)|
+----------+
|         1|
+----------+



In [17]:
#Maximum ratings for movies by user
ratings_count.select(max('count')).show()

+----------+
|max(count)|
+----------+
|      4874|
+----------+



In [18]:
#Average ratings for movies by user
ratings_count.select(avg('count')).show()

+----------------+
|      avg(count)|
+----------------+
|97.5276734773492|
+----------------+



In [12]:
#Number of ratings in matrix
numerator = ratings_local.count()
print("Actual number of ratings: ", numerator)

Actual number of ratings:  2000000


In [13]:
#Number of ratings matrix could contain if no empty cells
denominator = users_count * movies_count
print("Number of ratings if every user watch every moive: ", denominator)

Number of ratings if every user watch every moive:  533797210


In [14]:
#Caculating the sparsity of ratings_new
sparsity = 1 - (numerator*1.0 / denominator)
print("Sparsity: ", sparsity)

Sparsity:  0.9962532587984115


In [19]:
#What if remove users with less than 20 ratings?
ratings_filted = ratings_local.groupby('userId').count().filter(col('count') >= 20)
greater20 = ratings_filted.count()/users_count
print("Number of users ratings more than 20 movies: ", ratings_filted.count())
print("Proportion of users who have no less than 20 ratings:", greater20)

Number of users ratings more than 20 movies:  12556
Proportion of users who have no less than 20 ratings: 0.6122787340907982


In [20]:
ratings_local =ratings_local.withColumn('userId', ratings_local['userId'].cast('integer'))
ratings_local =ratings_local.withColumn('movieId', ratings_local['movieId'].cast('integer'))
ratings_local =ratings_local.withColumn('rating', ratings_local['rating'].cast('float'))
ratings_local =ratings_local.withColumn('timestamp', ratings_local['timestamp'].cast('integer'))

In [21]:
#Confirm the columns are now in the correct format
ratings_local.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: integer (nullable = true)



In [22]:
#Remove unnecssary features from ratings_local
ratings_local = ratings_local.select('userId','movieId','rating')
ratings_local.show(3)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    307|   3.5|
|     1|    481|   3.5|
|     1|   1091|   1.5|
+------+-------+------+
only showing top 3 rows



In [23]:
#Convert the type of movieId in movies table
movies =movies.withColumn('movieId', movies['movieId'].cast('integer'))

In [24]:
movies.printSchema()

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)



In [25]:
#Join the movies table and the ratings table together
movie_ratings = ratings_local.join(movies, on='movieId', how='leftouter')

In [26]:
movie_ratings.show(5)

+-------+------+------+--------------------+--------------+
|movieId|userId|rating|               title|        genres|
+-------+------+------+--------------------+--------------+
|    307|     1|   3.5|Three Colors: Blu...|         Drama|
|    481|     1|   3.5|   Kalifornia (1993)|Drama|Thriller|
|   1091|     1|   1.5|Weekend at Bernie...|        Comedy|
|   1257|     1|   4.5|Better Off Dead.....|Comedy|Romance|
|   1449|     1|   4.5|Waiting for Guffm...|        Comedy|
+-------+------+------+--------------------+--------------+
only showing top 5 rows



# Building the ALS Recommendation Engines

In [27]:
csv_num = [500, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000, 256000, 512000, 1000000]

sparsity_combination = []
maxIter_combination = []
regParam_combination = []
time_combination = []
RMSE_combination = []
rank_combination = []


for i in csv_num:
    
    #Take samlpes
    ratings_sample = ratings_local.rdd.takeSample(False, i, 1234)

    #Create spark dataframes
    ratings = sqlContext.createDataFrame(ratings_sample)
    
    #Record start time
    ALS_start = time.time()
    
    #Split the dataset 
    (training_data, test_data) = ratings.randomSplit([0.8, 0.2], seed=1234)
    
    #Build a generic ALS model without hyperparameters
    als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative = True,\
          implicitPrefs = False)
    
    # Add hyperparameters and their respective values to param_grid
    param_grid= ParamGridBuilder() \
            .addGrid(als.rank, [10, 15, 20]) \
            .addGrid(als.maxIter, [5, 10, 20]) \
            .addGrid(als.regParam, [0.05, 0.1, 0.15]) \
            .build()
    
    # Define evaluator as RMSE
    evaluator = RegressionEvaluator(metricName="rmse", \
                                    labelCol="rating", predictionCol="prediction")
    
    #Build cross validation using CrossValidator
    cv = CrossValidator(estimator=als, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=5)
    
    #Run the cross validation on the training data
    model = cv.fit(training_data)
    
    #Extract best combination of values from cross validation
    best_model = model.bestModel
    
    #Generate test set predictions
    predictions = best_model.transform(test_data)
    predictions = predictions.na.drop()
    
    #Evaluate the predictions using RMSE
    rmse = evaluator.evaluate(predictions)
    ALS_end = time.time()
    #print("RMSE for size", i, ": ", rmse)
    
    rank_combination.append((i, best_model.rank))
    maxIter_combination.append((i, best_model._java_obj.parent().getMaxIter()))
    regParam_combination.append((i, best_model._java_obj.parent().getRegParam()))
    
    
    model_time = ALS_end - ALS_start
    #print("ALS cost time for size", i, ": ", model_time)
    
    time_combination.append((i, model_time))
    RMSE_combination.append((i, rmse))
        
#df_time = pd.DataFrame(time_combination, columns = ['size', 'model_cost_time']).set_index('size')
#df_rmse = pd.DataFrame(RMSE_combination, columns = ['size', 'rmse']).set_index('size')
cSchema_time = StructType([StructField("size", IntegerType()),StructField("model_cost_time", FloatType())])
cSchema_rmse = StructType([StructField("size", IntegerType()),StructField("rmse", FloatType())])

df_time = spark.createDataFrame(time_combination, schema = cSchema_time)
df_rmse = spark.createDataFrame(RMSE_combination, schema = cSchema_rmse)

df_time.show()
df_rmse.show()

+-----+---------------+
| size|model_cost_time|
+-----+---------------+
|  500|      25.155996|
| 1000|      20.992178|
| 2000|       19.99443|
| 4000|      21.106209|
| 8000|       22.90486|
|16000|      23.987976|
|32000|      27.568935|
|64000|      30.975883|
+-----+---------------+

+-----+---------+
| size|     rmse|
+-----+---------+
|  500| 4.230382|
| 1000|3.2854357|
| 2000|2.7728326|
| 4000|2.4852667|
| 8000|2.1477742|
|16000| 1.833819|
|32000|1.6032212|
|64000|1.3736886|
+-----+---------+



# Model Performance Evaluation

In [28]:
#Generate 3 recommendations for all users
ALS_recommendations = best_model.recommendForAllUsers(3)

In [29]:
ALS_recommendations.show(5)

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|  1580|[[104074, 11.8679...|
|  4900|[[26395, 8.163957...|
|  7880|[[26395, 5.075081...|
| 14570|[[565, 7.171178],...|
| 15790|[[55253, 4.84224]...|
+------+--------------------+
only showing top 5 rows



In [30]:
ALS_recommendations.registerTempTable('ALS_recs_temp')

In [31]:
#Convert the recommder to database type of view
convert_recs = spark.sql("SELECT userId,\
                               movieIds_and_ratings.movieId AS movieId,\
                               movieIds_and_ratings.rating AS prediction\
                       FROM ALS_recs_temp\
                       LATERAL VIEW explode(recommendations) exploded_table\
                       AS movieIds_and_ratings")

In [32]:
convert_recs.show(5)

+------+-------+----------+
|userId|movieId|prediction|
+------+-------+----------+
|  1580| 104074| 11.867983|
|  1580|   1783|  9.381773|
|  1580|   7759|  8.470671|
|  4900|  26395|  8.163957|
|  4900|   2357|  7.987455|
+------+-------+----------+
only showing top 5 rows



In [33]:
#Join the recommder to movie table
full_recs = convert_recs.join(movies, ['movieId'], "left")
full_recs.show(5)

+-------+------+----------+--------------------+--------------------+
|movieId|userId|prediction|               title|              genres|
+-------+------+----------+--------------------+--------------------+
| 104074|  1580| 11.867983|Percy Jackson: Se...|Adventure|Childre...|
|   1783|  1580|  9.381773|     Palmetto (1998)|Crime|Drama|Myste...|
|   7759|  1580|  8.470671|   Nostalghia (1983)|               Drama|
|  26395|  4900|  8.163957|Rutles: All You N...|              Comedy|
|   2357|  4900|  7.987455|Central Station (...|               Drama|
+-------+------+----------+--------------------+--------------------+
only showing top 5 rows



In [37]:
#Filter the movie the each user have seen before
clean_recs = full_recs.join(ratings, ["userId", 'movieId'], "left").filter(ratings.rating.isNull())

In [39]:
clean_recs.show(5)

+------+-------+----------+--------------------+--------------------+------+
|userId|movieId|prediction|               title|              genres|rating|
+------+-------+----------+--------------------+--------------------+------+
|   251| 114893|  8.871814|         Sins (1986)|       Drama|Romance|  null|
|   319| 168492|  8.319116|Call Me by Your N...|       Drama|Romance|  null|
|   342|  73000|  5.594062|         Nine (2009)|Drama|Musical|Rom...|  null|
|   467|   4513|  6.996867|House on Carroll ...|            Thriller|  null|
|   549|  82152|   7.58963|      Beastly (2011)|Drama|Fantasy|Rom...|  null|
+------+-------+----------+--------------------+--------------------+------+
only showing top 5 rows

