In [0]:
from pyspark.sql.types import *
 
dbfs_dir = '/databricks-datasets/cs110x/ml-20m/data-001'

movies_filename = dbfs_dir + '/movies.csv'
ratings_filename = dbfs_dir + '/ratings.csv'
    
movies_schema = StructType([
  StructField('movieId', IntegerType()),
  StructField('title', StringType()),
  StructField('genres', StringType()),
])
 
ratings_schema = StructType([
  StructField('userId', IntegerType()),
  StructField('movieId', IntegerType()),
  StructField('rating', FloatType()),
])

In [0]:
movies_df = sqlContext \
    .read \
    .format('com.databricks.spark.csv') \
    .options(header=True, inferSchema=False) \
    .schema(movies_schema) \
    .load(movies_filename)

display(movies_df)

ratings_df = sqlContext \
    .read \
    .format('com.databricks.spark.csv') \
    .options(header=True, inferSchema=False) \
    .schema(ratings_schema) \
    .load(ratings_filename)

display(ratings_df)

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance
5,Father of the Bride Part II (1995),Comedy
6,Heat (1995),Action|Crime|Thriller
7,Sabrina (1995),Comedy|Romance
8,Tom and Huck (1995),Adventure|Children
9,Sudden Death (1995),Action
10,GoldenEye (1995),Action|Adventure|Thriller


userId,movieId,rating
1,2,3.5
1,29,3.5
1,32,3.5
1,47,3.5
1,50,3.5
1,112,3.5
1,151,4.0
1,223,4.0
1,253,4.0
1,260,4.0


In [0]:
# Train and run the model on the 20 Million movie ratings dataset

(df_ratings_70, df_ratings_30) = ratings_df.randomSplit([0.7, 0.3], 50)

df_ratings_train = df_ratings_70.cache()
df_ratings_test = df_ratings_30.cache()

print(df_ratings_train.count())
print(df_ratings_test.count())

14001658
5998605


In [0]:
# Test with various values and find the best model based on the error value

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# ALS learner
als = ALS()

# Set the parameters for the method
als.setMaxIter(6) \
   .setRegParam(0.1) \
   .setUserCol('userId') \
   .setItemCol('movieId') \
   .setRatingCol('rating')

# Create an RMSE evaluator using the label and predicted columns
reg_eval = RegressionEvaluator(predictionCol="prediction", labelCol="rating", metricName="rmse")

ranks = [4, 8, 12] 
errors = [0, 0, 0]
models = [0, 0, 0]
err = 0
min_error = float('inf')
best_rank = -1

for rank in ranks:
    # Set the rank
    als.setRank(rank)
  
    # Create the model with these parameters
    model = als.fit(df_ratings_train)
  
    # Run the model to create a prediction. Predict against the df_ratings_test
    predict_df = model.transform(df_ratings_test)

    # Remove NaN values from prediction
    predicted_ratings_df = predict_df.filter(predict_df.prediction != float('nan'))

    # Run the previously created RMSE evaluator, reg_eval, on the predicted_ratings_df
    error = reg_eval.evaluate(predicted_ratings_df)
    errors[err] = error
    models[err] = model
  
    print('For rank {rank} the RMSE is {error}'.format(rank=rank, error=error))
    
    if error < min_error:
        min_error = error
        best_rank = err
    
    err += 1

als.setRank(ranks[best_rank])

print('The best model was trained with rank {rank}'.format(rank=ranks[best_rank]))

model = models[best_rank]

For rank 4 the RMSE is 0.8209835013079961
For rank 8 the RMSE is 0.8119550102738992
For rank 12 the RMSE is 0.811538089664684
The best model was trained with rank 12


In [0]:
# Prepare a table with at least 10 of my own ratings for the movies

my_user_id = 7

my_rated_movies = [
    (my_user_id, 1367, 4), # 101 Dalmatians (1996)
    (my_user_id, 93297, 4), # Act of Violence (2012)
    (my_user_id, 2525, 5), # Alligator (1980)
    (my_user_id, 130, 5), # Angela (1995)
    (my_user_id, 72209, 4), # Astro Boy (2009)
    (my_user_id, 60259, 4), # Bambi 2 (2006)
    (my_user_id, 127579, 5), # Black Venus (2010)
    (my_user_id, 5961, 5), # Blue Steel (1990)
    (my_user_id, 31431, 4), # Boogeyman (2005)
    (my_user_id, 93939, 5) # Cafe de Flore (2011)
]
 
df_my_ratings = sqlContext.createDataFrame(my_rated_movies, ['userId', 'movieId', 'rating'])
display(df_my_ratings)

userId,movieId,rating
7,1367,4
7,93297,4
7,2525,5
7,130,5
7,72209,4
7,60259,4
7,127579,5
7,5961,5
7,31431,4
7,93939,5


In [0]:
# Run the model with my rated movies data
df_ratings_with_mine = df_ratings_train.unionAll(df_my_ratings)

print(df_ratings_train.count())
print(df_ratings_with_mine.count())

als = ALS().setMaxIter(6).setRegParam(0.1) \
    .setUserCol('userId') \
    .setItemCol('movieId') \
    .setRatingCol('rating') \
    .setRank(8)
 
my_model = als.fit(df_ratings_with_mine)

14001658
14001668


In [0]:
from pyspark.sql.functions import lit

# Filter out the movies I already rated
my_rated_movie_ids = [x[1] for x in my_rated_movies]
df_unrated_movies = movies_df.filter(~movies_df['movieId'].isin(my_rated_movie_ids))

# Add a column with my_user_id as "userId"
df_unrated_movies_w_user = df_unrated_movies.withColumn('userId', lit(my_user_id))

In [0]:
from pyspark.sql.functions import desc

# Use my_model to predict ratings for the movies that I did not manually rate
df_my_predicted_ratings = my_model.transform(df_unrated_movies_w_user)

# Eliminate empty predictions
df_my_predicted_ratings = df_my_predicted_ratings.filter(df_my_predicted_ratings.prediction != float('nan'))
display(df_my_predicted_ratings)

# Show the top 20 movies that the model recommends for me to watch
df_my_predicted_ratings.sort(desc('prediction')).show(20)

movieId,title,genres,userId,prediction
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy,7,3.7041857
2,Jumanji (1995),Adventure|Children|Fantasy,7,3.2105508
3,Grumpier Old Men (1995),Comedy|Romance,7,3.1314464
4,Waiting to Exhale (1995),Comedy|Drama|Romance,7,2.6663945
5,Father of the Bride Part II (1995),Comedy,7,2.9999356
6,Heat (1995),Action|Crime|Thriller,7,3.5415046
7,Sabrina (1995),Comedy|Romance,7,3.1951044
8,Tom and Huck (1995),Adventure|Children,7,2.9642055
9,Sudden Death (1995),Action,7,2.9025524
10,GoldenEye (1995),Action|Adventure|Thriller,7,3.4285057


+-------+--------------------+--------------------+------+----------+
|movieId|               title|              genres|userId|prediction|
+-------+--------------------+--------------------+------+----------+
|  43567|Sweet November (1...|               Drama|     7| 5.3704004|
|  77736|Crazy Stone (Feng...|        Comedy|Crime|     7|   5.12091|
| 128812|Drew: The Man Beh...|         Documentary|     7| 5.0526586|
| 114070|Good Job:  Storie...|         Documentary|     7| 4.8574343|
| 117907|My Brother Tom (2...|               Drama|     7| 4.8552675|
|  98275|Octopus, The (Le ...|Comedy|Crime|Thri...|     7| 4.8495646|
| 121029|No Distance Left ...|         Documentary|     7| 4.8233085|
| 107252|Island at War (2004)|           Drama|War|     7|  4.820666|
| 107434|Diplomatic Immuni...|              Comedy|     7|  4.820666|
|  98328|Chronicle of My M...|               Drama|     7|  4.740797|
|  26978| Kiss or Kill (1997)|Crime|Drama|Thriller|     7| 4.6999984|
| 115699|Turning Tid