In [None]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=fb6d4b214db7340be8a45622fb807ae9c71a27c99dc79bba5fcecb9cff62c4f6
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


#Step 1: Creating a PySpark Session


In [None]:
# import the required libraries
import time
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('recommendation').getOrCreate()


#Step 2: Loading and Preprocessing Data

In [None]:
# load the datasets using pyspark
movies = spark.read.load("/content/movies.csv", format='csv', header = True)
ratings = spark.read.load('/content/ratings.csv', format='csv', header = True)
links = spark.read.load("/content/links.csv", format='csv', header = True)
tags = spark.read.load("/content/tags.csv", format='csv', header = True)
ratings.show()
movies.show()
links.show()
tags.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
| 

In [None]:
# print the schema to understand the data types of features
ratings = ratings.select("userId", "movieId", "rating")
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)



In [None]:
# convert the data type to integer and float
df = ratings.withColumn('userId', ratings['userId'].cast('int')).\
withColumn('movieId', ratings['movieId'].cast('int')).withColumn('rating', ratings['rating'].cast('float'))
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [None]:
# split the data into train, validation and test sets
train, validation, test = df.randomSplit([0.6,0.2,0.2], seed = 0)
print("The number of ratings in each set: {}, {}, {}".format(train.count(), validation.count(), test.count()))

The number of ratings in each set: 60435, 20052, 20349


#Step 3: Model Training and Validation

In [None]:
from pyspark.sql.functions import col, sqrt
def RMSE(predictions):
    squared_diff = predictions.withColumn("squared_diff", pow(col("rating") - col("prediction"), 2))
    mse = squared_diff.selectExpr("mean(squared_diff) as mse").first().mse
    return mse ** 0.5

In [None]:
# implement the model using ALS algorithm and find the right hyperparameters using Grid Search
from pyspark.ml.recommendation import ALS

def GridSearch(train, valid, num_iterations, reg_param, n_factors):
    min_rmse = float('inf')
    best_n = -1
    best_reg = 0
    best_model = None
    # run Grid Search for all the parameter defined in the range in a loop
    for n in n_factors:
        for reg in reg_param:
            als = ALS(rank = n,
                      maxIter = num_iterations,
                      seed = 0,
                      regParam = reg,
                      userCol="userId",
                      itemCol="movieId",
                      ratingCol="rating",
                      coldStartStrategy="drop")
            model = als.fit(train)
            predictions = model.transform(valid)
            rmse = RMSE(predictions)
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(n, reg, rmse))
            # track the best model using RMSE
            if rmse < min_rmse:
                min_rmse = rmse
                best_n = n
                best_reg = reg
                best_model = model

    pred = best_model.transform(train)
    train_rmse = RMSE(pred)
    # best model and its metrics
    print('\nThe best model has {} latent factors and regularization = {}:'.format(best_n, best_reg))
    print('traning RMSE is {}; validation RMSE is {}'.format(train_rmse, min_rmse))
    return best_model

In [None]:
# build the model using different ranges for Grid Search
from pyspark.sql.functions import col, sqrt
num_iterations = 10
ranks = [6, 8, 10, 12]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

start_time = time.time()
final_model = GridSearch(train, validation, num_iterations, reg_params, ranks)
print('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

6 latent factors and regularization = 0.05: validation RMSE is 0.9774929358976446
6 latent factors and regularization = 0.1: validation RMSE is 0.9129091203678512
6 latent factors and regularization = 0.2: validation RMSE is 0.8951553355978934
6 latent factors and regularization = 0.4: validation RMSE is 0.9694803162186382
6 latent factors and regularization = 0.8: validation RMSE is 1.1934058842790796
8 latent factors and regularization = 0.05: validation RMSE is 0.9911454471125616
8 latent factors and regularization = 0.1: validation RMSE is 0.9168968729472543
8 latent factors and regularization = 0.2: validation RMSE is 0.8984989562331739
8 latent factors and regularization = 0.4: validation RMSE is 0.9702570878824905
8 latent factors and regularization = 0.8: validation RMSE is 1.1934001733725708
10 latent factors and regularization = 0.05: validation RMSE is 0.9978579823667801
10 latent factors and regularization = 0.1: validation RMSE is 0.9176672164670597
10 latent factors and r

In [None]:
# test the accuracy of the model on test set using RMSE
pred_test = final_model.transform(test)
print('The testing RMSE is ' + str(RMSE(pred_test)))

The testing RMSE is 0.8959197533497142


#Step 4 : Testing the recommendations for a Single User

In [None]:
# test for a single user
single_user = test.filter(test['userId']==12).select(['movieId','userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|    543|    12|
|   1357|    12|
|   2485|    12|
+-------+------+



In [None]:
# fetch the names of the movies
single_user.join(movies, single_user.movieId == movies.movieId, 'inner').show()

+-------+------+-------+--------------------+--------------------+
|movieId|userId|movieId|               title|              genres|
+-------+------+-------+--------------------+--------------------+
|    543|    12|    543|So I Married an A...|Comedy|Romance|Th...|
|   1357|    12|   1357|        Shine (1996)|       Drama|Romance|
|   2485|    12|   2485|She's All That (1...|      Comedy|Romance|
+-------+------+-------+--------------------+--------------------+



In [None]:
# verify the prediction rating for the user
reccomendations = final_model.transform(single_user)
reccomendations.orderBy('prediction',ascending=False).show()

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1357|    12|  5.015935|
|    543|    12| 3.6550274|
|   2485|    12| 3.4955368|
+-------+------+----------+



In [None]:
# fetch the names of the movies
reccomendations.join(movies, reccomendations.movieId == movies.movieId, 'inner').show()

+-------+------+----------+-------+--------------------+--------------------+
|movieId|userId|prediction|movieId|               title|              genres|
+-------+------+----------+-------+--------------------+--------------------+
|    543|    12| 3.6550274|    543|So I Married an A...|Comedy|Romance|Th...|
|   1357|    12|  5.015935|   1357|        Shine (1996)|       Drama|Romance|
|   2485|    12| 3.4955368|   2485|She's All That (1...|      Comedy|Romance|
+-------+------+----------+-------+--------------------+--------------------+



#Step 5: Providing the recommendations to the user

In [None]:
from pyspark.sql.functions import col, lit

# select a single user from the test set
user_id = 12
single_user_ratings = test.filter(test['userId'] == user_id).select(['movieId', 'userId', 'rating'])

# display the movies the user has liked
print("Movies liked by user with ID", user_id)
single_user_ratings.join(movies, 'movieId').select('movieId', 'title', 'rating').show()

# generate recommendations for the user
all_movies = df.select('movieId').distinct()
user_movies = single_user_ratings.select('movieId').distinct()
movies_to_recommend = all_movies.subtract(user_movies)

# predict ratings for movies the user has not rated yet
recommendations = final_model.transform(movies_to_recommend.withColumn('userId', lit(user_id)))

# filter out the movies that the user has already rated or seen (this filters out the movies that the user has not liked as well)
recommendations = recommendations.filter(col('prediction') > 0)

# display the recommendations with movie names
print("Recommended movies for user with ID", user_id)
recommended_movies = recommendations.join(movies, 'movieId').select('movieId', 'title', 'prediction')

# Sort recommended movies by prediction in descending order
ordered_recommendations = recommended_movies.orderBy(col('prediction').desc())

# Display the ordered recommendations
ordered_recommendations.show()

Movies liked by user with ID 12
+-------+--------------------+------+
|movieId|               title|rating|
+-------+--------------------+------+
|    543|So I Married an A...|   3.5|
|   1357|        Shine (1996)|   5.0|
|   2485|She's All That (1...|   5.0|
+-------+--------------------+------+

Recommended movies for user with ID 12
+-------+--------------------+----------+
|movieId|               title|prediction|
+-------+--------------------+----------+
|  67618|Strictly Sexual (...| 6.1667633|
|   3379| On the Beach (1959)|  6.117749|
|   5867|        Thief (1981)| 5.9761686|
|  42730|   Glory Road (2006)| 5.9761686|
|   4535|Man from Snowy Ri...| 5.9761686|
|   7121|   Adam's Rib (1949)| 5.9670253|
|  60943| Frozen River (2008)|  5.941128|
|  33649|  Saving Face (2004)| 5.9352655|
|  25906|Mr. Skeffington (...| 5.9273853|
|  77846| 12 Angry Men (1997)| 5.9273853|
|   3200|Last Detail, The ...|  5.890436|
|   3567|   Bossa Nova (2000)|  5.871084|
|  94070|Best Exotic Marig...|  