In [15]:
from pyspark.sql import SparkSession

# Initialize a Spark session
spark = SparkSession.builder \
    .appName("MovieLens Exploration") \
    .getOrCreate()

# Load data
ratings_df = spark.read.csv("../data/ratings_small.csv", header=True, inferSchema=True)
movies_df = spark.read.csv("../data/movies_metadata.csv", header=True, inferSchema=True)

# Display the first few rows of the dataframe
ratings_df.show()

# Print the schema of the dataframe
ratings_df.printSchema()

from pyspark.sql import functions as F

# Average rating per movie
avg_ratings = ratings_df.groupBy("movieId").agg(F.avg("rating").alias("average_rating"))
avg_ratings.show()

# Number of ratings per user
user_ratings_count = ratings_df.groupBy("userId").agg(F.count("rating").alias("num_ratings"))
user_ratings_count.show()

[Stage 3747:>(6 + 2) / 10][Stage 3808:>(0 + 0) / 10][Stage 3827:> (0 + 0) / 1]

In [14]:
# Data Preprocessing

# 1. Handle Missing Values
for col in ratings_df.columns:
    print(col, "\t", "with null values: ", ratings_df.filter(ratings_df[col].isNull()).count())

# Drop them if necessary (just an example)
# ratings_df = ratings_df.dropna()

# 2. Convert Data Types if necessary
# As an example, converting 'rating' column to float type
# ratings_df = ratings_df.withColumn("rating", ratings_df["rating"].cast("float"))

# Join DataFrames to get movie titles in ratings_df
ratings_df = ratings_df.join(movies_df, ["movieId"], "left").select(ratings_df["*"], movies_df["title"])

# 3. Split the Dataset
train_data, test_data = ratings_df.randomSplit([0.8, 0.2], seed=1234)

# Check the count of each dataset
print("Number of training records: " + str(train_data.count()))
print("Number of testing records : " + str(test_data.count()))


ERROR:root:KeyboardInterrupt while sending command.][Stage 3827:> (0 + 0) / 1]
Traceback (most recent call last):
  File "/home/solaraeth/anaconda3/envs/MLRecommender/lib/python3.11/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/solaraeth/anaconda3/envs/MLRecommender/lib/python3.11/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/home/solaraeth/anaconda3/envs/MLRecommender/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 

In [4]:
from pyspark.ml.recommendation import ALS

# Define the ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating", nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

# Set model parameters
model = als.fit(train_data)

# Predict on the test set
predictions = model.transform(test_data)


23/08/31 05:53:29 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
                                                                                

In [5]:
from pyspark.ml.evaluation import RegressionEvaluator

# Evaluate the model with Root Mean Squared Error
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))


                                                                                

Root-mean-square error = 0.9101083617002579


In [9]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

# Define the ALS model
als = ALS(userCol="userId", itemCol="movieId", ratingCol="rating" , nonnegative=True, implicitPrefs=False, coldStartStrategy="drop")

# Define the evaluator as RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

# Construct the grid of parameters to search over
ParamGrid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 25]) \
    .addGrid(als.maxIter, [5, 10]) \
    .addGrid(als.regParam, [0.01, 0.1,]) \
    .build()

# Setup cross valudation
crossval = CrossValidator(estimator=als,
                          estimatorParamMaps=ParamGrid,
                          evaluator=evaluator,
                          numFolds=2)

# Run cross-validation
cvModel = crossval.fit(train_data)

# Use test set to measure the accuracy of our model on the new data
predictions = cvModel.transform(test_data)
rmse = evaluator.evaluate(predictions)

print("Best model RMSE: ", rmse)
print("Best model rank: ", cvModel.bestModel.rank)
print("Best model maxIter: ", cvModel.bestModel._java_obj.parent().getMaxIter())
print("Best model regParam: ", cvModel.bestModel._java_obj.parent().getRegParam())

23/08/31 19:45:45 WARN CacheManager: Asked to cache already cached data.
23/08/31 19:45:45 WARN CacheManager: Asked to cache already cached data.

KeyboardInterrupt: 



In [11]:
# Retrain the model using the best hyperparameters
best_als = ALS(
    rank=5,
    maxIter=5,
    regParam=0.1,
    userCol="userId",
    itemCol="movieId",
    ratingCol="rating",
    nonnegative=True,
    implicitPrefs=False,
    coldStartStrategy="drop"
)
final_model = best_als.fit(ratings_df)

# Generate top 10 movie recommendations for each user
user_recommendations = final_model.recommendForAllUsers(10)

# Show recommendations for the first few users
user_recommendations.show()

[Stage 3579:>(8 + 2) / 10][Stage 3594:> (0 + 0) / 1][Stage 3597:> (0 + 0) / 1]0]

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{2563, 4.248676}...|
|     2|[{92494, 5.902215...|
|     3|[{92494, 5.579682...|
|     4|[{83411, 7.021948...|
|     5|[{83411, 5.636155...|
|     6|[{83411, 5.321218...|
|     7|[{83411, 5.557587...|
|     8|[{83411, 5.659895...|
|     9|[{92494, 5.805102...|
|    10|[{92494, 6.19543}...|
|    11|[{92494, 6.099708...|
|    12|[{6219, 5.4512215...|
|    13|[{90061, 5.158160...|
|    14|[{3414, 5.1059365...|
|    15|[{5071, 4.9651093...|
|    16|[{83411, 5.749088...|
|    17|[{83411, 5.813849...|
|    18|[{83411, 5.523983...|
|    19|[{83411, 5.783284...|
|    20|[{121231, 5.60584...|
+------+--------------------+
only showing top 20 rows



                                                                                

[Stage 3613:>(2 + 2) / 10][Stage 3623:> (0 + 0) / 1][Stage 3637:>(0 + 0) / 10]0]

In [13]:
from pyspark.sql.functions import explode, col

# Explode the recommendations column into seperate rows
exploded_recs = user_recommendations.select("userId", explode("recommendations").alias("recommendation"))

# Extract movieId and rating from the recommendation struct column
exploded_recs = exploded_recs.select(
    "userId",
    col("recommendation.movieId").alias("movieId"),
    col("recommendation.rating").alias("prediction")
)

# Join the exploded recomendations with the movies_df to get the titles
final_recommendations = exploded_recs.join(movies_df, on="movieId").select("userId", "title", "prediction")

# Show the recommendations
final_recommendations.show()

[Stage 3615:>              (0 + 2) / 10][Stage 3647:>               (0 + 0) / 1]

AnalysisException: [UNRESOLVED_USING_COLUMN_FOR_JOIN] USING column `movieId` cannot be resolved on the right side of the join. The right-side columns: [`adult`, `belongs_to_collection`, `budget`, `genres`, `homepage`, `id`, `imdb_id`, `original_language`, `original_title`, `overview`, `popularity`, `poster_path`, `production_companies`, `production_countries`, `release_date`, `revenue`, `runtime`, `spoken_languages`, `status`, `tagline`, `title`, `video`, `vote_average`, `vote_count`].

