##  Task 2: Recommender System – Introduction
In this task, I developed a recommender system using user behavior data from Steam, a popular video game distribution platform. The dataset contains over 200,000 entries detailing whether a user has purchased or played a game, with playtime represented in hours. By analyzing these implicit feedback signals, I used collaborative filtering to generate personalized game recommendations.

I leveraged Apache Spark's MLlib library, particularly the Alternating Least Squares (ALS) algorithm, to handle large-scale matrix factorization. My workflow included data loading, cleaning, preprocessing, model training, hyperparameter tuning, and result evaluation. The entire pipeline was implemented in Databricks for scalable performance and integrated MLflow tracking.

This notebook is structured to provide not only functional code but also thorough explanations of each step to promote clarity, reproducibility, and good software engineering practice.

## Task 2.1 Data Loading and Initial Inspection

I started by loading the dataset into a Spark DataFrame. Since it didn’t have predefined column headers, I manually assigned appropriate names: 'User_ID', 'Game', 'Action', and 'Value'. Each row represents either a ‘purchase’ or ‘play’ event, with purchases marked as '1' and play events showing the number of hours played.

To understand user behavior, I explored the distribution of actions. The dataset includes over 120,000 purchases and approximately 70,000 play entries. I also confirmed that the 'User_ID' and 'Game' columns are unique and free from nulls, making them reliable identifiers for collaborative filtering.

This initial exploration gave me a better sense of the dataset’s structure and helped me plan the upcoming transformation and modeling steps.

In [0]:
dbutils.fs.ls("/FileStore/tables")

[FileInfo(path='dbfs:/FileStore/tables/BDTT_Assignment_1_Enron.zip', name='BDTT_Assignment_1_Enron.zip', size=375294957, modificationTime=1742223289000),
 FileInfo(path='dbfs:/FileStore/tables/Clinicaltrial_16012025.csv', name='Clinicaltrial_16012025.csv', size=205522181, modificationTime=1742551688000),
 FileInfo(path='dbfs:/FileStore/tables/MUHAMMED FAHIM/', name='MUHAMMED FAHIM/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1740589459000),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts.zip', name='accounts.zip', size=5297592, modificationTime=1738172850000),
 FileInfo(path='dbfs:/FileStore/tables/acount-models/', name='acount-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationTime=0),


In [0]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AssignmentTask2").getOrCreate()

file_path = "/FileStore/tables/steam_200k.csv"

df = spark.read.csv(
    file_path,
    header=False, 
    inferSchema=True,
    quote='"', 
    escape='"', 
    multiLine=True 
)

df.printSchema()
display(df.limit(50))

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)



_c0,_c1,_c2,_c3
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


In [0]:
df = df.withColumnRenamed("_c0", "User_ID") \
       .withColumnRenamed("_c1", "Game") \
       .withColumnRenamed("_c2", "Action") \
       .withColumnRenamed("_c3", "Value")

display(df.limit(20))


User_ID,Game,Action,Value
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


## Exploratory Data Analysis and Behavioral Insights

Before modeling, I performed basic exploratory data analysis. I found that there are over 12,000 unique users and more than 5,000 distinct games, indicating a highly sparse user-item interaction matrix. I also noticed that while most users played games for less than 100 hours, some outliers played games for hundreds or even thousands of hours.

These insights confirmed that playtime distribution is skewed and could potentially impact the model. Understanding this early on helped me later when tuning the model and evaluating performance. I considered the possibility of normalizing the values or clipping extreme playtimes but proceeded first with the raw data to establish a baseline.

In [0]:
print("Total users:", df.select("User_ID").distinct().count())
print("Total games:", df.select("Game").distinct().count())

Total users: 12393
Total games: 5155


In [0]:
df.groupBy("Action").count().show()


+--------+------+
|  Action| count|
+--------+------+
|purchase|129511|
|    play| 70489|
+--------+------+



In [0]:
from pyspark.sql.functions import col, sum

for c in df.columns:
    df.select(sum(col(c).isNull().cast("int")).alias(c)).show()


+-------+
|User_ID|
+-------+
|      0|
+-------+

+----+
|Game|
+----+
|   0|
+----+

+------+
|Action|
+------+
|     0|
+------+

+-----+
|Value|
+-----+
|    0|
+-----+



In [0]:
print(df.columns)

['User_ID', 'Game', 'Action', 'Value']


## Data Cleaning and Preprocessing

For effective ALS training, the data needed preprocessing. Since ALS requires integer IDs, I used Spark’s 'monotonically_increasing_id' to assign a unique numeric ID to each game. The 'User_ID' values were already numeric, so I retained them as-is.

I decided to focus on 'play' actions rather than 'purchase' events because playtime is a stronger indicator of user interest. I filtered the dataset accordingly and preserved the number of hours played as the implicit rating.

At this point, my dataset included three key columns: 'User_ID', 'Game_ID', and 'Rating'. This cleaned structure was essential for fitting the ALS model.


## Encoding Game IDs for ALS

Spark’s ALS implementation requires integer values for both users and items. Since my dataset used game names as strings, I generated numeric IDs for each game using 'monotonically_increasing_id()' and joined this mapping back to the main dataset.

I opted for this approach over using 'StringIndexer' because I wanted globally unique IDs without relying on label frequency or sorting. As a result, each game had a corresponding 'Game_ID', making it compatible with ALS training.

This step ensured that my final dataset, consisting of 'User_ID', 'Game_ID', and 'Rating', adhered to ALS input requirements.


In [0]:
from pyspark.sql.functions import monotonically_increasing_id

game_ids = df.select("Game").distinct().withColumn("Game_ID", monotonically_increasing_id())

df = df.join(game_ids, "Game")

display(df.limit(10)) 

Game,User_ID,Action,Value,Game_ID
The Elder Scrolls V Skyrim,151603712,purchase,1.0,3036
The Elder Scrolls V Skyrim,151603712,play,273.0,3036
Fallout 4,151603712,purchase,1.0,478
Fallout 4,151603712,play,87.0,478
Spore,151603712,purchase,1.0,4524
Spore,151603712,play,14.9,4524
Fallout New Vegas,151603712,purchase,1.0,4470
Fallout New Vegas,151603712,play,12.1,4470
Left 4 Dead 2,151603712,purchase,1.0,86
Left 4 Dead 2,151603712,play,8.9,86


In [0]:
df_play = df.filter(df.Action == "play").select("User_ID", "Game_ID", "Value")
df_play = df_play.withColumnRenamed("Value", "Rating")  
display(df_play.limit(10))


User_ID,Game_ID,Rating
151603712,3036,273.0
151603712,478,87.0
151603712,4524,14.9
151603712,4470,12.1
151603712,86,8.9
151603712,3893,8.5
151603712,1806,8.1
151603712,4813,7.5
151603712,2317,3.3
151603712,4553,2.8


##Task 2.2 ALS Model Training

With the data preprocessed, I trained an ALS (Alternating Least Squares) model using Spark MLlib. This collaborative filtering algorithm creates latent vectors for users and items to predict ratings.

I trained the model using 80% of the data and used the remaining 20% for evaluation. For my initial model, I used basic hyperparameters like 'rank=10' and 'regParam=0.1'. I also set 'coldStartStrategy="drop"' to avoid issues with unseen users or games during evaluation.

After training, I generated predictions on the test set and prepared for model evaluation using RMSE.


In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col

(train_data, test_data) = df.randomSplit([0.8, 0.2], seed=42)

als = ALS(
    userCol="User_ID",   
    itemCol="Game_ID",  
    ratingCol="Value",  
    nonnegative=True,   
    implicitPrefs=False, 
    coldStartStrategy="drop" 
)

als_model = als.fit(train_data)


In [0]:
predictions = als_model.transform(test_data)

predictions = predictions.na.drop()

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="Value",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")


Root Mean Squared Error (RMSE): 191.73180018408405


## Model Evaluation

I evaluated the performance of my ALS model using RMSE (Root Mean Squared Error). Initially, the model returned an RMSE of around 191, which indicated a significant deviation from the true playtime values.

This high error was expected, given the skewed distribution of playtime in the dataset. However, it gave me a benchmark to improve upon. I knew that further tuning and a better understanding of user interaction patterns could help reduce this error.

By identifying areas of improvement, such as better regularization and considering implicit feedback, I prepared for the next step: tuning the model.


### Hyperparameter Tuning

To enhance model performance, I experimented with several combinations of ALS hyperparameters. I varied 'rank', 'regParam', and 'maxIter' across multiple runs and evaluated the resulting models using RMSE.

After testing different values, I found that setting 'rank=10', 'regParam=0.1', and 'maxIter=15' while enabling 'implicitPrefs=True' significantly improved the model. The RMSE dropped to around 145, a considerable improvement from the baseline.

This process taught me the importance of fine-tuning in collaborative filtering models. Small changes in parameters can lead to meaningful improvements in recommendation quality.

In [0]:
als = ALS(
    userCol="User_ID",
    itemCol="Game_ID",
    ratingCol="Value",
    nonnegative=True,
    implicitPrefs=True, 
    coldStartStrategy="drop",
    rank=10,            
    maxIter=15,        
    regParam=0.1       
)

als_model = als.fit(train_data)

 predictions = als_model.transform(test_data).na.drop()
rmse = evaluator.evaluate(predictions)
print(f"Tuned RMSE: {rmse}")


Tuned RMSE: 145.76628440892677


In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

ranks = [5, 10, 15]
regParams = [0.01, 0.1, 0.2]
iterations = [10, 15]

best_rmse = float("inf")
best_model = None
best_params = {}

for rank in ranks:
    for reg in regParams:
        for maxIter in iterations:
            als = ALS(
                userCol="User_ID",
                itemCol="Game_ID",
                ratingCol="Value",
                rank=rank,
                maxIter=maxIter,
                regParam=reg,
                nonnegative=True,
                implicitPrefs=True,
                coldStartStrategy="drop"
            )
            model = als.fit(train_data)
            predictions = model.transform(test_data).na.drop()
            rmse = evaluator.evaluate(predictions)

            print(f"RMSE (rank={rank}, regParam={reg}, maxIter={maxIter}): {rmse}")

            if rmse < best_rmse:
                best_rmse = rmse
                best_model = model
                best_params = {
                    "rank": rank,
                    "regParam": reg,
                    "maxIter": maxIter
                }

print(f"\n Best Model --> RMSE: {best_rmse}, Params: {best_params}")

RMSE (rank=5, regParam=0.01, maxIter=10): 145.76815768596688
RMSE (rank=5, regParam=0.01, maxIter=15): 145.7671922826825
RMSE (rank=5, regParam=0.1, maxIter=10): 145.77218825767412
RMSE (rank=5, regParam=0.1, maxIter=15): 145.7714109075554
RMSE (rank=5, regParam=0.2, maxIter=10): 145.77195900698192
RMSE (rank=5, regParam=0.2, maxIter=15): 145.77050719598998
RMSE (rank=10, regParam=0.01, maxIter=10): 145.76410823305486
RMSE (rank=10, regParam=0.01, maxIter=15): 145.7626723272945
RMSE (rank=10, regParam=0.1, maxIter=10): 145.76711979507564
RMSE (rank=10, regParam=0.1, maxIter=15): 145.76628440877388
RMSE (rank=10, regParam=0.2, maxIter=10): 145.768889521988
RMSE (rank=10, regParam=0.2, maxIter=15): 145.76768156830275
RMSE (rank=15, regParam=0.01, maxIter=10): 145.76177157095813
RMSE (rank=15, regParam=0.01, maxIter=15): 145.7602005748532
RMSE (rank=15, regParam=0.1, maxIter=10): 145.7653977114824
RMSE (rank=15, regParam=0.1, maxIter=15): 145.7641069248583
RMSE (rank=15, regParam=0.2, max

## Experiment Tracking with MLflow

To manage and track different model runs, I integrated MLflow into my workflow. MLflow allowed me to log parameters such as 'rank', 'regParam', and 'maxIter, along with performance metrics like RMSE.

I also logged the trained model to enable future comparisons and deployment. This process helped me manage experiments systematically, especially while tuning hyperparameters.

Using MLflow gave me better visibility into my experiments and aligned my workflow with modern MLOps practices.

In [0]:
import mlflow
import mlflow.spark

with mlflow.start_run():
    mlflow.log_params(best_params)
    mlflow.log_metric("rmse", best_rmse)
    mlflow.spark.log_model(best_model, "als-model")


2025/04/23 13:08:27 INFO mlflow.spark: Inferring pip requirements by reloading the logged model from the databricks artifact repository, which can be time-consuming. To speed up, explicitly specify the conda_env or pip_requirements when calling log_model().


Downloading artifacts:   0%|          | 0/38 [00:00<?, ?it/s]

2025/04/23 13:08:29 INFO mlflow.store.artifact.artifact_repo: The progress bar can be disabled by setting the environment variable MLFLOW_ENABLE_ARTIFACTS_PROGRESS_BAR to false


Uploading artifacts:   0%|          | 0/4 [00:00<?, ?it/s]

## Final Game Recommendations for a Specific User

After training and tuning my ALS model, I proceeded to generate game recommendations. Using Spark’s 'recommendForAllUsers()' function, I obtained the top 10 personalized game recommendations for each user in the dataset. This method returns an array of the most relevant 'Game_ID's and their predicted ratings.

To explore individual user preferences more closely, I selected a specific user (user ID: 144736) and filtered the recommendations to display only their personalized list. This allowed me to evaluate the model’s effectiveness for a concrete use case and observe the quality of predicted game matches.

By displaying both the full recommendation dataset and the filtered subset, I was able to compare results at scale and at a personal level. This step not only confirmed that the model was functioning as intended but also showcased its practical application in generating relevant, user-specific recommendations for real-world platforms like Steam.

In [0]:
user_recommendations = als_model.recommendForAllUsers(10)
display(user_recommendations)


User_ID,recommendations
76767,"List(List(2698, 1.088009), List(2355, 1.0682054), List(2292, 1.0463887), List(1895, 1.0439525), List(4470, 1.0177672), List(315, 1.0138277), List(4503, 0.9896123), List(24, 0.94962287), List(4084, 0.9444698), List(3587, 0.92860234))"
144736,"List(List(4315, 0.4794949), List(4211, 0.43175635), List(408, 0.36450392), List(516, 0.34135327), List(2375, 0.33916238), List(2321, 0.33089447), List(4686, 0.32187015), List(1873, 0.30560964), List(1177, 0.3054879), List(5031, 0.29609084))"
229911,"List(List(4315, 1.0798972), List(4211, 0.9736465), List(55, 0.92145264), List(86, 0.9088483), List(408, 0.8710248), List(24, 0.85985065), List(4503, 0.8217643), List(1808, 0.8062978), List(516, 0.8049519), List(1177, 0.78290313))"
835015,"List(List(4315, 0.4288638), List(4211, 0.38616607), List(408, 0.326015), List(516, 0.30530888), List(2375, 0.30334935), List(2321, 0.29595447), List(4686, 0.28788304), List(1873, 0.27333954), List(1177, 0.27323064), List(5031, 0.26482585))"
948368,"List(List(4315, 0.9744824), List(55, 0.9194612), List(4211, 0.87746286), List(408, 0.7519003), List(516, 0.74365366), List(1873, 0.70086944), List(4686, 0.69314384), List(2375, 0.6892832), List(2321, 0.6724802), List(1177, 0.655858))"
975449,"List(List(4502, 1.1297665), List(2440, 1.1036294), List(5031, 1.101384), List(3188, 1.0892375), List(4932, 1.0887299), List(3723, 1.0842478), List(3066, 1.0815394), List(315, 1.0776682), List(620, 1.0771718), List(3871, 1.0642183))"
1268792,"List(List(4315, 0.45386887), List(4211, 0.40868163), List(408, 0.34502342), List(516, 0.32311), List(2375, 0.32103625), List(2321, 0.3132102), List(4686, 0.30466816), List(1873, 0.2892767), List(1177, 0.28916144), List(5031, 0.2802666))"
2531540,"List(List(4315, 0.93809134), List(4211, 0.844647), List(2698, 0.7942907), List(408, 0.7691196), List(2292, 0.76679075), List(1808, 0.7277081), List(516, 0.70647323), List(1152, 0.68015176), List(2375, 0.6755224), List(1177, 0.6752027))"
2753525,"List(List(55, 1.1914524), List(1808, 1.1051337), List(1177, 1.0847028), List(1496, 1.0743893), List(86, 1.0742031), List(4553, 1.0534204), List(1109, 1.0160261), List(3109, 1.0037587), List(2589, 0.9770841), List(590, 0.9613151))"
3450426,"List(List(4315, 0.53206825), List(4211, 0.47909546), List(408, 0.40446928), List(516, 0.37878028), List(2375, 0.3763492), List(2321, 0.36717477), List(4686, 0.35716102), List(1873, 0.33911765), List(1177, 0.33898255), List(5031, 0.3285552))"


In [0]:
user_id = 144736
user_rec = user_recommendations.filter(col("User_ID") == user_id)
display(user_rec)


User_ID,recommendations
144736,"List(List(4315, 0.4794949), List(4211, 0.43175635), List(408, 0.36450392), List(516, 0.34135327), List(2375, 0.33916238), List(2321, 0.33089447), List(4686, 0.32187015), List(1873, 0.30560964), List(1177, 0.3054879), List(5031, 0.29609084))"


## Critical Reflection on the Model

While the ALS model provided decent results, I noticed several challenges. The biggest issue was the wide range of playtimes, which introduced high variance and inflated RMSE scores. I also realized that not all playtimes are positive feedback—some users may try a game briefly and dislike it.

Another issue was the cold-start problem, where new users or games had no history and thus couldn't be recommended accurately. I considered possible improvements like incorporating game metadata or user demographics, though these weren’t available in the current dataset.

Overall, while ALS served as a solid starting point, there’s potential to explore hybrid models or normalize the data for better results.

## Conclusion and Future Work

In this project, I successfully built a collaborative filtering recommender system using Spark’s ALS algorithm on user behavior data from Steam. By focusing on playtime as a proxy for user preference, I was able to make personalized game recommendations.

After preprocessing and initial training, I conducted hyperparameter tuning and used MLflow to track experiments. The best model achieved an RMSE of around 145, which showed meaningful predictive capability.