# Task 2 : Recommender System

## Introduction

Recommendation algorithms are now a crucial component of personalised experiences in the era of digital content delivery. These systems, which range from websites like Amazon to movie streaming services like Netflix, assist customers in finding relevant goods based on their past likes or behaviours. In this project, we use user interaction data from Steam, a well-known digital video game distribution network, to create and assess a collaborative filtering-based recommender system.

User interactions, such as purchases and playtime hours, are included in the dataset. Both behaviours are implicit indications of user preference: hours played show the level of involvement, while a purchase suggests interest or purpose. Our goal is to train an Alternating Least Squares (ALS) model with Apache Spark MLlib utilising this implicit feedback, then implement it as a scalable recommender system. We also apply experiment tracking using MLflow and explore how hyperparameter tuning influences model performance. 

In [0]:
dbutils.fs.ls("/FileStore/tables")

Out[1]: [FileInfo(path='dbfs:/FileStore/tables/Clinicaltrial_16012025.csv', name='Clinicaltrial_16012025.csv', size=205522181, modificationTime=1743006583000),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1740588118000),
 FileInfo(path='dbfs:/FileStore/tables/account-models/', name='account-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations.zip', name='activations.zip', size=8411369, modificationTime=1738764668000),
 FileInfo(path='dbfs:/FileStore/tables/emails.csv', name='emails.csv', size=1426122219, modificationTime=1740662146000),
 FileInfo(path='dbfs:/FileStore/tables/flood.csv', name='flood.csv', size=128984, modificationTime=1739369783000),
 FileInfo(pa

## Dataset Overview

The steam-200k.csv dataset comprises roughly 200,000 user-item interactions, with each row representing an event where a user either bought a game or played it for a specific number of hours. It has four columns: User_Id, Game_Name, Behaviour, and Value. Here the feedback is implicit, we treat the problem as an implicit collaborative filtering task, where the objective is to learn user-item affinities rather than precise ratings.

In [0]:
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import mlflow
import mlflow.spark

# Load the dataset
df = spark.read.csv("/FileStore/tables/steam_200k.csv", header=False, inferSchema=True)
df = df.withColumnRenamed("_c0", "User_id") \
       .withColumnRenamed("_c1", "Game_name") \
       .withColumnRenamed("_c2", "Behavior") \
       .withColumnRenamed("_c3", "Value")

display(df.limit(10))

User_id,Game_name,Behavior,Value
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


## Exploratory Data Analysis (EDA)

Using PySpark, we loaded and explored the dataset to start the project.

The initial evaluation showed:

- There are thousands of unique users and games in the collection.
- The majority of users engage in several interactions.
- In keeping with the reasoning that gameplay happens after purchase, there are more "play" records than "purchase" records.

Prior to modelling, it was crucial to comprehend the distribution of the data. We noticed that certain games were played for a lot longer than others, which can distort the model if feedback signals aren't merged or normalised properly.

In [0]:
from pyspark.sql.functions import countDistinct, when, avg

# Count unique users and games
df.select(countDistinct("User_id").alias("Unique Users"),
          countDistinct("Game_name").alias("Unique Games")).show()

# Show behavior breakdown
df.groupBy("Behavior").count().show()
          
# Top games by average playtime
df.filter(df.Behavior == "play") \
  .groupBy("Game_name") \
  .agg(avg("Value").alias("Avg_hours")) \
  .orderBy("Avg_hours", ascending=False) \
  .show(10)

+------------+------------+
|Unique Users|Unique Games|
+------------+------------+
|       12393|        5155|
+------------+------------+

+--------+------+
|Behavior| count|
+--------+------+
|purchase|129511|
|    play| 70489|
+--------+------+

+--------------------+------------------+
|           Game_name|         Avg_hours|
+--------------------+------------------+
|Eastside Hockey M...|            1295.0|
|Baldur's Gate II ...| 475.2555555555556|
|     FIFA Manager 09|             411.0|
|           Perpetuum|           400.975|
|Football Manager ...| 391.9846153846154|
|Football Manager ...|390.45316455696195|
|Football Manager ...|375.04857142857145|
|Football Manager ...| 365.7032258064516|
|   Freaking Meatbags|             331.0|
|Out of the Park B...|             330.4|
+--------------------+------------------+
only showing top 10 rows



These indicate which user engaged with which game, what kind of interaction took place (play or purchase), and a numerical value that either indicates the amount of hours played or a purchase (1).

Used the countDistinct function to determine how many unique users and unique games there are in the dataset. This is helpful for comprehending the dataset's scale and diversity — how many different users and games are represented.

The two variables in the Behaviour column—"purchase" and "play"—are used to group the dataset in this instance. The number of records for each behaviour is counted by the count() method. This indicates whether we may need to normalise or modify the weights for each category in further analysis and helps assess how balanced the dataset is between buy and play interactions.

The average playtime (in hours) is then determined using the avg() function after filtering the dataset to only include rows with the behaviour "play." After a game is played, this measure provides an indicator of normal user engagement.

## Data Preprocessing

The Alternating Least Squares (ALS) technique in PySpark is used in this block of code to prepare the data for training a collaborative filtering recommendation model. In particular, it converts the string identifiers (User_id, Game_name) into numerical indices needed by the ALS algorithm after filtering the information to concentrate on game play behaviours.

In [0]:
# Generate unique IDs for users and games
from pyspark.ml.feature import StringIndexer

# Filter only play behaviors
play_df = df.filter(df.Behavior == "play")\
            .withColumn("User_id", col("User_id").cast("int")) \
            .withColumn("Rating", col("Value").cast("float")) \
            .select("User_id", "Game_name", "Rating")

user_indexer = StringIndexer(inputCol="User_id", outputCol="userIndex")
game_indexer = StringIndexer(inputCol="Game_name", outputCol="gameIndex")

# Fit the indexers
play_df = user_indexer.fit(play_df).transform(play_df)
play_df = game_indexer.fit(play_df).transform(play_df)

play_df.select("User_id", "Game_name", "gameIndex", "Rating").show(10)

+---------+--------------------+---------+------+
|  User_id|           Game_name|gameIndex|Rating|
+---------+--------------------+---------+------+
|151603712|The Elder Scrolls...|      6.0| 273.0|
|151603712|           Fallout 4|     64.0|  87.0|
|151603712|               Spore|    247.0|  14.9|
|151603712|   Fallout New Vegas|     23.0|  12.1|
|151603712|       Left 4 Dead 2|      4.0|   8.9|
|151603712|            HuniePop|    628.0|   8.5|
|151603712|       Path of Exile|     49.0|   8.1|
|151603712|         Poly Bridge|    914.0|   7.5|
|151603712|         Left 4 Dead|     36.0|   3.3|
|151603712|     Team Fortress 2|      1.0|   2.8|
+---------+--------------------+---------+------+
only showing top 10 rows



The dataset is first filtered by selecting only rows with the behaviour "play" since hours played provide a more accurate indicator of user involvement than straightforward purchase events. The code then creates new columns called userIndex and gameIndex by using StringIndexer to transform the string-based User_id and Game_name columns into numeric values. Because ALS uses integer-based matrices for users and items, it needs these numerical indices. With each user and game represented by a distinct integer and linked to a quantitative feedback score based on gameplay, the data is ready for training the recommendation model by the end of this step.

## Train/Test Split

In [0]:
(training, test) = play_df.randomSplit([0.8, 0.2], seed=42)

The dataset play_df is divided into two distinct subsets, training and test.The data is divided at random by the randomSplit([0.8, 0.2], seed=42) function, with 80% of the records going into the training set and 20% into the test set. The split is ensured to be reproducible because of the seed=42; executing the code with the same seed will result in the same split each time. Because it enables the model to be trained on a subset of the data and then tested on unseen data to gauge its performance, this stage is essential to machine learning.

## ALS Model Training

For collaborative filtering, we decided to use the Alternating Least Squares (ALS) method. ALS can effectively manage implicit feedback and is particularly well-suited for sparse, large-scale datasets. In Spark's ALS implementation, the implicitPrefs=True flag guarantees that the model interprets the feedback values as confidence scores as opposed to explicit ratings.

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

# Initialize ALS model
als = ALS(userCol="userIndex", itemCol="gameIndex", ratingCol="Rating", 
          implicitPrefs=True, coldStartStrategy="drop", nonnegative=True)

# Train the ALS model
model = als.fit(training)

# Make predictions on the test set
predictions = model.transform(test)

# Evaluate using RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"Root-mean-square error = {rmse}")

Root-mean-square error = 211.11927143493753


We trained an initial model with default parameters after dividing the dataset into an 80-20 training/test ratio:

- Rank = 10
- Regularization parameter = 0.1
- Alpha = 1.0

Root Mean Squared Error (RMSE) was used to assess the model on the test set after it had been trained on the training set. The RMSE measures the difference between the actual feedback values and the expected scores. Better model performance is shown by a lower RMSE.

## Hyperparameter Tuning & Experiment Tracking with MLflow

Three important ALS hyperparameters were adjusted in several studies to enhance model performance:

- Rank: The number of latent features to represent users and items.
- RegParam: Regularization parameter to prevent overfitting.
- Alpha: Confidence parameter for weighting implicit feedback.

In [0]:
import mlflow
import mlflow.spark

ranks = [10, 20]
regParams = [0.1, 0.01]
alphas = [1.0, 10.0]

for rank in ranks:
    for reg in regParams:
        for alpha in alphas:
            with mlflow.start_run():
                als = ALS(userCol="userIndex", itemCol="gameIndex", ratingCol="Rating",
                          rank=rank, regParam=reg, alpha=alpha, implicitPrefs=True,
                          coldStartStrategy="drop", nonnegative=True)
                model = als.fit(training)
                predictions = model.transform(test)
                rmse = evaluator.evaluate(predictions)
                
                mlflow.log_param("rank", rank)
                mlflow.log_param("regParam", reg)
                mlflow.log_param("alpha", alpha)
                mlflow.log_metric("rmse", rmse)
                mlflow.spark.log_model(model, "ALSModel")

                print(f"[MODEL] rank={rank}, regParam={reg}, RMSE={rmse:.4f}")

2025/04/23 17:01:30 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=10, regParam=0.1, RMSE=211.1193


2025/04/23 17:03:01 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=10, regParam=0.1, RMSE=211.1133


2025/04/23 17:04:37 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=10, regParam=0.01, RMSE=211.1080


2025/04/23 17:06:12 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=10, regParam=0.01, RMSE=211.1113


2025/04/23 17:12:18 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=20, regParam=0.1, RMSE=211.1356


2025/04/23 17:18:40 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=20, regParam=0.1, RMSE=211.1283


2025/04/23 17:24:50 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=20, regParam=0.01, RMSE=211.1354


2025/04/23 17:31:28 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


[MODEL] rank=20, regParam=0.01, RMSE=211.1284


Performed tests using combinations of:

- Ranks: 10, 20
- RegParams: 0.1, 0.01
- Alphas: 1.0, 10.0

Recorded parameters and outcomes for every run using MLflow in order to effectively manage these trials. This helped in choosing the top-performing configuration by comparing RMSE values across configurations.

A scalable and lightweight system for tracking experiments, saving models, and documenting metrics is offered by MLflow. 

Every run is recorded:

- Hyperparameters for the model
- RMSE value
- Artifact of the Spark MLlib model

Finding the most promising parameter combinations and reproducing outcomes were made simple as a result.

After training the model, we evaluated it on the test dataset and found that the best RMSE was achieved using:

- Rank = 20
- RegParam = 0.01
- Alpha = 10.0

Accurate representation of user-game interactions was balanced with generalisation in this setup.

We then generated top-N recommendations for every user using the algorithm. ALS offers built-in functions like recommendForAllUsers(), which, using the learnt preferences, returns a list of the top k most relevant items for each user.

Personalised game recommendations on websites like Steam can be powered by this list in a production environment.

## Generate Recommendations

In [0]:
game_lookup = play_df.select("Game_name", "gameIndex").distinct()

In [0]:
from pyspark.sql.functions import explode

# Recommend top 5 games for each user
user_recs = model.recommendForAllUsers(5)

# Flatten the recommendation array into rows
user_recs_flat = user_recs.select("userIndex", explode("recommendations").alias("rec")) \
                          .selectExpr("userIndex", "rec.gameIndex as gameIndex", "rec.rating as predicted_rating")

# Join with game names
user_recs_named = user_recs_flat.join(game_lookup, on="gameIndex", how="left")

# Show results
user_recs_named.select("userIndex", "Game_name", "predicted_rating").show(truncate=False)


+---------+-----------------------------------+----------------+
|userIndex|Game_name                          |predicted_rating|
+---------+-----------------------------------+----------------+
|12       |Farming Simulator 2013             |3.1566741       |
|12       |Insurgency Modern Infantry Combat  |2.7749999       |
|12       |Dungeon Defenders II               |2.77229         |
|12       |ARK Survival Evolved               |2.7296934       |
|12       |Sins of a Solar Empire Rebellion   |2.559645        |
|26       |Football Manager 2012              |1.4615393       |
|26       |Construction-Simulator 2015        |1.4250526       |
|26       |Portal                             |1.2474349       |
|26       |Portal 2                           |1.2062197       |
|26       |War Thunder                        |1.1572908       |
|27       |Construction-Simulator 2015        |1.7475168       |
|27       |Sid Meier's Civilization V         |1.68599         |
|27       |Path of Exile 

## Conclusion

In a nutshell recommender systems based on implicit feedback offer strong personalisation features, particularly in fields with few explicit ratings, such as gaming. This effort provides a strong basis for more sophisticated systems and practical uses in the recommendation of digital content.