In [1]:
import os
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row, functions
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS


In [2]:
# initialize SparkSession
sc=SparkContext()
spark=SparkSession(sc)

In [3]:
# Create DataFrames
path = os.getcwd()
df_M = spark.read.format("csv").load(path + "/data/movies.csv")
df_R = spark.read.format("csv").load(path + "/data/ratings.csv")

In [4]:
#rename columns
df2= df_R.withColumnRenamed("_c0", "userId").withColumnRenamed("_c1", "movieId").withColumnRenamed("_c2", "rating").withColumnRenamed("_c3","timestamp")
df1= df_M.withColumnRenamed("_c0", "movieId").withColumnRenamed("_c1", "title").withColumnRenamed("_c2", "genre")

# Join DataFrames
df1=df1.join(df2,on="movieId")

# Create Train/Test split
split = df1.randomSplit([0.8,0.2],seed=1)
train = split[0]
test = split[1]

# casting
train=train.withColumn("userId", train["userId"].cast("int"))\
           .withColumn("movieId", train["movieId"].cast("int"))\
           .withColumn("rating", train["rating"].cast("float"))

test=test.withColumn("userId", test["userId"].cast("int"))\
         .withColumn("movieId", test["movieId"].cast("int"))\
         .withColumn("rating", test["rating"].cast("float"))

# remove columns where movieId, userId, or rating is null
train=train.filter(train.movieId.isNotNull())
train=train.filter(train.userId.isNotNull())
train=train.filter(train.rating.isNotNull())
test=test.filter(test.movieId.isNotNull())
test=test.filter(test.userId.isNotNull())
test=test.filter(test.rating.isNotNull())



In [5]:
%%timeit
# Time the process

#compute biases for movies and users and the average movie rating using the trainset

# first compute overall average rating

#select only column "rating"
#transform to rdd
#each line in the rdd will be of type Row. Transform Row to a dicitonary and then access value of "rating" to convert it to float
ratings = train.select("rating")\
               .rdd\
               .map(lambda x: float(x.asDict()["rating"]))  
               
#sum all ratings in the rdd
sum_ratings = ratings.sum()
#total number of ratings in the rdd
total_ratings = ratings.count()

avg_rating= sum_ratings/total_ratings

#compute user biases and movie biases

#compute number of ratings for each user
train1 = train.groupby("userId")\
              .agg({"rating":"count"})\
              .withColumnRenamed("count(rating)", "count_Rating")

#compute sum of ratings for each user     
train2= train.groupby("userId")\
             .agg({"rating":"sum"})\
             .withColumnRenamed("sum(rating)", "sum_Rating")

#compute average user rating for each user
train2=train2.join(train1, on="userId")
train2=train2.withColumn("avg_User_Rating", train2["sum_Rating"]/train2["count_Rating"])



#compute User biases

rdd = train2.rdd.map(lambda x:funct(x))

def funct(x):
    bias_User=x["avg_User_Rating"]-avg_rating
    r_1=x["userId"]
    return Row(userId=r_1, bias_User=bias_User)
    

#train_rdd.take(20)
user_biases= spark.createDataFrame(rdd)

#user_biases.sort("userId").show(n=20)

#compute movie biases



#compute number of ratings for each movie
train1 = train.groupby("movieId")\
              .agg({"rating":"count"})\
              .withColumnRenamed("count(rating)", "count_Rating")

#compute sum of ratings for each movie     
train2= train.groupby("movieId")\
             .agg({"rating":"sum"})\
             .withColumnRenamed("sum(rating)", "sum_Rating")



#compute average movie rating for each movie
train2=train2.join(train1, on="movieId")
train2=train2.withColumn("avg_Movie_Rating", train2["sum_Rating"]/train2["count_Rating"])






#compute Movie biases

rdd = train2.rdd.map(lambda x:funct(x))

def funct(x):
    bias_Movie=x["avg_Movie_Rating"]-avg_rating
    r_1=x["movieId"]
    return Row(movieId=r_1,bias_Movie=bias_Movie)





movie_biases= spark.createDataFrame(rdd)

#movie_biases.sort("movieId").show(n=20)












3.51 s ± 1.01 s per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
# Now actually compute biases for movies and users and the average movie rating using the trainset

# first compute overall average rating

#select only column "rating"
#transform to rdd
#each line in the rdd will be of type Row. Transform Row to a dicitonary and then access value of "rating" to convert it to float
ratings = train.select("rating")\
               .rdd\
               .map(lambda x: float(x.asDict()["rating"]))  
               
#sum all ratings in the rdd
sum_ratings = ratings.sum()
#total number of ratings in the rdd
total_ratings = ratings.count()

avg_rating= sum_ratings/total_ratings

#compute user biases and movie biases

#compute number of ratings for each user
train1 = train.groupby("userId")\
              .agg({"rating":"count"})\
              .withColumnRenamed("count(rating)", "count_Rating")

#compute sum of ratings for each user     
train2= train.groupby("userId")\
             .agg({"rating":"sum"})\
             .withColumnRenamed("sum(rating)", "sum_Rating")

#compute average user rating for each user
train2=train2.join(train1, on="userId")
train2=train2.withColumn("avg_User_Rating", train2["sum_Rating"]/train2["count_Rating"])



#compute User biases

rdd = train2.rdd.map(lambda x:funct(x))

def funct(x):
    bias_User=x["avg_User_Rating"]-avg_rating
    r_1=x["userId"]
    return Row(userId=r_1, bias_User=bias_User)
    

#train_rdd.take(20)
user_biases= spark.createDataFrame(rdd)

#user_biases.sort("userId").show(n=20)

#compute movie biases



#compute number of ratings for each movie
train1 = train.groupby("movieId")\
              .agg({"rating":"count"})\
              .withColumnRenamed("count(rating)", "count_Rating")

#compute sum of ratings for each movie     
train2= train.groupby("movieId")\
             .agg({"rating":"sum"})\
             .withColumnRenamed("sum(rating)", "sum_Rating")



#compute average movie rating for each movie
train2=train2.join(train1, on="movieId")
train2=train2.withColumn("avg_Movie_Rating", train2["sum_Rating"]/train2["count_Rating"])






#compute Movie biases

rdd = train2.rdd.map(lambda x:funct(x))

def funct(x):
    bias_Movie=x["avg_Movie_Rating"]-avg_rating
    r_1=x["movieId"]
    return Row(movieId=r_1,bias_Movie=bias_Movie)





movie_biases= spark.createDataFrame(rdd)

#movie_biases.sort("movieId").show(n=20)

In [7]:
%%timeit
#predict user ratings for movies and compare these ratings with the ratings included in the testset

#join test set with user_biases and movie_biases
testing= test.join(user_biases, on="userId")\
          .join(movie_biases, on="movieId")

testing=testing.rdd.map(lambda x: predict_funct(x))

def predict_funct(x):
    pred=x["bias_User"]+x["bias_Movie"]+avg_rating
    r_1=x["movieId"]
    r_2=x["userId"]
    r_3=x["title"]
    r_4=x["rating"]
    
    return Row(movieId=r_1, userId=r_2, title=r_3, rating=r_4, prediction=pred)


206 ms ± 23.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [8]:
# Now actually predict user ratings for movies and compare these ratings with the ratings included in the testset

#join test set with user_biases and movie_biases
testing= test.join(user_biases, on="userId")\
          .join(movie_biases, on="movieId")

testing=testing.rdd.map(lambda x: predict_funct(x))

def predict_funct(x):
    pred=x["bias_User"]+x["bias_Movie"]+avg_rating
    r_1=x["movieId"]
    r_2=x["userId"]
    r_3=x["title"]
    r_4=x["rating"]
    
    return Row(movieId=r_1, userId=r_2, title=r_3, rating=r_4, prediction=pred)


In [9]:
# compute the error of our predictions

testing= spark.createDataFrame(testing)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",predictionCol="prediction")
rmse = evaluator.evaluate(testing)
print("Root-mean-square error = " + str(rmse))

Root-mean-square error = 0.9057333522741469


In [10]:
# Generate top movie recommendations for users
testing = testing.groupby("userId").agg(functions.max("prediction"),functions.first("title"))
# Show results for 20 users
testing.show()

+------+------------------+--------------------+
|userId|   max(prediction)|        first(title)|
+------+------------------+--------------------+
|    26| 4.002829291433965| Pulp Fiction (1994)|
|    29| 5.019706212895851|    The Alamo (2004)|
|   474| 4.892961603463069|Brief Encounter (...|
|    65| 4.963587808294701|      Memento (2000)|
|   191| 4.383381677216477|Wallace & Gromit:...|
|   418| 4.285603721604014|Harry Potter and ...|
|   541|4.0307919028205355|      Aladdin (1992)|
|   558| 4.634142422747097| Pulp Fiction (1994)|
|   222| 4.623847309145523|Léon: The Profess...|
|   270|3.6058032158699986|Willy Wonka & the...|
|   293| 2.795405137627034|Beverly Hills Cop...|
|   243|  5.01745226818362|      Aladdin (1992)|
|   278| 4.492472586307386|Muppet Movie, The...|
|   367| 4.721353114181202|  Skulls, The (2000)|
|   442| 2.032085914156008|          JFK (1991)|
|    19| 3.423281185248839|In the Line of Fi...|
|    54|3.7667181803228544| Pulp Fiction (1994)|
|   296|4.8730414326