# Today, I will be selecting the movie lens dataset which contains 100k ratings.
You can download the dataset from [here](https://grouplens.org/datasets/movielens/?utm_source=chatgpt.com)

In [0]:
df = spark.read.table("luffy.phase2.ratings")
display(df.head(10))

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815
1,50,5.0,964982931
1,70,3.0,964982400
1,101,5.0,964980868
1,110,4.0,964982176
1,151,5.0,964984041
1,157,5.0,964984100


In [0]:
df.printSchema()
print(f"Shape: ({df.count()},{len(df.columns)})")

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: long (nullable = true)

Shape: (100836,4)


In [0]:
ratings = df.drop("timestamp")
display(ratings.head(10))

userId,movieId,rating
1,1,4.0
1,3,4.0
1,6,4.0
1,47,5.0
1,50,5.0
1,70,3.0
1,101,5.0
1,110,4.0
1,151,5.0
1,157,5.0


In [0]:
display(ratings.select("rating").distinct())

rating
4.0
5.0
3.0
2.0
1.0
4.5
3.5
2.5
0.5
1.5


##Training the ALS(Alternating Least Square) Model:
## Steps to train ALS Model:
- Split the data and train and test sets
- Create ALS Model
- Train the Model
- Make Predictions
- Evaluate

In [0]:
train, test = ratings.randomSplit([0.8, 0.2], seed=42)
train.count(),test.count()

(80481, 20355)

In [0]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    rank=32,              # number of latent factors
    maxIter=10,           # number of iterations
    regParam=0.2,         # regularization
    implicitPrefs=False,  # because these are real ratings
    coldStartStrategy="drop"  # If a user appears in test not in train, this will drop those types of rows to prevent predicitng NaN
)

In [0]:
model = als.fit(train)

In [0]:
predictions = model.transform(test)
display(predictions.head(10))

userId,movieId,rating,prediction
1,151,5.0,4.075538635253906
1,216,5.0,3.757520914077759
1,231,5.0,3.387457847595215
1,362,5.0,3.996443510055542
1,423,3.0,3.7773244380950928
1,441,4.0,4.480555534362793
1,457,5.0,4.537994384765625
1,500,3.0,3.94628381729126
1,552,4.0,3.6999552249908447
1,593,4.0,4.726114273071289


In [0]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print("RMSE =", rmse)

RMSE = 0.8830109920691748


In [0]:
users = ratings.select("userId").distinct()
movies = ratings.select("movieId").distinct()
user_movie = users.crossJoin(movies)
display(user_movie.head(10))

userId,movieId
28,151
29,480
30,1025
33,1219
93,1226
151,1552
171,1920
200,2054
208,2141
221,2273


In [0]:
predictions = model.transform(user_movie)

In [0]:
rated_movies = ratings.select("userId", "movieId")

unrated_predictions = predictions.join(
    rated_movies,
    ["userId", "movieId"],
    "left_anti"
)

In [0]:

from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col

window = Window.partitionBy("userId").orderBy(col("prediction").desc())

top5 = unrated_predictions \
    .withColumn("rank", row_number().over(window)) \
    .filter(col("rank") <= 5)

display(top5.head(10))

userId,movieId,prediction,rank
1,170355,5.989782333374023,1
1,5490,5.931138515472412,2
1,132333,5.931138515472412,3
1,5915,5.931138515472412,4
1,33649,5.776803970336914,5
2,170355,4.9758710861206055,1
2,5490,4.864869594573975,2
2,132333,4.864869594573975,3
2,5915,4.864869594573975,4
2,33649,4.816833019256592,5


In [0]:
top5.write.format("delta").mode('overwrite').save("/Volumes/luffy/phase2/gold/top5_recommendations")