In [0]:
dbutils.fs.ls('/FileStore/tables/')

[FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.csv', name='clinicaltrial_2020.csv', size=46318151, modificationTime=1712651175000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.zip', name='clinicaltrial_2020.zip', size=10599182, modificationTime=1712269265000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificationTime=1712651179000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.zip', name='clinicaltrial_2021.zip', size=11508457, modificationTime=1712269249000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2023.csv', name='clinicaltrial_2023.csv', size=292436366, modificationTime=1712651196000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2023.zip', name='clinicaltrial_2023.zip', size=57166668, modificationTime=1712269213000),
 FileInfo(path='dbfs:/FileStore/tables/pharma.csv', name='pharma.csv', size=678999, modificationTime=1712651199000),
 FileInfo(path='dbfs:/Fi

In [0]:
# Step 1: import libraries
# import MLflow and autologML runs

import mlflow
mlflow.pyspark.ml.autolog()


In [0]:
# load and read the data into spark DataFrame :observe the schema

steam_df = spark.read.csv('/FileStore/tables/steam_200k_.csv',
                             header = 'False',
                             inferSchema = 'True')
display(steam_df)

_c0,_c1,_c2,_c3
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


In [0]:
# Add column names
columns = ["member_id", "game_names", "behavior", "hours_played"]
steam_df = steam_df.toDF(*columns)

In [0]:
# print the schema- data structure with new col names
steam_df.printSchema()

root
 |-- member_id: integer (nullable = true)
 |-- game_names: string (nullable = true)
 |-- behavior: string (nullable = true)
 |-- hours_played: double (nullable = true)



In [0]:
# Show first few rows
steam_df.show(10)

+---------+--------------------+--------+------------+
|member_id|          game_names|behavior|hours_played|
+---------+--------------------+--------+------------+
|151603712|The Elder Scrolls...|purchase|         1.0|
|151603712|The Elder Scrolls...|    play|       273.0|
|151603712|           Fallout 4|purchase|         1.0|
|151603712|           Fallout 4|    play|        87.0|
|151603712|               Spore|purchase|         1.0|
|151603712|               Spore|    play|        14.9|
|151603712|   Fallout New Vegas|purchase|         1.0|
|151603712|   Fallout New Vegas|    play|        12.1|
|151603712|       Left 4 Dead 2|purchase|         1.0|
|151603712|       Left 4 Dead 2|    play|         8.9|
+---------+--------------------+--------+------------+
only showing top 10 rows



In [0]:
# Check unique behaviors list
steam_df.select("behavior").distinct().show()

+--------+
|behavior|
+--------+
|purchase|
|    play|
+--------+



In [0]:
# Check unique games name list

steam_df.select("game_names").distinct().show(truncate=False)

+-------------------------------------+
|game_names                           |
+-------------------------------------+
|Left 4 Dead 2                        |
|Fallout 4                            |
|Dragon Age Origins - Ultimate Edition|
|Path of Exile                        |
|Tomb Raider                          |
|Left 4 Dead                          |
|Realm of the Mad God                 |
|BioShock Infinite                    |
|The Elder Scrolls V Skyrim           |
|SEGA Genesis & Mega Drive Classics   |
|HuniePop                             |
|The Banner Saga                      |
|Grand Theft Auto IV                  |
|Dead Island Epidemic                 |
|Marvel Heroes 2015                   |
|Eldevin                              |
|Fallout New Vegas                    |
|Spore                                |
|Team Fortress 2                      |
|Fallout 3 - Game of the Year Edition |
+-------------------------------------+
only showing top 20 rows



In [0]:
# Check missing values

print("Missing values count:")
for col in steam_df.columns:
    print(col, steam_df.filter(steam_df[col].isNull()).count())

Missing values count:
member_id 0
game_names 0
behavior 0
hours_played 0


In [0]:
# remove duplicates row
steam_df.dropDuplicates(['member_id', 'game_names','behavior']).show(truncate=False)


+---------+------------------------------------------------------------+--------+------------+
|member_id|game_names                                                  |behavior|hours_played|
+---------+------------------------------------------------------------+--------+------------+
|53875128 |Sid Meier's Civilization III Complete                       |play    |9.8         |
|53875128 |Sir, You Are Being Hunted                                   |play    |0.8         |
|53875128 |Hammerfight                                                 |play    |0.4         |
|297811211|Team Fortress 2                                             |play    |10.1        |
|97298878 |Commando Jack                                               |purchase|1.0         |
|65117175 |Machinarium                                                 |purchase|1.0         |
|65117175 |Alan Wake's American Nightmare                              |purchase|1.0         |
|236557903|Counter-Strike Nexon Zombies           

In [0]:
# count the distinct rows after the duplicates

steam_df.dropDuplicates(['member_id', 'game_names','behavior']).count()

199281

In [0]:
# Data Analysis :1
# Unique user counts and unique games counts in the steam dataset

print("Unique-users: ", steam_df.select("member_id").distinct().count())
print('Unique - Number of Games:', steam_df.select("game_names").distinct().count())
print('Unique -behaviour :',steam_df.select('behavior').distinct().count())

print("Unique Rows :" , steam_df.distinct().count())

Unique-users:  12393
Unique - Number of Games: 5155
Unique -behaviour : 2
Unique Rows : 199293


In [0]:
# Data Analysis 2: Total number of purchased games which has been played

steam_df.select("game_names", "behavior", "hours_played").filter(steam_df["behavior"] == "play").count()

70489

In [0]:
# #Analysis 3: Most popular- playing games

steam_df.select("game_names", "behavior", "hours_played").filter(steam_df["behavior"] == "play").groupBy("game_names").count().orderBy('count', ascending=False).show(truncate=False)

+-------------------------------+-----+
|game_names                     |count|
+-------------------------------+-----+
|Dota 2                         |4841 |
|Team Fortress 2                |2323 |
|Counter-Strike Global Offensive|1377 |
|Unturned                       |1069 |
|Left 4 Dead 2                  |801  |
|Counter-Strike Source          |715  |
|The Elder Scrolls V Skyrim     |677  |
|Garry's Mod                    |666  |
|Counter-Strike                 |568  |
|Sid Meier's Civilization V     |554  |
|Terraria                       |460  |
|Portal 2                       |453  |
|Warframe                       |424  |
|Portal                         |417  |
|Robocraft                      |407  |
|PAYDAY 2                       |390  |
|Borderlands 2                  |386  |
|Half-Life 2                    |356  |
|Heroes & Generals              |335  |
|War Thunder                    |303  |
+-------------------------------+-----+
only showing top 20 rows



In [0]:
 # Analysis 4: popular- purchase games

steam_df.select("game_names", "behavior", "hours_played").filter(steam_df["behavior"] == "purchase").groupBy("game_names").count().orderBy('count', ascending=False).show(truncate=False)

+--------------------------------------------+-----+
|game_names                                  |count|
+--------------------------------------------+-----+
|Dota 2                                      |4841 |
|Team Fortress 2                             |2323 |
|Unturned                                    |1563 |
|Counter-Strike Global Offensive             |1412 |
|Half-Life 2 Lost Coast                      |981  |
|Counter-Strike Source                       |978  |
|Left 4 Dead 2                               |951  |
|Counter-Strike                              |856  |
|Warframe                                    |847  |
|Half-Life 2 Deathmatch                      |823  |
|Garry's Mod                                 |731  |
|The Elder Scrolls V Skyrim                  |717  |
|Robocraft                                   |689  |
|Counter-Strike Condition Zero Deleted Scenes|679  |
|Counter-Strike Condition Zero               |679  |
|Heroes & Generals                           |

In [0]:
# Data Analysis 5 : groupby member_id wise and take the counts
steam_df .groupBy('member_id').count().display(20)

member_id,count
16167221,57
166705920,11
244878837,2
99992274,2
174415183,10
156156544,2
152861732,34
171911285,2
128412180,2
74557142,4


In [0]:
# create SQL view in the name:steamView from steam dataset

steam_df.createOrReplaceTempView('steamView')

In [0]:
%sql
--SQL view
SELECT *
FROM steamView

member_id,game_names,behavior,hours_played
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


In [0]:
%sql

SELECT behavior,count(*)
FROM steamView
GROUP BY behavior
ORDER BY count(*) DESC

behavior,count(1)
purchase,129511
play,70489


Databricks visualization. Run in Databricks to view.

In [0]:
# Data Visualization :1 - Top 10 games with most number of users

steam_df.groupBy('game_names').count().orderBy('count',ascending =False).limit(10).display()

game_names,count
Dota 2,9682
Team Fortress 2,4646
Counter-Strike Global Offensive,2789
Unturned,2632
Left 4 Dead 2,1752
Counter-Strike Source,1693
Counter-Strike,1424
Garry's Mod,1397
The Elder Scrolls V Skyrim,1394
Warframe,1271


Databricks visualization. Run in Databricks to view.

In [0]:
# Data visualization : 2 - Most popular games by behavior play wise
steam_df.select("game_names", "behavior", "hours_played").filter(steam_df["behavior"] == "play").groupBy("game_names").count().orderBy('count', ascending=False).limit(5).display()

game_names,count
Dota 2,4841
Team Fortress 2,2323
Counter-Strike Global Offensive,1377
Unturned,1069
Left 4 Dead 2,801


Databricks visualization. Run in Databricks to view.

In [0]:
# Data visualization : 3 - Most popular games by behavior purchase wise
steam_df.select("game_names", "behavior", "hours_played").filter(steam_df["behavior"] == "purchase").groupBy("game_names").count().orderBy('count', ascending=False).limit(5).display()

game_names,count
Dota 2,4841
Team Fortress 2,2323
Unturned,1563
Counter-Strike Global Offensive,1412
Half-Life 2 Lost Coast,981


Databricks visualization. Run in Databricks to view.

In [0]:
# Step 2: Pre-process the data and generate unique integer IDs for games
# Generate unique integer IDs for games
# steam_df = steam_df.withColumn("game_id", monotonically_increasing_id())

from pyspark.sql.functions import dense_rank
from pyspark.sql.window import Window

# Generate unique integer IDs for games
windowSpec = Window.orderBy("game_names")
steam_df = steam_df.withColumn("game_id", dense_rank().over(windowSpec))

# Check the updated DataFrame
steam_df.show(truncate = False)


+---------+----------------------------------------------------------+--------+------------+-------+
|member_id|game_names                                                |behavior|hours_played|game_id|
+---------+----------------------------------------------------------+--------+------------+-------+
|46055854 |007 Legends                                               |purchase|1.0         |1      |
|46055854 |007 Legends                                               |play    |0.7         |1      |
|86055705 |0RBITALIS                                                 |purchase|1.0         |2      |
|86055705 |0RBITALIS                                                 |play    |0.3         |2      |
|11940338 |0RBITALIS                                                 |play    |0.6         |2      |
|11940338 |0RBITALIS                                                 |purchase|1.0         |2      |
|93030550 |0RBITALIS                                                 |purchase|1.0         

In [0]:
#steam_df = steam_df.filter(steam_df["behavior"] == "play")
#steam_df.show(50)

In [0]:
# Step 3: Train a collaborative filtering recommender system using MLlib
# Import MLlib necessarylibraries

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

In [0]:

trainingDF,testDF = steam_df.randomSplit([0.75, 0.25])
trainingDF.show()

+---------+--------------------+--------+------------+-------+
|member_id|          game_names|behavior|hours_played|game_id|
+---------+--------------------+--------+------------+-------+
|     5250|         Alien Swarm|    play|         4.9|    228|
|     5250|         Alien Swarm|purchase|         1.0|    228|
|     5250|     Cities Skylines|    play|       144.0|    853|
|     5250|     Cities Skylines|purchase|         1.0|    853|
|     5250|      Counter-Strike|purchase|         1.0|    979|
|     5250|Counter-Strike So...|purchase|         1.0|    985|
|     5250|  Deathmatch Classic|purchase|         1.0|   1180|
|     5250|Deus Ex Human Rev...|    play|        62.0|   1249|
|     5250|Deus Ex Human Rev...|purchase|         1.0|   1249|
|     5250|              Dota 2|    play|         0.2|   1337|
|     5250|              Dota 2|purchase|         1.0|   1337|
|     5250|           Half-Life|purchase|         1.0|   2070|
|     5250|Half-Life 2 Episo...|purchase|         1.0| 

In [0]:
print("Data length :",steam_df.count())
print("trainingDF length :", trainingDF.count())
print("testDF length :", testDF.count())

Data length : 200000
trainingDF length : 150163
testDF length : 49837


In [0]:
# step 4: Training the Model use ALS Algorithm-create usermatrix 

als = ALS(maxIter=10, regParam=0.1,userCol="member_id", itemCol="game_id", ratingCol="hours_played",coldStartStrategy= "drop",
          seed =100)
model = als.fit(trainingDF)

2024/04/10 10:14:03 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '2d6f2b3cdce94d28935c380c4d8aa570', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:
 #step 5: Evaluating the Model :prediction columns based on collabrative filtering

 predictions = model.transform(testDF).dropna()
 predictions.show()

+---------+--------------------+--------+------------+-------+-----------+
|member_id|          game_names|behavior|hours_played|game_id| prediction|
+---------+--------------------+--------+------------+-------+-----------+
|     5250|       Day of Defeat|purchase|         1.0|   1132|  5.0936112|
|     5250|         Half-Life 2|purchase|         1.0|   2071|  11.969165|
|     5250|Half-Life 2 Death...|purchase|         1.0|   2072|   6.352893|
|     5250|Half-Life 2 Lost ...|purchase|         1.0|   2075|  0.8713377|
|     5250|     Team Fortress 2|purchase|         1.0|   4258|  0.6987543|
|    76767|Age of Empires II...|purchase|         1.0|    175|   13.97617|
|    76767|         Alien Swarm|purchase|         1.0|    228|   4.124664|
|    76767|Arma 2 Operation ...|purchase|         1.0|    328| 0.57216555|
|    76767|Call of Duty Blac...|purchase|         1.0|    728|  10.282965|
|    76767|Call of Duty Mode...|purchase|         1.0|    741|  22.840004|
|    76767|      Counter-

In [0]:
# step 6 : Regression method for OLS -Ordinary Least Square

from pyspark.ml.evaluation import RegressionEvaluator
evaluator = RegressionEvaluator(metricName = 'rmse',labelCol ='hours_played',predictionCol = 'prediction')
rmse = evaluator.evaluate(predictions)
print('Root Mean Square error is %g' % rmse)

Root Mean Square error is 205.771


In [0]:
# Step 7: Explore some of the resulting recommendations

# Generate top recommendations for all users
userRecs = model.recommendForAllUsers(5)
userRecs.show(truncate=False)

+---------+-----------------------------------------------------------------------------------------------+
|member_id|recommendations                                                                                |
+---------+-----------------------------------------------------------------------------------------------+
|76767    |[{2020, 1556.9143}, {1762, 1374.5972}, {1766, 1297.9213}, {5042, 1289.0312}, {1560, 1152.7506}]|
|144736   |[{3272, 58.916893}, {2863, 52.217358}, {1761, 44.939846}, {1285, 42.636524}, {1560, 37.75588}] |
|229911   |[{1621, 2713.6855}, {1763, 1817.1494}, {2808, 1786.8007}, {3251, 902.6165}, {1485, 656.949}]   |
|835015   |[{3272, 77.9229}, {1285, 59.776512}, {2863, 55.678192}, {1560, 50.974148}, {1031, 41.767445}]  |
|948368   |[{1761, 135.0363}, {3272, 120.135666}, {330, 102.129425}, {985, 82.95882}, {1031, 60.417202}]  |
|975449   |[{1560, 839.66437}, {1031, 575.05035}, {3272, 493.31848}, {2658, 346.7212}, {1762, 345.79587}] |
|1268792  |[{2863, 85.94329}

In [0]:
userRecs.printSchema()

root
 |-- member_id: integer (nullable = false)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- game_id: integer (nullable = true)
 |    |    |-- rating: float (nullable = true)



In [0]:
# step 8: challenges for ParamGridBuilder

from pyspark.ml.tuning import ParamGridBuilder
 
als = ALS( userCol="member_id", itemCol="game_id", ratingCol="hours_played", coldStartStrategy="drop", seed=100)
 
# Create a parameter grid
 
parameters = ParamGridBuilder() \
            .addGrid(als.rank, [10, 20, 30]) \
            .addGrid(als.regParam, [.05, .1, .15]) \
            .addGrid(als.maxIter, [10,20])  \
            .build()

In [0]:
from pyspark.ml.tuning import TrainValidationSplit
 
# Define TrainValidationSplit
 
tvs = TrainValidationSplit()\
.setSeed(100)\
.setTrainRatio(0.75)\
.setEstimatorParamMaps(parameters)\
.setEstimator(als)\
.setEvaluator(evaluator)

In [0]:
gridsearchModel = tvs.fit(trainingDF)

2024/04/10 10:15:02 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '36f08513a1e24598b76ebe753d2dc21a', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


In [0]:
# step 9: Hyperparameter Tuning


# Select best model and identify the parameters
 
bestModel = gridsearchModel.bestModel
 
print("Parameters for the best model:")
print("Rank Parameter: %g" %bestModel.rank)
print("RegParam Parameter: %g" %bestModel._java_obj.parent().getRegParam())


Parameters for the best model:
Rank Parameter: 30
RegParam Parameter: 0.15


In [0]:
evaluator.evaluate(bestModel.transform(testDF))

188.72269838812213

In [0]:
# generate top 10 games recommendation for specific user 

user = steam_df.select('member_id').distinct().limit(5)
user.show()

+---------+
|member_id|
+---------+
| 59945701|
|151603712|
|187131847|
| 53875128|
|234941318|
+---------+



In [0]:
# Testing the recommendation system on a single user
single_user = steam_df.filter(steam_df["member_id"] == 53875128)
single_recommendation = bestModel.transform(single_user)
single_recommendation.show(truncate =False)

+---------+--------------------------------------------+--------+------------+-------+----------+
|member_id|game_names                                  |behavior|hours_played|game_id|prediction|
+---------+--------------------------------------------+--------+------------+-------+----------+
|53875128 |140                                         |purchase|1.0         |11     |0.8952969 |
|53875128 |404Sight                                    |purchase|1.0         |36     |0.7531686 |
|53875128 |404Sight                                    |play    |0.5         |36     |0.7531686 |
|53875128 |A Story About My Uncle                      |purchase|1.0         |61     |0.96282905|
|53875128 |AaaaaAAaaaAAAaaAAAAaAAAAA!!! for the Awesome|purchase|1.0         |91     |0.6716336 |
|53875128 |AaaaaAAaaaAAAaaAAAAaAAAAA!!! for the Awesome|play    |0.4         |91     |0.6716336 |
|53875128 |Afterlife Empire                            |purchase|1.0         |124    |0.5940967 |
|53875128 |Afterlife