In [0]:
###Part1. Description of any set up required to complete this task.
#Create a ML compute and notebook with language as python
#Upload csv file to the DBFS
#Confirm that the csv file in this case, steam_200k was successfully uploaded.
dbutils.fs.ls("/FileStore/tables/")

[FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1709140371000),
 FileInfo(path='dbfs:/FileStore/tables/TS021_2021_2.csv', name='TS021_2021_2.csv', size=497239, modificationTime=1710957989000),
 FileInfo(path='dbfs:/FileStore/tables/account-models/', name='account-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts.zip', name='accounts.zip', size=5297592, modificationTime=1706721471000),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations.zip', name='activations.zip', size=8411369, modificationTime=1706718922000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.zip', name='clinicaltrial_2020.zip', size=10599182, modificationTime=1714560565000),
 FileInfo(path

In [0]:
#View contents of the uploaded file
csv_file = "steam_200k.csv"
dbutils.fs.head(f"/FileStore/tables/{csv_file}")

[Truncated to first 65536 bytes]


'151603712,The Elder Scrolls V Skyrim,purchase,1\r\n151603712,The Elder Scrolls V Skyrim,play,273\r\n151603712,Fallout 4,purchase,1\r\n151603712,Fallout 4,play,87\r\n151603712,Spore,purchase,1\r\n151603712,Spore,play,14.9\r\n151603712,Fallout New Vegas,purchase,1\r\n151603712,Fallout New Vegas,play,12.1\r\n151603712,Left 4 Dead 2,purchase,1\r\n151603712,Left 4 Dead 2,play,8.9\r\n151603712,HuniePop,purchase,1\r\n151603712,HuniePop,play,8.5\r\n151603712,Path of Exile,purchase,1\r\n151603712,Path of Exile,play,8.1\r\n151603712,Poly Bridge,purchase,1\r\n151603712,Poly Bridge,play,7.5\r\n151603712,Left 4 Dead,purchase,1\r\n151603712,Left 4 Dead,play,3.3\r\n151603712,Team Fortress 2,purchase,1\r\n151603712,Team Fortress 2,play,2.8\r\n151603712,Tomb Raider,purchase,1\r\n151603712,Tomb Raider,play,2.5\r\n151603712,The Banner Saga,purchase,1\r\n151603712,The Banner Saga,play,2\r\n151603712,Dead Island Epidemic,purchase,1\r\n151603712,Dead Island Epidemic,play,1.4\r\n151603712,BioShock Infinite,

In [0]:
#import mlflow and autolog machine learning runs
import mlflow
mlflow.pyspark.ml.autolog

<function mlflow.pyspark.ml.autolog(log_models=True, log_datasets=True, disable=False, exclusive=False, disable_for_unsupported_versions=False, silent=False, log_post_training_metrics=True, registered_model_name=None, log_input_examples=False, log_model_signatures=True, log_model_allowlist=None, extra_tags=None)>

In [0]:
###Part2. Loading data into Spark DataFrame and any exploratory analysis or visualisation carried out prior to training.
#Read data into spark DataFrame using spark.read.csv(), and set inferSchema to true so the schema can be inferred.
fileDF = spark.read.csv(f"/FileStore/tables/{csv_file}",inferSchema = "true")
#Display the dataframe
fileDF.display()

_c0,_c1,_c2,_c3
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


In [0]:
#Create a function for renaming columns for readability and change the Playtime datatype to float for memory optimization
def change_schema(fileDF):
    fileDF = fileDF.withColumn("_c3", fileDF["_c3"].cast("float"))\
                   .withColumnRenamed("_c0", "userID")\
                   .withColumnRenamed("_c1", "gameName")\
                   .withColumnRenamed("_c2", "memberBehaviour")\
                   .withColumnRenamed("_c3", "playTime")
    return fileDF
#Execute the function on the fileDF
steamDF = change_schema(fileDF)
steamDF.show()

+---------+--------------------+---------------+--------+
|   userID|            gameName|memberBehaviour|playTime|
+---------+--------------------+---------------+--------+
|151603712|The Elder Scrolls...|       purchase|     1.0|
|151603712|The Elder Scrolls...|           play|   273.0|
|151603712|           Fallout 4|       purchase|     1.0|
|151603712|           Fallout 4|           play|    87.0|
|151603712|               Spore|       purchase|     1.0|
|151603712|               Spore|           play|    14.9|
|151603712|   Fallout New Vegas|       purchase|     1.0|
|151603712|   Fallout New Vegas|           play|    12.1|
|151603712|       Left 4 Dead 2|       purchase|     1.0|
|151603712|       Left 4 Dead 2|           play|     8.9|
|151603712|            HuniePop|       purchase|     1.0|
|151603712|            HuniePop|           play|     8.5|
|151603712|       Path of Exile|       purchase|     1.0|
|151603712|       Path of Exile|           play|     8.1|
|151603712|   

In [0]:
#Analysis 1: Number of total records, distinct records, users and games
TotalRecord = steamDF.count()
DistinctRecord = steamDF.distinct().count()
DistinctUsers = steamDF.select("userID").distinct().count()
DistinctGames = steamDF.select("gameName").distinct().count()
print(f"The number of records in the file is {TotalRecord}.")
print(f"The number of distinct records in the file is {DistinctRecord}.")
print(f"The number of distinct users in the file is {DistinctUsers}.")
print(f"The number of distinct games in the file is {DistinctGames}.")

The number of records in the file is 200000.
The number of distinct records in the file is 199293.
The number of distinct users in the file is 12393.
The number of distinct games in the file is 5155.


In [0]:
#Analysis 2: Number of times the games were played and purchased
played_games = steamDF.filter(steamDF["memberBehaviour"] == "play").distinct().count()
purchased_games = steamDF.filter(steamDF["memberBehaviour"] == "purchase").distinct().count()
#print results
print("The number of times the games were played is", played_games)
print("The number of times the games were purchased is", purchased_games)

The number of times the games were played is 70489
The number of times the games were purchased is 128804


In [0]:
#Analysis 3: Top 10 most played games
steamDF.filter(steamDF["memberBehaviour"] == "play").\
        groupBy("gameName").count().\
        orderBy('count', ascending=False).\
        limit(10).\
        display()
#Barplot shows the number of times each game was played for the top 10 games.

gameName,count
Dota 2,4841
Team Fortress 2,2323
Counter-Strike Global Offensive,1377
Unturned,1069
Left 4 Dead 2,801
Counter-Strike Source,715
The Elder Scrolls V Skyrim,677
Garry's Mod,666
Counter-Strike,568
Sid Meier's Civilization V,554


Databricks visualization. Run in Databricks to view.

In [0]:
#Analysis 4: Top 10 most purchased games
steamDF.filter(steamDF["memberBehaviour"] == "purchase").\
        groupBy("gameName").count().\
        orderBy('count', ascending=False).\
        limit(10).\
        display()
#Barplot shows the number of times each game was purchased for the top 10 games.
#Good to note that the highest purchased/played game (Dota 2) has equal values. Game must be really good.

gameName,count
Dota 2,4841
Team Fortress 2,2323
Unturned,1563
Counter-Strike Global Offensive,1412
Half-Life 2 Lost Coast,981
Counter-Strike Source,978
Left 4 Dead 2,951
Counter-Strike,856
Warframe,847
Half-Life 2 Deathmatch,823


Databricks visualization. Run in Databricks to view.

In [0]:
#Analysis 5: Least purchased games
#This shows list of games that could possible be removed from the catalog.
steamDF.filter(steamDF["memberBehaviour"] == "purchase").\
        groupBy("gameName").count().\
        orderBy('count', ascending=True).\
        show(truncate=False)

+-----------------------------+-----+
|gameName                     |count|
+-----------------------------+-----+
|Crayon Chronicles            |1    |
|Diehard Dungeon              |1    |
|WARSHIFT                     |1    |
|HassleHeart                  |1    |
|Foreign Legion Multi Massacre|1    |
|A Druid's Duel               |1    |
|Puzzle Chronicles            |1    |
|In Between - Soundtrack      |1    |
|Spandex Force Champion Rising|1    |
|Roguelands                   |1    |
|Haunted House                |1    |
|The Political Machine 2016   |1    |
|Cabela's African Adventures  |1    |
|Hotel Collectors Edition     |1    |
|MotoGP13 Moto2 and Moto3     |1    |
|AstroPop Deluxe              |1    |
|Armikrog                     |1    |
|Watchmen The End Is Nigh     |1    |
|Chris Sawyer's Locomotion    |1    |
|Talismania Deluxe            |1    |
+-----------------------------+-----+
only showing top 20 rows



In [0]:
#Analysis 6: Games with highest play time, limit to top 10
#Consider using SQL
steamDF.filter(steamDF["memberBehaviour"] == "play").\
        groupBy("gameName").\
        sum("playTime").\
        orderBy('sum(playTime)', ascending=False).\
        limit(10).\
        display()
##2nd top played has the 3rd highest playtime.

gameName,sum(playTime)
Dota 2,981684.6000046805
Counter-Strike Global Offensive,322771.60000587255
Team Fortress 2,173673.30000534654
Counter-Strike,134261.1000032574
Sid Meier's Civilization V,99821.30000032485
Counter-Strike Source,96075.4999980852
The Elder Scrolls V Skyrim,70889.30000342429
Garry's Mod,49725.300001084805
Call of Duty Modern Warfare 2 - Multiplayer,42009.8999973014
Left 4 Dead 2,33596.70000024885


Databricks visualization. Run in Databricks to view.

In [0]:
#Create a temporary view so we can query the data using SQL and perform other indepth analysis
steamDF.createOrReplaceTempView("SteamView")

In [0]:
%sql
--#Analysis 7
--Games that were purchased but played
--Using Count in the Select statement gives 11309.
SELECT *
FROM SteamView
WHERE gameName NOT IN (
    SELECT gameName
    FROM SteamView
    WHERE memberBehaviour = 'play'
);

userID,gameName,memberBehaviour,playTime
151603712,Fallout New Vegas Courier's Stash,purchase,1.0
151603712,Fallout New Vegas Dead Money,purchase,1.0
151603712,Fallout New Vegas Honest Hearts,purchase,1.0
151603712,HuniePop Official Digital Art Collection,purchase,1.0
151603712,HuniePop Original Soundtrack,purchase,1.0
151603712,The Banner Saga - Mod Content,purchase,1.0
151603712,The Elder Scrolls V Skyrim - Dawnguard,purchase,1.0
151603712,The Elder Scrolls V Skyrim - Dragonborn,purchase,1.0
151603712,The Elder Scrolls V Skyrim - Hearthfire,purchase,1.0
59945701,Fallout New Vegas Courier's Stash,purchase,1.0


In [0]:
%sql
--#Analysis 8
--Count the number of games played per user starting with the highest
SELECT userID, Count(*) AS Count_of_Plays
FROM SteamView
WHERE memberBehaviour == "play"
GROUP BY userID
ORDER BY Count_of_Plays DESC
LIMIT 10;

userID,Count_of_Plays
62990992,498
11403772,314
138941587,299
47457723,298
49893565,297
24469287,284
48798067,254
36546868,235
51557405,210
17530772,209


Databricks visualization. Run in Databricks to view.

In [0]:
####Part 3: Data preparation and pre-processing carried out prior to training the model.

# Create unique integer ID for each game
from pyspark.sql.functions import monotonically_increasing_id
#Drop other columns in steamDF except the gameName column
nameDF = steamDF.drop("userID","memberBehaviour", "playTime").dropna().distinct()
#Add the column gameID (populated by monotonically_increasing_id()) and rename the game name column.
nameDF = nameDF.withColumn("gameID", monotonically_increasing_id()).withColumnRenamed("gameName","game_name")
#Change gameID data type from long to int
nameDF = nameDF.withColumn("gameID", nameDF["gameID"].cast("int"))
nameDF.display()

game_name,gameID
Dota 2,0
METAL GEAR SOLID V THE PHANTOM PAIN,1
LEGO Batman The Videogame,2
RIFT,3
Anodyne,4
Legend of Grimrock,5
Divinity Original Sin,6
Meltdown,7
SanctuaryRPG Black Edition,8
Snuggle Truck,9


In [0]:
#Add the generated game ids to the steam DF and drop the extra game name column from the join
steamDF_combined = nameDF.join(steamDF, nameDF["game_name"] == steamDF["gameName"]).drop("game_name").dropna()
steamDF_combined.display()

gameID,userID,gameName,memberBehaviour,playTime
2609,151603712,The Elder Scrolls V Skyrim,purchase,1.0
2609,151603712,The Elder Scrolls V Skyrim,play,273.0
410,151603712,Fallout 4,purchase,1.0
410,151603712,Fallout 4,play,87.0
3868,151603712,Spore,purchase,1.0
3868,151603712,Spore,play,14.9
3820,151603712,Fallout New Vegas,purchase,1.0
3820,151603712,Fallout New Vegas,play,12.1
69,151603712,Left 4 Dead 2,purchase,1.0
69,151603712,Left 4 Dead 2,play,8.9


In [0]:
#Streamlining the Dataframe for play only
steamDF_combined_play = steamDF_combined.filter(steamDF_combined.memberBehaviour == "play")
steamDF_combined_play.na.drop()
#Splitting the dataset into training and test using a ratio of 7:3 and setting seed to 100 to ensure reproducibility.
trainingDF, testDF = steamDF_combined_play.randomSplit([0.7, 0.3], seed=100)
trainingDF.display()

gameID,userID,gameName,memberBehaviour,playTime
0,5250,Dota 2,play,0.2
0,298950,Dota 2,play,0.5
0,975449,Dota 2,play,54.0
0,1950243,Dota 2,play,1.1
0,2083767,Dota 2,play,3.1
0,2259650,Dota 2,play,0.9
0,2428602,Dota 2,play,473.0
0,3527485,Dota 2,play,455.0
0,4824107,Dota 2,play,91.0
0,6928806,Dota 2,play,702.0


In [0]:
####Part 4: Selection of hyperparameters and model training and evaluation and MLflow experiment tracking.
#Import the Alternating Least Squares (ALS) matrix factorization using ML
##RUN 1
import mlflow.spark
from pyspark.ml.recommendation import ALS
als = ALS(userCol="userID", itemCol="gameID", ratingCol="playTime",implicitPrefs = True, coldStartStrategy="drop",nonnegative=True, seed=100)
model = als.fit(trainingDF)

In [0]:
#Evaluating the model by generating predictions on the test dataset
predictions = model.transform(testDF).dropna()
predictions.show()

+------+--------+--------+---------------+--------+----------+
|gameID|  userID|gameName|memberBehaviour|playTime|prediction|
+------+--------+--------+---------------+--------+----------+
|     0| 1612666|  Dota 2|           play|     7.4| 0.4613096|
|     0| 4834220|  Dota 2|           play|   474.0| 1.1903085|
|     0| 6717871|  Dota 2|           play|     0.7|0.53581655|
|     0| 7431946|  Dota 2|           play|    17.8|0.80222476|
|     0| 8542204|  Dota 2|           play|   129.0|0.67634106|
|     0| 9946133|  Dota 2|           play|     4.4| 0.7767874|
|     0|11161178|  Dota 2|           play|   256.0| 0.7879887|
|     0|11813637|  Dota 2|           play|  2341.0|0.75000525|
|     0|14544587|  Dota 2|           play|    14.6|0.63956404|
|     0|20200395|  Dota 2|           play|    13.0|0.61899906|
|     0|20207081|  Dota 2|           play|  4845.0|0.63202286|
|     0|21061921|  Dota 2|           play|    41.0|0.83198833|
|     0|22605909|  Dota 2|           play|     1.9| 0.5

In [0]:
#Use RMSE to evaluate how effective the model was on predicting the playTime
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="playTime", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root Mean Square Error before log transformation: %g" % rmse)

Root Mean Square Error before log transformation: 241.284


In [0]:
# Testing the recommendation system on a single user with id 62990992
user = steamDF_combined_play.filter(steamDF_combined_play["userID"] == 62990992)
user_recommendation = model.transform(user)
user_recommendation.show()

+------+--------+--------------------+---------------+--------+-----------+
|gameID|  userID|            gameName|memberBehaviour|playTime| prediction|
+------+--------+--------------------+---------------+--------+-----------+
|  2353|62990992|Counter-Strike Gl...|           play|   663.0| 0.55739254|
|   962|62990992|Sid Meier's Civil...|           play|   550.0| 0.91411114|
|  2160|62990992|  Total War SHOGUN 2|           play|   212.0|  0.9404156|
|  3501|62990992|Total War ROME II...|           play|   198.0|  0.9049943|
|  2686|62990992|   Dungeon Defenders|           play|   195.0|  0.9530826|
|  3855|62990992|Age of Empires On...|           play|   168.0|  0.6922543|
|  3016|62990992|  XCOM Enemy Unknown|           play|   126.0|  0.9205298|
|  1180|62990992|    Empire Total War|           play|   125.0| 0.86021185|
|  1450|62990992|Might & Magic Her...|           play|   118.0|  0.9767247|
|   865|62990992|Assassin's Creed ...|           play|    94.0|  1.0506451|
|  3369|6299

In [0]:
##RUN 2
from pyspark.ml.recommendation import ALS
als2 = ALS(maxIter=5, regParam=0.005, userCol="userID", itemCol="gameID", ratingCol="playTime",coldStartStrategy='drop',nonnegative=True, seed=100)
#fit the model to the training dataset
model2 = als2.fit(trainingDF)

In [0]:
#Evaluating the model by generating predictions on the test dataset
predictions2 = model2.transform(testDF).dropna()
predictions2.show()

+------+--------+--------+---------------+--------+----------+
|gameID|  userID|gameName|memberBehaviour|playTime|prediction|
+------+--------+--------+---------------+--------+----------+
|     0|  994489|  Dota 2|           play|    18.7| 18.780268|
|     0| 2643609|  Dota 2|           play|     0.4|  38.19415|
|     0| 2753525|  Dota 2|           play|   132.0| 34.132797|
|     0| 4834220|  Dota 2|           play|   474.0|  9.149226|
|     0| 5860071|  Dota 2|           play|     3.2| 2.3348384|
|     0| 6928806|  Dota 2|           play|   702.0|  661.3389|
|     0| 7249363|  Dota 2|           play|     0.2|0.88906544|
|     0| 8865447|  Dota 2|           play|     1.3| 1.2999835|
|     0| 9946133|  Dota 2|           play|     4.4| 12.326636|
|     0|11161178|  Dota 2|           play|   256.0| 0.5779641|
|     0|11373749|  Dota 2|           play|     2.2|  5.185115|
|     0|11731710|  Dota 2|           play|  1243.0|  1246.893|
|     0|11794760|  Dota 2|           play|   839.0|  75

In [0]:
#Use RMSE to evaluate how effective the model was on predicting the playTime
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="playTime", predictionCol="prediction")
rmse2 = evaluator.evaluate(predictions2)
print("Root Mean Square Error before log transformation: %g" % rmse2)

Root Mean Square Error before log transformation: 238.57


In [0]:
# Testing the recommendation system on a single user with id 62990992
user = steamDF_combined_play.filter(steamDF_combined_play["userID"] == 62990992)
user_recommendation = model2.transform(user)
user_recommendation.show()

+------+--------+--------------------+---------------+--------+----------+
|gameID|  userID|            gameName|memberBehaviour|playTime|prediction|
+------+--------+--------------------+---------------+--------+----------+
|     0|62990992|              Dota 2|           play|     8.7| 6.0219436|
|     9|62990992|       Snuggle Truck|           play|     0.7| 0.8398284|
|    10|62990992|        Lunar Flight|           play|     2.0|  5.308462|
|    16|62990992|Dust An Elysian Tail|           play|     2.4| 3.6291876|
|    23|62990992|             Robotex|           play|     9.1|  11.45041|
|    35|62990992| Hero of the Kingdom|           play|     3.2|  4.156986|
|    44|62990992|Counter-Strike So...|           play|     2.7| 1.2841368|
|    58|62990992|  Legend of Mysteria|           play|     1.6| 2.0619597|
|    68|62990992|Flower Shop Winte...|           play|     2.9|  4.293655|
|    69|62990992|       Left 4 Dead 2|           play|    20.0| 78.730995|
|    74|62990992|Two Worl

In [0]:
##RUN 3
from pyspark.ml.recommendation import ALS
als3 = ALS(maxIter=5, regParam=0.1, userCol="userID", itemCol="gameID", ratingCol="playTime",coldStartStrategy='drop',nonnegative=True, seed=100)
#fit the model to the training dataset
model3 = als3.fit(trainingDF)

In [0]:
#Evaluating the model by generating predictions on the test dataset
predictions3 = model3.transform(testDF).dropna()
predictions3.show()

+------+--------+--------+---------------+--------+----------+
|gameID|  userID|gameName|memberBehaviour|playTime|prediction|
+------+--------+--------+---------------+--------+----------+
|     0|  994489|  Dota 2|           play|    18.7| 18.673138|
|     0| 2643609|  Dota 2|           play|     0.4| 36.253548|
|     0| 2753525|  Dota 2|           play|   132.0|  27.10894|
|     0| 4834220|  Dota 2|           play|   474.0| 10.187996|
|     0| 5860071|  Dota 2|           play|     3.2| 34.191353|
|     0| 6928806|  Dota 2|           play|   702.0|  685.8426|
|     0| 7249363|  Dota 2|           play|     0.2|0.33730036|
|     0| 8865447|  Dota 2|           play|     1.3| 1.3062185|
|     0| 9946133|  Dota 2|           play|     4.4|  9.110248|
|     0|11161178|  Dota 2|           play|   256.0| 1.6857101|
|     0|11373749|  Dota 2|           play|     2.2| 3.2084813|
|     0|11731710|  Dota 2|           play|  1243.0| 1225.7383|
|     0|11794760|  Dota 2|           play|   839.0| 762

In [0]:
#Use RMSE to evaluate how effective the model was on predicting the playTime
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="playTime", predictionCol="prediction")
rmse3 = evaluator.evaluate(predictions3)
print("Root Mean Square Error before log transformation: %g" % rmse3)

Root Mean Square Error before log transformation: 208.113


In [0]:
# Testing the recommendation system on a single user with id 62990992
user = steamDF_combined_play.filter(steamDF_combined_play["userID"] == 62990992)
user_recommendation = model3.transform(user)
user_recommendation.show()

+------+--------+--------------------+---------------+--------+----------+
|gameID|  userID|            gameName|memberBehaviour|playTime|prediction|
+------+--------+--------------------+---------------+--------+----------+
|     0|62990992|              Dota 2|           play|     8.7|  6.959526|
|     9|62990992|       Snuggle Truck|           play|     0.7|0.73588157|
|    10|62990992|        Lunar Flight|           play|     2.0| 1.9271502|
|    16|62990992|Dust An Elysian Tail|           play|     2.4|  5.506467|
|    23|62990992|             Robotex|           play|     9.1| 10.638147|
|    35|62990992| Hero of the Kingdom|           play|     3.2| 3.6376696|
|    44|62990992|Counter-Strike So...|           play|     2.7| 4.7307243|
|    58|62990992|  Legend of Mysteria|           play|     1.6|  1.840572|
|    68|62990992|Flower Shop Winte...|           play|     2.9| 3.7935133|
|    69|62990992|       Left 4 Dead 2|           play|    20.0|  88.91418|
|    74|62990992|Two Worl

In [0]:
##RUN #4
###Hyper Parameter Tuning #1
#Create parameter grid for hyperparameter tuning
from pyspark.ml.tuning import ParamGridBuilder
als4 = ALS(userCol="userID", itemCol="gameID", ratingCol="playTime", coldStartStrategy="drop", seed=100)
parameters = ParamGridBuilder()\
.addGrid(als4.rank, [10, 15, 20])\
.addGrid(als4.regParam, [0.001, 0.005, 0.01, 0.05, 0.1])\
.build()

In [0]:
# Build cross validation using CrossValidator
from pyspark.ml.tuning import CrossValidator
cv = CrossValidator(estimator=als4, estimatorParamMaps=parameters, evaluator=evaluator, numFolds=3)
#Use cv to fit trainingDF
Modelcv = cv.fit(trainingDF)

In [0]:
#Use RMSE to evaluate how effective the model was on predicting the playTime
rmse_best_cv = evaluator.evaluate(Modelcv.transform(testDF))
print("Root Mean Square Error is %g" %rmse_best_cv)

Root Mean Square Error is 153.349


In [0]:
# Testing the recommendation system on a single user with id 62990992
user = steamDF_combined_play.filter(steamDF_combined_play["userID"] == 62990992)
user_recommendation = Modelcv.transform(user)
user_recommendation.show()

+------+--------+--------------------+---------------+--------+----------+
|gameID|  userID|            gameName|memberBehaviour|playTime|prediction|
+------+--------+--------------------+---------------+--------+----------+
|     0|62990992|              Dota 2|           play|     8.7|  6.371176|
|     9|62990992|       Snuggle Truck|           play|     0.7|0.69846165|
|    10|62990992|        Lunar Flight|           play|     2.0| 1.7337968|
|    16|62990992|Dust An Elysian Tail|           play|     2.4| 9.8262615|
|    23|62990992|             Robotex|           play|     9.1|   9.47024|
|    35|62990992| Hero of the Kingdom|           play|     3.2|  3.369008|
|    44|62990992|Counter-Strike So...|           play|     2.7| -7.328499|
|    58|62990992|  Legend of Mysteria|           play|     1.6| 1.6791337|
|    68|62990992|Flower Shop Winte...|           play|     2.9| 2.9564352|
|    69|62990992|       Left 4 Dead 2|           play|    20.0| 40.762444|
|    74|62990992|Two Worl

In [0]:
##RUN #5
###Hyper Parameter Tuning #2
#Create parameter grid for hyperparameter tuning
from pyspark.ml.tuning import ParamGridBuilder
als5 = ALS(userCol="userID", itemCol="gameID", ratingCol="playTime", coldStartStrategy="drop", seed=100)
parameters2 = ParamGridBuilder()\
.addGrid(als5.rank, [10, 15, 20])\
.addGrid(als5.regParam, [0.001, 0.005, 0.01, 0.05, 0.1])\
.build()

In [0]:
#instantiate a TrainValidationSplit object
from pyspark.ml.tuning import TrainValidationSplit

tvs = TrainValidationSplit()\
.setSeed(100)\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(parameters2)\
.setEstimator(als5)\
.setEvaluator(evaluator)

In [0]:
#Train model using gridsearch
gridsearchModel = tvs.fit(trainingDF)

In [0]:
# the best model and identify parameters
bestModel = gridsearchModel.bestModel
print("Parameters for the best model:")
print("Rank Parameter:%g" %bestModel.rank)
print("RegParam Parameter: %g" %bestModel._java_obj.parent().getRegParam())

Parameters for the best model:
Rank Parameter:20
RegParam Parameter: 0.1


In [0]:
#Use RMSE to evaluate how effective the model was on predicting the playTime
rmse_best = evaluator.evaluate(bestModel.transform(testDF))
print("Root Mean Square Error is %g" %rmse_best)

Root Mean Square Error is 153.349


In [0]:
# Testing the recommendation system on a single user with id 62990992
user = steamDF_combined_play.filter(steamDF_combined_play["userID"] == 62990992)
user_recommendation = bestModel.transform(user)
user_recommendation.show()

+------+--------+--------------------+---------------+--------+-----------+
|gameID|  userID|            gameName|memberBehaviour|playTime| prediction|
+------+--------+--------------------+---------------+--------+-----------+
|  2353|62990992|Counter-Strike Gl...|           play|   663.0|   56.73542|
|   962|62990992|Sid Meier's Civil...|           play|   550.0|  554.65857|
|  2160|62990992|  Total War SHOGUN 2|           play|   212.0|  136.00049|
|  3501|62990992|Total War ROME II...|           play|   198.0|  200.91872|
|  2686|62990992|   Dungeon Defenders|           play|   195.0|  145.89589|
|  3855|62990992|Age of Empires On...|           play|   168.0|  169.11205|
|  3016|62990992|  XCOM Enemy Unknown|           play|   126.0|   71.90108|
|  1180|62990992|    Empire Total War|           play|   125.0|  132.25299|
|  1450|62990992|Might & Magic Her...|           play|   118.0|  115.97923|
|   865|62990992|Assassin's Creed ...|           play|    94.0|  70.224075|
|  3369|6299

In [0]:
#Create a temporary view so we can query the recommendation using SQL and perform other indepth analysis
user_recommendation.createOrReplaceTempView("user_recommendationView")

In [0]:
%sql
--Recommender Analysis--List of top 10 game recommendation for the user 
SELECT userID, gameName, prediction
FROM user_recommendationView
ORDER BY prediction DESC
LIMIT 10;

userID,gameName,prediction
62990992,Sid Meier's Civilization V,554.65857
62990992,Total War ROME II - Emperor Edition,200.91872
62990992,Warframe,171.29044
62990992,Age of Empires Online,169.11205
62990992,Dungeon Defenders,145.89589
62990992,Total War SHOGUN 2,136.00049
62990992,Empire Total War,132.25299
62990992,Supreme Commander 2,121.23713
62990992,Crusader Kings II,119.78403
62990992,Might & Magic Heroes VI,115.97923


Databricks visualization. Run in Databricks to view.

In [0]:
#Recommend 10 games for all users
userRecs = bestModel.recommendForAllUsers(10)
userRecs.show(truncate=False)

+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userID  |recommendations                                                                                                                                                                                            |
+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|76767   |[{232, 1123.2002}, {3202, 767.37427}, {2202, 760.9206}, {225, 702.67566}, {351, 687.00433}, {4124, 651.2712}, {2105, 593.5946}, {946, 577.27356}, {3772, 515.0831}, {962, 453.3786}]                       |
|144736  |[{3689, 0.09999975}, {232, 0.08696092}, {2202, 0.08029256}, {351, 0.037363745}, {4124, 0.032614104}, {4921, 0.02826492}, {225, 0.0