# Lab 3 — recommender systems

In [127]:
import json
import matplotlib.pyplot as plt
import numpy as np
import plotly.express as px


from random import randint

%matplotlib inline
plt.style.use("ggplot")


What does the data look like?

In [128]:
!hdfs dfs -ls /ix/ml-20m

Found 4 items
-rw-rw-rw-+  3 mkhorasa supergroup  745096004 2024-04-04 16:54 /ix/ml-20m/genome-scores.txt
-rw-rw-rw-+  3 mkhorasa supergroup      40652 2024-04-04 16:54 /ix/ml-20m/genome-tags.txt
-rw-rw-rw-+  3 mkhorasa supergroup    2538070 2024-04-04 16:54 /ix/ml-20m/movies.txt
-rw-rw-rw-+  3 mkhorasa supergroup 1493457002 2024-04-04 16:54 /ix/ml-20m/ratings.txt


In [129]:
!hdfs dfs -cat /ix/ml-20m/ratings.txt | head -n 10

{"movieId": 2, "userId": 1, "timestamp": 1112486027, "rating": 3.5}
{"movieId": 29, "userId": 1, "timestamp": 1112484676, "rating": 3.5}
{"movieId": 32, "userId": 1, "timestamp": 1112484819, "rating": 3.5}
{"movieId": 47, "userId": 1, "timestamp": 1112484727, "rating": 3.5}
{"movieId": 50, "userId": 1, "timestamp": 1112484580, "rating": 3.5}
{"movieId": 112, "userId": 1, "timestamp": 1094785740, "rating": 3.5}
{"movieId": 151, "userId": 1, "timestamp": 1094785734, "rating": 4.0}
{"movieId": 223, "userId": 1, "timestamp": 1112485573, "rating": 4.0}
{"movieId": 253, "userId": 1, "timestamp": 1112484940, "rating": 4.0}
{"movieId": 260, "userId": 1, "timestamp": 1112484826, "rating": 4.0}
cat: Unable to write to output stream.


Exercice 3.5 : Basic statistics 

In [130]:
ratings_rdd = sc.textFile("/ix/ml-20m/ratings.txt") # First of all, we transform the ratings file on hdfs into an RDD.

In [131]:
ratings_rdd.take(10)

['{"movieId": 2, "userId": 1, "timestamp": 1112486027, "rating": 3.5}',
 '{"movieId": 29, "userId": 1, "timestamp": 1112484676, "rating": 3.5}',
 '{"movieId": 32, "userId": 1, "timestamp": 1112484819, "rating": 3.5}',
 '{"movieId": 47, "userId": 1, "timestamp": 1112484727, "rating": 3.5}',
 '{"movieId": 50, "userId": 1, "timestamp": 1112484580, "rating": 3.5}',
 '{"movieId": 112, "userId": 1, "timestamp": 1094785740, "rating": 3.5}',
 '{"movieId": 151, "userId": 1, "timestamp": 1094785734, "rating": 4.0}',
 '{"movieId": 223, "userId": 1, "timestamp": 1112485573, "rating": 4.0}',
 '{"movieId": 253, "userId": 1, "timestamp": 1112484940, "rating": 4.0}',
 '{"movieId": 260, "userId": 1, "timestamp": 1112484826, "rating": 4.0}']

In [132]:
# For the visualisations in this section, 
# we decided to use the power of spark thanks to spark dataframes, and then use tools like plotly.

ratings_dataframe = spark.createDataFrame(ratings_rdd.map(json.loads))




Using RDD of dict to inferSchema is deprecated. Use pyspark.sql.Row instead



In [133]:
def visualization(spark_df, length_interval, nb_element, name_element):
    
    """
    This function creates a bar chart whose x-axis includes all possible values 
    from an interval defined by its length and a random starting point in the definition space (movieId or userId here),
    and the y-axis includes the set of ratings for the x-interval.
    
    """

    random_element = randint(0, (nb_element-length_interval-1))

    data_to_plot = spark_df.orderBy(name_element, ascending=True).toPandas().iloc[random_element:random_element+length_interval]


    fig = px.bar(data_to_plot, x=name_element, y="nb_rating")

    fig.update_layout(yaxis_title="Number of rating")
    fig.update_layout(title=f"Number of rating for each {name_element}")


    fig.show(renderer='notebook')

In [134]:
# We obtain the dataframe of the number of ratings per userId

user_rating_data = ratings_dataframe.groupBy("userId").agg({"rating": "count"}).withColumnRenamed("count(rating)", "nb_rating")
user_rating_data.show(5)




+------+---------+
|userId|nb_rating|
+------+---------+
|    26|       61|
|    29|      177|
|   474|      141|
|   964|      227|
|  1677|      154|
+------+---------+
only showing top 5 rows



In [135]:
nb_users = user_rating_data.count() # number of unique users
print(nb_users)

138493


In [None]:
visualization(user_rating_data, 300, nb_users, "userId")

In [None]:
# We do the same thing as above, but for movies

movie_rating_data = (ratings_dataframe
                     .groupBy("movieId")
                     .agg({"rating": "count"})
                     .withColumnRenamed("count(rating)", "nb_rating")
                    )


In [None]:
nb_movies = movie_rating_data.count()
print(nb_movies)

In [None]:
visualization(movie_rating_data, 300, nb_movies, "movieId")

There seems to be a clear and important imbalance between the two visualisations: 

- The userId view shows visually that all users have voted

- The view by movie shows a lot of ‘holes’, and suggests that many movies have not been rated.

(Later on in this lab, we can see that the number of users without a rating is 34, whereas for movies it is 105 403)

Exercice 3.6 : Partitioning the dataset 


In [None]:
!hdfs dfs -copyFromLocal ./my-ratings.txt # Add the file to my home directory on hdfs

In [None]:
!hdfs dfs -cat my-ratings.txt # Below are my ratings

In [None]:
my_ratings_rdd = sc.textFile("my-ratings.txt")

In [None]:
combined_rdd = ratings_rdd.union(my_ratings_rdd) # My file is combined with the ratings file of other users 

In the section below, the file is split into a training set containing 16 million lines and a validation set containing 4 million lines, giving a 4/5 and 1/5 split by dividing the combined_rdd by the last timestamp digit. This can be considered a good way of doing things, as there is no bias in this way. 

For example, if we had taken the first 4/5 lines, there could have been a time bias, perhaps because a style of film is more popular at a given period, and therefore at a given timestamp range.

In [None]:
training_set = combined_rdd.map(json.loads).filter(lambda row: str(row["timestamp"])[-1] in map(str, range(8))) 
# The last digit of the timestamp is between 0 and 7

In [None]:
validation_set = combined_rdd.map(json.loads).filter(lambda row: str(row["timestamp"])[-1] in map(str, range(8, 10)))
# The last digit of the timestamp is between 8 and 9

Exercice 3.7 : Baseline model

N_movies and N_users are respectively the number of unique films and users in the dataset. They will be useful later in the code.

In [None]:
N_movies = (training_set.map(lambda row: (row["movieId"], 0))\
                       .reduceByKey(lambda x, y: x+y)\
                       .count())
print(f"""The number of unique films in the training set is : {N_movies}""")  




In [None]:
N_users = (training_set.map(lambda row: (row["userId"], 0))\
                       .reduceByKey(lambda x, y: x+y)\
                       .count())

print(f"""The number of unique users in the training set is : {N_users}""")     






In [None]:
rating_sum = (training_set
              .map(lambda row: (row["rating"], 1))
              .reduce(lambda row1, row2 : (row1[0]+row2[0], row1[1]+row2[1]))
             )

In [None]:
print(rating_sum) # We obtain the tuple (total rating, number of ratings)

In [None]:
global_r = rating_sum[0]
N = rating_sum[1]



mu = global_r/N
print(f"""The average rating for all films and users is : {mu}""")

In [None]:
user_data = training_set.map(lambda row: (row["userId"], (1, row["rating"]-mu)))

# For each user, we calculate the tuple (number of films rated, sum of the user's ratings).
user_data_grouped_sum = user_data.reduceByKey(lambda row1, row2: (row1[0]+row2[0], row1[1]+row2[1]))

# Finally, for each user, the ratio of the previous tuple is calculated to obtain its bias. 
user_biases = user_data_grouped_sum.map(lambda user: (user[0], user[1][1]/user[1][0]))

In [None]:
max_userId = (training_set
              .map(lambda row: row["userId"])
              .reduce(lambda x, y: max(x, y))
             )


print(f"""There are {max_userId - N_users} users who have no ratings.""")

As we showed just before, some users don't have ratings, so we need to give them a default bias. We have chosen to use the average bias of all the users in the training_set, which we call avg_user_biases.

In [None]:
user_biases_sum = user_biases.reduce(lambda user1, user2: (_, user1[1]+user2[1]))[1]
avg_user_biases = user_biases_sum/(user_biases.count())
  
 

default_value_user_bias = [(i, avg_user_biases) for i in range(1, N_users+1)] # Here we use avg_user_biases as default bias
default_value_user_bias_rdd = sc.parallelize(default_value_user_bias)


In [None]:
user_biases_final = (user_biases
                      .rightOuterJoin(default_value_user_bias_rdd) 
                      .map(lambda row: (row[0], row[1][0] if row[1][0]!=None else row[1][1])))

# The default value is used if the user is not in the user_biases rdd.

In [None]:
user_biases_final.take(10)

In [None]:
max_movieId = (training_set
              .map(lambda row: row["movieId"])
              .reduce(lambda x, y: max(x, y))
             )


print(f"""There are {max_movieId - N_movies} movies who have no ratings.""")

In [None]:
# This is an RDD containing all the tuples (movieId, bias associated with the user who rated the movie).
movies_users_biases = (training_set
                       .map(lambda row: (row["userId"], row["movieId"]))
                       .join(user_biases_final)
                       .map(lambda row: (row[1][0], row[1][1]))
                      ) 

# This RDD contains all the tuples (movieId, sum of the biases of all the users who rated the movieId).
groupUsersByMovie = (movies_users_biases.groupByKey()
                      .mapValues(sum))
                     

    
# This step is necessary to subtract user bias from movie bias, as specified in the formula.
                      



In [None]:
movie_data = training_set.map(lambda row: (row["movieId"], (1, row["rating"]-mu)))

# For each movie, we calculate the tuple (number of times the movie has been rated, sum of the movie's ratings).
movie_data_grouped_sum = (movie_data
                          .reduceByKey(lambda row1, row2: (row1[0]+row2[0], row1[1]+row2[1]))
                          .join(groupUsersByMovie)
                          .map(lambda row: (row[0], (row[1][0][0], row[1][0][1], row[1][1])))
                         )

# Finally, for each movie, the ratio of the previous tuple is calculated to obtain its bias, 
# also we subtracte the bias of the users who rated the film, calculated above.

movie_biases = movie_data_grouped_sum.map(lambda movie: (movie[0], (movie[1][1]-movie[1][2])/movie[1][0]))

In [None]:
# As with what we did with the users, 
# there are some movies that have no bias, so we set them to the default value avg_movie_biases.

sum_movie_biases = movie_biases.reduce(lambda x, y: (_, x[1]+y[1]))[1]
avg_movie_biases = sum_movie_biases/(movie_biases.count())
  


default_value_movie_bias = [(i, avg_movie_biases) for i in range(1, N_movies+1)] 
default_value_movie_bias_rdd = sc.parallelize(default_value_movie_bias)

In [None]:
movie_biases_final = (movie_biases
                      .rightOuterJoin(default_value_movie_bias_rdd)
                      .map(lambda row: (row[0], row[1][0] if row[1][0]!=None else row[1][1])))

In [None]:
movie_biases_final.take(10)

Here we calculate the score predictions for the pairs (movieId, userId) of the validation_set from the formula mu + movie bias + user bias

In [None]:
prediction_set = (validation_set
                  .map(lambda row: (row["userId"], row["movieId"]))
                  .join(user_biases_final) 
                  .map(lambda row: (row[1][0], (row[0], row[1][1]))) # rdd of tuples (movieId, (userId, user_bias))
                  .join(movie_biases_final) 
                  .map(lambda row: (row[0], row[1][0][0], row[1][0][1], row[1][1])) # rdd of tuples (movieId, userId, user_bias, movie_bias)
                  .map(lambda row: ((row[0], row[1]), mu+row[2]+row[3])) # rdd of tuples ((movieId, userId), predictive rating)
                 )
                  
                  
                                

In [None]:
prediction_set.take(10)

Exercice 3.8 : Evaluation

In [None]:
from math import sqrt

In [None]:
# Here we calculate the error on the validation_set, 
# using the prediction_set calculated previously and the formula in the statement.


def error(rdd):
    rdd_and_prediction = rdd.map(lambda row: ((row["movieId"], row["userId"]), row["rating"])).join(prediction_set)
    compute_error = (rdd_and_prediction
                     .map(lambda row: (row[0][1], (1, (row[1][0]-row[1][1])**2)))
                     .reduceByKey(lambda row1, row2: (row1[0]+row2[0], row1[1]+row2[1]))
                     .map(lambda row: (1, sqrt(row[1][1]/row[1][0])))
                     .reduce(lambda row1, row2: (row1[0]+row2[0], row1[1]+row2[1])))
    
    return compute_error[1]/compute_error[0]

In [None]:
error = error(validation_set)

In [None]:
print(error)