In [0]:
import mlflow
mlflow.pyspark.ml.autolog()

In [0]:
dbutils.fs.ls("/FileStore/tables/")

[FileInfo(path='dbfs:/FileStore/tables/BDTT_Lab_2023_24_Week7__1_.pdf', name='BDTT_Lab_2023_24_Week7__1_.pdf', size=1179007, modificationTime=1714128978000),
 FileInfo(path='dbfs:/FileStore/tables/Occupancy_Detection_Data.csv', name='Occupancy_Detection_Data.csv', size=50968, modificationTime=1709137348000),
 FileInfo(path='dbfs:/FileStore/tables/TS021_2021_2.csv', name='TS021_2021_2.csv', size=497239, modificationTime=1710953978000),
 FileInfo(path='dbfs:/FileStore/tables/account-models/', name='account-models/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/accounts/', name='accounts/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations/', name='activations/', size=0, modificationTime=0),
 FileInfo(path='dbfs:/FileStore/tables/activations.zip', name='activations.zip', size=8411369, modificationTime=1706717421000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificatio

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
mySchema = StructType ([
    StructField("UserId", IntegerType()),
    StructField("GameName", StringType()) ,
    StructField("Behaviour", StringType()),
    StructField("PurchaseorPlay", FloatType()),])
SteamDf= spark.read.csv("/FileStore/tables/steam_200k.csv",header = False, schema= mySchema)

SteamDf.show(100)
 

+---------+--------------------+---------+--------------+
|   UserId|            GameName|Behaviour|PurchaseorPlay|
+---------+--------------------+---------+--------------+
|151603712|The Elder Scrolls...| purchase|           1.0|
|151603712|The Elder Scrolls...|     play|         273.0|
|151603712|           Fallout 4| purchase|           1.0|
|151603712|           Fallout 4|     play|          87.0|
|151603712|               Spore| purchase|           1.0|
|151603712|               Spore|     play|          14.9|
|151603712|   Fallout New Vegas| purchase|           1.0|
|151603712|   Fallout New Vegas|     play|          12.1|
|151603712|       Left 4 Dead 2| purchase|           1.0|
|151603712|       Left 4 Dead 2|     play|           8.9|
|151603712|            HuniePop| purchase|           1.0|
|151603712|            HuniePop|     play|           8.5|
|151603712|       Path of Exile| purchase|           1.0|
|151603712|       Path of Exile|     play|           8.1|
|151603712|   

In [0]:
print("Number of rows:", SteamDf.count())
print("Number of columns:", len(SteamDf.columns))

Number of rows: 200000
Number of columns: 4


In [0]:
SteamDf.summary().display()

summary,UserId,GameName,Behaviour,PurchaseorPlay
count,200000.0,200000,200000,200000.0
mean,103655865.94664,140.0,,17.874384000420385
stddev,72080735.12913968,0.0,,138.05695165082415
min,5250.0,007 Legends,play,0.1
25%,47384202.0,140.0,,1.0
50%,86912006.0,140.0,,1.0
75%,154230933.0,140.0,,1.3
max,309903146.0,theHunter Primal,purchase,11754.0


Databricks visualization. Run in Databricks to view.

In [0]:
from pyspark.sql.functions import col

columns_with_null = [col_name for col_name in SteamDf.columns if SteamDf.filter(col(col_name).isNull()).count() > 0]

print("Columns with null values:", columns_with_null)


Columns with null values: []


In [0]:
SteamDf= SteamDf.dropna()
SteamDf.show(100)

+---------+--------------------+---------+--------------+
|   UserId|            GameName|Behaviour|PurchaseorPlay|
+---------+--------------------+---------+--------------+
|151603712|The Elder Scrolls...| purchase|           1.0|
|151603712|The Elder Scrolls...|     play|         273.0|
|151603712|           Fallout 4| purchase|           1.0|
|151603712|           Fallout 4|     play|          87.0|
|151603712|               Spore| purchase|           1.0|
|151603712|               Spore|     play|          14.9|
|151603712|   Fallout New Vegas| purchase|           1.0|
|151603712|   Fallout New Vegas|     play|          12.1|
|151603712|       Left 4 Dead 2| purchase|           1.0|
|151603712|       Left 4 Dead 2|     play|           8.9|
|151603712|            HuniePop| purchase|           1.0|
|151603712|            HuniePop|     play|           8.5|
|151603712|       Path of Exile| purchase|           1.0|
|151603712|       Path of Exile|     play|           8.1|
|151603712|   

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
unique_games = SteamDf.select("GameName").distinct()
game_id_mapping = unique_games.withColumn("GameID", monotonically_increasing_id())
NewSteamDF = SteamDf.join(game_id_mapping, on="GameName", how="left")

In [0]:
NewSteamDF.show()

+--------------------+---------+---------+--------------+------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|
+--------------------+---------+---------+--------------+------+
|The Elder Scrolls...|151603712| purchase|           1.0|  2609|
|The Elder Scrolls...|151603712|     play|         273.0|  2609|
|           Fallout 4|151603712| purchase|           1.0|   410|
|           Fallout 4|151603712|     play|          87.0|   410|
|               Spore|151603712| purchase|           1.0|  3868|
|               Spore|151603712|     play|          14.9|  3868|
|   Fallout New Vegas|151603712| purchase|           1.0|  3820|
|   Fallout New Vegas|151603712|     play|          12.1|  3820|
|       Left 4 Dead 2|151603712| purchase|           1.0|    69|
|       Left 4 Dead 2|151603712|     play|           8.9|    69|
|            HuniePop|151603712| purchase|           1.0|  3340|
|            HuniePop|151603712|     play|           8.5|  3340|
|       Path of Exile|151

In [0]:
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.types import IntegerType
unique_games = SteamDf.select("GameName").distinct()
game_id_mapping = unique_games.withColumn("GameID", monotonically_increasing_id())
NewSteamDF = SteamDf.join(game_id_mapping, on="GameName", how="left")
NewSteamDF = NewSteamDF.withColumn("GameID", NewSteamDF["GameID"].cast(IntegerType()))

NewSteamDF.show()


+--------------------+---------+---------+--------------+------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|
+--------------------+---------+---------+--------------+------+
|The Elder Scrolls...|151603712| purchase|           1.0|  2609|
|The Elder Scrolls...|151603712|     play|         273.0|  2609|
|           Fallout 4|151603712| purchase|           1.0|   410|
|           Fallout 4|151603712|     play|          87.0|   410|
|               Spore|151603712| purchase|           1.0|  3868|
|               Spore|151603712|     play|          14.9|  3868|
|   Fallout New Vegas|151603712| purchase|           1.0|  3820|
|   Fallout New Vegas|151603712|     play|          12.1|  3820|
|       Left 4 Dead 2|151603712| purchase|           1.0|    69|
|       Left 4 Dead 2|151603712|     play|           8.9|    69|
|            HuniePop|151603712| purchase|           1.0|  3340|
|            HuniePop|151603712|     play|           8.5|  3340|
|       Path of Exile|151

In [0]:
from pyspark.ml.feature import RFormula
formula = "Faulle_detected ~ ."
preprocess = RFormula(formula=formula)
preprocessed_data = preprocess.fit(NewSteamDF).transform(NewSteamDF)
preprocessed_data.show()


2024/05/01 18:59:16 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'b5a9e9f2622c48279ef248937c3a1233', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


+--------------------+---------+---------+--------------+------+--------------------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|            features|
+--------------------+---------+---------+--------------+------+--------------------+
|The Elder Scrolls...|151603712| purchase|           1.0|  2609|(5158,[8,5154,515...|
|The Elder Scrolls...|151603712|     play|         273.0|  2609|(5158,[8,5154,515...|
|           Fallout 4|151603712| purchase|           1.0|   410|(5158,[100,5154,5...|
|           Fallout 4|151603712|     play|          87.0|   410|(5158,[100,5154,5...|
|               Spore|151603712| purchase|           1.0|  3868|(5158,[332,5154,5...|
|               Spore|151603712|     play|          14.9|  3868|(5158,[332,5154,5...|
|   Fallout New Vegas|151603712| purchase|           1.0|  3820|(5158,[29,5154,51...|
|   Fallout New Vegas|151603712|     play|          12.1|  3820|(5158,[29,5154,51...|
|       Left 4 Dead 2|151603712| purchase|           1

In [0]:
NewSteamDFPlay = preprocessed_data.filter(col('Behaviour') == 'play')
NewSteamDFPlay.show(100)

+--------------------+---------+---------+--------------+------+--------------------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|            features|
+--------------------+---------+---------+--------------+------+--------------------+
|The Elder Scrolls...|151603712|     play|         273.0|  2609|(5158,[8,5154,515...|
|           Fallout 4|151603712|     play|          87.0|   410|(5158,[100,5154,5...|
|               Spore|151603712|     play|          14.9|  3868|(5158,[332,5154,5...|
|   Fallout New Vegas|151603712|     play|          12.1|  3820|(5158,[29,5154,51...|
|       Left 4 Dead 2|151603712|     play|           8.9|    69|(5158,[4,5154,515...|
|            HuniePop|151603712|     play|           8.5|  3340|(5158,[867,5154,5...|
|       Path of Exile|151603712|     play|           8.1|  1562|(5158,[39,5154,51...|
|         Poly Bridge|151603712|     play|           7.5|  4116|(5158,[1347,5154,...|
|         Left 4 Dead|151603712|     play|           3

In [0]:
NewSteamDFpurchase = NewSteamDF.filter(col('Behaviour') == 'purchase')
NewSteamDFpurchase.show(100)

+--------------------+---------+---------+--------------+------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|
+--------------------+---------+---------+--------------+------+
|The Elder Scrolls...|151603712| purchase|           1.0|  2609|
|           Fallout 4|151603712| purchase|           1.0|   410|
|               Spore|151603712| purchase|           1.0|  3868|
|   Fallout New Vegas|151603712| purchase|           1.0|  3820|
|       Left 4 Dead 2|151603712| purchase|           1.0|    69|
|            HuniePop|151603712| purchase|           1.0|  3340|
|       Path of Exile|151603712| purchase|           1.0|  1562|
|         Poly Bridge|151603712| purchase|           1.0|  4116|
|         Left 4 Dead|151603712| purchase|           1.0|  1992|
|     Team Fortress 2|151603712| purchase|           1.0|  3893|
|         Tomb Raider|151603712| purchase|           1.0|  1754|
|     The Banner Saga|151603712| purchase|           1.0|  3341|
|Dead Island Epidemic|151

In [0]:
print(NewSteamDFPlay.count())
print(len(NewSteamDFPlay.columns))

70489
5


In [0]:
(training, test)=NewSteamDFPlay.randomSplit([0.7,0.3], seed=100)

In [0]:
(training, test)=NewSteamDF1.randomSplit([0.8,0.2], seed=100)

In [0]:
training.show()

+--------------------+---------+---------+--------------+------+--------------------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|            features|
+--------------------+---------+---------+--------------+------+--------------------+
|         007 Legends| 46055854|     play|           0.7|   403|(5158,[3737,5154,...|
|           0RBITALIS| 86055705|     play|           0.3|  2726|(5158,[2613,5154,...|
|1... 2... 3... KI...| 49893565|     play|           2.4|   657|(5158,[1937,5154,...|
|1... 2... 3... KI...| 78560022|     play|           0.2|   657|(5158,[1937,5154,...|
|          10,000,000| 33865373|     play|           3.6|  3112|(5158,[3738,5154,...|
|   100% Orange Juice| 48028873|     play|           1.8|   275|(5158,[1518,5154,...|
|   100% Orange Juice| 88525821|     play|           0.9|   275|(5158,[1518,5154,...|
|   100% Orange Juice| 93644606|     play|           3.1|   275|(5158,[1518,5154,...|
|   100% Orange Juice|125017535|     play|           0

In [0]:
test.show()

+--------------------+---------+---------+--------------+------+--------------------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|            features|
+--------------------+---------+---------+--------------+------+--------------------+
|1... 2... 3... KI...| 65117175|     play|           1.2|   657|(5158,[1937,5154,...|
|   100% Orange Juice|197328486|     play|          35.0|   275|(5158,[1518,5154,...|
|           1000 Amps| 55426012|     play|           0.1|  1349|(5158,[3360,5154,...|
|12 Labours of Her...| 73835640|     play|           6.0|   390|(5158,[1622,5154,...|
|12 Labours of Her...|123974970|     play|           5.2|   390|(5158,[1622,5154,...|
|12 Labours of Her...|146999982|     play|           0.2|  1434|(5158,[1466,5154,...|
|12 Labours of Her...|147602462|     play|          14.7|  1434|(5158,[1466,5154,...|
|                 140| 30246419|     play|           1.2|  1353|(5158,[1565,5154,...|
|        16bit Trader| 92914917|     play|           4

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
als = ALS(maxIter=10, regParam=0.01, userCol="UserId", itemCol="GameID", ratingCol="PurchaseorPlay", seed=100) 
model = als.fit(training)


predictions = model.transform(test).dropna()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="PurchaseorPlay", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) =", rmse)


2024/05/01 19:10:29 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'f1385c80616242a59eb49430954f2e9b', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


Root Mean Squared Error (RMSE) = 491.7520050932785


In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
als = ALS(maxIter=30, regParam=0.06, userCol="UserId", itemCol="GameID", ratingCol="PurchaseorPlay", seed=100)
model = als.fit(training)

predictions = model.transform(test).dropna()

evaluator = RegressionEvaluator(metricName="rmse", labelCol="PurchaseorPlay", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

print("Root Mean Squared Error (RMSE) =", rmse)


2024/05/01 16:53:13 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '85382122ab864ded9871926a64ef86d0', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


Root Mean Squared Error (RMSE) = 374.5584719449089


In [0]:
from pyspark.ml.recommendation import ALS
 
als = ALS(maxIter=50, regParam=0.8, userCol="UserId", itemCol="GameID", ratingCol="PurchaseorPlay",seed=100)
 
model = als.fit(training)


2024/05/01 19:16:50 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '8a40bd830307479fb81c22c9530ab100', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:
predictions = model.transform(test).dropna()

predictions.show()

+--------------------+---------+---------+--------------+------+--------------------+-----------+
|            GameName|   UserId|Behaviour|PurchaseorPlay|GameID|            features| prediction|
+--------------------+---------+---------+--------------+------+--------------------+-----------+
|1... 2... 3... KI...| 65117175|     play|           1.2|   657|(5158,[1937,5154,...|  3.9245467|
|   100% Orange Juice|197328486|     play|          35.0|   275|(5158,[1518,5154,...|    70.7795|
|12 Labours of Her...| 73835640|     play|           6.0|   390|(5158,[1622,5154,...|   8.059372|
|12 Labours of Her...|123974970|     play|           5.2|   390|(5158,[1622,5154,...|  4.0123687|
|12 Labours of Her...|146999982|     play|           0.2|  1434|(5158,[1466,5154,...| 0.67995524|
|12 Labours of Her...|147602462|     play|          14.7|  1434|(5158,[1466,5154,...| -1.6875671|
|                 140| 30246419|     play|           1.2|  1353|(5158,[1565,5154,...|0.095413126|
|        16bit Trade

In [0]:
from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName="rmse", labelCol="PurchaseorPlay", predictionCol="prediction")
 
rmse = evaluator.evaluate(predictions)
 
print('Root Mean Square Error is %g' %rmse)

Root Mean Square Error is 223.595


In [0]:
from pyspark.ml.tuning import ParamGridBuilder
 
als = ALS(maxIter=5, userCol="UserId", itemCol="GameID", ratingCol="PurchaseorPlay", coldStartStrategy="drop", seed=100)
 
# Create a parameter grid
 
parameters = ParamGridBuilder()\
.addGrid(als.rank, [5, 10, 15, 20, 25])\
.addGrid(als.regParam, [0.001, 0.005, 0.01, 0.05, 0.1, 0.8, 0.6])\
.build()

In [0]:
from pyspark.ml.tuning import TrainValidationSplit
 
# Define TrainValidationSplit
 
tvs = TrainValidationSplit()\
.setSeed(100)\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(parameters)\
.setEstimator(als)\
.setEvaluator(evaluator)

In [0]:
gridsearchModel = tvs.fit(training)

2024/05/01 19:46:09 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'ddb50dea575246528574402b5260924c', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:

bestModel = gridsearchModel.bestModel
 
print("Parameters for the best model:")
print("Rank Parameter: %g" %bestModel.rank)
print("RegParam Parameter: %g" %bestModel._java_obj.parent().getRegParam())

Parameters for the best model:
Rank Parameter: 25
RegParam Parameter: 0.8


In [0]:
evaluator.evaluate(bestModel.transform(test))

216.55362171983433

In [0]:
def recommendationsByUser(UserId):
    user_predictions = predictions.filter(predictions.UserId == UserId)\
                                   .orderBy("prediction", ascending=False)\
                                   .limit(4)
    
    user_predictions.show(truncate=False)
    return None


In [0]:

recommendationsByUser(94851051)


+-------------------------------+--------+---------+--------------+------+-----------------------------------------------------------------------+-----------+
|GameName                       |UserId  |Behaviour|PurchaseorPlay|GameID|features                                                               |prediction |
+-------------------------------+--------+---------+--------------+------+-----------------------------------------------------------------------+-----------+
|Mount & Blade Warband          |94851051|play     |9.4           |225   |(5158,[153,5154,5156,5157],[1.0,9.4851051E7,9.399999618530273,225.0])  |11763.4375 |
|Counter-Strike Global Offensive|94851051|play     |36.0          |2353  |(5158,[2,5154,5156,5157],[1.0,9.4851051E7,36.0,2353.0])                |976.2047   |
|Rocketbirds Hardboiled Chicken |94851051|play     |0.6           |3518  |(5158,[901,5154,5156,5157],[1.0,9.4851051E7,0.6000000238418579,3518.0])|-0.21311611|
|PAYDAY 2                       |94851051|play

In [0]:
userRecs=bestModel.recommendForAllUsers(6)

In [0]:
userRecs.show(truncate= False)

+--------+---------------------------------------------------------------------------------------------------------------------------+
|UserId  |recommendations                                                                                                            |
+--------+---------------------------------------------------------------------------------------------------------------------------+
|76767   |[{232, 600.7385}, {2294, 501.44836}, {3772, 464.95563}, {351, 434.5953}, {3689, 361.40973}, {2202, 354.3112}]              |
|144736  |[{4921, 0.119978756}, {3689, 0.099962875}, {3772, 0.08034055}, {232, 0.069678985}, {2294, 0.064843826}, {351, 0.061335757}]|
|229911  |[{4677, 533.5479}, {2803, 428.9477}, {232, 331.46594}, {3697, 327.3155}, {351, 313.9445}, {3202, 311.48126}]               |
|948368  |[{4921, 5.2478485}, {1223, 2.030438}, {1251, 1.3153516}, {1506, 1.0775161}, {953, 1.0424814}, {225, 0.9495074}]            |
|975449  |[{2084, 352.73962}, {3893, 339.43716}, {3086,

In [0]:
from pyspark.sql.functions import col, explode, max
exploded_userRecs = userRecs.select("UserId", explode("recommendations").alias("recommendation"))
best_game_for_each_user = exploded_userRecs \
    .groupBy("UserId") \
    .agg(max(col("recommendation.rating")).alias("max_rating")) \
    .join(exploded_userRecs, ["UserId"]) \
    .filter(col("recommendation.rating") == col("max_rating")) \
    .select("UserId", "recommendation.GameID", "max_rating")
best_game_for_each_user.show(truncate=False)

+--------+------+-----------+
|UserId  |GameID|max_rating |
+--------+------+-----------+
|76767   |232   |600.7385   |
|144736  |4921  |0.119978756|
|229911  |4677  |533.5479   |
|948368  |4921  |5.2478485  |
|975449  |2084  |352.73962  |
|1268792 |4921  |0.59989375 |
|2531540 |2803  |68.948166  |
|2753525 |3772  |614.67755  |
|3450426 |225   |16.138563  |
|7923954 |4921  |0.35993633 |
|8259307 |4921  |72.04369   |
|8567888 |3697  |1.7248974  |
|8585433 |4921  |5068.9673  |
|8784496 |4921  |248.79178  |
|8795607 |3176  |223.29654  |
|10595342|4921  |740.62885  |
|10599862|3772  |1300.928   |
|11807754|4921  |2865.6409  |
|11973378|3697  |1.0614755  |
|15095770|3697  |10.738093  |
+--------+------+-----------+
only showing top 20 rows

