In [1]:
# import required libraries
from pyspark.sql import SparkSession
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark import SparkConf

In [2]:
# create a SparkSession
conf = SparkConf().setAppName("ALS Recommendation System") \
                  .set("spark.storage.memoryFraction", "0.8")

spark = SparkSession.builder.config(conf=conf).getOrCreate()


In [3]:
from pyspark.sql.functions import split

# read the data file
data = spark.read.format("csv").load(r"C:\Users\Aryan Yadav\Desktop\ml-100k\u.data")

# split the values in each row by the delimiter
data = data.select(split("_c0", "\t").alias("values"))

# expand the split values into columns
data = data.selectExpr("values[0] as user", "values[1] as item", "values[2] as rating", "values[3] as timestamp")

# print the data
data.show()


+----+----+------+---------+
|user|item|rating|timestamp|
+----+----+------+---------+
| 196| 242|     3|881250949|
| 186| 302|     3|891717742|
|  22| 377|     1|878887116|
| 244|  51|     2|880606923|
| 166| 346|     1|886397596|
| 298| 474|     4|884182806|
| 115| 265|     2|881171488|
| 253| 465|     5|891628467|
| 305| 451|     3|886324817|
|   6|  86|     3|883603013|
|  62| 257|     2|879372434|
| 286|1014|     5|879781125|
| 200| 222|     5|876042340|
| 210|  40|     3|891035994|
| 224|  29|     3|888104457|
| 303| 785|     3|879485318|
| 122| 387|     5|879270459|
| 194| 274|     2|879539794|
| 291|1042|     4|874834944|
| 234|1184|     2|892079237|
+----+----+------+---------+
only showing top 20 rows



In [4]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType

# assuming `data` is the DataFrame with columns to be cast to integer type
data = data.select([col(c).cast(IntegerType()).alias(c) for c in data.columns])
data.show()

+----+----+------+---------+
|user|item|rating|timestamp|
+----+----+------+---------+
| 196| 242|     3|881250949|
| 186| 302|     3|891717742|
|  22| 377|     1|878887116|
| 244|  51|     2|880606923|
| 166| 346|     1|886397596|
| 298| 474|     4|884182806|
| 115| 265|     2|881171488|
| 253| 465|     5|891628467|
| 305| 451|     3|886324817|
|   6|  86|     3|883603013|
|  62| 257|     2|879372434|
| 286|1014|     5|879781125|
| 200| 222|     5|876042340|
| 210|  40|     3|891035994|
| 224|  29|     3|888104457|
| 303| 785|     3|879485318|
| 122| 387|     5|879270459|
| 194| 274|     2|879539794|
| 291|1042|     4|874834944|
| 234|1184|     2|892079237|
+----+----+------+---------+
only showing top 20 rows



In [5]:
# split the data into training and testing datasets
(training_data, test_data) = data.randomSplit([0.8, 0.2])

In [7]:
test_data.show()

+----+----+------+---------+
|user|item|rating|timestamp|
+----+----+------+---------+
|   1|   3|     4|878542960|
|   1|   7|     4|875071561|
|   1|   9|     5|878543541|
|   1|  13|     5|875071805|
|   1|  20|     4|887431883|
|   1|  26|     3|875072442|
|   1|  41|     2|876892818|
|   1|  46|     4|876893230|
|   1|  48|     5|875072520|
|   1|  51|     4|878543275|
|   1|  55|     5|875072688|
|   1|  56|     4|875072716|
|   1|  64|     5|875072404|
|   1|  66|     4|878543030|
|   1|  69|     3|875072262|
|   1|  70|     3|875072895|
|   1|  93|     5|875071484|
|   1|  98|     4|875072404|
|   1|  99|     3|875072547|
|   1| 113|     5|878542738|
+----+----+------+---------+
only showing top 20 rows



In [8]:
# build the ALS model:
als = ALS(maxIter=10, regParam=0.1, userCol="user", itemCol="item", ratingCol="rating", coldStartStrategy="drop")
model = als.fit(training_data)



In [9]:
# generate predictions for the test data
predictions = model.transform(test_data)


In [10]:
# evaluate the model using Root Mean Square Error (RMSE)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


Root-mean-square error = 0.9230259152120834


In [11]:
# use the trained model to generate recommendations for all users
userRecs = model.recommendForAllUsers(10)
userRecs.show()

+----+--------------------+
|user|     recommendations|
+----+--------------------+
|   1|[{1463, 5.0894566...|
|   3|[{1643, 5.085789}...|
|   5|[{793, 4.7600174}...|
|   6|[{1463, 5.1076016...|
|   9|[{1463, 5.3891687...|
|  12|[{113, 5.3132577}...|
|  13|[{1463, 4.976876}...|
|  15|[{1242, 5.438542}...|
|  16|[{1467, 5.586324}...|
|  17|[{1589, 4.6193986...|
|  19|[{793, 5.016589},...|
|  20|[{1278, 4.11146},...|
|  22|[{169, 5.0118}, {...|
|  26|[{1463, 4.3604927...|
|  27|[{313, 3.8850806}...|
|  28|[{1467, 4.878327}...|
|  31|[{1368, 5.3423915...|
|  34|[{1512, 6.616613}...|
|  35|[{776, 3.9237242}...|
|  37|[{1449, 4.760874}...|
+----+--------------------+
only showing top 20 rows



In [12]:
# generate top 10 recommendations for a specific user
user_id = 25
userRecs = model.recommendForAllUsers(10).filter(f"user = {user_id}").select("recommendations").collect()[0][0]

# output all recommendations for the user
for rec in userRecs:
    print(rec)

Row(item=1463, rating=4.89435338973999)
Row(item=113, rating=4.843940734863281)
Row(item=1449, rating=4.747164726257324)
Row(item=169, rating=4.6523118019104)
Row(item=408, rating=4.606603622436523)
Row(item=516, rating=4.595702171325684)
Row(item=1122, rating=4.579644680023193)
Row(item=1642, rating=4.571200847625732)
Row(item=64, rating=4.566840171813965)
Row(item=316, rating=4.562083721160889)


In [13]:
# filter out recommendations already rated/watched by user
movieIds = [rec.item for rec in data.rdd.collect() if rec.user == user_id]

for row in userRecs:
    if row[0] in movieIds:
        userRecs = [row for row in userRecs if row[0] not in movieIds]
        print("Removed row with movieId:", row[0])
    else:
        print("Kept row with movieId:", row[0])

Kept row with movieId: 1463
Kept row with movieId: 113
Kept row with movieId: 1449
Removed row with movieId: 169
Removed row with movieId: 408
Kept row with movieId: 516
Kept row with movieId: 1122
Kept row with movieId: 1642
Kept row with movieId: 64
Kept row with movieId: 316


In [14]:
# print updated recommendations for user_id 
for rec in userRecs:
    print(rec)


Row(item=1463, rating=4.89435338973999)
Row(item=113, rating=4.843940734863281)
Row(item=1449, rating=4.747164726257324)
Row(item=516, rating=4.595702171325684)
Row(item=1122, rating=4.579644680023193)
Row(item=1642, rating=4.571200847625732)
Row(item=64, rating=4.566840171813965)
Row(item=316, rating=4.562083721160889)


In [15]:
# read the movie data file
movieData = spark.read.format("csv").load(r"C:\Users\Aryan Yadav\Desktop\ml-100k\u.item")

# split the values in each row by the delimiter
movieData = movieData.select(split(movieData._c0, "\|").alias("values"))

#create a dictionary with {movieId : movieName} as entries
movie_list = movieData.collect()
movie_dict = {}

for row in movie_list:
    movie_id = row.values[0]
    movie_name = row.values[1]
    movie_dict[int(movie_id)] = movie_name
    

# print the first 20 items in the dictionary
for i, (movie_id, movie_name) in enumerate(movie_dict.items()):
    print(f"{movie_id}: {movie_name}")
    if i == 19:
        break


1: Toy Story (1995)
2: GoldenEye (1995)
3: Four Rooms (1995)
4: Get Shorty (1995)
5: Copycat (1995)
6: Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)
7: Twelve Monkeys (1995)
8: Babe (1995)
9: Dead Man Walking (1995)
10: Richard III (1995)
11: Seven (Se7en) (1995)
12: Usual Suspects
13: Mighty Aphrodite (1995)
14: Postino
15: Mr. Holland's Opus (1995)
16: French Twist (Gazon maudit) (1995)
17: From Dusk Till Dawn (1996)
18: White Balloon
19: Antonia's Line (1995)
20: Angels and Insects (1995)


In [16]:
# generate top 10 recommendations for a specific user along with movie names
from tabulate import tabulate  
table_data = []

for rec in userRecs:
    movieId = rec[0]
    rating = rec[1]
    if movieId in movie_dict:
        movie_name = movie_dict[movieId]
        table_data.append([user_id, movie_name, rating])

print(tabulate(table_data, headers=["User", "Movie Recommended", "Predicted Rating"]))

  User  Movie Recommended                 Predicted Rating
------  ------------------------------  ------------------
    25  Boys                                       4.89435
    25  Horseman on the Roof                       4.84394
    25  Pather Panchali (1955)                     4.74716
    25  Local Hero (1983)                          4.5957
    25  They Made Me a Criminal (1939)             4.57964
    25  Some Mother's Son (1996)                   4.5712
    25  Shawshank Redemption                       4.56684
    25  As Good As It Gets (1997)                  4.56208
