In [1]:
!pip install pyspark
!pip install pandas
import pandas as pd
user_train_df = pd.read_csv("../MATHM0029_2024_TB-1/user_train_df.csv")
user_test_df = pd.read_csv("../MATHM0029_2024_TB-1/user_test_df.csv")


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.2[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


# Standard Model

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col, collect_list, explode
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize Spark session
spark = SparkSession.builder.appName("RecommenderSystem").getOrCreate()

# Prepare the data
columns = ["User ID", "Item ID", "Rating"]
user_train_spark_df = spark.createDataFrame(user_train_df[columns])

# ALS Model Setup
als = ALS(
    userCol="User ID",
    itemCol="Item ID",
    ratingCol="Rating",
    maxIter=10, # Number of iterations - Ensures the algorithm has enough time to converge.
    regParam=0.1, # Regularization parameter - Tests different levels of regularization to prevent overfitting.
    rank=10, # Number of latent factors - This explores different complexities of the model.
    coldStartStrategy="drop"  # Prevent NaN predictions
)

# Train the model
als_model = als.fit(user_train_spark_df)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/25 09:42:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/11/25 09:42:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/11/25 09:42:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
                                                                                

In [5]:
# Evaluate the model with RMSE on test set
user_test_spark_df = spark.createDataFrame(user_test_df[columns])
predictions = als_model.transform(user_test_spark_df)
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Rating",
    predictionCol="prediction"
)
rmse = evaluator.evaluate(predictions)
print(f"RMSE: {rmse}")

RMSE: 0.6934949132375731


                                                                                

# Hyperparameter Tuning

Here’s what each parameter in the ALS model means:

1. maxIter
Description: This sets the maximum number of iterations for the ALS optimization process.
Purpose: ALS is an iterative algorithm, and this parameter controls how many iterations the algorithm will perform to converge to a solution.
Impact:
Increasing maxIter allows the algorithm to potentially achieve a better approximation of the latent factors, improving recommendations.
However, too many iterations might lead to overfitting or unnecessary computational expense if convergence happens early.

2. regParam
Description: This specifies the regularization parameter used to prevent overfitting.
Purpose: Regularization helps control the magnitude of the learned latent factors to avoid overfitting to the training data.
Impact:
Lower regParam values may lead to better fitting of the training data but risk overfitting and poor generalization to test data.
Higher regParam values increase the regularization effect, which may result in underfitting if set too high.

3. rank
Description: This defines the number of latent factors (or features) in the ALS model.
Purpose: The latent factors represent the dimensions in which user and item preferences are mapped. A higher rank allows for a more complex model that can capture nuanced relationships.
Impact:
A larger rank increases the model's capacity to represent complex patterns but also increases computational cost and the risk of overfitting.
A smaller rank might lead to underfitting if it cannot capture sufficient detail in the data.

In [7]:
# Note takes a while to run

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

# Create a parameter grid for tuning
param_grid = ParamGridBuilder() \
    .addGrid(als.rank, [5, 10, 15]) \
    .addGrid(als.maxIter, [10, 20]) \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .build()

# Set up cross-validation
cv = CrossValidator(
    estimator=als,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3  # Use 3-fold cross-validation
)

# Fit cross-validation to the training data
cv_model = cv.fit(user_train_spark_df)

# Find the best model
best_model = cv_model.bestModel

# Evaluate the best model on the test set
predictions = best_model.transform(user_test_spark_df)
rmse = evaluator.evaluate(predictions)

print(f"Best Model Parameters:\n - Rank: {best_model.rank}\n - MaxIter: {best_model._java_obj.parent().getMaxIter()}\n - RegParam: {best_model._java_obj.parent().getRegParam()}")
print(f"RMSE on Test Data: {rmse}")

                                                                                

Best Model Parameters:
 - Rank: 10
 - MaxIter: 10
 - RegParam: 0.1
RMSE on Test Data: 0.6934949132375731


Parameter Grid systematically explores a range of values for each hyperparameter to tune the ALS model.

Cross-validation helps avoid overfitting to a specific train/test split and ensures the selected parameters generalize well.

After tuning, we've selected the best model based on the lowest RMSE during cross-validation.