In [0]:
# Dependencies and configuration
# Requires ML runtime

from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, trim, lower, when, floor, explode, collect_set, size, udf, sum as spark_sum, round as spark_round, max as spark_max, format_number
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType, StructType, StructField, DoubleType, StringType, FloatType
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
# Install and set up MLflow library
import mlflow
import mlflow.spark
from mlflow.tracking import MlflowClient
# Confirm the destination for experiment runs
print("Current tracking URI:", mlflow.get_tracking_uri())
# Confirm MLflow version
print("MLflow version:" + mlflow.__version__)
# Import Python's logging module
import logging
# Set MLflow's logging level to ERROR (To only display ERROR-level and critical log messages)
logging.getLogger("mlflow").setLevel(logging.ERROR)
# Set the path for the MLflow experiment
experiment_path = "/Users/j.l.wong@edu.salford.ac.uk/game_recommender_system_v3.0"
# Set the MLflow experiment
mlflow.set_experiment(experiment_path)
# Print runtime version
print("Databricks Runtime Version:", spark.conf.get("spark.databricks.clusterUsageTags.sparkVersion"))

Current tracking URI: databricks
MLflow version:2.1.1
Databricks Runtime Version: 12.2.x-cpu-ml-scala2.12


##1. Introduction##

This Databricks notebook addresses the task of building a collaborative filtering recommender system using a dataset extracted from Steam, a video game platform. The dataset contains user/game interaction data based on two types of implicit feedback, game purchases and hours played, and each row captures a unique interaction between a member and a game, including the member’s ID, the game’s name, the type of behaviour ("play" or "purchase"), and a column indicating either the number of hours played for rows with "play" behaviour or a value of 1 for rows with "purchase" behaviour. Because games must be purchased before being played, some member/game pairs appear twice in the dataset, each time with different behaviours. The dataset does not include game IDs.

The goal is to process this data and use it to train ALS models in Spark MLlib, evaluate their performance and explore the recommendations they generate. Additionally, it is expected that the solution implements an MLflow experiment involving multiple runs for instance using different hyperparameter combinations.

The proposed solution implements two modelling approaches, one using only playtime and another combining playtime with a weighted purchase flag as implicit feedback. Several combinations of vector dimensions and regularisation strengths are tested as part of the hyperparameter tuning stage, beginning from baseline models. All the models are evaluated using Root Mean Squared Error (RMSE), Precision@10 and Recall@10 metrics. To demonstrate the use of the trained models, the best Recall and Precision performing models are then used to generate ten personalised recommendations for each member and another ten for a single member. All implementation steps, including the transformation of member and game identifiers into numeric indices, are carried out entirely within Spark and experiment tracking with multiple runs is handled programmatically with MLflow.

## 2. Data Loading and Initial Exploration##

We started by verifying the dataset has been correctly uploaded to the Databricks FileStore using dbutils.fs.ls().  Then,  we carried out an initial exploration to understand the structure and contents of the dataset, for which we loaded the raw data into a Resilient Distributed Dataset (RDD) using sparkContext.textFile(),  which treats each line of the CSV file as a raw string and displayed a sample with the take() function.

After confirming the structure of the dataset (comma-separated values consistent with member ID, game name, behaviour type, and a numerical value representing purchase or playtime), we loaded the dataset into a Spark DataFrame using spark.read.csv(). This keeps all values in string format for better control during early-stage preprocessing.

We then performed several basic exploratory checks to understand the dataset’s composition and identify potential data quality issues, including counting total rows, distinct members, and distinct games. Similarly, we checked the unique behaviours recorded (purchase vs play), since these will influence how we treat user feedback, and inspected the presence of nulls in each column to ensure there are no missing values that could affect transformations or model training.

Additionally, we used a regular expression to confirm that the hours played column contains valid numeric values in string form (integers or floats), which is what is expected to be used for raitings in the ALS algorithm.

In [0]:
%python
# check that the file has been uploaded.
dbutils.fs.ls("/FileStore/tables/")

Out[2]: [FileInfo(path='dbfs:/FileStore/tables/BDTT_Assignment_1_Enron/', name='BDTT_Assignment_1_Enron/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/BDTT_Assignment_1_Enron-1.zip', name='BDTT_Assignment_1_Enron-1.zip', size=375294957, modificationTime=1742269195000),
 FileInfo(path='dbfs:/FileStore/tables/BDTT_Assignment_1_Enron.zip', name='BDTT_Assignment_1_Enron.zip', size=375294957, modificationTime=1739618921000),
 FileInfo(path='dbfs:/FileStore/tables/Clinicaltrial_16012025.csv', name='Clinicaltrial_16012025.csv', size=205522181, modificationTime=1742570281000),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1740575789000),
 FileInfo(path='dbfs:/FileStore/tables/account-models/', name='account-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/a

In [0]:
# Load raw data into and RDD
steamRDD = sc.textFile("dbfs:/FileStore/tables/steam_200k.csv")

In [0]:
# Display a sample of the data to inspect
steamRDD.take(10)

Out[4]: ['151603712,The Elder Scrolls V Skyrim,purchase,1',
 '151603712,The Elder Scrolls V Skyrim,play,273',
 '151603712,Fallout 4,purchase,1',
 '151603712,Fallout 4,play,87',
 '151603712,Spore,purchase,1',
 '151603712,Spore,play,14.9',
 '151603712,Fallout New Vegas,purchase,1',
 '151603712,Fallout New Vegas,play,12.1',
 '151603712,Left 4 Dead 2,purchase,1',
 '151603712,Left 4 Dead 2,play,8.9']

In [0]:
# Loads the data as strings into a DataFrame
steamDF_raw = spark.read.csv("dbfs:/FileStore/tables/steam_200k.csv", header=False, inferSchema=False)

In [0]:
# Display a sample of the DataFrame
steamDF_raw.limit(10).show(10, truncate=False)

+---------+--------------------------+--------+----+
|_c0      |_c1                       |_c2     |_c3 |
+---------+--------------------------+--------+----+
|151603712|The Elder Scrolls V Skyrim|purchase|1   |
|151603712|The Elder Scrolls V Skyrim|play    |273 |
|151603712|Fallout 4                 |purchase|1   |
|151603712|Fallout 4                 |play    |87  |
|151603712|Spore                     |purchase|1   |
|151603712|Spore                     |play    |14.9|
|151603712|Fallout New Vegas         |purchase|1   |
|151603712|Fallout New Vegas         |play    |12.1|
|151603712|Left 4 Dead 2             |purchase|1   |
|151603712|Left 4 Dead 2             |play    |8.9 |
+---------+--------------------------+--------+----+



In [0]:
# Count rows
row_count = steamDF_raw.count()
print(f"Row count:{row_count}") 

Row count:200000


In [0]:
# Count of members 
members_count = steamDF_raw.select((col("_c0"))).distinct().count()
print(f"Members count: {members_count}")

Members count: 12393


In [0]:
# Count of games
games_count = steamDF_raw.select((col("_c1"))).distinct().count()
print(f"Games count: {games_count}")

Games count: 5155


In [0]:
# Show all unique behaviours
steamDF_raw.select((col("_c2"))).alias("behaviour").distinct().show(truncate=False)

+--------+
|_c2     |
+--------+
|purchase|
|play    |
+--------+



In [0]:
# Count rows where _c2 == "purchase"
purchase_count = steamDF_raw.filter("_c2 = 'purchase'").count()
print(f"Number of purchase records: {purchase_count}")

Number of purchase records: 129511


In [0]:
# Count rows where _c2 == "play"
purchase_count = steamDF_raw.filter("_c2 = 'play'").count()
print(f"Number of play records: {purchase_count}")

Number of play records: 70489


In [0]:
# Check for null values

# Dictionary mapping column names to labels for output 
columns_to_check = {
    "_c0": "memberID",
    "_c1": "game Name",
    "_c2": "behaviour",
    "_c3": "hours Played"
}


# Iterate through each column to check for null values
for col_name, label in columns_to_check.items():
    # Count rows where this specific column contains null values
    null_count = steamDF_raw.filter(col(col_name).isNull()).count()
    # Print the results
    print(f"Number of rows with null {label}: {null_count}")

Number of rows with null memberID: 0
Number of rows with null game Name: 0
Number of rows with null behaviour: 0
Number of rows with null hours Played: 0


In [0]:
# Check hours played column contains Strings consistent with float  

# Defines a regular expresion matching one or more digits followed by an optional group consisting of a dot and a sequence of one or more digits 
numeric_regex = "^[0-9]+(\\.[0-9]+)?$"

# Count of numeric values (including decimals)
numeric_value = steamDF_raw.filter(col("_c3").rlike(numeric_regex)).count()
# Print the results
print(f"Number of rows with numeric value: {numeric_value}")

# Count other values not null or not matched by the regex
dirty_value = steamDF_raw.filter(~col("_c3").rlike(numeric_regex) & col("_c3").isNotNull()).count()
# Print the results
print(f"Number of rows with dirty value (non-numeric, non-null): {dirty_value}")

Number of rows with numeric value: 200000
Number of rows with dirty value (non-numeric, non-null): 0


The outputs obtained in the Data Loading and Initial Exploration stage confirmed that the dataset was successfully loaded and contains 200,000 member/game interaction records, involving 12,393 unique members and 5,155 unique games. No missing values were found in any column, 

Although the dataset did not have headers, the four columns were consistent with the description of the task's expected values. One of the columns contained the String distinct values "purchase" and "play", consistent with the behaviours to be used for implicit feedback. There was a column consistent with a numeric value to be used as a user ID. Another column  showed valid numeric Strings that can be cast to a numeric data type for ratings. Except for the lack of an item ID for games, expected by the ALS algorithm, and the presence of instances where the same member had two rows due to having purchased and played a game, the data was found to be suitable to allow for the development of the collaborative filtering solution with implicit feedback.

##3. Data Pre-Processing##
We began this stage by giving the columns meaningful names: memberID, gameName, behaviour, and hoursPlayed for clarity, using the `withColumnRenamed()` method of the DataFrame class. 

Next, we casted the hoursPlayed column to float as this column holds the values for hours played or one purchase which needed to be of numeric data type to be used as rating in the ALS algorithm.

Additionally, we ensured that whenever the behaviour column indicated a "purchase", the corresponding hoursPlayed value was overwritten as 1.0 in `steamDF = steamDF.withColumn("hoursPlayed", when(lower(trim(col("behaviour"))) == "purchase", 1.0).otherwise(col("hoursPlayed")))`, to  correct for any inconsistent values that may have been mistakenly recorded to indicate purchase in the hoursPlayed column.

Given that the Spark interface to access the ALS algorithm also requires both user (member) and item (game) integer IDs, we addressed the fact that, in the dataset, memberID was of String data type and there was no numeric identifier for games by transforming memberID and gameName both using PySpark's StringIndexer to produce the memberIndex and gameIndex columns, mapping each unique String value to a numeric index. These columns were then casted to Integer type to satisfy the input requirements of Spark’s ALS implementation.

Considering that, in the dataset, the same member can have two entries for the same game, one with the behaviour value "purchase", and one with "play", we  pivoted the data to unify the information so that instead of keeping separate rows for each behaviour, we generated a single row per member, with one column, `played_hours`, indicating hours played and another, `purchase_flag`, indicating whether the game was purchased. Missing values in either column were filled with zero. 

Subsequently,  we created two DataFrames that would allow us to model two variants of implicit feedback. The first DataFrame, steamDF_playOnly,  intended for training ALS on playtime alone, was prepared by selecting only the `played_hours` column as the rating and filtering out all zero-hour rows. The second DataFrame, steamDF_playAndPurchase, was constructed by computing a new rating as the sum of `played_hours` and `purchased_flag` multiplied by a weight set to 5.0, indicating the relative importance of the purchase implicit feedback for the model, a hyperparameter that can be adjusted based on domain knowledge or for model tuning. This way, we combined both types of relationships between a member and a game into a single implicit feedback rating. In both DataFrames, we kept only the relevant columns (memberIndex, gameIndex, and rating) as expected by ALS.

Given that ALS with implicit feedback treats higher implicit feedback values as stronger signals of user preference, the model could have been at risk of overfitting due to the presence of outliers with extremely high rating values — for instance, prolonged game sessions accumulating to thousands of hours. To understand the distribution of ratings in both DataFrames, we used .describe() to calculate summary statistics such as count, mean, standard deviation, minimum, and maximum. We then visualised the distributions by binning the ratings into 100 intervals using `(floor(col("rating") / 100) * 100)` and registering the resulting DataFrames as temporary views. These were then queried via `SELECT * FROM SQL` statements and visualised as bar charts using the Databricks built-in visualisation functionality.

The resulting charts confirmed a heavy skew in both DataFrames, with most feedback in the 0-100 bin, and a long tail of high values. To mitigate this skew and reduce the effect of extreme outliers, we computed the 95th percentile of the rating values in both datasets using `approxQuantile("rating", [0.95], 0.01)[0]`. For Play Only, the threshold was 152.0, and for Play and Purchase (considering only play values), it was 72.0. Ratings above these thresholds were capped to their respective 95th percentile values. This helped stabilise the data distributions while preserving relative differences in implicit feedback strength.

In [0]:
# Rename columns for clarity
steamDF = steamDF_raw.withColumnRenamed("_c0", "memberID") \
                     .withColumnRenamed("_c1", "gameName") \
                     .withColumnRenamed("_c2", "behaviour") \
                     .withColumnRenamed("_c3", "hoursPlayed")

In [0]:
# Cast "hoursPlayed" column from String to float
steamDF = steamDF.withColumn("hoursPlayed", col("hoursPlayed").cast("float"))

In [0]:
# Overwrites "hoursPlayed" column to ensure that when the behaviour column value is "purchase", the "hoursPlayed" column value is always 1, correcting inconsistent values
steamDF = steamDF.withColumn("hoursPlayed", when(lower(trim(col("behaviour"))) == "purchase", 1.0).otherwise(col("hoursPlayed")))

In [0]:
# Pivot and combine play/purchase
# convert each member/game pair into a single row with both playtime and purchase info.

# Pivot the behaviour column to get separate columns for 'play' and 'purchase'
pivotedDF = steamDF.groupBy("memberID", "gameName").pivot("behaviour").agg(spark_max("hoursPlayed"))

# Rename the new columns to make them clearer
pivotedDF = pivotedDF.withColumnRenamed("play", "played_hours").withColumnRenamed("purchase", "purchased_flag")

# Replace nulls resulting from the pivoting with 0
pivotedDF = pivotedDF.fillna(0, subset=["played_hours", "purchased_flag"])

In [0]:
pivotedDF.printSchema()

root
 |-- memberID: string (nullable = true)
 |-- gameName: string (nullable = true)
 |-- played_hours: double (nullable = false)
 |-- purchased_flag: double (nullable = false)



In [0]:
# Show sample of the pivoted DataFrame
pivotedDF.withColumn("played_hours", spark_round(col("played_hours"), 2)).limit(20).display()

memberID,gameName,played_hours,purchased_flag
60296891,Batman Arkham Asylum GOTY Edition,0.0,1.0
86055705,Destination Sol,0.0,1.0
30246419,Thief Deadly Shadows,0.0,1.0
64479113,Team Fortress Classic,0.0,1.0
80164199,FTL Faster Than Light,11.8,1.0
158944454,Warface,1.4,1.0
75328197,Call of Duty Black Ops - Multiplayer,5.8,1.0
105782521,Terraria,0.2,1.0
105384518,Lunar Flight,5.5,1.0
33706322,Saints Row The Third,2.0,1.0


In [0]:
# Create numeric indexes for members and games using the memberID and gameName Strings
# Required for ALS. Spark MLlib needs user and item IDs as integers or floats

# Instantiates and object of the Class StringIndexer to map each distinct memberID (string) to a numeric index
user_indexer = StringIndexer(
    inputCol="memberID",     # The source column with user identifiers (Strings)
    outputCol="memberIndex", # The destination column for numeric user IDs (the generated indices)
    handleInvalid="skip"     # Skip rows with unexpected/null values
)

# The user_indexer StringIndexer scans the memberID column to learn the mapping by identifying all unique values and assigning a numeric index to each (for instance memberID '151603712' generates memberIndex 0.0). 
# Returns a StringIndexModel
member_indexer_model = user_indexer.fit(pivotedDF)

# Applies the memberIndexerModel with the mapping to the DataFrame. 
# Returns the new DataFrame with the 'memberIndex' column.
pivotedDF =  member_indexer_model.transform(pivotedDF)




# Instantiates and object of the Class StringIndexer to map each distinct gameName (String) to a numeric index
game_indexer = StringIndexer(
    inputCol="gameName",          # The source column with game names
    outputCol="gameIndex",      # The destination column for numeric game IDs (the generated indices)
    handleInvalid="skip"      # Skip rows with unexpected/null values
)

# The game_indexer StringIndexer scans the gameName column to learn the mapping by identifying all unique values and assigning a numeric index to each (for instance gameName 'The Elder Scrolls V Skyrim' generates gameIndex 0.0). 
# Returns a StringIndexModel
game_indexer_model = game_indexer.fit(pivotedDF)

# Applies the game_Indexer_Model with the mapping to the DataFrame. 
# Returns the new DataFrame with the 'gameIndex' column.
pivotedDF = game_indexer_model.transform(pivotedDF)

In [0]:
# Cast memberIndex and gameIndex to IntegerType for ALS compatibility
pivotedDF = pivotedDF.withColumn("memberIndex", col("memberIndex").cast(IntegerType()))
pivotedDF = pivotedDF.withColumn("gameIndex", col("gameIndex").cast(IntegerType()))

In [0]:
# Show sample of the pivoted DataFrame with the gameIndex and memberIndex column
pivotedDF.withColumn("played_hours", spark_round(col("played_hours"), 2)).limit(20).display()

memberID,gameName,played_hours,purchased_flag,memberIndex,gameIndex
60296891,Batman Arkham Asylum GOTY Edition,0.0,1.0,229,123
86055705,Destination Sol,0.0,1.0,50,1352
30246419,Thief Deadly Shadows,0.0,1.0,2,559
64479113,Team Fortress Classic,0.0,1.0,4321,37
80164199,FTL Faster Than Light,11.8,1.0,2170,150
158944454,Warface,1.4,1.0,2046,48
75328197,Call of Duty Black Ops - Multiplayer,5.8,1.0,6545,70
105782521,Terraria,0.2,1.0,413,24
105384518,Lunar Flight,5.5,1.0,303,1832
33706322,Saints Row The Third,2.0,1.0,292,64


In [0]:
# Show the pre-processed DataFrame schema to confirm memberIndex and gameIndex casting
pivotedDF.printSchema()

root
 |-- memberID: string (nullable = true)
 |-- gameName: string (nullable = true)
 |-- played_hours: double (nullable = false)
 |-- purchased_flag: double (nullable = false)
 |-- memberIndex: integer (nullable = true)
 |-- gameIndex: integer (nullable = true)



In [0]:
# Prepares a DataFrame ready for a model that uses hours played as implicit feedback

# Rename the column to be used as rating (played_hours) as "rating" for consitency with the ALS paramaters name
steamDF_playOnly = pivotedDF.withColumnRenamed("played_hours", "rating")


# Keep only the relevant columns and where rating (hours played) is not zero 
# This represents the list of member/game iteration which will be inputed into the ALS algorithm
steamDF_playOnly = steamDF_playOnly.select("memberIndex", "gameIndex", "rating").filter("rating != 0")

# Show a preview
steamDF_playOnly.withColumn("rating", spark_round(col("rating"), 2)).limit(10).display()

memberIndex,gameIndex,rating
2170,150,11.8
2046,48,1.4
6545,70,5.8
413,24,0.2
303,1832,5.5
292,64,2.0
2000,83,93.0
183,191,25.0
1254,3752,5.3
115,1109,1.2


In [0]:
# Describe min/max/avg values of the play only rating
print("Ratings summary - play only:")
steamDF_playOnly.describe("rating").withColumn("rating",format_number(col("rating").cast("double"), 2)).display()

Ratings summary - play only:


summary,rating
count,70477.0
mean,48.89
stddev,229.35
min,0.1
max,11754.0


In [0]:
# Bins playtime into 100 intervals to create a frequency distribution

# Bin the rating values (group into 100-hour buckets)
playOnly_binned = steamDF_playOnly.withColumn("rating_bin", (floor(col("rating") / 100) * 100))

# Create a new DataFrame with the count of user-game interactions in each 100-hour rating bin representing a frequency distribution of playtime across buckets
rating_distribution_playOnly = playOnly_binned.groupBy("rating_bin").count().orderBy("rating_bin")

# Register the new DataFrame as a temp view
rating_distribution_playOnly.createOrReplaceTempView("rating_distribution_playonly")

In [0]:
%sql
--Query the binned playtime distribution from the temporary view to show how many user-game interactions fall into each 100-hour rating bucket.
-- The result table is used to produce a bar chart for visualisation
SELECT * FROM rating_distribution_playonly

rating_bin,count
0,64696
100,2399
200,1006
300,536
400,392
500,267
600,176
700,155
800,134
900,90


Databricks visualization. Run in Databricks to view.

In [0]:
# Prepares a DataFrame ready for a model that combines "play" and "purchase" behaviours as feedback

# Scale the weight of "purchase" in the rating 
# Different weights for purchase can be assigned based on the domain knowledge and the relative importance of purchase vs hours played or for experimentation and tuning 
purchase_weight = 5.0

# Combines playtime and purchase info into a single rating to serve as unified implicit feedback
# This also renames the "hours_played" column to "rating" for consistency with the ALS parameter name
steamDF_playAndPurchase = pivotedDF.withColumn("rating", col("played_hours") + (col("purchased_flag") * purchase_weight))

# Select ALS-required columns only
steamDF_playAndPurchase = steamDF_playAndPurchase.select("memberIndex", "gameIndex", "rating")

# Show a preview
steamDF_playAndPurchase.withColumn("rating", spark_round(col("rating"), 2)).limit(10).display()

memberIndex,gameIndex,rating
229,123,5.0
50,1352,5.0
2,559,5.0
4321,37,5.0
2170,150,16.8
2046,48,6.4
6545,70,10.8
413,24,5.2
303,1832,10.5
292,64,7.0


In [0]:
# Describe min/max/avg values of the play and purchase rating
print("Ratings summary - purchase + play:")
steamDF_playAndPurchase.describe("rating").withColumn("rating",format_number(col("rating").cast("double"), 2)).display()

Ratings summary - purchase + play:


summary,rating
count,128804.0
mean,31.75
stddev,171.39
min,5.0
max,11759.0


In [0]:
# Bins rating values into intervals of 100 to create a frequency distribution

# Bin the rating values in steamDF_playAndPurchase)
playAndPurchase_binned = steamDF_playAndPurchase.withColumn("rating_bin", (floor(col("rating") / 100) * 100))

# Create a new DataFrame with the count of user-game interactions in each 100-unit rating bin, representing a frequency distribution across rating ranges
rating_distribution_playAndPurchase = playAndPurchase_binned.groupBy("rating_bin").count().orderBy("rating_bin")

# Register the new DataFrame as a temp view
rating_distribution_playAndPurchase.createOrReplaceTempView("rating_distribution_playAndPurchase")

In [0]:
%sql
-- Query the binned playtime distribution from the temporary view to show how many user-game interactions fall into each 100-hour rating bucket.
-- The result table is used to produce a bar chart for visualisation
SELECT * FROM rating_distribution_playAndPurchase

rating_bin,count
0,122809
100,2548
200,1032
300,557
400,393
500,276
600,178
700,148
800,140
900,92


Databricks visualization. Run in Databricks to view.

In [0]:
# Get the 95th percentile of playtime (ratings > 1) in each model

# For Play Only rating dataframe
percentile_95_play = steamDF_playOnly.approxQuantile("rating", [0.95], 0.01)[0]

# For Play and Purchase rating dataframe (only playtime rows)
percentile_95_playPurchase = steamDF_playAndPurchase.filter("rating > 1.0").approxQuantile("rating", [0.95], 0.01)[0]

print(f"95th percentile - Play Only: {percentile_95_play}")
print(f"95th percentile - Play and Purchase (only play values): {percentile_95_playPurchase}")

95th percentile - Play Only: 152.0
95th percentile - Play and Purchase (only play values): 72.0


In [0]:
# Capping steamDF_playOnly to 95% percentile

steamDF_playOnly_capped = steamDF_playOnly.withColumn("rating",when(col("rating") > percentile_95_play, percentile_95_play).otherwise(col("rating")))

In [0]:
# Display summary statistics to confirm the effect of the capping
print("Ratings summary - play only / After capping:")
steamDF_playOnly_capped.describe("rating").withColumn("rating",format_number(col("rating").cast("double"), 2)).display()

Ratings summary - play only / After capping:


summary,rating
count,70477.0
mean,22.26
stddev,40.49
min,0.1
max,152.0


In [0]:
# Capping steamDF_playAndPurchase to 95% percentile

# Cap played_hours before computing the combined rating
steamDF_playAndPurchase_capped = pivotedDF.withColumn("played_hours", when(col("played_hours") > percentile_95_playPurchase, percentile_95_playPurchase).otherwise(col("played_hours")))

# Recalculate final rating after capping playtime
steamDF_playAndPurchase_capped = steamDF_playAndPurchase_capped.withColumn("rating", col("played_hours") + (col("purchased_flag") * purchase_weight)).select("memberIndex", "gameIndex", "rating")

In [0]:
# Display summary statistics to confirm the effect of the capping
print("Ratings summary - play and purchase / After capping:")
steamDF_playAndPurchase_capped.describe("rating").withColumn("rating",format_number(col("rating").cast("double"), 2)).display()

Ratings summary - play and purchase / After capping:


summary,rating
count,128804.0
mean,13.8
stddev,18.97
min,5.0
max,77.0


During the pre processing stage, the inspection of the preprocessed data confirmed that the pivoting of behaviour values  produced the intended structure with one row per member/game pair and separate columns for played hours and  purchase. 

Similarly, the step to ensure purchases were always assigned a value of 1.0 was also confirmed in the preview, where interactions involving purchase behaviour appeared with played_hours = 0 and purchased_flag = 1.

The transformation of memberID and gameName into the numerical memberIndex and gameIndex columns using  the StringIndexer was also successful, as shown in the the schema output confirming both columns were properly casted to integer to conform with the ALS algorithm requirements.

The visualisation of samples from the first prepared DataFrame, steamDF_playOnly,  included only hours played interactions with non-zero durations, while the second DataFrame, steamDF_playAndPurchase, sample visualisations confirmed the combination of both playtime and purchase information into a single rating, consequently confirming that the two DataFrames could be used as input for model training under two different feedback assumptions.

The steamDF_playOnly,  statistics summary showed 70,477 interactions with non-zero playtime, and a highly skewed distribution of rating values with an average of 48.89 hours and a maximum of 11,754 hours. The corresponding bar chart confirmed a typical long-tail distribution, where most user-game interactions were in the 0–100 hours range, while a small number of member/game interactions involved very high accumulated play hours.

The statistics summary of the steamDF_playAndPurchase DataFrame included 128,804 member-game pairs, combining playtime and purchase indicators into a single implicit feedback score, the inclusion of the purchase interaction, in this case with a weight value of five,  as expected, caused the mean to drop to 31.75, but the data was still right-skewed again due to long-duration play interactions, as confirmed by the bar chart of 100-binned rating distribution.

To address the risk of overfitting from high outlier rating values, the 95th percentile of playtime ratings was calculated, 152.0 for steamDF_playOnly, and 72.0 for the play component of steamDF_playAndPurchase and these values were then used to cap ratings in both DataFrames, reducing the influence of outliers above these thresholds.

The effect of capping was confirmed by generating a summary of statistics for each DataFrame, showing that the mean and standard deviation were considerably reduced. For example, the average rating in the steamDF_playAndPurchase dataset dropped from 31.75 to 13.80, and its standard deviation went from 171.39 to 18.97,  showing a substantial reduction in the effect of extreme values.

##4. Model Training##

We began this stage by splitting the preprocessed and capped datasets into training and testing sets using an 80/20 ratio and setting a fixed seed (seed=42) to ensure reproducibility. This is performed independently for the two datasets prepared.

To train the models, we define a reusable function `train_baseline_model()` that logs parameters and performance metrics using MLflow and returns the trained ALS model along with its predictions. The function signature includes three parameters intended to receive as arguments the corresponding training and testing sets as well as a model label String to identify the training dataset the model was trained on. Inside the function, we configured a baseline ALS (Alternating Least Squares) model using the following default hyperparameters:

- rank=10: Number of latent features.

- regParam=0.1: Regularisation to prevent overfitting.
 
- maxIter=10: Maximum number of alternating optimisation steps.
 
- implicitPrefs=True: Indicates the use of implicit feedback.
 
- coldStartStrategy="drop": Ensures that predictions are evaluated only for games and users seen during training.

In the function, the fit() method takes the training data set as input to train the ALS model. Upon receiving the input DataFrame containing the list of known member/games interaction in the testing set, the ALS algorithm implementation in Spark MLlib internally carries out initial verification of data types and formats. Then, the DataFrame is converted into an indexed sparse matrix format, mapping each member to the games they have actually played along with the corresponding rating (representing either the number of hours played or the number of hours played and the purchase behaviour, depending on the feature engineering approach for the input dataset) and each game to each of the members that have played that game and the corresponding rating, effectively compactifying the input DataFrame by not including the instances of null member/game interactions in the input DataFrame.

Next, a vector is initialised for each member and game. These vectors are lists of, initialy random, n values, where n is given by the Rank hyperparameter. Each value in a given position within a game vector represents the level of association of the game with a latent game feature, while the corresponding value within a member vector represents the member's level of preference or how much the member cares about the corresponding latent game feature in the same position. These game features are in a hidden state as their association with real game features such as "action genre" or "made by Activision" is not explicit. Consequently a game vector indicates the relative importance of game features associated to a game and, likewise, a member vector indicates the relative importance of the member's preferences for those game features. Therefore, the dot product between a member vector and a game vector, the degree of alignment between vectors, tends to be higher when the relative importance across features is similar, indicating a high affinity and vice versa. Altogether, the set of game and member vectors can be thought of as occupying an n-dimensional feature space. Each of the values within a vector indicates the vector position in an n-axis in the n-dimensional feature space and together represent the location or coordinates of the game or member in the same space. 

Then, each possible game/member combination is given a preference value. A preference value of 1 denotes that there is a corresponding rating within the sparse matrix, indicating an interaction between a user and a game, and a value of zero denotes that there is no rating associated with a particular user/member combination in the sparse matrix, indicating the absence of interaction.

The goal of ALS is to find values for user and game vectors in the training set such that the dot product between them approximates an affinity score, a real number value, that is closer to 1 for member/game pairs with preference value of 1 and closer to zero for member/game pairs with preference value of zero. This is achieved by adjusting the vector values via computing a loss function accross all user-game combinations as the squared difference between the predicted affinity score (the dot product of the user and game vectors) and the preference value associated to the member/game pair, multiplied by a confidence value for that interaction that allows the model to give more emphasis to stronger interactions (larger rating).  Finally, a regularisation term is added to the loss function to prevent overfitting by discouraging excessively large values in the vectors.

The confidence value is used in the loss function because the presence or absence of interaction alone does not capture the strength of the member's preference to purchase or purchase and play a game, hence a confidence value is associated to each member/game combination to assign them weights, indicating the strength of a member/game interaction to indicate a preference. The value is calculated by adding the preference value to the rating value scaled by multiplying it by a fixed alpha value with the purpose of emphasising and magnifying the magnitude of the rating. ​Consequently, the confidence value encapsulates the member's preference for a game, indicating the presence or absence of purchase or purchase and play behaviour, depending on the feature engineering approach in our models, and the strength of that preference. The confidence value expresses how strongly the algorithm should trust an observed interaction. 

The regularisation term is added to the loss function to discourage the vector values from growing too large and to help prevent overfitting. This term has a penalisation effect whose extent is the result of multiplying the regParam hyperparameter by the result of adding the sum of the squared values in the member vector and the sum of the squared values in the game vector. The value of the regParam determines the magnitude of the penalisation effect of the regularisation term in the loss function. 

To minimise the loss function mentioned above, the ALS algorithm factorises the user-game interaction matrix into two sets of vectors: one for users (members) and one for games. It then uses an alternating least squares approach to learn the values of these vectors. On each iteration, it fixes the values in one set of vectors (either member or game) and solves for the other. This optimisation is repeated over several iterations, as specified by the maxIter hyperparameter, progressively improving the estimates of both sets of vectors. When solving for a specific vector (for example, a game vector), the algorithm considers all users who interacted with that game and calculates the dot product between each of those fixed user vectors and the unknown game vector. Ideally, this dot product would exactly match the preference value for each of those user-game interactions. However, since there is no single vector that can produce a perfect dot product for every case, the algorithm instead looks for a compromise by finding values for the unknown vector that minimises the total confidence-weighted squared error across all of those users, which is exactly what the loss function represents. 

To compute the values for each vector during optimisation, ALS solves for one unknown vector at a time. For each, a system of linear equations is derived by taking the partial derivatives of the loss function with respect to each unknown element, setting them to zero, and solving the resulting system using matrix operations. Once all vectors have been updated for the current side (either games or members), the roles are reversed and the process continues on the other side. This cycle repeats until the algorithm converges or reaches the maximum number of iterations.

After the trainig data set is fitted to the model inside the function, the trained model is used to generated predicitons (ratings) on the testing set with the `transform()` method. These predictions were later used for the evaluation of the model.

The function returns the trained model and its label and predictions on the testing set.

In [0]:
# Splitting data into training and test sets
# Splits 80% training, 20% testing
# seed=42 Ensures the same split each time for reproducibility

# For model using only 'play' behaviour
train_playOnly, test_playOnly = steamDF_playOnly_capped.randomSplit([0.8, 0.2], seed=42)

print(f"Play Only Model:")
print(f"Training set size: {train_playOnly.count()}")
print(f"Test set size: {test_playOnly.count()}")

# For model using both 'play' and 'purchase' behaviours
train_playAndPurchase, test_playAndPurchase = steamDF_playAndPurchase_capped.randomSplit([0.8, 0.2], seed=42)

print(f"\nPlay and Purchase Model:")
print(f"Training set size: {train_playAndPurchase.count()}")
print(f"Test set size: {test_playAndPurchase.count()}")

Play Only Model:
Training set size: 56613
Test set size: 13864

Play and Purchase Model:
Training set size: 103168
Test set size: 25636


In [0]:
# Function to train and log to MLflow a baseline model, returning the model, its label and predictions
# model_label allows to recognise whether a model was trained on play only or play and purchase data
def train_baseline_model(train_df, test_df, model_label):
     #Starts and names a run in the MLflow experiment
    with mlflow.start_run(run_name=f"Training_{model_label}_Baseline"):
        # Log the size of the training and test datasets to MLflow
        mlflow.log_param("train_size", train_df.count())
        mlflow.log_param("test_size", test_df.count())
        
        # Print a message to indicate which model is being trained with default ALS hyperparameters
        print(f"\nTraining {model_label} with default ALS hyperparameters")

        # Define the ALS model
        als = ALS(
            userCol="memberIndex", # Column representing the user (member) id
            itemCol="gameIndex", # Column representing the item (game) id
            ratingCol="rating", # Column representing the user/item  (member/game) interaction (implicit feedback)
            implicitPrefs=True, # True = Implicit feedback
            coldStartStrategy="drop", # Drops NaNs during evaluation on testing set (due to unseen users/items during testing)
            rank=10, # Number of latent factors (dimensionality of user/item feature vectors)
            regParam=0.1, # Regularisation to prevent overfitting
            maxIter=10 # Number of training iterations
        )

        # Train the model (fit the model on the training set)
        model = als.fit(train_df)

        # Make predictions (of ratings) on the test set
        predictions = model.transform(test_df)

        # Logs the trained model to MLflow tracking (Allow to reload the model if needed later)
        mlflow.spark.log_model(model, f"{model_label}_baseline_model")

        # Log the trained model's configuration and label to MLflow
        mlflow.log_param("rank", 10)
        mlflow.log_param("regParam", 0.1)
        mlflow.log_param("maxIter", 10)
        mlflow.log_param("implicitPrefs", True)
        mlflow.log_param("model", model_label)
        

    # Returns a dictionary with the model, its label and predictions
    return {"model_label": model_label, "model": model, "predictions": predictions}

In [0]:
# Baseline model using Play only
# model_label allows to recognise whether a model was trained on play only or play and purchase data
baseline_model_playOnly = train_baseline_model(train_playOnly, test_playOnly, model_label="Model_PlayOnly")


Training Model_PlayOnly with default ALS hyperparameters




In [0]:
# Baseline model using Play and Purchase
# model_label allows to recognise whether a model was trained on play only or play and purchase data
baseline_model_playAndPurchase = train_baseline_model(train_playAndPurchase, test_playAndPurchase, model_label="Model_PlayAndPurchase")


Training Model_PlayAndPurchase with default ALS hyperparameters


The data split yielded 56,613 training member/game interactions and 13,864 test member/game interactions from the dataset for the training using hours played as implicit feedback and  103,168 training member/game interactions and 25,636 test member/game interactions from the dataset for the training using hours played and purchase behaviour as implicit feedback, confirming that the full datasets were correctly split without loss and that we had a substantial volume of data for training both models.

Subsequently, the training process of both baseline models was completed successfully using default ALS hyperparameters, and predictions were generated on their respective test sets. 

At this stage, the models have learned the latent vectors for each member and game, with embedded preferences influenced by either the hours played or the combined purchase behaviour and hours played and predictions of the trained models on their corresponding test data sets are ready to be used for model evaluation.

##5. Hyperparameter Tunning##

In this stage, we developed the 'train_als_with_gridsearch' function which replicates the  'train_baseline_model' but additionally implements a hyperparameter grid intending to improve the baseline performance of the ALS models by tuning their hyperparameters through a grid search approach. ALS performance can be sensitive to the configuration of certain hyperparameters, particularly rank, which determines the number of latent features, and regParam, which controls the degree of regularisation to prevent overfitting. Therefore, instead of relying on default values, the function uses a Python nested loop control flow structure to iterate over all the possible combinations of a set of rank values and regParam values. maxIter is fixed at 10 to limit training time while allowing the models to converge adequately. All the model objects generated and their, configurations, predictions on their test data sets and labels are saved in the `all_models_info = []` list and the list is returned for further evaluation and comparison of models. The label saved to the list for each model is a constructed an unique label so that the model can be identified not only by the data it was trained on but also the hyperparameters used for hypertuning.

In [0]:
# Function to train and log a model with hyperparameter tunning, returning the models, their labels and predictions
# Loops through combinations of rank and regParam. Tracks all model using MLflow, logging all relevant hyperparameters
def train_als_with_gridsearch(train_df, test_df, model_label):
    # Define parameter grid for gridsearch (hyperparameter tuning)
    ranks = [5, 15]
    regParams = [0.001, 0.01, 0.05]
    maxIter = 10

    # List to store all the models and their predictions
    all_models_info = []

    # Grid Search Loop iterates over each combination of hyperparameters to train the model
    for rank in ranks:
        for reg in regParams:
            # Uses the model label and hyperparameter values to construct a name for the run
            run_name = f"Training_{model_label}_rank{rank}_reg{reg}_iter{maxIter}"
            # Starts and names (using the constructed name) a run in the MLflow experiment for this loop iteration
            with mlflow.start_run(run_name=run_name):
                
                # Log the size of the training and test datasets to MLflow
                mlflow.log_param("train_size", train_df.count())
                mlflow.log_param("test_size", test_df.count())

                # Display current iteration model and hyperparameters
                print(f"\nTraining {model_label} with rank={rank}, regParam={reg}, maxIter={maxIter}")

                # Define the ALS model
                als = ALS(
                    userCol="memberIndex", # Column representing the user (member) id
                    itemCol="gameIndex", # Column representing the item (game) id
                    ratingCol="rating", # Column representing the user-item interaction (implicit feedback)
                    implicitPrefs=True, # True = Implicit feedback
                    coldStartStrategy="drop", # Drops NaNs during evaluation on testing set (due to users/games only present in the testing set so not seen during training)
                    rank=rank, # Number of latent factors (dimensionality of user/item feature vectors)
                    regParam=reg, # Regularisation to prevent overfitting
                    maxIter=maxIter # Number of training iterations
                )

                # Train the model (fit the model on the training set)
                model = als.fit(train_df)

                # Make predictions (ratings) on the test set 
                predictions = model.transform(test_df)

                # Logs the trained model to MLflow trackin (Allow to reload the model if needed later)
                mlflow.spark.log_model(model, f"{model_label}_rank{rank}_reg{reg}_model")

                # Appends the trained model's metadata to all_models_info list
                all_models_info.append({
                    # This constructs a unique label so that the model can be identified not only by the data it was trained on but also the hyperparameters used for hypertuning.
                    "model_label": f"{model_label}_rank{rank}_reg{reg}",
                    "rank": rank,
                    "regParam": reg,
                    "model": model,
                    "predictions": predictions
                })

                # Log current iteration's model configuration and label to MLflow
                mlflow.log_param("model", model_label)
                mlflow.log_param("rank", rank)
                mlflow.log_param("regParam", reg)
                mlflow.log_param("maxIter", maxIter)
                mlflow.log_param("implicitPrefs", True)

    # Return the list with all the models and their predictions
    return all_models_info

In [0]:
# Play only models with hyperparameter tuning (returns list of models dictionary)
# model_label allows to recognise whether a model was trained on play only or play and purchase data
tuned_models_playOnly = train_als_with_gridsearch(train_playOnly, test_playOnly, model_label="Model_PlayOnly")


Training Model_PlayOnly with rank=5, regParam=0.001, maxIter=10

Training Model_PlayOnly with rank=5, regParam=0.01, maxIter=10

Training Model_PlayOnly with rank=5, regParam=0.05, maxIter=10

Training Model_PlayOnly with rank=15, regParam=0.001, maxIter=10

Training Model_PlayOnly with rank=15, regParam=0.01, maxIter=10

Training Model_PlayOnly with rank=15, regParam=0.05, maxIter=10


In [0]:
# Play and Purchase models with hyperparameter tuning (returns list of models dictionary)
# model_label allows to recognise whether a model was trained on play only or play and purchase data
tuned_models_playAndPurchase = train_als_with_gridsearch(train_playAndPurchase, test_playAndPurchase, model_label="Model_PlayAndPurchase")


Training Model_PlayAndPurchase with rank=5, regParam=0.001, maxIter=10

Training Model_PlayAndPurchase with rank=5, regParam=0.01, maxIter=10

Training Model_PlayAndPurchase with rank=5, regParam=0.05, maxIter=10

Training Model_PlayAndPurchase with rank=15, regParam=0.001, maxIter=10

Training Model_PlayAndPurchase with rank=15, regParam=0.01, maxIter=10

Training Model_PlayAndPurchase with rank=15, regParam=0.05, maxIter=10


The tuning procedure was completed successfully for both model variations, one using hours played and the other using purchase and hours played as implicit feedback, producing twelve tuned models each (6 for each model variation) representing different combinations of latent space feature dimensions via the values in the rank hyperparameter with different levels of regularisation to prevent overfitting via the regParam hyperparameter. At this point all these models and their, configurations, predictions on their test data sets and labels saved in a list were available for further evaluation and comparison.

##6. Model Evaluation##

After the training stage, we proceeded to evaluate all the trained ALS models, both baseline and hyperparameter-tuned, using three complementary metrics: Root Mean Squared Error (RMSE), Precision@10, and Recall@10. 

RMSE evaluates the prediction errors between actual and predicted ratings using the predictions made by the model on the test set. 

The RMSE metric is useful for tracking relative model improvement across different hyperparameters configurations, therefore we implemented the function `evaluate_rmse(model, model_label, predictions)` that uses an instance of the Spark's RegressionEvaluator object to calculate this metric. Internally, the metric is calculated 
as the square root of the mean squared differences between the model's predicted rating and actual rating across the member/game interactions in the test set. The
differences are squared to penalise larger errors more heavily, since squaring amplifies the impact of larger differences more than smaller ones. Taking the square root then brings the error back to the same scale as the target variable, making it easier to interpret. Given that RMSE evaluates rating prediction accuracy but does not capture the quality of recommendations, we also produced Recall@N and Precision@N metrics to evaluate the quality of ten recommendations made by each model.

Precision@N assesses the relevance of the recommendations made by the model to members on the testing set. It meassures the proportion of games that were actually played by the user with respect to the number of recommendations, reflecting the model's ability to prioritise meaningful recommendations. In the code implementation, the function `evaluate_precision(model, test_df, model_label, n):` is used to calculate the Precision@10 metric for each of the models generated. This function, takes for input a model object, its corresponding training set and a label that allows to identify within the function whether the model is a play or play and purchase infered feedback model. The function then defines an inner function `precision_udf(actual, predicted)` that takes N predicted ratings made by the model for a member and the corresponding actual ratings recorded in the testing set and compares them calculating precision as ratio of relevant recommendations to total recommendations. The `precision_udf(actual, predicted)` function is wrapped up as a Spark User Defined Function (UDF) in ` precision_udf_spark = udf(precision_udf, DoubleType())` so it can be used with Spark Dataframes and executed in a distributed manner, returning the precision metric as a double data type value. An if-else conditional flow control structure was used within the function to select the correct testing set of the model being evaluated, extract the members included in the training set and intersect them with the corresponding testing set so that the evaluation of precision is carried out using predicitions made by the model on the testing set involving only members seen during the model's training, given the impossibility of an ALS trained model to generate correctly recommendations for users (members) not seen during its training. This was done out of an excess of caution given the size of the datasets involved. A helper function `get_top_n_recommendations(model, users_df, n)`, which implements Spark's `recommendForUserSubset` method, was used to generate the N recommendations for the members. 

Recall@10 measures the coverage of the recommendations made by the model to members on the testing set. It meassures the proportion of games played by the member that are in the generated recommendations with respect to the total number of games played by the member. A process similar to the calculation of Precision@N implemented in the function `evaluate_precision(model, test_df, model_label, n):` was implemented in the function `evaluate_recall(model, test_df, model_label, n):` to evaluate Recall@10 for each model.
       
Then the `evaluate_rmse`, `evaluate_precision` and `evaluate_recall` functions are invoked passing all the baseline and hypertuned models. The `evaluation_results` list saves all the model metrics in the process and a function `log_evaluation_to_mlflow(model_label, rmse, precision, recall)` starts experiment runs in MLflow and logs all the metrics for the models evaluated. Subsequently, the `evaluation_results` list is used to produce a DataFrame which is then displayed showing all the models' metrics ordered by ascending RMSE. Lastly, this DataFrame is ordered by presicion descending order and then by recall descending order to retrieve the first row of each resulting DataFrame and extract the model labels corresponding to the model's with best precision and best recall, respectively and then use these labels to retrieve the corresponding model objects and store them in the best_precision_model and best_recall_model variables in preparaction for generating recommendations and comparison. 


In [0]:
# Evaluate Root Mean Square Error (RMSE)
# RMSE is the square root of the mean squared differences between model's predicted rating and actual rating across the member/game interactions in the test set
#  Differences are squared to penalise larger errors more heavily, as squaring amplifies the impact of larger differences more than smaller ones. The square root then returns the mean of the squared differences to the original scale of the target variable, making it easier to interpret.
# Function to compute RMSE
def evaluate_rmse(model, model_label, predictions):
    
    print(f"Evaluating RMSE for {model_label}")
    
    # Initialise evaluator to evaluate the model using RMSE
    evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating", # Actual rating values in the training set
        predictionCol="prediction" # Model predicted ratings in the training set
    )
    # Compare predictions vs actual ratings in the test set to compute the RMSE
    # RMSE measures how close the predicted rating (hours played or 1 for purchase) was to the actual rating in the testing set
    rmse = evaluator.evaluate(predictions)
    return rmse

In [0]:
# Custom function to, given a model, compute Top_N recomendations for all users
def get_top_n_recommendations(model, users_df, n):
    # Uses spark recommendForUserSubset function to get the n recommendations
    recommendations = model.recommendForUserSubset(users_df, n)
    # Flattens the nested list containing the top-N recommendations per user so that each row 
    # contains a single (user, game) recommendation along with its predicted rating.
    return recommendations.withColumn("rec_exp", explode("recommendations")).select(
        col("memberIndex"),
        col("rec_exp.gameIndex").alias("gameIndex"),
        col("rec_exp.rating").alias("predicted_rating")
    )

In [0]:
# Evaluates Precision@N for a given model and test dataset, returning the average precision across all users.
# Precision@N measures how many of the top n games recommended to a user were actually played or purchased by the user in the test set (Relevance of recommendations)
def evaluate_precision(model, test_df, model_label, n):

    print(f"Evaluating Precision@{n} for {model_label}")

    # Defines a function to calculate precision for a single user by comparing actual vs. predicted game sets
    def precision_udf(actual, predicted):
        #Convert actual interactions to a set
        actual_set = set(actual)
        # Convert predicted interactions to a set
        predicted_set = set(predicted)
        # Count intersection between actual and predicted sets
        true_positives = len(actual_set & predicted_set)
        # Calculates precision as ratio of relevant recommendations to total recommendations
        precision = float(true_positives) / len(predicted_set) if predicted_set else 0.0
        # Return precision
        return precision

    # Converts the precision_udf function to a Spark User Defined Function so it can be used in DataFrame transformations
    precision_udf_spark = udf(precision_udf, DoubleType())


    # Set of users in the training set that have been seen during training (Avoids making suggestions on members not seen during trainig)
    # Uses the model_label to identify the training set used to train the model
    #If an instance of the substring "PlayOnly" is found in the label
    if "PlayOnly" in model_label: 
        # Distinct members in the train_playOnly set
        training_users_df = train_playOnly.select("memberIndex").distinct()
    else:
        # Distinct members in the train_playAndPurchase set
        training_users_df = train_playAndPurchase.select("memberIndex").distinct()
    # Distinct members in the test
    test_users_df = test_df.select("memberIndex").distinct()
    # Distinct members in the test set who are also in the training set
    users_df = training_users_df.intersect(test_users_df)
    

    # Get top-N recommendations for each distinct member in the test set who is also in the trainig set (Uses the custom function)
    recommendations = get_top_n_recommendations(model, users_df, n)

    # Get the actual games the members interacted with
    actual_df = test_df.select("memberIndex", "gameIndex").groupBy("memberIndex").agg(collect_set("gameIndex").alias("actual_games"))

    # Aggregates the Top-10 recommendations generated for each user into a set for comparison
    predicted_df = recommendations.select("memberIndex", "gameIndex").groupBy("memberIndex").agg(collect_set("gameIndex").alias("predicted_games"))

    # Join both actual and predicted for comparison
    comparison_df = actual_df.join(predicted_df, on="memberIndex", how="inner")

    # Applies the precision UDF to each row in the comparison Dataframe
    result_df = comparison_df.withColumn("precision", precision_udf_spark("actual_games", "predicted_games"))

    # Computes the average precision across all users
    avg_precision = result_df.agg({"precision": "avg"}).collect()[0]["avg(precision)"]

    # Returns average precision
    return avg_precision

In [0]:
# Evaluates Recall@N for a given model and test dataset, returning the average recall across all users.
# Recall@N measures how many of the games actually played by a user in the test set were found in the top n recommendations made by the model (Coverage of recommendations)
def evaluate_recall(model, test_df, model_label, n):
   
    print(f"Evaluating Recall@{n} for {model_label}")

    # Defines a function to calculate Recall@N for a single user based on actual and predicted game sets
    def recall_udf(actual, predicted):
        #Convert actual interactions to a set
        actual_set = set(actual)
        # Convert predicted interactions to a set
        predicted_set = set(predicted)
        # Count intersection between actual and predicted sets
        true_positives = len(actual_set & predicted_set)
        # Calculates recall as the ratio of relevant items retrieved (true positives) to all relevant items (actual set)
        recall = float(true_positives) / len(actual_set) if actual_set else 0.0
        # Return recall
        return recall

    # Converts the recall_udf function to a Spark User Defined Functio so it can be used in DataFrame transformations
    recall_udf_spark = udf(recall_udf, DoubleType())

    # Set of users in the training set that have been seen during training (Avoids making suggestions on members not seen during trainig)
    # Uses the model_label to identify the training set used to train the model
    #If an instance of the substring "PlayOnly" is found in the label
    if "PlayOnly" in model_label:
        # Distinct members in the train_playOnly set
        training_users_df = train_playOnly.select("memberIndex").distinct()
    else:
        # Distinct members in the train_playAndPurchase set
        training_users_df = train_playAndPurchase.select("memberIndex").distinct()
    # Distinct members in the test
    test_users_df = test_df.select("memberIndex").distinct()
    # Distinct members in the test set who are also in the training set
    users_df = training_users_df.intersect(test_users_df)
    
    
    # Get top-N recommendations for each distinct member in the test set who is also in the trainig set (Uses the custom function)
    recommendations = get_top_n_recommendations(model, users_df, n)

    # Get the actual games the members interacted with
    actual_df = test_df.select("memberIndex", "gameIndex").groupBy("memberIndex").agg(collect_set("gameIndex").alias("actual_games"))

    # Aggregates the Top-10 recommendations generated for each user into a set for comparison
    predicted_df = recommendations.select("memberIndex", "gameIndex").groupBy("memberIndex").agg(collect_set("gameIndex").alias("predicted_games"))

    # Join both actual and predicted for comparison
    comparison_df = actual_df.join(predicted_df, on="memberIndex", how="inner")

    # Applies the recall UDF to each row in the comparison Dataframe
    result_df = comparison_df.withColumn("recall", recall_udf_spark("actual_games", "predicted_games"))

    # Computes the average recall across all users
    avg_recall = result_df.agg({"recall": "avg"}).collect()[0]["avg(recall)"]

    # returns average recall
    return avg_recall

In [0]:
# Function to log RMSE, Precision@N, and Recall@N to MLflow for a given model label.
def log_evaluation_to_mlflow(model_label, rmse, precision, recall):
   # Starts and names (using the model label) a run in the MLflow experiment
    with mlflow.start_run(run_name=f"Evaluation_{model_label}"):
        # Log the model label and metrics in MLflow
        mlflow.log_param("model", model_label)
        mlflow.log_metric("RMSE", rmse)
        mlflow.log_metric("Precision@10", precision)
        mlflow.log_metric("Recall@10", recall)
        # Display confirmation
        print(f"Evaluation metrics logged to MLflow for {model_label}")
        print()

In [0]:
# Evaluate RMSE, Precision@10 and Recall@10 in all baseline and hypertuning models, stores results in a list and log metrics in MLflow

# List to store all the metrics
evaluation_results = []


# Baseline - Play Only model evaluation
rmse_base_play = evaluate_rmse(baseline_model_playOnly["model"], baseline_model_playOnly["model_label"], baseline_model_playOnly["predictions"])
precision_base_play = evaluate_precision(baseline_model_playOnly["model"], test_playOnly, baseline_model_playOnly["model_label"], n=10)
recall_base_play = evaluate_recall(baseline_model_playOnly["model"], test_playOnly, baseline_model_playOnly["model_label"], n=10)
# Logs the model's label and metrics to MLflow
log_evaluation_to_mlflow(baseline_model_playOnly["model_label"], rmse_base_play, precision_base_play, recall_base_play)
# Creates a dictionary containing the model's label and metrics and appends the dictionary to evaluation_results list
evaluation_results.append({"model_label": baseline_model_playOnly["model_label"], "rmse": rmse_base_play, "precision@10": precision_base_play, "recall@10": recall_base_play})


# Baseline Play and Purchase model evaluation
rmse_base_playPurchase = evaluate_rmse(baseline_model_playAndPurchase["model"], baseline_model_playAndPurchase["model_label"], baseline_model_playAndPurchase["predictions"])
precision_base_playPurchase = evaluate_precision(baseline_model_playAndPurchase["model"], test_playAndPurchase, baseline_model_playAndPurchase["model_label"], n=10)
recall_base_playPurchase = evaluate_recall(baseline_model_playAndPurchase["model"], test_playAndPurchase, baseline_model_playAndPurchase["model_label"], n=10)
# Logs the model's label and metrics to MLflow
log_evaluation_to_mlflow(baseline_model_playAndPurchase["model_label"], rmse_base_playPurchase, precision_base_playPurchase, recall_base_playPurchase)
# Creates a dictionary containing the model's label and metrics and appends the dictionary to evaluation_results list
evaluation_results.append({"model_label": baseline_model_playAndPurchase["model_label"], "rmse": rmse_base_playPurchase, "precision@10": precision_base_playPurchase, "recall@10": recall_base_playPurchase})


# Loop through Play Only hypertuning models stored in tuned_models_playOnly to evaluate each
for model_info in tuned_models_playOnly:
    # Gets the model object
    model = model_info["model"]
    # Gets the model's label   
    model_label = model_info["model_label"]
    # Gets the model's predictions
    predictions = model_info["predictions"]
    # Computes RMSE
    rmse = evaluate_rmse(model, model_label, predictions)
    # Compures precision@10
    precision = evaluate_precision(model, test_playOnly, model_label, n=10)
    # Computes recall@10
    recall = evaluate_recall(model, test_playOnly, model_label, n=10)
    # Logs model label and resulting metrics to MLflow
    log_evaluation_to_mlflow(model_label, rmse, precision, recall)
    # Creates a dictionary containing the model's label and metrics and appends the dictionary to evaluation_results list
    evaluation_results.append({"model_label": model_label,"rmse": rmse,"precision@10": precision,"recall@10": recall})

# Loop through Play and Purchase hypertuned models stored in tuned_models_playAndPurchase list to evaluate each
for model_info in tuned_models_playAndPurchase:
    # Gets the model object
    model = model_info["model"]
    # Gets the model's label
    model_label = model_info["model_label"]
    # Gets the model's predictions
    predictions = model_info["predictions"]
    # Computes RMSE
    rmse = evaluate_rmse(model, model_label, predictions)
    # Compures precision@10
    precision = evaluate_precision(model, test_playAndPurchase, model_label, n=10)
    # Computes recall@10
    recall = evaluate_recall(model, test_playAndPurchase, model_label, n=10)
    # Logs model label and resulting metrics to MLflow
    log_evaluation_to_mlflow(model_label, rmse, precision, recall)
    # Creates a dictionary containing the model's label and metrics and appends the dictionary to evaluation_results list
    evaluation_results.append({"model_label": model_label,"rmse": rmse,"precision@10": precision,"recall@10": recall})

Evaluating RMSE for Model_PlayOnly
Evaluating Precision@10 for Model_PlayOnly
Evaluating Recall@10 for Model_PlayOnly
Evaluation metrics logged to MLflow for Model_PlayOnly

Evaluating RMSE for Model_PlayAndPurchase
Evaluating Precision@10 for Model_PlayAndPurchase
Evaluating Recall@10 for Model_PlayAndPurchase
Evaluation metrics logged to MLflow for Model_PlayAndPurchase

Evaluating RMSE for Model_PlayOnly_rank5_reg0.001
Evaluating Precision@10 for Model_PlayOnly_rank5_reg0.001
Evaluating Recall@10 for Model_PlayOnly_rank5_reg0.001
Evaluation metrics logged to MLflow for Model_PlayOnly_rank5_reg0.001

Evaluating RMSE for Model_PlayOnly_rank5_reg0.01
Evaluating Precision@10 for Model_PlayOnly_rank5_reg0.01
Evaluating Recall@10 for Model_PlayOnly_rank5_reg0.01
Evaluation metrics logged to MLflow for Model_PlayOnly_rank5_reg0.01

Evaluating RMSE for Model_PlayOnly_rank5_reg0.05
Evaluating Precision@10 for Model_PlayOnly_rank5_reg0.05
Evaluating Recall@10 for Model_PlayOnly_rank5_reg0.05


In [0]:
# Display Evaluation Summary

# Convert list of metrics to Spark DataFrame
evaluation_df = spark.createDataFrame([Row(**row) for row in evaluation_results])

# Display the summary table ordered by RMSE
print("Summary of Evaluation Metrics Ordered by Ascending RMSE")
evaluation_df.orderBy("rmse").display()

Summary of Evaluation Metrics Ordered by Ascending RMSE


model_label,rmse,precision@10,recall@10
Model_PlayAndPurchase_rank15_reg0.001,22.81787103077415,0.0741886299281219,0.315048316783646
Model_PlayAndPurchase_rank5_reg0.001,22.819261244925396,0.0653234589414074,0.2642329453172939
Model_PlayAndPurchase_rank15_reg0.01,22.822413126887025,0.0739054672184726,0.3151000533982104
Model_PlayAndPurchase_rank15_reg0.05,22.82861544937509,0.0725114354171219,0.3104944818374553
Model_PlayAndPurchase,22.830322090709014,0.0711609671095635,0.3024937037572767
Model_PlayAndPurchase_rank5_reg0.01,22.830924360370908,0.0642343715966022,0.2613090806328523
Model_PlayAndPurchase_rank5_reg0.05,22.836230134906145,0.0636462644304073,0.2597848600726621
Model_PlayOnly_rank5_reg0.001,45.09940210765486,0.0538215542053073,0.242081473162181
Model_PlayOnly_rank5_reg0.01,45.10322397203067,0.0534058202750227,0.2387107795256216
Model_PlayOnly_rank5_reg0.05,45.10899115281792,0.0530860249440346,0.238827933844001


In [0]:
# Select best models by Precision@10 and Recall@10

# Get best model by Precision@10 in the evaluation_df DataFrame
# Orders the DataFrame by precision@10 descending order and gets the first row
best_precision_row = evaluation_df.orderBy(col("precision@10").desc()).first()
# Get best model by Recall@10 in the evaluation_df DataFrame
# Orders the DataFrame by recall@10 descending order and gets the first row
best_recall_row = evaluation_df.orderBy(col("recall@10").desc()).first()

# Extract model labels
# Extract the model_label value from the row in the DataFrame with the best Precision@10 
best_precision_label = best_precision_row["model_label"]
# Extract the model_label value from the row in the DataFrame with the best Recall@10 
best_recall_label = best_recall_row["model_label"]

# Gather all models into one list
# This includes the two baseline models and all models trained with hyperparameter tuning
# All models were created by passing the train/test sets to train_baseline_model and train_als_with_gridsearch
all_models = [baseline_model_playOnly, baseline_model_playAndPurchase] + tuned_models_playOnly + tuned_models_playAndPurchase

# Find in the all_models list, the model dictionary that matches the best_precision_label
# next() gives the first match found in the list
best_precision_model_info = next(m for m in all_models if m["model_label"] == best_precision_label)
# Find in the all_models list, the model dictionary that matches the best_recall_label
# next() gives the first match found in the list
best_recall_model_info = next(m for m in all_models if m["model_label"] == best_recall_label)

# Get the model object from the dictionary that matches the best_precission_label
best_precision_model = best_precision_model_info["model"]
# Get the model object from the dictionary that matches the best_recall_label
best_recall_model = best_recall_model_info["model"]

# Display best models' labels
print(f"Best Precision Model: {best_precision_label}")
print(f"Best Recall Model: {best_recall_label}")

Best Precision Model: Model_PlayAndPurchase_rank15_reg0.001
Best Recall Model: Model_PlayAndPurchase_rank15_reg0.01


The evaluation of all ALS models using RMSE, Precision@10, and Recall@10 confirmed that the Play and Purchase variants consistently outperformed the Play Only models across all metrics.

Similarly, models with a higher rank value tended to perform better in terms of RMSE, likely due to the increase in the dimensionality of vectors, enabling them to capture more nuances and hidden patterns in the data.

The best performing model in terms of Precision@10 was Model_PlayAndPurchase_rank15_reg0.001, showing a RMSE of 22.82, Precision@10 of 0.074, and Recall@10 of 0.315

The best performing model in terms of Recall@10 was Model_PlayAndPurchase_rank15_reg0.01, showing a RMSE of 22.82, Precision@10 of 0.074, and Recall@10 of 0.315

These models were able to retrieve over 31% of the games  the members actually interacted with, using only 10 recommendations, and mantained an average of approximately 0.74 relevant items in the top-10 recommendation list. These results reflect good performance considering the nature of the data and the lack of explicit ratings.

The Play Only models showed significantly higher RMSE but lower values for both Precision and Recall, which indicates the added value of incorporating purchase behaviour into the rating to improve both accuracy and relevance when generating recommendations.


##7. Generating Recommendations##

In the final stage, we focused on demonstrating the use of the best-performing models to generate game recommendations. Given that models can only generate recommendations for members who have been included in their training sets, we developed a cold start fallback function, `generate_fallback_recommendations(n=10)`  that returns the top-n most played games by aggregating the total playtime across all users for each game using the pivoted dataset from the pre-processing stage and maps each gameIndex back to its original gameName using the fitted StringIndexer, so that members who were not seen by a model during its training can still receive recommendations.

Then, we used the `recommendForAllUsers`  Spark method to generate a DataFrame of Top 10 recommendations for all users using the ALS models with the highest Precision and Recall. The generated DataFrames are stored in the top10_precision and top10_recall variables. If the same model achieved the best result in the two metrics, recommendations were produced using only that model. Otherwise, two separate recommendation DataFrames were created using both models to allow for comparison.

To retrieve personalised recommendations for a given user, we implemented the  `generate_one_user_recommendations()` function. This function takes a DataFrame with the ten best recommendations for all users generated by `recommendForAllUsers` Spark method, the model's label and the original member ID (string) as input and maps the original memberID to its corresponding memberIndex using the indexed DataFrame. If the user is not found or was not seen during model training, it calls the cold start fallback  `generate_fallback_recommendations(n=10)` function to return the most played games. If the member ID is not a new member and it was part of the model's training set, the function filters the recommendation DataFrame for that specific user, explodes the nested recommendation list into individual game entries, and joins with the mapping DataFrame to retrieve the game names alongside predicted ratings. Consequently, we applied this function to the first user in the dataset and displayed the resulting top-10 recommendations, showing both predicted ratings and game names.

 To contextualise the model’s outputs, we also called the fallback function to display the 10 most played games across the entire dataset, allowing for a side-by-side comparison with personalised results.

This stage confirms that the trained ALS models can produce individualised, data-driven recommendations for users while also supporting fallback options for new or unseen members.

In [0]:
# Cold start function
# Function to genereate recommendations using the 10 most played games accross all users
# Use as cold start function to generate recommedations for members not in the original dataset (new members) 
# Also, use for members not on the training set in a trained model (ALS can not generate recommendations for users not seen during training) 
def generate_fallback_recommendations(n=10):

    # Groups the dataset by gameIndex to aggregate total play time across all users
    top_games_df = (pivotedDF.groupBy("gameIndex").agg(spark_sum("played_hours").alias("totalHours")).orderBy(col("totalHours"). desc()).limit(n))

    # Retrieves the game names from the StringIndexer transformed SteamDF DataFrame 
    game_labels = game_indexer_model.labels

    # Creates a mapping DataFrame to convert the numeric gameIndex back to game names
    game_mapping_df = spark.createDataFrame([Row(gameIndex=i, gameName=name) for i, name in enumerate(game_labels)])

    # Joins the aggregated top games with their names for display purposes
    top_games_named = top_games_df.join(game_mapping_df, on="gameIndex", how="left").withColumn("totalHours", spark_round(col("totalHours"), 2))

    # Returns a DataFrame of the top n most played games with their names and total hours.
    return top_games_named

In [0]:
# Generate a DataFrame with 10 recommendations for all users in the training set
# If the same model achieved the best precision and the best recall, it uses this model, otherwise uses both models and generates two sets of recommendaions for comparison
# It returns Spark DataFrame(s) where each row corresponds to a user, and the recommendations column contains a nested list of recommended items and predicted ratings
if best_precision_label == best_recall_label:
    print(f"\nSame model achieved both best Precision@10 and Recall@10: {best_precision_label}")
    # Generates 10 recommendations using the model that achieved best precission
    top10_precision = best_precision_model.recommendForAllUsers(10)
else:
    print(f"\nRecommendations with the Best Precision Model: {best_precision_label}")
    top10_precision = best_precision_model.recommendForAllUsers(10)
    print(f"Recommendations with the Best Recall Model: {best_recall_label}")
    top10_recall = best_recall_model.recommendForAllUsers(10)


Recommendations with the Best Precision Model: Model_PlayAndPurchase_rank15_reg0.001
Recommendations with the Best Recall Model: Model_PlayAndPurchase_rank15_reg0.01


In [0]:
# Function to get 10 recommendations for one user from a recommendation DataFrame
def generate_one_user_recommendations(recommendation_df, member_id, model_label):
    
    # Use the DataFrame transformed with the StringIndexer to find the row where the corresponding numeric memberIndex for the member_id string passed as argument is
    member_index_row = pivotedDF.select("memberID", "memberIndex").filter(col("memberID") == member_id).first()

    
    # If the member_id is not found in the StringIndexer transformed SteamDF DataFrame return the fallback ten most played games (cold start)
    # If member_index_row is assigned a null value it means the member_id is wrong or a new member not in the original raw data
    if not member_index_row:
        print(f"\nMember ID {member_id} not found in pivotedDF. Returning fallback recommendations.")
        # Uses the custom function to generate fallback recommendations
        fallback_df = generate_fallback_recommendations()
        # Display the fallback recommendations
        print(f"\nTop 10 Popular Games (Fallback for Member ID {member_id}):")
        fallback_df.select("gameIndex", "gameName", "totalHours").show()
        # Returns the fallback recommendations
        return fallback_df
    else:
        # If the member_id is found in the StringIndexer transformed SteamDF DataFrame, extracts the corresponding numberIndex
        member_index = member_index_row["memberIndex"]



    # Check if the member was not seen during the model's training
    # Assumes that the recommendation_df DataFrame would containg recommendations for the member_id member if it has been part of the model's trainig data
    if recommendation_df.filter(col("memberIndex") == member_index).count() == 0:
        print(f"\nCold-start: Member ID {member_id} was not seen during training.")
        # Generate fallback recommendations using the custom function if the recommendations do not included the member_id
        fallback_df = generate_fallback_recommendations()
        # Display the fallback recommendations
        print(f"\nTop 10 Popular Games (Fallback for Member ID {member_id}):")
        fallback_df.select("gameIndex", "gameName", "totalHours").show()
        # Return the fallback recommendations
        return fallback_df      

    # Continue if member was seen by the model during training and it is not a new member

    # Filter just the recommendations for the specific user using their memberIndex
    user_recs = recommendation_df.filter(col("memberIndex") == member_index)

    # Flatten the nested recommendations list (which contains gameIndex and predicted_rating)
    # This gives one row per recommended game for the user
    user_recs_flat = user_recs.withColumn("rec", explode("recommendations")).select(
        "memberIndex",
        col("rec.gameIndex").alias("gameIndex"),
        col("rec.rating").alias("predicted_rating")
    )

    # Get the list of game names used in the StringIndexer 
    game_labels = game_indexer_model.labels

    # Convert the list of games into a DataFrame
    game_mapping_df = spark.createDataFrame([Row(gameIndex=i, gameName=name) for i, name in enumerate(game_labels)])

    # Join the flattened recommendations with the game names
    user_recs_named = user_recs_flat.join(game_mapping_df, on="gameIndex", how="left").withColumn("predicted_rating", spark_round(col("predicted_rating"), 2))


    # Show the top-10 recommended games with predicted ratings for that user
    label_info = f" ({model_label})" if model_label else ""
    print(f"\nTop 10 Recommendations for Member ID {member_id}{label_info}:")
    user_recs_named.select("gameIndex", "gameName", "predicted_rating").show(truncate=False)

    # Returns the 10 recommendations
    return user_recs_named

In [0]:
# Get the first member ID from the dataset (original string ID before indexing)
first_member_id = steamDF.select("memberID").first()["memberID"]

# Only generate one set of recommendations if both metrics picked the same model
if best_precision_label == best_recall_label:
    print(f"\nGenerating recommendations for best model (Precision & Recall): {best_precision_label}")
    # Generate 10 recommendations for the first member using the model with best precision
    generate_one_user_recommendations(top10_precision, first_member_id, best_precision_label)
else:
    print(f"\nGenerating recommendations for best Precision model: {best_precision_label}")
    # Generate 10 recommendations for the first member using the model with best precision
    generate_one_user_recommendations(top10_precision, first_member_id, best_precision_label)

    print(f"\nGenerating recommendations for best Recall model: {best_recall_label}")
    # Generate 10 recommendations for the first member using the model with best recall
    generate_one_user_recommendations(top10_recall, first_member_id, best_recall_label)


Generating recommendations for best Precision model: Model_PlayAndPurchase_rank15_reg0.001

Top 10 Recommendations for Member ID 151603712 (Model_PlayAndPurchase_rank15_reg0.001):
+---------+-------------------------------+----------------+
|gameIndex|gameName                       |predicted_rating|
+---------+-------------------------------+----------------+
|2        |Unturned                       |1.45            |
|11       |The Elder Scrolls V Skyrim     |1.0             |
|3        |Counter-Strike Global Offensive|1.0             |
|6        |Left 4 Dead 2                  |0.95            |
|17       |Sid Meier's Civilization V     |0.93            |
|24       |Terraria                       |0.87            |
|26       |Borderlands 2                  |0.86            |
|8        |Warframe                       |0.86            |
|0        |Dota 2                         |0.85            |
|135      |Fallout 4                      |0.81            |
+---------+---------------

In [0]:
# Call the cold start function to compare the 10 games with most hours played accross all users with the 10 games recommended to the user in the output above
fallback_df = generate_fallback_recommendations(10)

# Show the fallback recommendations 
fallback_df.select("gameIndex", "gameName", "totalHours").show(truncate=False)

+---------+-------------------------------------------+----------+
|gameIndex|gameName                                   |totalHours|
+---------+-------------------------------------------+----------+
|0        |Dota 2                                     |981684.6  |
|3        |Counter-Strike Global Offensive            |322771.6  |
|1        |Team Fortress 2                            |173673.3  |
|7        |Counter-Strike                             |134261.1  |
|17       |Sid Meier's Civilization V                 |99821.3   |
|5        |Counter-Strike Source                      |96075.5   |
|11       |The Elder Scrolls V Skyrim                 |70889.3   |
|10       |Garry's Mod                                |49725.3   |
|45       |Call of Duty Modern Warfare 2 - Multiplayer|42009.9   |
|6        |Left 4 Dead 2                              |33596.7   |
+---------+-------------------------------------------+----------+



The output confirmed that the ALS models trained earlier were able to generate personalised top-10 recommendations for each member seen during training and personalised top-10 recommendations for a single member, each including a predicted rating, with higher values representing stronger indications of the member's likely preference for the game. 

 Given the implicit feedback nature of the training data, the predicted rating shown for each game in the top-10 recommendations does not represent an estimate of how many hours the member will play the game or the probability that the member would buy the game. Instead, the rating indicates how confident the model is about the strength of the member’s potential interest in the game. Therefore, these values should be interpreted in relative terms considering the ratings of the other games in the list of recommendations.

The top-10 lists generated by the models with the best Precision (Model_PlayAndPurchase_rank15_reg0.001) and the best Recall (Model_PlayAndPurchase_rank15_reg0.01) had some overlap, with games like Unturned, Skyrim, and Counter-Strike: Global Offensive appearing in both, suggesting that both models agreed on some of the most relevant games.

However, there were also a few differences between the two lists, such as Robocraft appearing in the best recall model’s list but not in the precision one, showing how the two models prioritise slightly different things with the precision model focusing more on returning highly relevant games, while the recall model aiming to include more of the games the user interacted with overall, even if their predicted ratings were slightly lower.

Lastly, the fallback recommendations also worked as expected, returning the ten most played games across all users some of which also appeared in the personalised lists but most were unique to the fallback list showing how personalised recommendations can go beyond the popularity of the games to recommend games based on individual preferences.

##8. Conclusion##

In this notebook, we implemented a collaborative filtering recommender system using matrix factorization via the ALS algorithm in Spark MLlib, to train models entirely on implicit feedback from user-game interactions, using different value combinations of rank (dimensionality of latent factors vectors) and regularisation regParams for model hyperparameter tuning and programmatically tracking models with MLflow. Two approaches were tested, one using hours played only, and another combining hours played with a weighted purchase flag as implicit feedback.

The system follows a collaborative filtering approach by learning patterns from user/game interactions only, without needing game metadata such as genre or release year or relying on explicit user ratings.  

The model trained on both play and purchase behaviours consistently outperformed the play-only model. The best model in terms of Precision@10 was Model_PlayAndPurchase_rank15_reg0.001 (Precision@10: 0.074). The best Recall@10 was achieved by Model_PlayAndPurchase_rank15_reg0.01 (Recall@10: 0.315).

Companies using this system could improve customer engagement by helping their users discover games they are more likely to enjoy and potentially increase time spent on the platform while generating more sales or playtime. The choice between precision and recall depends on the business goal. If their priority is offering highly relevant games at the top of the list, the model with the best precision should be chosen. If the goal is to maximise the exposure to relevant games, recall becomes more important.

Some of the limitations of the system include the cold start problem due to the inherent impossibility of ALS models to generate personalised recommendations for members that were not included in the models' training sets in addition to the reduced explainability of the predicted ratings in the recommendations due to the nature of the implicit feedback approached used.

Future work could include functionality improvements such as filtering out games already owned or played by a member before generating recommendations. It could also seek model performance improvement by using more implicit feedback elements such as clickstream and time since the last interaction, expanding the quantity and range of hyperparameters used during hyperparameter tuning and including techniques such as early stopping, trying different weights for the purchase feedback, or exploring hybrid models combining collaborative and content-based filtering for better cold start handling and more personalised recommendations. Moreover, frontier research could be explored to carry out posthoc analysis aiming to interpret what the models have captured and patterns in their latent features that may suggest correlations with the implicit feedback elements the model was trained on such as patterns in playtime and purchase behaviour.

##Appendix 1: MLflow Logs##

In [0]:
# Display the MLflow logs
# Allows review to the experiment logs without direct MLflow server access.

# Initialise MLflow client
client = MlflowClient()

# Get the experiment
experiment = client.get_experiment_by_name(experiment_path)

# 
if experiment is None:
    print(f"Experiment not found at path: {experiment_path}")
else:
    print("Using MLflow Experiment ID:", experiment.experiment_id)

    # Get all runs in ascending start time
    runs = client.search_runs(experiment.experiment_id, order_by=["start_time ASC"])

    # Format MLflow runs as Spark DataFrame rows
    rows = []
    for run in runs:
        data = run.data

        '''
        rows.append({
            "run_id": run.info.run_id,
            "model": data.params.get("model", "N/A"),
            "rank": data.params.get("rank", "N/A"),
            "regParam": data.params.get("regParam", "N/A"),
            "maxIter": data.params.get("maxIter", "N/A"),
            "implicitPrefs": data.params.get("implicitPrefs", "N/A"),
            "rmse": data.metrics.get("rmse", None),
            "precision": data.metrics.get("precision", None),
            "recall": data.metrics.get("recall", None)
        })
'''

        rows.append({
            "run_id": run.info.run_id,
            "model": data.params.get("model"),             
            "rank": data.params.get("rank"),
            "regParam": data.params.get("regParam"),
            "maxIter": data.params.get("maxIter"),
            "implicitPrefs": data.params.get("implicitPrefs"),
            "rmse": data.metrics.get("rmse"),
            "precision": data.metrics.get("Precision@10"),
            "recall": data.metrics.get("Recall@10")
        })


    # Define the DataFrame schema
    schema = StructType([
        StructField("run_id", StringType(), True),
        StructField("model", StringType(), True),
        StructField("rank", StringType(), True),
        StructField("regParam", StringType(), True),
        StructField("maxIter", StringType(), True),
        StructField("implicitPrefs", StringType(), True),
        StructField("rmse", FloatType(), True),
        StructField("precision", FloatType(), True),
        StructField("recall", FloatType(), True)
    ])

    # Convert rows to Spark DataFrame using the eschema
    mlflow_log_df = spark.createDataFrame([Row(**row) for row in rows], schema)
    # Display the DataFrame
    mlflow_log_df.display()

Using MLflow Experiment ID: 1210934385966680


run_id,model,rank,regParam,maxIter,implicitPrefs,rmse,precision,recall
d88273e7315a49758eb8e8ff092d610c,,,,,,,,
311c01517f144e1d9d5d5e071a287cff,,,,,,,,
782dc91def454e05951215e67cc92a9d,Model_PlayOnly,10.0,0.1,10.0,True,,,
d642568531a14e07bab145abb7b81bfa,Model_PlayAndPurchase,10.0,0.1,10.0,True,,,
b50c8d0ec57f42dba16d1dc96619a625,Model_PlayOnly,5.0,0.001,10.0,True,,,
0698c3f2589745f190894486e1fba0ca,Model_PlayOnly,5.0,0.01,10.0,True,,,
1d8a0338693a4dcea2feaf42abf110a2,Model_PlayOnly,5.0,0.05,10.0,True,,,
2bec427e85f147e9affe164fd117d06f,Model_PlayOnly,15.0,0.001,10.0,True,,,
f36f0699eebd4667a9f9510b3a7494e0,Model_PlayOnly,15.0,0.01,10.0,True,,,
6c2603a4dc8d4ef6a90c88d09646f2ae,Model_PlayOnly,15.0,0.05,10.0,True,,,


In [0]:
'''
# Delete all MLflow logs

from mlflow.tracking import MlflowClient

# Set experiment path 
experiment_path = "/Users/j.l.wong@edu.salford.ac.uk/game_recommender_system"
client = MlflowClient()
experiment = client.get_experiment_by_name(experiment_path)

if experiment is not None:
    experiment_id = experiment.experiment_id
    runs = client.search_runs(experiment_id)
    for run in runs:
        print(f"Deleting run: {run.info.run_id}")
        client.delete_run(run.info.run_id)
    print("All runs deleted.")
else:
    print(" Experiment not found.")
'''