In [1]:
dbfs_dir = 's3://projectnetflix/'
training = dbfs_dir + 'TrainingRatings.txt'
testing = dbfs_dir + 'TestingRatings.txt'
movies = dbfs_dir + 'movie_titles.txt'

In [2]:
from pyspark.sql.types import *

df_schema = StructType(
  [StructField('movieID', IntegerType()),
   StructField('userID', IntegerType()),
   StructField('rating', DoubleType())]
)

movie_schema = StructType(
  [StructField('movieID', IntegerType()),
   StructField('releaseYear', IntegerType()),
   StructField('title', StringType())]
)

In [3]:
training_data = sqlContext.read.format('csv').options(header=False, inferSchema=False).schema(df_schema).load(training)
testing_data = sqlContext.read.format('csv').options(header=False, inferSchema=False).schema(df_schema).load(testing)
movie_data = sqlContext.read.format('csv').options(header=False, inferSchema=False).schema(movie_schema).load(movies)

training_data.cache()
testing_data.cache()
movie_data.cache()

DataFrame[movieID: int, releaseYear: int, title: string]

<h3> ALS model </h3>
<p> For prediction </p>

In [4]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

In [5]:
#Join test and train dataframe with movies 

joined_train = training_data.join(movie_data,on=['movieID'],how='inner')
joined_test = testing_data.join(movie_data,on=['movieID'],how='inner')

joined_train.cache()
joined_test.cache()

DataFrame[movieID: int, userID: int, rating: double, releaseYear: int, title: string]

In [6]:
seed = 124

def testALS(maxIter, regParam, min_error = float('inf')):
    print('For maxIter = %i and regParam = %.2f'%(maxIter, regParam))
    als = ALS(maxIter=maxIter, regParam=regParam, 
              userCol="userID", itemCol="movieID", ratingCol="rating",
              coldStartStrategy="drop")

    mae_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="mae")
    rmse_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

    tolerance = 0.03
    ranks = [4, 8, 12]
    all_mae = [0, 0, 0]
    all_rmse = [0, 0, 0]
    models = [0, 0, 0]
    i = 0
    best_rank = -1

    for rank in ranks:
        # Set the rank here:
        als.setRank(rank)

        # Create the model with these parameters.
        model = als.fit(joined_train)

        # Run the model to create a prediction. Predict against the validation_df.
        predict_df = model.transform(joined_test)

        # Remove NaN values from prediction (due to SPARK-14489)
        #predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

        # Run the previously created MAE and RMSE evaluators, on the predicted_ratings_df DataFrame
        mae = mae_eval.evaluate(predict_df)
        all_mae[i] = mae

        rmse = rmse_eval.evaluate(predict_df)
        all_rmse[i] = rmse

        models[i] = model
        print('For rank %i the MAE is %.4f' % (rank, mae))
        print('For rank %i the RMSE is %.4f' % (rank, rmse))
        if rmse < min_error:
            min_error = rmse
            best_rank = i
        i += 1

    als.setRank(ranks[best_rank])
    print('The best model was trained with rank %i\n\n' % ranks[best_rank])
    my_model = models[best_rank]
    return min_error, my_model

In [7]:
%%time 
#Testing multiple parameter values to find best model

error, best_model = testALS(5, 0.1)
error, best_model = testALS(5, 0.08, error)
error, best_model = testALS(10, 0.01, error)

For maxIter = 5 and regParam = 0.10


                                                                                

For rank 4 the MAE is 0.6914
For rank 4 the RMSE is 0.8708


                                                                                

For rank 8 the MAE is 0.6877
For rank 8 the RMSE is 0.8643


                                                                                

For rank 12 the MAE is 0.6908
For rank 12 the RMSE is 0.8666
The best model was trained with rank 8


For maxIter = 5 and regParam = 0.08


                                                                                

For rank 4 the MAE is 0.6920
For rank 4 the RMSE is 0.8708


                                                                                

For rank 8 the MAE is 0.6848
For rank 8 the RMSE is 0.8604


                                                                                

For rank 12 the MAE is 0.6881
For rank 12 the RMSE is 0.8627
The best model was trained with rank 8


For maxIter = 10 and regParam = 0.01


                                                                                

For rank 4 the MAE is 0.6783
For rank 4 the RMSE is 0.8595


                                                                                

For rank 8 the MAE is 0.6620
For rank 8 the RMSE is 0.8436




For rank 12 the MAE is 0.6621
For rank 12 the RMSE is 0.8480
The best model was trained with rank 8


CPU times: user 591 ms, sys: 105 ms, total: 696 ms
Wall time: 2min 24s


                                                                                

<h3> New user recommendation </h3>

In [8]:
from pyspark.sql.functions import countDistinct

#Checking to see which userID can be used
training_data.select('userID').distinct().orderBy('userID', ascending=True).limit(5).show()
testing_data.select('userID').distinct().orderBy('userID', ascending=True).limit(5).show()

                                                                                

+------+
|userID|
+------+
|     7|
|    79|
|   199|
|   481|
|   769|
+------+

+------+
|userID|
+------+
|     7|
|    79|
|   199|
|   481|
|   769|
+------+



Since userIDs start form 7 in both testing and training data, I can use any number before as my userID. <br>
I choose to select id as 4. 

In [9]:
my_user_id = 4

my_rated_movies = [
    (my_user_id, 10676, 4),
    (my_user_id, 14810, 4),
    (my_user_id, 16162, 2),
    (my_user_id, 11340, 5),
    (my_user_id, 4556, 2),
    (my_user_id, 6250, 4),
    (my_user_id, 13334, 1),
    (my_user_id, 11312, 1),
    (my_user_id, 15731, 5),
    (my_user_id, 10109, 4)]

my_rated_movies = sqlContext.createDataFrame(my_rated_movies, ['userID','movieID','rating'])
my_rated_movies = my_rated_movies.join(movie_data,on=['movieID'],how='inner')

In [10]:
print('My rated movies are:')
my_rated_movies.show(truncate=False)

My rated movies are:
+-------+------+------+-----------+-----------------------------------+
|movieID|userID|rating|releaseYear|title                              |
+-------+------+------+-----------+-----------------------------------+
|10676  |4     |4     |1933       |The Kennel Murder Case / Nancy Drew|
|14810  |4     |4     |2000       |Dolphins: IMAX                     |
|16162  |4     |2     |2002       |Kim Possible: The Secret Files     |
|11340  |4     |5     |1988       |Johnny Be Good                     |
|4556   |4     |2     |2001       |Stealing Time                      |
|6250   |4     |4     |1997       |Female Perversions                 |
|13334  |4     |1     |2000       |Catfish in Black Bean Sauce        |
|11312  |4     |1     |1998       |Mystery Kids                       |
|15731  |4     |5     |2002       |Roxy Music: Live at the Apollo     |
|10109  |4     |4     |1994       |Major League II                    |
+-------+------+------+-----------+--------

In [11]:
joined_train = joined_train.union(my_rated_movies)

als = ALS(maxIter=10, regParam=0.01, 
              userCol="userID", itemCol="movieID", ratingCol="rating",
              coldStartStrategy="drop")

model = als.fit(joined_train)

                                                                                

In [12]:
recs = list(model.recommendForUserSubset(my_rated_movies.select('userID').distinct(), 10).select('recommendations')\
            .toPandas()['recommendations'])[0]

                                                                                

In [13]:
print('Recommended movies for my user are:\n')
for (title, rating) in recs:
    print(movie_data.filter(movie_data.movieID == title).collect()[0][2])

Recommended movies for my user are:

Linkin Park
Maxim: The Real Swimsuit DVD: Vol. 1
Vietnam: We Were Heroes 1st Cavalry Division Airmobile
Kiss: Unauthorized Kiss
The Deviants
Learning HTML: No Brainers
The Cars: Live
Raging Bull: Collector's Edition: Bonus Material
Secrets of War: Nazi Warfare
Dance for Camera


In [14]:
#Manually removing all cached dataframes
training_data.unpersist()
testing_data.unpersist()
movie_data.unpersist()

joined_train.unpersist()
joined_test.unpersist()

DataFrame[movieID: int, userID: int, rating: double, releaseYear: int, title: string]