# Movie Recommendation by Collaborative Filtering using the Netflix Data

# Predicting Movie Ratings

One of the most common uses of big data is to predict what users want.Netflix to recommend movies that you might like.  This lab will demonstrate how we can use Apache Spark to recommend movies to a user.  We will start with some basic techniques, and then use the [sparkml] library's Alternating Least Squares method to make more sophisticated predictions.

In this project, we will use MLlib to make personalized movie recommendations . We will work with 3.2 million ratings from 1821 movies and 28978 users , 

You will need to Spawn a Spark cluster and Jupyter Notebook server using the instructions provided in Sparkify 8, and upload a copy of this notebook to this Spark Notebook server. Also, you will have to upload the dataset (i.e., txt files) on S3.  The following cell defines the locations of the data files. You'll need to adjust the paths, below.


In [1]:
import os

# Change to the location of data files
dbfs_dir = 's3://aguonyinyechidsci6007netflix/Netflix/Netflix/'
movie_titles_filename = dbfs_dir + '/movie_titles.txt'
TrainingRatings_filename = dbfs_dir + '/TrainingRatings.txt'
TestingRatings_filename = dbfs_dir + '/TestingRatings.txt'


We're going to speed things up further by specifying the DataFrame schema explicitly. (When the Spark CSV adapter infers the schema from a txt file, it has to make an extra pass over the file. That'll slow things down here, and it isn't really necessary.)

In [2]:
from pyspark.sql.types import *

movie_titles_df_schema = StructType(
  [StructField('ID', IntegerType()),
   StructField('movie_year', IntegerType()),
   StructField('movie_title', StringType())]
)
TrainingRatings_df_schema = StructType(
  [StructField('movie_Id', IntegerType()),
   StructField('user_Id', IntegerType()),
   StructField('movie_ratings', FloatType())]
)
TestingRatings_df_schema = StructType(
  [StructField('movie_Id', IntegerType()),
   StructField('user_Id', IntegerType()),
   StructField('movie_ratings', FloatType())]
)


# Load and Cache
By now, your datasets should be hosted on S3. We're going to be accessing this data a lot. Rather than read it over and over again from S3, we'll cache both the movies DataFrame and the ratings DataFrame in memory.

In [3]:
from pyspark.sql.functions import regexp_extract
from pyspark.sql.types import *

raw_movie_titles_df = sqlContext.read.format('csv').options(inferSchema=False).schema(movie_titles_df_schema).load(movie_titles_filename)
movie_titles_df= raw_movie_titles_df

raw_TrainingRatings_df = sqlContext.read.format('csv').options(inferSchema=False).schema(TrainingRatings_df_schema).load(TrainingRatings_filename)
TrainingRatings_df = raw_TrainingRatings_df

raw_TestingRatings_df = sqlContext.read.format('csv').options(inferSchema=False).schema(TestingRatings_df_schema).load(TestingRatings_filename)
TestingRatings_df = raw_TestingRatings_df

movie_titles_df.cache()
TrainingRatings_df.cache()
TestingRatings_df.cache()

assert movie_titles_df.is_cached
assert TrainingRatings_df.is_cached
assert TestingRatings_df.is_cached

raw_movie_titles_count = raw_movie_titles_df.count()
movie_titles_count = movie_titles_df.count()

raw_TrainingRatings_count = raw_TrainingRatings_df.count()
TrainingRatings_count = TrainingRatings_df.count()

raw_TestingRatings_count = raw_TestingRatings_df.count()
TestingRatings_count = TestingRatings_df.count()


print('There are %s movietitle, %s trainingRatings and %s testingRatings in the datasets' % (movie_titles_count, TrainingRatings_count,TestingRatings_count))
print('Movie_titles:')
movie_titles_df.show(3)
print ('TrainingRatings:')
TrainingRatings_df.show(3, truncate=False)
print ('TestingRatings:')
TestingRatings_df.show(3, truncate=False)

assert raw_movie_titles_count == movie_titles_count
assert raw_TrainingRatings_count == TrainingRatings_count
assert raw_TestingRatings_count == TestingRatings_count


There are 17770 movietitle, 3255352 trainingRatings and 100478 testingRatings in the datasets
Movie_titles:
+---+----------+--------------------+
| ID|movie_year|         movie_title|
+---+----------+--------------------+
|  1|      2003|     Dinosaur Planet|
|  2|      2004|Isle of Man TT 20...|
|  3|      1997|           Character|
+---+----------+--------------------+
only showing top 3 rows

TrainingRatings:
+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|8       |1744889|1.0          |
|8       |1395430|2.0          |
|8       |1205593|4.0          |
+--------+-------+-------------+
only showing top 3 rows

TestingRatings:
+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|8       |573364 |1.0          |
|8       |2149668|3.0          |
|8       |1089184|3.0          |
+--------+-------+-------------+
only showing top 3 rows



To get an idea on which similarity can be estimated more
accurately, pick a user from the test set, extract the items this user has rated in the
training set and compute how many other users in the training set have rated the
same items. Let’s call this quantity overlap of an item for a given user. To get a
comparison statistic you can average over all the items of that user and also
perform this computation for multiple users from the test set to get an estimated
average overlap of items forusers. Then do the same for items instead of users to get the
estimated average overlap of users for items. Which number do you expect to be higher

In [4]:
AverageUser_df = TrainingRatings_df.filter("user_Id == 2149668")
AverageUser_df.show()


list1 = AverageUser_df.select('movie_Id')
array = [int(row.movie_Id) for row in list1.collect()]
users = TrainingRatings_df[TrainingRatings_df['movie_Id'].isin(array)]
users.show(10)

from pyspark.sql import functions as F
itemsusers = users.groupBy('user_Id').agg(F.count(users.movie_ratings).alias("count"),F.avg(users.movie_ratings).alias("average"))
print('movie_ids_with_avg_ratings_df:')
itemsusers.show(3, truncate=False)


+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|     992|2149668|          3.0|
|    1202|2149668|          3.0|
|    1289|2149668|          1.0|
|    1305|2149668|          3.0|
|    2015|2149668|          5.0|
|    2212|2149668|          3.0|
|    2342|2149668|          4.0|
|    2601|2149668|          3.0|
|    2675|2149668|          3.0|
|    2755|2149668|          3.0|
|    2913|2149668|          5.0|
|    2955|2149668|          5.0|
|    3151|2149668|          4.0|
|    3253|2149668|          4.0|
|    3274|2149668|          2.0|
|    3290|2149668|          5.0|
|    3355|2149668|          5.0|
|    3538|2149668|          4.0|
|    4847|2149668|          3.0|
|    4849|2149668|          3.0|
+--------+-------+-------------+
only showing top 20 rows

+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|     992| 306466|          3.0|
|     992| 765331|          4.0|
|     992|  41412

In [5]:
AverageUser_df = TrainingRatings_df.filter("movie_Id == 8")
AverageUser_df.show()

list1 = AverageUser_df.select('user_Id')
array = [int(row.user_Id) for row in list1.collect()]
users = TrainingRatings_df[TrainingRatings_df['user_Id'].isin(array)]
users.show(10)

from pyspark.sql import functions as F
itemsusers = users.groupBy('movie_Id').agg(F.count(users.movie_ratings).alias("count"),F.avg(users.movie_ratings).alias("average"))
print('movie_ids_with_avg_ratings_df:')
itemsusers.show(3, truncate=False)

+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|       8|1744889|          1.0|
|       8|1395430|          2.0|
|       8|1205593|          4.0|
|       8|1488844|          4.0|
|       8|1447354|          1.0|
|       8| 306466|          4.0|
|       8|1331154|          4.0|
|       8|1818178|          3.0|
|       8| 991725|          4.0|
|       8|1987434|          4.0|
|       8|1765381|          4.0|
|       8| 433803|          3.0|
|       8|1148143|          2.0|
|       8|1174811|          5.0|
|       8|1684516|          3.0|
|       8| 754781|          4.0|
|       8| 567025|          4.0|
|       8|1623132|          4.0|
|       8|1567095|          3.0|
|       8|1666394|          5.0|
+--------+-------+-------------+
only showing top 20 rows

+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|       8|1744889|          1.0|
|       8|1395430|          2.0|
|       8|1205593

In [8]:

trainingratings = TrainingRatings_df.toPandas()

#pd_testing = df_testing.toPandas()

In [12]:
#number of rated movies per user:
number_rated_movies = trainingratings.groupby("user_Id")["movie_ratings"].count().sort_values(ascending = False)
number_rated_movies.head()

user_Id
305344     1757
387418     1744
2439493    1640
1664010    1535
2118461    1481
Name: movie_ratings, dtype: int64

In [13]:
#number of rated movies per user:
rated_movies = trainingratings.groupby("movie_Id")["movie_ratings"].count().sort_values(ascending = False)
rated_movies.head()

movie_Id
6971    25468
6287    24393
4640    23525
9728    23184
8596    23005
Name: movie_ratings, dtype: int64

In [14]:
userItemRating = train_rating.pivot_table(index='user_Id', columns='movie_Id', values='movie_ratings')
userItemRating.head()


movie_Id,8,28,43,48,61,64,66,92,96,111,...,17654,17660,17689,17693,17706,17725,17728,17734,17741,17742
user_Id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
7,5.0,4.0,,,,,,,,,...,,,,,,,,,,
79,,,,,,,,,,,...,,,,,,,,,,
199,,,,,,,,,,4.0,...,,,,,,,,,,
481,,,,,,,,,,5.0,...,,,,,,,,,,
769,,,,,,,,,,,...,,,,,,,,,,


In [15]:
userItemRating.fillna(0, inplace = True)
userItemRating.head()

#userMovieRating = userItemRating.astype(np.int32)
#userMovieRating.head()

movie_Id,8,28,43,48,61,64,66,92,96,111,...,17654,17660,17689,17693,17706,17725,17728,17734,17741,17742
user_Id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
7,5.0,4.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
79,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
199,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,4.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
481,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0
769,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0


In [24]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [25]:
Train= TrainingRatings_df.join(movie_titles_df,TrainingRatings_df.user_Id==movie_titles_df.ID)

test = TestingRatings_df.join(movie_titles_df,TestingRatings_df.user_Id==movie_titles_df.ID)

In [None]:
Splitting of the dataset

In [26]:
(split_60_df, split_a_20_df ) = TrainingRatings_df.randomSplit([7.5, 2.5])

# Let's cache these datasets for performance
train_df = split_60_df.cache()
validation_df = split_a_20_df.cache()

print('Training: {0}, validation: {1}\n'.format(
  train_df.count(), validation_df.count())
)
train_df.show(3)
validation_df.show(3)

Training: 2442110, validation: 813242

+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|       8|   1333|          3.0|
|       8|   3321|          1.0|
|       8|   3363|          2.0|
+--------+-------+-------------+
only showing top 3 rows

+--------+-------+-------------+
|movie_Id|user_Id|movie_ratings|
+--------+-------+-------------+
|       8|      7|          5.0|
|       8|   5980|          3.0|
|       8|  13197|          3.0|
+--------+-------+-------------+
only showing top 3 rows



In [None]:
ALS Approach

In [28]:
als = ALS(maxIter=10, regParam=0.5, userCol="user_Id", 
                      itemCol = "movie_Id", ratingCol = "movie_ratings", coldStartStrategy = "drop")

model = als.fit(train_df)
#Generating Predictions
prediction = model.transform(validation_df)
evaluate1 = RegressionEvaluator(metricName="mse", labelCol="movie_ratings",
                                predictionCol="prediction")
                                            
evaluate2 = RegressionEvaluator(metricName="rmse", labelCol="movie_ratings",
                                predictionCol="prediction")



rmse = evaluate1.evaluate(prediction)
mse = evaluate2.evaluate(prediction)

print('The model had a MAE on the test set of {0}'.format(test_MAE))
print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

Mean squared error = 1.0335312185835566
Root-mean-square error = 1.068186779786811


In [32]:
##TestData
predict_df = model.transform(TestingRatings_df)

# Remove NaN values from prediction (due to SPARK-14489)
predicted_test_df = predict_df.filter(predict_df.prediction != float('nan'))

# Run the previously created RMSE evaluator, reg_eval, on the predicted_test_df DataFrame
test_MAE = evaluate1.evaluate(predicted_test_df)
test_RMSE = evaluate2.evaluate(predicted_test_df)

print('The model had a MAE on the test set of {0}'.format(test_MAE))
print('The model had a RMSE on the test set of {0}'.format(test_RMSE))

The model had a MAE on the test set of 1.0718980704883263
The model had a RMSE on the test set of 1.0353251037661195


In [33]:
userRecommend = model.recommendForAllUsers(10)
movieRecommend = model.recommendForAllItems(10)
userRecommend.show()
movieRecommend.show()

+-------+--------------------+
|user_Id|     recommendations|
+-------+--------------------+
|    481|[[3033, 4.8184724...|
|   2678|[[3033, 3.8027837...|
|   3595|[[3033, 3.4540896...|
|   6460|[[3033, 3.9813197...|
|   7284|[[3033, 4.2775936...|
|   7576|[[3033, 4.167266]...|
|   9597|[[3033, 3.7623503...|
|  15191|[[3033, 4.0348988...|
|  15846|[[3033, 4.372589]...|
|  20461|[[3033, 3.8701422...|
|  20774|[[3033, 3.494145]...|
|  26258|[[3033, 4.073724]...|
|  27608|[[3033, 4.1057005...|
|  28346|[[3033, 3.7849407...|
|  30941|[[3033, 3.8398561...|
|  30976|[[3033, 4.201834]...|
|  31203|[[3033, 3.7107627...|
|  36822|[[3033, 4.242998]...|
|  40851|[[3033, 3.864068]...|
|  41068|[[3033, 3.2144418...|
+-------+--------------------+
only showing top 20 rows

+--------+--------------------+
|movie_Id|     recommendations|
+--------+--------------------+
|    4190|[[1482568, 3.8478...|
|    3220|[[1482568, 3.7492...|
|   11240|[[1482568, 4.1383...|
|    6110|[[1482568, 4.5736...|
|    8

In [34]:
from pyspark.sql import Row
my_user_id = 1

my_rated_movies = [
    (my_user_id, 12293,  5),
    (my_user_id, 10947,  5),
    (my_user_id, 2290,  5),
    (my_user_id, 14648,  3),
    (my_user_id, 14185,  4),
    (my_user_id, 11812,  5),
    (my_user_id, 11088,  3),
    (my_user_id, 25468,  5),
    (my_user_id, 5326,  4),
    (my_user_id, 3928,  2)
     # The format of each line is (my_user_id, movie ID, your rating)
     # For example, to give the movie "Star Wars: Episode IV - A New Hope (1977)" a five rating, you would add the following line:
     #   (my_user_id, 260, 5),
]

my_ratings_df = sqlContext.createDataFrame(my_rated_movies, ['user_Id','movie_Id','movie_ratings'])
print('My movie ratings:')
display(my_ratings_df.limit(10))
my_ratings_df.limit(10).show()

My movie ratings:


DataFrame[user_Id: bigint, movie_Id: bigint, movie_ratings: bigint]

+-------+--------+-------------+
|user_Id|movie_Id|movie_ratings|
+-------+--------+-------------+
|      1|   12293|            5|
|      1|    2290|            5|
|      1|   11812|            5|
|      1|   25468|            5|
|      1|   10947|            5|
|      1|   14648|            3|
|      1|   14185|            4|
|      1|   11088|            3|
|      1|    5326|            4|
|      1|    3928|            2|
+-------+--------+-------------+

