In [18]:
import os
import tqdm
from pyspark.sql import SparkSession
from pyspark.mllib.recommendation import ALS
import math

In [19]:
os.environ["PYSPARK_PYTHON"]="/usr/local/bin/python3.11"
os.environ["PYSPARK_DRIVER_PYTHON"]="/usr/local/bin/python3.11"

spark = SparkSession.builder \
    .appName("movie recommendation") \
    .getOrCreate()
sc = spark.sparkContext

## Data Loading

In [20]:
movies = spark.read.load("datasets/movie_lens/movies.csv", format="csv", header=True, inferSchema=True)
movies.toPandas().head()

Unnamed: 0,movieId,title,genres
0,1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
1,2,Jumanji (1995),Adventure|Children|Fantasy
2,3,Grumpier Old Men (1995),Comedy|Romance
3,4,Waiting to Exhale (1995),Comedy|Drama|Romance
4,5,Father of the Bride Part II (1995),Comedy


In [21]:
ratings = spark.read.load("datasets/movie_lens/ratings.csv", format="csv", header=True, inferSchema=True)
ratings.toPandas().head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,1,4.0,964982703
1,1,3,4.0,964981247
2,1,6,4.0,964982224
3,1,47,5.0,964983815
4,1,50,5.0,964982931


In [22]:
links = spark.read.load("datasets/movie_lens/links.csv", format="csv", header=True, inferSchema=True)
links.toPandas().head()

Unnamed: 0,movieId,imdbId,tmdbId
0,1,114709,862.0
1,2,113497,8844.0
2,3,113228,15602.0
3,4,114885,31357.0
4,5,113041,11862.0


In [23]:
tags = spark.read.load("datasets/movie_lens/tags.csv", format="csv", header=True, inferSchema=True)
tags.toPandas().head()

Unnamed: 0,userId,movieId,tag,timestamp
0,2,60756,funny,1445714994
1,2,60756,Highly quotable,1445714996
2,2,60756,will ferrell,1445714992
3,2,89774,Boxing story,1445715207
4,2,89774,MMA,1445715200


## Data Ingestion

In [24]:
# check for missing values
user_groups = ratings.groupBy("userId").count().toPandas()["count"].min()
movie_groups = ratings.groupBy("movieId").count().toPandas()["count"].min()
print("min # of ratings per user is", user_groups)
print("min # of ratings per movie is", movie_groups)

min # of ratings per user is 20
min # of ratings per movie is 1


In [25]:
# check how many movies are rated by only one user
count_single_rating_movies = ratings.groupBy("movieId").count().filter("count = 1").count()
count_movies = ratings.select("movieId").distinct().count()
print(count_single_rating_movies, "out of", count_movies, "movies are rated by only one user")

3446 out of 9724 movies are rated by only one user


In [26]:
# check the number of users and movies rated
count_users_rated = ratings.select("userId").distinct().count()
count_movies_rated = ratings.select("movieId").distinct().count()
print("rotal number of users rated is", count_users_rated)
print("total number of movies rated is", count_movies_rated)

rotal number of users rated is 610
total number of movies rated is 9724


In [27]:
# compare the number of movies in the movies.csv and ratings.csv, i.e. how many movies are not rated
count_movies = movies.select("movieId").distinct().count()
count_movies_rated = ratings.select("movieId").distinct().count()
print("total number of movies is", count_movies)
print("total number of movies rated is", count_movies_rated)

total number of movies is 9742
total number of movies rated is 9724


In [28]:
# all unrated movies
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
print('movies that are not rated yet: ')
spark.sql(
    "SELECT m.movieId, m.title "
    "FROM movies m LEFT JOIN ratings r ON m.movieId=r.movieId "
    "WHERE r.movieId IS NULL"
).show(10)

movies that are not rated yet: 
+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   1076|Innocents, The (1...|
|   2939|      Niagara (1953)|
|   3338|For All Mankind (...|
|   3456|Color of Paradise...|
|   4194|I Know Where I'm ...|
|   5721|  Chosen, The (1981)|
|   6668|Road Home, The (W...|
|   6849|      Scrooge (1970)|
|   7020|        Proof (1991)|
|   7792|Parallax View, Th...|
+-------+--------------------+
only showing top 10 rows



## Spark ALS model

In [29]:
movie_ratings = sc.textFile("datasets/movie_lens/ratings.csv")

header = movie_ratings.first()
ratings_data = movie_ratings \
    .filter(lambda line: line != header) \
    .map(lambda line: line.split(',')) \
    .map(lambda tokens: (int(tokens[0]), int(tokens[1]), float(tokens[2]))) \
    .cache()

print(ratings_data.take(3))

[(1, 1, 4.0), (1, 3, 4.0), (1, 6, 4.0)]


24/05/01 13:25:07 WARN BlockManager: Task 8647 already completed, not releasing lock for rdd_8614_0


In [30]:
train, validation, test = ratings_data.randomSplit([6, 2, 2], seed=42)
train.cache()
validation.cache()
test.cache()

PythonRDD[8618] at RDD at PythonRDD.scala:53

In [31]:
num_iterations = 10
ranks = [8, 10, 12, 14, 16, 18, 20]
reg_params = [0.001, 0.01, 0.05, 0.1, 0.2]
min_error = float('inf')
best_rank = -1
best_regularization = 0
best_model = None
best_rmse = float('inf')

for rank in tqdm.tqdm(ranks, total=len(ranks)):
    for reg in tqdm.tqdm(reg_params, total=len(reg_params)):
        model = ALS.train(
            ratings=train,
            iterations=num_iterations,
            rank=rank,
            lambda_=reg,
            seed=42
        )

        valid_data = validation.map(lambda p: (p[0], p[1]))
        predictions = model.predictAll(valid_data).map(lambda r: ((r[0], r[1]), r[2]))

        rates_and_preds = validation.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)

        MSE = rates_and_preds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
        error = math.sqrt(MSE)

        if error < min_error:
            min_error = error
            best_rank = rank
            best_regularization = reg
            best_model = model
            best_rmse = error

print('\nThe best model has {} latent factors and regularization = {}, with RMSE = {}'.format(best_rank, best_regularization, best_rmse))


100%|██████████| 5/5 [00:23<00:00,  4.78s/it]                                   
100%|██████████| 5/5 [00:24<00:00,  4.83s/it]                                   
100%|██████████| 5/5 [00:22<00:00,  4.42s/it]                                   
100%|██████████| 5/5 [00:22<00:00,  4.57s/it]                                   
100%|██████████| 5/5 [00:24<00:00,  4.85s/it]                                   
100%|██████████| 5/5 [00:23<00:00,  4.64s/it]                                   
100%|██████████| 5/5 [00:23<00:00,  4.61s/it]                                   
100%|██████████| 7/7 [02:43<00:00, 23.36s/it]


The best model has 8 latent factors and regularization = 0.2, with RMSE = 0.8906498894233397





In [32]:
final_model = ALS.train(
    ratings=ratings_data,
    iterations=num_iterations,
    rank=best_rank,
    lambda_=best_regularization,
    seed=42
)
test_data = test.map(lambda p: (p[0], p[1]))
predictions = final_model.predictAll(test_data).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = test.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
error = math.sqrt(MSE)
print('For testing data the RMSE is {}'.format(error))

24/05/01 13:27:51 WARN BlockManager: Task 16978 already completed, not releasing lock for rdd_8614_0

For testing data the RMSE is 0.7349596832883968


                                                                                