## Load Data

In [1]:
from pyspark.sql.types import *
import pyspark.sql.functions as F

In [2]:
# The code was removed by DSX for sharing.

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|     31|   2.5|
|     1|   1029|   3.0|
|     1|   1061|   3.0|
|     1|   1129|   2.0|
|     1|   1172|   4.0|
+------+-------+------+
only showing top 5 rows



In [3]:
small_ratings_df.schema

StructType(List(StructField(userId,StringType,true),StructField(movieId,StringType,true),StructField(rating,StringType,true)))

In [4]:
small_ratings_df.schema.names

['userId', 'movieId', 'rating']

In [8]:
# convert datatype

from pyspark.sql.types import IntegerType
from pyspark.sql.types import DoubleType

small_ratings_df = small_ratings_df.withColumn("userId", small_ratings_df["userId"].cast(IntegerType()))
small_ratings_df = small_ratings_df.withColumn("movieId", small_ratings_df["movieId"].cast(IntegerType()))
small_ratings_df = small_ratings_df.withColumn("rating", small_ratings_df["rating"].cast(DoubleType()))

print(small_ratings_df.schema)

StructType(List(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,DoubleType,true)))


In [12]:
small_movies_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('movies.csv', 'moiverecommendationd136f4f785054be6861fc3dba4e8391e')).drop('genres')

small_movies_df = small_movies_df.withColumn("movieId", small_movies_df["movieId"].cast(IntegerType()))

small_movies_df.show(5)
print(small_movies_df.schema)

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows

StructType(List(StructField(movieId,IntegerType,true),StructField(title,StringType,true)))


## Collaborative Filtering

In [14]:
train, validation, test = small_ratings_df.randomSplit([0.6, 0.2, 0.2])
validation_for_predict = validation.select(validation.columns[0:2])
test_for_predict = test.select(test.columns[0:2])

In [23]:
from pyspark.mllib.recommendation import ALS
import math

# set parameters
seed = 0
iterations = 10
regularization_parameter = 0.1
ranks = [4, 8, 12]
errors = [0, 0, 0]
err = 0
tolerance = 0.02

min_error = float('inf')
best_rank = -1
best_iteration = -1

# find the best rank
for rank in ranks:
    model = ALS.train(train, rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)
    predictions = model.predictAll(validation_for_predict.rdd).map(lambda x: ((x[0], x[1]), x[2]))
    rates_and_preds = validation.rdd.map(lambda x: ((x[0], x[1]), x[2])).join(predictions)
    error = math.sqrt(rates_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())
    errors[err] = error
    err += 1
    print('For rank %s the RMSE is %s'%(rank, error))
    if error < min_error:
        min_error = error
        best_rank = rank
    
print('The best model was trained with rank %s' % best_rank)

For rank 4 the RMSE is 0.9352785533511581
For rank 8 the RMSE is 0.9461427592255386
For rank 12 the RMSE is 0.9464846239958269
The best model was trained with rank 4


### Load Full Data

In [49]:
full_ratings_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('ratings_full.csv', 'moiverecommendationd136f4f785054be6861fc3dba4e8391e'))\
  .drop('timestamp')


full_ratings_df = full_ratings_df.withColumn("userId", full_ratings_df["userId"].cast(IntegerType()))
full_ratings_df = full_ratings_df.withColumn("movieId", full_ratings_df["movieId"].cast(IntegerType()))
full_ratings_df = full_ratings_df.withColumn("rating", full_ratings_df["rating"].cast(DoubleType()))

print(full_ratings_df.schema)
full_ratings_df.show(5)

StructType(List(StructField(userId,IntegerType,true),StructField(movieId,IntegerType,true),StructField(rating,DoubleType,true)))
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    110|   1.0|
|     1|    147|   4.5|
|     1|    858|   5.0|
|     1|   1221|   5.0|
|     1|   1246|   5.0|
+------+-------+------+
only showing top 5 rows



In [91]:
full_movies_df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(cos.url('movies_full.csv', 'moiverecommendationd136f4f785054be6861fc3dba4e8391e')).drop('genres')

full_movies_df = full_movies_df.withColumn("movieId", full_movies_df["movieId"].cast(IntegerType()))
full_movies_df.show(5)
full_movies_df.createOrReplaceTempView("full_movies_df_view")
full_movies_titles = full_movies_df.rdd.map(lambda x: (int(x[0]),x[1]))

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|      Jumanji (1995)|
|      3|Grumpier Old Men ...|
|      4|Waiting to Exhale...|
|      5|Father of the Bri...|
+-------+--------------------+
only showing top 5 rows



In [33]:
full_train_df, full_test_df = full_ratings_df.randomSplit([0.7, 0.3])
full_model = ALS.train(full_train_df, best_rank, seed=seed, iterations=iterations, lambda_=regularization_parameter)

X = full_test_df.rdd.map(lambda x: (x[0], x[1]))
y_hat = full_model.predictAll(X).map(lambda x: ((x[0], x[1]), x[2]))
rates_and_preds = full_test_df.rdd.map(lambda x: ((x[0], x[1]), x[2])).join(y_hat)
error = math.sqrt(rates_and_preds.map(lambda x: (x[1][0] - x[1][1])**2).mean())

print('For testing data the RMSE is %s' %error)

For testing data the RMSE is 0.8326700962589142


In [50]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [80]:
full_ratings_df.cache()
full_ratings_df.createOrReplaceTempView("full_ratings_df_view")

# calulate average ratings for each movie id
d_avgRating_df = spark.sql("SELECT movieID, AVG(rating) AS avg_rating FROM full_ratings_df_view GROUP BY movieID")
# calculate number of ratings for each movie id
id_count_df = spark.sql("SELECT movieID, count(*) AS count FROM full_ratings_df_view GROUP BY movieID")

id_avgRating_df.createOrReplaceTempView("id_avgRating_df_view")
id_count_df.createOrReplaceTempView('id_count_df_view')

## Add New User Ratings and Retrain the Model

In [82]:
new_user_ID = 0

# The format of each line is (userID, movieID, rating)
new_user_ratings = [
     (0,260,4), # Star Wars (1977)
     (0,1,3), # Toy Story (1995)
     (0,16,3), # Casino (1995)
     (0,25,4), # Leaving Las Vegas (1995)
     (0,32,4), # Twelve Monkeys (a.k.a. 12 Monkeys) (1995)
     (0,335,1), # Flintstones, The (1994)
     (0,379,1), # Timecop (1994)
     (0,296,3), # Pulp Fiction (1994)
     (0,858,5) , # Godfather, The (1972)
     (0,50,4) # Usual Suspects, The (1995)
    ]

new_user_ratings_rdd = sc.parallelize(new_user_ratings)
print('New user ratings: %s' % new_user_ratings_rdd.take(10))

New user ratings: [(0, 260, 4), (0, 1, 3), (0, 16, 3), (0, 25, 4), (0, 32, 4), (0, 335, 1), (0, 379, 1), (0, 296, 3), (0, 858, 5), (0, 50, 4)]


In [85]:
# add ratings of the new user to our data
new_ratings_rdd = full_ratings_df.rdd.union(new_user_ratings_rdd)

In [107]:
from time import time

t0 = time()
# retrain our model
new_ratings_model = ALS.train(new_ratings_rdd, rank=8, seed=seed, 
                              iterations=iterations, lambda_=regularization_parameter)
tt = time() - t0

print("New model trained in %s seconds" % round(tt,3))

New model trained in 92.379 seconds


In [112]:
new_user_ratings_ids = map(lambda x: x[1], new_user_ratings) # get just movie IDs

# keep just those not on the ID list (thanks Lei Li for spotting the error!)
new_user_unrated_movies_rdd = (full_movies_df.rdd.filter(lambda x: x[0] not in new_user_ratings_ids).map(lambda x: (new_user_ID, x[0])))

# Use the input RDD, new_user_unrated_movies_RDD, with new_ratings_model.predictAll() to predict new ratings for the movies
new_user_recommendations_rdd = new_ratings_model.predictAll(new_user_unrated_movies_RDD)

# Transform new_user_recommendations_RDD into pairs of the form (Movie ID, Predicted Rating)
new_user_recommendations_rating_rdd = new_user_recommendations_rdd.map(lambda x: (x.product, x.rating))
new_user_recommendations_rating_title_and_count_rdd = new_user_recommendations_rating_rdd.join(full_movies_titles).join(id_count_df.rdd)
new_user_recommendations_rating_title_and_count_rdd.take(3)

[(6656, ((1.0458995363624215, 'Attack of the Puppet People (1958)'), 47)),
 (119808, ((2.9503873804552105, 'Arizona Colt Returns (1970)'), 1)),
 (4160,
  ((2.9357647226700676,
    'Widow of St. Pierre, The (Veuve de Saint-Pierre, La) (2000)'),
   366))]

In [113]:
new_user_recommendations_rating_title_and_count_rdd = new_user_recommendations_rating_title_and_count_rdd.map(lambda r: (r[1][0][1], r[1][0][0], r[1][1]))
    
top_movies = new_user_recommendations_rating_title_and_count_rdd.filter(lambda r: r[2]>=25).takeOrdered(25, key=lambda x: -x[1])

print ('TOP recommended movies (with more than 25 reviews):\n%s' % '\n'.join(map(str, top_movies)))

TOP recommended movies (with more than 25 reviews):
("Long Night's Journey Into Day (2000)", 4.067201180445796, 35)
('Heimat - A Chronicle of Germany (Heimat - Eine deutsche Chronik) (1984)', 4.037841937716027, 32)
('Tale of Tales (Skazka skazok) (1979)', 3.967847257853708, 39)
('Godfather, The (1972)', 3.9641593171048726, 57070)
('Human Condition III, The (Ningen no joken III) (1961)', 3.8950607000338264, 82)
('Godfather: Part II, The (1974)', 3.8945976991324662, 36679)
('Frozen Planet (2011)', 3.873168757489889, 322)
('Alone in the Wilderness (2004)', 3.857246927303845, 326)
('Harakiri (Seppuku) (1962)', 3.84499029188689, 610)
('Madagascar (2011)', 3.838933093129907, 26)
('Walking with Monsters (2005)', 3.8358965744744227, 28)
('Civil War, The (1990)', 3.8341200476173682, 400)
("Sundays and Cybele (Les dimanches de Ville d'Avray) (1962)", 3.8196692796561083, 28)
('Seven Samurai (Shichinin no samurai) (1954)', 3.812708140982135, 13994)
('Planet Earth (2006)', 3.808814825168765, 754)
(