## Game Recommendation using ML - Collaborative filtering - Implicit Feedback system

In [0]:
from pyspark.sql.types import *
import re
from pyspark.sql.functions import *
from pyspark.sql.window import *
import pandas as pd
import matplotlib.pyplot as plt
pd.options.plotting.backend = "plotly"
import plotly.express as px
import mlflow
mlflow.pyspark.ml.autolog()
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
dbutils.fs.ls("FileStore/tables")

Out[52]: [FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.csv', name='clinicaltrial_2020.csv', size=46318151, modificationTime=1709758602000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2020.zip', name='clinicaltrial_2020.zip', size=10599182, modificationTime=1709758582000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.csv', name='clinicaltrial_2021.csv', size=50359696, modificationTime=1712329235000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2021.zip', name='clinicaltrial_2021.zip', size=11508457, modificationTime=1709757022000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2023-1.zip', name='clinicaltrial_2023-1.zip', size=57166668, modificationTime=1710259254000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2023.csv', name='clinicaltrial_2023.csv', size=292436366, modificationTime=1712319782000),
 FileInfo(path='dbfs:/FileStore/tables/clinicaltrial_2023.zip', name='clinicaltrial_2023.zip', size=57166668, modificationTime=1

## Data Acquisition

In [0]:
df_stream = spark.read.csv("/FileStore/tables/steam_200k.csv", sep=',',header=False, inferSchema=True)
rdd_stream = df_stream.rdd
rdd_stream1 = rdd_stream.map(lambda x : x).collect()
RDD_Games_Unique_ID = rdd_stream.map(lambda x : x[1])\
                           .distinct()\
                           .zipWithIndex()\
                           .collect()

In [0]:
df_stream.head(5)

Out[54]: [Row(_c0=151603712, _c1='The Elder Scrolls V Skyrim', _c2='purchase', _c3=1.0),
 Row(_c0=151603712, _c1='The Elder Scrolls V Skyrim', _c2='play', _c3=273.0),
 Row(_c0=151603712, _c1='Fallout 4', _c2='purchase', _c3=1.0),
 Row(_c0=151603712, _c1='Fallout 4', _c2='play', _c3=87.0),
 Row(_c0=151603712, _c1='Spore', _c2='purchase', _c3=1.0)]

In [0]:
df_stream.tail(5)

Out[55]: [Row(_c0=128470551, _c1='Titan Souls', _c2='play', _c3=1.5),
 Row(_c0=128470551, _c1='Grand Theft Auto Vice City', _c2='purchase', _c3=1.0),
 Row(_c0=128470551, _c1='Grand Theft Auto Vice City', _c2='play', _c3=1.5),
 Row(_c0=128470551, _c1='RUSH', _c2='purchase', _c3=1.0),
 Row(_c0=128470551, _c1='RUSH', _c2='play', _c3=1.4)]

#Creating DataFrame and assigning unique Game ID

In [0]:
Schema = StructType([
        StructField("UserID", IntegerType()),
        StructField("GameName", StringType()),
        StructField("Behaviour", StringType()),
        StructField("Hours", FloatType())
])
RDD_Stream = rdd_stream.map(lambda x : x)
#display(RDD_Stream)
Stream_DF = spark.createDataFrame(RDD_Stream,Schema)
# Added a unique ID in dataframe using row_number window function. It ranges from 1 to 200000 i.e., total number of records
window_function = Window.orderBy("GameName")
Stream_DF = Stream_DF.withColumn("Game_ID", dense_rank().over(window_function))
window_function1 = Window.orderBy("UserID")
Stream_DF = Stream_DF.withColumn("UserID_Unique", dense_rank().over(window_function1))

##Exploratory Data Analysis

In [0]:
Stream_DF.describe().display()

summary,UserID,GameName,Behaviour,Hours,Game_ID,UserID_Unique
count,200000.0,200000,200000,200000.0,200000.0,200000.0
mean,103655865.94664,140.0,,17.874384000420385,2490.26983,3663.387275
stddev,72080735.12913981,0.0,,138.05695165082625,1473.1175655862046,3127.408514708799
min,5250.0,007 Legends,play,0.1,1.0,1.0
max,309903146.0,theHunter Primal,purchase,11754.0,5155.0,12393.0


In [0]:
Stream_DF.head(5)

Out[58]: [Row(UserID=5250, GameName='Cities Skylines', Behaviour='purchase', Hours=1.0, Game_ID=853, UserID_Unique=1),
 Row(UserID=5250, GameName='Day of Defeat', Behaviour='purchase', Hours=1.0, Game_ID=1132, UserID_Unique=1),
 Row(UserID=5250, GameName='Cities Skylines', Behaviour='play', Hours=144.0, Game_ID=853, UserID_Unique=1),
 Row(UserID=5250, GameName='Alien Swarm', Behaviour='play', Hours=4.900000095367432, Game_ID=228, UserID_Unique=1),
 Row(UserID=5250, GameName='Counter-Strike', Behaviour='purchase', Hours=1.0, Game_ID=979, UserID_Unique=1)]

In [0]:
Stream_DF.tail(5)

Out[59]: [Row(UserID=309812026, GameName='Robocraft', Behaviour='purchase', Hours=1.0, Game_ID=3563, UserID_Unique=12391),
 Row(UserID=309824202, GameName='Dota 2', Behaviour='purchase', Hours=1.0, Game_ID=1337, UserID_Unique=12392),
 Row(UserID=309824202, GameName='Dota 2', Behaviour='play', Hours=0.699999988079071, Game_ID=1337, UserID_Unique=12392),
 Row(UserID=309903146, GameName='Dota 2', Behaviour='purchase', Hours=1.0, Game_ID=1337, UserID_Unique=12393),
 Row(UserID=309903146, GameName='Dota 2', Behaviour='play', Hours=0.20000000298023224, Game_ID=1337, UserID_Unique=12393)]

In [0]:
Stream_Record_Count = rdd_stream.count()
RDD_Distinct_Members = rdd_stream.map(lambda x : x[0])\
                           .distinct()\
                           .count()
RDD_Distinct_Games = rdd_stream.map(lambda x : x[1])\
                           .distinct()\
                           .zipWithIndex()\
                           .count()
RDD_Play = rdd_stream.map(lambda x : (x[0] , x[2]))\
                     .filter(lambda x : x[1].lower().strip()=='play')\
                     .count()
RDD_Purchase = rdd_stream.map(lambda x : (x[0] , x[2]))\
                     .filter(lambda x : x[1].lower().strip()=='purchase')\
                     .count()
RDD_Play_Members = rdd_stream.map(lambda x : (x[0] , x[2]))\
                     .filter(lambda x : x[1].lower().strip()=='play')\
                     .map(lambda x : x[0])\
                     .distinct()\
                     .collect()
RDD_Purchase_Members = rdd_stream.map(lambda x : (x[0] , x[2]))\
                     .filter(lambda x : x[1].lower().strip()=='purchase')\
                     .map(lambda x : x[0])\
                     .distinct()\
                     .collect()
RDD_play_Mem = sc.parallelize(RDD_Play_Members)
RDD_purchase_Mem = sc.parallelize(RDD_Purchase_Members)
common_members = RDD_play_Mem.union(RDD_purchase_Mem).count()
Only_purchase = RDD_purchase_Mem.subtract(RDD_play_Mem).count()
print("No.of Record count is:", Stream_Record_Count)
print("Distinct Members in file is:", RDD_Distinct_Members)
print("Distinct games in file is: ", RDD_Distinct_Games)
print("Total no.of play is :", RDD_Play)
print("Total no.of purchase is :", RDD_Purchase)

No.of Record count is: 200000
Distinct Members in file is: 12393
Distinct games in file is:  5155
Total no.of play is : 70489
Total no.of purchase is : 129511


## Analysis using Raw file - without Normalization

In [0]:
# Most played game based on users
Most_Played_Game = Stream_DF.filter(col("Behaviour")=="play").select("UserID_Unique","UserID","Game_ID","GameName","Hours").groupBy("Game_ID","GameName").agg(count("Hours").alias("Played_Count"),round(sum("Hours")).cast(FloatType()).cast(IntegerType()).alias("Sum_Hours_Played"))

In [0]:
print("Most played game based on users")
Most_Played_Game.sort('Played_Count',ascending = False).limit(10).show(truncate=False)

Most played game based on users
+-------+-------------------------------+------------+----------------+
|Game_ID|GameName                       |Played_Count|Sum_Hours_Played|
+-------+-------------------------------+------------+----------------+
|1337   |Dota 2                         |4841        |981685          |
|4258   |Team Fortress 2                |2323        |173673          |
|982    |Counter-Strike Global Offensive|1377        |322772          |
|4789   |Unturned                       |1069        |16096           |
|2476   |Left 4 Dead 2                  |801         |33597           |
|985    |Counter-Strike Source          |715         |96075           |
|4365   |The Elder Scrolls V Skyrim     |677         |70889           |
|1895   |Garry's Mod                    |666         |49725           |
|979    |Counter-Strike                 |568         |134261          |
|3826   |Sid Meier's Civilization V     |554         |99821           |
+-------+-----------------------

In [0]:
# Maximum number of hours played based on hours
print("Maximum number of hours played based on hours")
Most_Played_Game.sort('Sum_Hours_Played',ascending = False).limit(10).show(truncate=False)

Maximum number of hours played based on hours
+-------+-------------------------------------------+------------+----------------+
|Game_ID|GameName                                   |Played_Count|Sum_Hours_Played|
+-------+-------------------------------------------+------------+----------------+
|1337   |Dota 2                                     |4841        |981685          |
|982    |Counter-Strike Global Offensive            |1377        |322772          |
|4258   |Team Fortress 2                            |2323        |173673          |
|979    |Counter-Strike                             |568         |134261          |
|3826   |Sid Meier's Civilization V                 |554         |99821           |
|985    |Counter-Strike Source                      |715         |96075           |
|4365   |The Elder Scrolls V Skyrim                 |677         |70889           |
|1895   |Garry's Mod                                |666         |49725           |
|738    |Call of Duty Modern W

In [0]:
# Most purchased game based on users
Most_Played_Game = Stream_DF.filter(col("Behaviour")=="purchase").select("UserID_Unique","UserID","Game_ID","GameName","Hours").groupBy("Game_ID","GameName").agg(count("Hours").alias("Purchased_Count"),round(sum("Hours")).cast(FloatType()).cast(IntegerType()).alias("Sum_Hours_Played"))
print("Most purchased game based on users")
Most_Played_Game.sort('Purchased_Count',ascending = False).limit(10).show(truncate=False)

Most purchased game based on users
+-------+-------------------------------+---------------+----------------+
|Game_ID|GameName                       |Purchased_Count|Sum_Hours_Played|
+-------+-------------------------------+---------------+----------------+
|1337   |Dota 2                         |4841           |4841            |
|4258   |Team Fortress 2                |2323           |2323            |
|4789   |Unturned                       |1563           |1563            |
|982    |Counter-Strike Global Offensive|1412           |1412            |
|2075   |Half-Life 2 Lost Coast         |981            |981             |
|985    |Counter-Strike Source          |978            |978             |
|2476   |Left 4 Dead 2                  |951            |951             |
|979    |Counter-Strike                 |856            |856             |
|4900   |Warframe                       |847            |847             |
|2072   |Half-Life 2 Deathmatch         |823            |823     

In [0]:
# User played maximum hours based on individual users
User_Played_Max_Hours = Stream_DF.filter(col("Behaviour")=="play").select("UserID_Unique","UserID","Game_ID","GameName","Hours").groupBy("UserID_Unique","UserID","Game_ID","GameName").agg(sum("Hours").cast(FloatType()).alias("Sum_Hours_Played_User"))
print("User played maximum hours based on individual users")
User_Played_Max_Hours.sort('Sum_Hours_Played_User',ascending = False).limit(10).show(truncate=False)

User played maximum hours based on individual users
+-------------+---------+-------+--------------------------+---------------------+
|UserID_Unique|UserID   |Game_ID|GameName                  |Sum_Hours_Played_User|
+-------------+---------+-------+--------------------------+---------------------+
|2086         |73017395 |3826   |Sid Meier's Civilization V|11754.0              |
|3237         |100630947|1337   |Dota 2                    |10442.0              |
|5564         |153382649|4258   |Team Fortress 2           |9640.0               |
|4564         |130882834|1337   |Dota 2                    |7765.0               |
|1356         |52567955 |1337   |Dota 2                    |6964.0               |
|4126         |121199670|1337   |Dota 2                    |6753.0               |
|2603         |86256882 |1337   |Dota 2                    |6015.0               |
|1985         |70487610 |3826   |Sid Meier's Civilization V|6013.0               |
|3270         |101414179|1337   |Do

In [0]:
Stream_DF.show(truncate=False)

+------+------------------------+---------+-----+-------+-------------+
|UserID|GameName                |Behaviour|Hours|Game_ID|UserID_Unique|
+------+------------------------+---------+-----+-------+-------------+
|5250  |Counter-Strike Source   |purchase |1.0  |985    |1            |
|5250  |Team Fortress 2         |play     |0.8  |4258   |1            |
|5250  |Day of Defeat           |purchase |1.0  |1132   |1            |
|5250  |Alien Swarm             |play     |4.9  |228    |1            |
|5250  |Deathmatch Classic      |purchase |1.0  |1180   |1            |
|5250  |Cities Skylines         |play     |144.0|853    |1            |
|5250  |Deus Ex Human Revolution|purchase |1.0  |1249   |1            |
|5250  |Half-Life 2 Episode One |purchase |1.0  |2073   |1            |
|5250  |Half-Life 2 Episode Two |purchase |1.0  |2074   |1            |
|5250  |Alien Swarm             |purchase |1.0  |228    |1            |
|5250  |Half-Life 2 Lost Coast  |purchase |1.0  |2075   |1      

In [0]:
# Before Normalization
Record_Count = Stream_DF.count()
print(f"No. of Records in file is: {Record_Count}")
unique_Games = Stream_DF.select("GameName").distinct().count()
print(f"No. of unique games is: {unique_Games}")
unique_Users = Stream_DF.select("UserID").distinct().count()
print(f"No. of unique games is: {unique_Users}")
duplicate_purchases = Stream_DF.select("UserID","GameName","Behaviour").groupBy("UserID","GameName","Behaviour").agg(count("Behaviour").alias("Count")).filter("Count > 1").count()
print(f"No. of duplicate purchases is: {duplicate_purchases}")
Stream_DF = Stream_DF.select("UserID","UserID_Unique","Game_ID","GameName","Behaviour","Hours").dropDuplicates()
Record_Count = Stream_DF.count()
print(f"No. of Records in file after dropping duplicates is: {Record_Count}")
unique_Games = Stream_DF.select("GameName").distinct().count()
print(f"No. of unique games after deleting duplicates is: {unique_Games}")
unique_Users = Stream_DF.select("UserID").distinct().count()
print(f"No. of unique games after deleting duplicates is: {unique_Users}")

No. of Records in file is: 200000
No. of unique games is: 5155
No. of unique games is: 12393
No. of duplicate purchases is: 719
No. of Records in file after dropping duplicates is: 199293
No. of unique games after deleting duplicates is: 5155
No. of unique games after deleting duplicates is: 12393


## Normalization - Converting Behaviour to Purchase and Play: Assigning Play Hours to corresponding Games

In [0]:
Stream_DF_Normalized = Stream_DF.alias("Stream_DF_Normalized")

In [0]:
Stream_DF_Normalized = Stream_DF_Normalized.withColumn("Play", expr("""case when Behaviour = 'play' then 1 else 0 end""")).withColumn("Purchase", expr("""case when Behaviour = 'purchase' then 1 else 0 end""")).groupBy("UserID_Unique","UserID","Game_ID","GameName").agg(sum("Purchase").alias("Purchase"),sum("Play").alias("Play"),(sum("Hours")-sum("Purchase")).cast(FloatType()).alias("Hours_Played"))
Stream_DF_Normalized.show(truncate=False)

+-------------+------+-------+------------------------+--------+----+------------+
|UserID_Unique|UserID|Game_ID|GameName                |Purchase|Play|Hours_Played|
+-------------+------+-------+------------------------+--------+----+------------+
|1            |5250  |228    |Alien Swarm             |1       |1   |4.9         |
|1            |5250  |853    |Cities Skylines         |1       |1   |144.0       |
|1            |5250  |979    |Counter-Strike          |1       |0   |0.0         |
|1            |5250  |985    |Counter-Strike Source   |1       |0   |0.0         |
|1            |5250  |1132   |Day of Defeat           |1       |0   |0.0         |
|1            |5250  |1180   |Deathmatch Classic      |1       |0   |0.0         |
|1            |5250  |1249   |Deus Ex Human Revolution|1       |1   |62.0        |
|1            |5250  |1337   |Dota 2                  |1       |1   |0.2         |
|1            |5250  |2070   |Half-Life               |1       |0   |0.0         |
|1  

In [0]:
Stream_DF_Normalized.filter("UserID_Unique == 597 and Game_ID == 1976").show()

+-------------+--------+-------+--------------------+--------+----+------------+
|UserID_Unique|  UserID|Game_ID|            GameName|Purchase|Play|Hours_Played|
+-------------+--------+-------+--------------------+--------+----+------------+
|          597|28472068|   1976|Grand Theft Auto III|       1|   2|         0.5|
+-------------+--------+-------+--------------------+--------+----+------------+



In [0]:
Stream_DF_Normalized.describe().display()

summary,UserID_Unique,UserID,Game_ID,GameName,Purchase,Play,Hours_Played
count,128804.0,128804.0,128804.0,128804,128804.0,128804.0,128804.0
mean,3622.684109189156,102534319.2784696,2499.307242011117,140.0,1.0,0.54725784913512,26.74890376159137
stddev,3140.48790736742,72428748.89271711,1472.4906381852552,0.0,0.0,0.4979507509468264,171.39018874814366
min,1.0,5250.0,1.0,007 Legends,1.0,0.0,0.0
max,12393.0,309903146.0,5155.0,theHunter Primal,1.0,2.0,11754.0


In [0]:
Stream_DF_Normalized.filter("Play > 1").show(truncate=False)

+-------------+---------+-------+--------------------------------------------+--------+----+------------+
|UserID_Unique|UserID   |Game_ID|GameName                                    |Purchase|Play|Hours_Played|
+-------------+---------+-------+--------------------------------------------+--------+----+------------+
|597          |28472068 |1976   |Grand Theft Auto III                        |1       |2   |0.5         |
|597          |28472068 |1978   |Grand Theft Auto San Andreas                |1       |2   |0.9         |
|597          |28472068 |1980   |Grand Theft Auto Vice City                  |1       |2   |5.7000003   |
|785          |33865373 |3822   |Sid Meier's Civilization IV                 |1       |2   |137.0       |
|1302         |50769696 |1978   |Grand Theft Auto San Andreas                |1       |2   |14.0        |
|1626         |59925638 |4633   |Tom Clancy's H.A.W.X. 2                     |1       |2   |7.4         |
|2022         |71411882 |1976   |Grand Theft A

## EDA After Normalization

In [0]:
Record_Count = Stream_DF_Normalized.count()
print(f"No. of Records in file is: {Record_Count}")
unique_Games = Stream_DF_Normalized.select("GameName").distinct().count()
print(f"No. of unique games is: {unique_Games}")
unique_Users = Stream_DF_Normalized.select("UserID").distinct().count()
print(f"No. of unique games is: {unique_Users}")
duplicate_purchases = Stream_DF_Normalized.select("UserID","GameName","Hours_Played").groupBy("UserID","GameName","Hours_Played").agg(count("Hours_Played").alias("Count")).filter("Count > 1").count()
print(f"No. of duplicate purchases is: {duplicate_purchases}")
Stream_DF = Stream_DF_Normalized.select("Game_ID","UserID","GameName","Purchase","Play","Hours_Played").dropDuplicates()
Record_Count = Stream_DF_Normalized.count()
print(f"No. of Records in file after dropping duplicates is: {Record_Count}")
unique_Games = Stream_DF_Normalized.select("GameName").distinct().count()
print(f"No. of unique games after deleting duplicates is: {unique_Games}")
unique_Users = Stream_DF_Normalized.select("UserID").distinct().count()
print(f"No. of unique users after deleting duplicates is: {unique_Users}")

No. of Records in file is: 128804
No. of unique games is: 5155
No. of unique games is: 12393
No. of duplicate purchases is: 0
No. of Records in file after dropping duplicates is: 128804
No. of unique games after deleting duplicates is: 5155
No. of unique users after deleting duplicates is: 12393


In [0]:
# Most played game based on users
print("Most played game based on users")
Most_Played_Game = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").groupBy("Game_ID","GameName").agg(count("Play").alias("Played_Count"),round(sum("Hours_Played")).cast(FloatType()).cast(IntegerType()).alias("Sum_Hours_Played"))
Most_Played_Game.sort('Played_Count',ascending = False).limit(10).show(truncate=False)

Most played game based on users
+-------+-------------------------------+------------+----------------+
|Game_ID|GameName                       |Played_Count|Sum_Hours_Played|
+-------+-------------------------------+------------+----------------+
|1337   |Dota 2                         |4841        |981685          |
|4258   |Team Fortress 2                |2323        |173673          |
|4789   |Unturned                       |1563        |16096           |
|982    |Counter-Strike Global Offensive|1412        |322772          |
|2075   |Half-Life 2 Lost Coast         |981         |184             |
|985    |Counter-Strike Source          |978         |96075           |
|2476   |Left 4 Dead 2                  |951         |33597           |
|979    |Counter-Strike                 |856         |134261          |
|4900   |Warframe                       |847         |27075           |
|2072   |Half-Life 2 Deathmatch         |823         |3713            |
+-------+-----------------------

In [0]:
print("Maximum number of hours played based on hours")
# Maximum number of hours played based on hours
Most_Played_Hours = Most_Played_Game.sort('Sum_Hours_Played',ascending = False).limit(10).show(truncate=False)

Maximum number of hours played based on hours
+-------+-------------------------------------------+------------+----------------+
|Game_ID|GameName                                   |Played_Count|Sum_Hours_Played|
+-------+-------------------------------------------+------------+----------------+
|1337   |Dota 2                                     |4841        |981685          |
|982    |Counter-Strike Global Offensive            |1412        |322772          |
|4258   |Team Fortress 2                            |2323        |173673          |
|979    |Counter-Strike                             |856         |134261          |
|3826   |Sid Meier's Civilization V                 |596         |99821           |
|985    |Counter-Strike Source                      |978         |96075           |
|4365   |The Elder Scrolls V Skyrim                 |717         |70889           |
|1895   |Garry's Mod                                |731         |49725           |
|738    |Call of Duty Modern W

In [0]:
print("Most purchased game based on users")
# Most purchased game based on users
Most_Played_Game = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").groupBy("Game_ID","GameName").agg(count("Purchase").alias("Purchased_Count"),round(sum("Hours_Played")).cast(FloatType()).cast(IntegerType()).alias("Sum_Hours_Played"))
Most_Played_Game.sort('Purchased_Count',ascending = False).limit(10).show(truncate=False)

Most purchased game based on users
+-------+-------------------------------+---------------+----------------+
|Game_ID|GameName                       |Purchased_Count|Sum_Hours_Played|
+-------+-------------------------------+---------------+----------------+
|1337   |Dota 2                         |4841           |981685          |
|4258   |Team Fortress 2                |2323           |173673          |
|4789   |Unturned                       |1563           |16096           |
|982    |Counter-Strike Global Offensive|1412           |322772          |
|2075   |Half-Life 2 Lost Coast         |981            |184             |
|985    |Counter-Strike Source          |978            |96075           |
|2476   |Left 4 Dead 2                  |951            |33597           |
|979    |Counter-Strike                 |856            |134261          |
|4900   |Warframe                       |847            |27075           |
|2072   |Half-Life 2 Deathmatch         |823            |3713    

In [0]:
print("User played maximum hours based on individual users")
# User played maximum hours based on individual users
User_Played_Max_Hours = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").groupBy("UserID_Unique","Game_ID","GameName").agg(sum("Hours_Played").cast(FloatType()).alias("Sum_Hours_Played_User"))
User_Played_Max_Hours.sort('Sum_Hours_Played_User',ascending = False).limit(10).show(truncate=False)

User played maximum hours based on individual users
+-------------+-------+--------------------------+---------------------+
|UserID_Unique|Game_ID|GameName                  |Sum_Hours_Played_User|
+-------------+-------+--------------------------+---------------------+
|2086         |3826   |Sid Meier's Civilization V|11754.0              |
|3237         |1337   |Dota 2                    |10442.0              |
|5564         |4258   |Team Fortress 2           |9640.0               |
|4564         |1337   |Dota 2                    |7765.0               |
|1356         |1337   |Dota 2                    |6964.0               |
|4126         |1337   |Dota 2                    |6753.0               |
|2603         |1337   |Dota 2                    |6015.0               |
|1985         |3826   |Sid Meier's Civilization V|6013.0               |
|3270         |1337   |Dota 2                    |5982.0               |
|242          |1337   |Dota 2                    |5970.0               |

In [0]:
# Total No.of Purchases
No_Game_Purchases = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").filter("Purchase != 0").count()
print("No. of games purchased", No_Game_Purchases)

# Total No.of Plays
No_Game_Plays = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").filter("Play != 0").count()
print("No. of games played", No_Game_Plays)

# Users Purchased but not played
User_Purchased_not_Played_Count = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").filter("Play == 0").groupBy("UserID").agg(count("UserID")).count()
print("No. of users purchased games but not played:", User_Purchased_not_Played_Count)

# Games Played more than once
Games_Played_multiple_times = Stream_DF_Normalized.select("UserID_Unique","UserID","Game_ID","GameName","Hours_Played","Purchase","Play").filter("Play > 1").count()
print("No. of games played more than one time:", Games_Played_multiple_times)

No. of games purchased 128804
No. of games played 70477
No. of users purchased games but not played: 5953
No. of games played more than one time: 12


In [0]:
Stream_DF_Final = Stream_DF_Normalized.dropna()

In [0]:
Stream_DF_Final.show()

+-------------+------+-------+--------------------+--------+----+------------+
|UserID_Unique|UserID|Game_ID|            GameName|Purchase|Play|Hours_Played|
+-------------+------+-------+--------------------+--------+----+------------+
|            1|  5250|    228|         Alien Swarm|       1|   1|         4.9|
|            1|  5250|    853|     Cities Skylines|       1|   1|       144.0|
|            1|  5250|    979|      Counter-Strike|       1|   0|         0.0|
|            1|  5250|    985|Counter-Strike So...|       1|   0|         0.0|
|            1|  5250|   1132|       Day of Defeat|       1|   0|         0.0|
|            1|  5250|   1180|  Deathmatch Classic|       1|   0|         0.0|
|            1|  5250|   1249|Deus Ex Human Rev...|       1|   1|        62.0|
|            1|  5250|   1337|              Dota 2|       1|   1|         0.2|
|            1|  5250|   2070|           Half-Life|       1|   0|         0.0|
|            1|  5250|   2071|         Half-Life 2| 

##ALS Implementation

## Implementation with train 70% and test 30% data based on Hours_Played

In [0]:
(training, test) = Stream_DF_Final.randomSplit([0.7 , 0.3])
als_7 = ALS(nonnegative = True, implicitPrefs=True,coldStartStrategy="drop",userCol="UserID_Unique", itemCol="Game_ID",ratingCol="Hours_Played",seed=120, rank=5, regParam=0.01, maxIter=10, alpha = 1)

Model = als_7.fit(training)
predictions = Model.transform(test)
predictions.show()

2024/04/07 09:03:48 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '6da18e9c4e9c4af5a3466e81504fd5cc', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


+-------------+------+-------+--------------------+--------+----+------------+-----------+
|UserID_Unique|UserID|Game_ID|            GameName|Purchase|Play|Hours_Played| prediction|
+-------------+------+-------+--------------------+--------+----+------------+-----------+
|            1|  5250|    228|         Alien Swarm|       1|   1|         4.9| 0.18845913|
|            1|  5250|    853|     Cities Skylines|       1|   1|       144.0| 0.20564412|
|            1|  5250|    985|Counter-Strike So...|       1|   0|         0.0|  0.3498589|
|            1|  5250|   2073|Half-Life 2 Episo...|       1|   0|         0.0| 0.15252747|
|            1|  5250|   2075|Half-Life 2 Lost ...|       1|   0|         0.0|0.071707085|
|            1|  5250|   3224|            Portal 2|       1|   1|        13.6| 0.34568468|
|            2| 76767|    328|Arma 2 Operation ...|       1|   0|         0.0| 0.09356316|
|            2| 76767|    728|Call of Duty Blac...|       1|   1|        12.5|  0.8545084|

In [0]:
Evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="Hours_Played", 
           predictionCol="prediction") 
RMSE = Evaluator.evaluate(predictions)
print(RMSE)

161.61268351509776


In [0]:
userRecs = Model.recommendForAllUsers(6)
userRecs.sort("UserID_Unique",ascending=True).show(truncate=False)

gameRecommendation = Model.recommendForAllItems(6)
gameRecommendation.sort("Game_ID", ascending=True).show(truncate = False)

+-------------+---------------------------------------------------------------------------------------------------------------------+
|UserID_Unique|recommendations                                                                                                      |
+-------------+---------------------------------------------------------------------------------------------------------------------+
|1            |[{3826, 0.35017368}, {985, 0.3498589}, {3224, 0.34568468}, {979, 0.34320742}, {2374, 0.32889587}, {2071, 0.3252013}] |
|2            |[{985, 1.115745}, {2476, 1.0677307}, {979, 1.0597292}, {3224, 1.0411886}, {2374, 0.97893834}, {2071, 0.95103014}]    |
|3            |[{4365, 0.8946045}, {620, 0.71338856}, {1979, 0.7117016}, {4277, 0.67196167}, {3066, 0.6652721}, {1895, 0.6601146}]  |
|4            |[{117, 0.0}, {97, 0.0}, {72, 0.0}, {69, 0.0}, {67, 0.0}, {65, 0.0}]                                                  |
|5            |[{979, 0.07032441}, {985, 0.06904019}, {980, 0.

In [0]:
from pyspark.sql.functions import explode
# Top 6 recommendation games - user specific
DF = userRecs.where(userRecs.UserID_Unique==1985)\
        .withColumn("recommendations",explode("recommendations"))\
        .select("recommendations.Game_ID","recommendations.rating")\
        .join(Stream_DF_Final,["Game_ID"])\
        .select("Game_ID","GameName","rating")\
        .distinct()\
        .sort("rating", ascending=False)\
        .show(truncate=False)

+-------+--------------------------------------------+----------+
|Game_ID|GameName                                    |rating    |
+-------+--------------------------------------------+----------+
|3826   |Sid Meier's Civilization V                  |0.9961773 |
|3823   |Sid Meier's Civilization IV Beyond the Sword|0.76147985|
|1105   |Dark Souls Prepare to Die Edition           |0.7296764 |
|853    |Cities Skylines                             |0.72735703|
|4914   |Warhammer 40,000 Dawn of War II             |0.7258136 |
|3819   |Sid Meier's Civilization Beyond Earth       |0.7209965 |
+-------+--------------------------------------------+----------+



In [0]:
# Top 6 recommendation users - Games specific
gameRecommendation.where(gameRecommendation.Game_ID==4258)\
        .withColumn("recommendations",explode("recommendations"))\
        .select("recommendations.UserID_Unique","recommendations.rating")\
        .join(Stream_DF_Final,["UserID_Unique"])\
        .select("UserID_Unique","rating")\
        .distinct()\
        .sort("rating", ascending=False)\
        .show(truncate=False)

+-------------+---------+
|UserID_Unique|rating   |
+-------------+---------+
|4378         |3.5802853|
|5479         |3.534132 |
|5213         |3.4708564|
|6364         |3.0945754|
|3158         |2.8809268|
|11384        |2.8043904|
+-------------+---------+



In [0]:
# Top 5 games recommendations
games_sel = gameRecommendation.select(als.getItemCol()).distinct().limit(5)
gameSubSetRecs = Model.recommendForItemSubset(games_sel, 10)
gameSubSetRecs.show(truncate = False)

+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Game_ID|recommendations                                                                                                                                                                                                        |
+-------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|31     |[{100, 0.0}, {99, 0.0}, {98, 0.0}, {97, 0.0}, {96, 0.0}, {95, 0.0}, {94, 0.0}, {93, 0.0}, {92, 0.0}, {91, 0.0}]                                                                                                        |
|12     |[{2156, 0.0047997776}, {1049, 0.004553267}, {3446, 0.0045199897}, {1361, 0.0044624037},

In [0]:
predictions.sort("prediction", ascending=False).show(truncate=False)

+-------------+---------+-------+-------------------------------+--------+----+------------+----------+
|UserID_Unique|UserID   |Game_ID|GameName                       |Purchase|Play|Hours_Played|prediction|
+-------------+---------+-------+-------------------------------+--------+----+------------+----------+
|5479         |151229648|4789   |Unturned                       |1       |0   |0.0         |2.71891   |
|1896         |67713900 |1762   |Football Manager 2011          |1       |1   |407.0       |2.0560536 |
|7443         |187463594|979    |Counter-Strike                 |1       |0   |0.0         |2.0373032 |
|6708         |174287424|1337   |Dota 2                         |1       |1   |9.7         |2.0208125 |
|3805         |113546110|4258   |Team Fortress 2                |1       |1   |8.1         |1.9330019 |
|8271         |201173206|982    |Counter-Strike Global Offensive|1       |1   |2.3         |1.9002986 |
|2823         |91690667 |2476   |Left 4 Dead 2                  

## Implementation with Purchase history as rating col

In [0]:
(training, test) = Stream_DF_Final.randomSplit([0.7 , 0.3])
als_purchase = ALS(nonnegative = True, implicitPrefs=True,coldStartStrategy="drop",userCol="UserID_Unique", itemCol="Game_ID",ratingCol="Purchase",seed=100, rank=5, regParam=1, maxIter=10, alpha=1)

Model = als_purchase.fit(training)
predictions = Model.transform(test)
Evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="Purchase", 
           predictionCol="prediction") 
RMSE = Evaluator.evaluate(predictions)
print(RMSE)

2024/04/07 09:08:49 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID '531dec398d9340f8b168403a5539e47a', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


0.9595079260092532


In [0]:
userRecs = Model.recommendForAllUsers(6)
userRecs.sort("UserID_Unique",ascending=True).show(truncate=False)

gameRecommendation = Model.recommendForAllItems(6)
gameRecommendation.sort("Game_ID", ascending=True).show(truncate = False)

+-------------+--------------------------------------------------------------------------------------------------------------------------+
|UserID_Unique|recommendations                                                                                                           |
+-------------+--------------------------------------------------------------------------------------------------------------------------+
|1            |[{1337, 0.13280135}, {979, 0.108620346}, {3532, 0.09760255}, {980, 0.09741668}, {981, 0.096139036}, {1180, 0.09489461}]   |
|2            |[{1337, 0.06302964}, {979, 0.058574084}, {2075, 0.054551307}, {985, 0.054111194}, {3532, 0.053472728}, {980, 0.053257562}]|
|3            |[{455, 0.07312896}, {453, 0.07289122}, {521, 0.07228543}, {1153, 0.07126716}, {522, 0.07043339}, {448, 0.07025576}]       |
|4            |[{1337, 0.15392794}, {979, 0.114698}, {980, 0.102167495}, {981, 0.10101768}, {3532, 0.10083738}, {1180, 0.098701306}]     |
|5            |[{1337, 0.14

In [0]:
from pyspark.sql.functions import explode
# Top 6 recommendation games - user specific
DF = userRecs.where(userRecs.UserID_Unique==1985)\
        .withColumn("recommendations",explode("recommendations"))\
        .select("recommendations.Game_ID","recommendations.rating")\
        .join(Stream_DF_Final,["Game_ID"])\
        .select("Game_ID","GameName","rating")\
        .distinct()\
        .sort("rating", ascending=False)\
        .show(truncate=False)

+-------+-------------------------------+-----------+
|Game_ID|GameName                       |rating     |
+-------+-------------------------------+-----------+
|521    |BioShock                       |0.015768904|
|523    |BioShock Infinite              |0.015734158|
|455    |Batman Arkham City GOTY        |0.015556798|
|1683   |Fallout New Vegas Honest Hearts|0.015542489|
|1289   |Dishonored                     |0.015362216|
|1682   |Fallout New Vegas Dead Money   |0.015255743|
+-------+-------------------------------+-----------+



In [0]:
# Top 6 recommendation users - Games specific
gameRecommendation.where(gameRecommendation.Game_ID==4258)\
        .withColumn("recommendations",explode("recommendations"))\
        .select("recommendations.UserID_Unique","recommendations.rating")\
        .join(Stream_DF_Final,["UserID_Unique"])\
        .select("UserID_Unique","rating")\
        .distinct()\
        .sort("rating", ascending=False)\
        .show(truncate=False)

+-------------+-----------+
|UserID_Unique|rating     |
+-------------+-----------+
|3726         |0.109250344|
|4033         |0.10872434 |
|4815         |0.10426469 |
|10209        |0.10399198 |
|1579         |0.10354516 |
|594          |0.101460464|
+-------------+-----------+



In [0]:
# Top 5 games recommendations
games_sel = gameRecommendation.select(als.getItemCol()).distinct().limit(5)
gameSubSetRecs = Model.recommendForItemSubset(games_sel, 10)
gameSubSetRecs.show(truncate = False)

+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Game_ID|recommendations                                                                                                                                                                                                    |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|12     |[{1622, 0.0067344173}, {481, 0.0065687634}, {26, 0.006389876}, {1192, 0.0063706655}, {743, 0.0063048187}, {860, 0.0061800187}, {2422, 0.0061668796}, {658, 0.006158406}, {785, 0.0060499003}, {5606, 0.0060081007}]|
|22     |[{1622, 0.017553205}, {481, 0.017121429}, {26, 0.01665516}, {1192, 0.016605088}, {743, 0.016433459}, {8

In [0]:
predictions.sort("prediction", ascending=False).show(truncate=False)

+-------------+---------+-------+--------------+--------+----+------------+-----------+
|UserID_Unique|UserID   |Game_ID|GameName      |Purchase|Play|Hours_Played|prediction |
+-------------+---------+-------+--------------+--------+----+------------+-----------+
|5175         |144578181|1337   |Dota 2        |1       |1   |28.0        |0.16507299 |
|8248         |200759485|1337   |Dota 2        |1       |1   |0.4         |0.16392577 |
|2199         |75689918 |1337   |Dota 2        |1       |1   |9.5         |0.16330025 |
|238          |12529679 |1337   |Dota 2        |1       |1   |0.5         |0.1587666  |
|4490         |129371925|1337   |Dota 2        |1       |1   |355.0       |0.15662536 |
|3905         |115655987|1337   |Dota 2        |1       |1   |601.0       |0.14804275 |
|6821         |176449171|1337   |Dota 2        |1       |1   |1310.0      |0.14206569 |
|1093         |44153929 |1337   |Dota 2        |1       |1   |1.8         |0.14086671 |
|7353         |186069406|1337   

## Implementation with 80% train and 20% test based on Hours_Played with crossvalidator

In [0]:
(training, test) = Stream_DF_Final.randomSplit([0.8 , 0.2])

In [0]:
als = ALS(nonnegative = True, implicitPrefs=True,coldStartStrategy="drop",userCol="UserID_Unique", itemCol="Game_ID",ratingCol="Hours_Played",seed=120, alpha=1)

Evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="Hours_Played", 
           predictionCol="prediction") 

** Hyperparameter Setting **

In [0]:
HyperParamter = ParamGridBuilder() \
            .addGrid(als.rank, [5, 10]) \
            .addGrid(als.regParam, [.01, 0.05, 1]) \
            .addGrid(als.maxIter, [5, 10])  \
            .build()

print ("No. of models to be tested: ", len(HyperParamter))

No. of models to be tested:  12


** Defining Evaluator and metrics **

In [0]:
Evaluator = RegressionEvaluator(
           metricName="rmse", 
           labelCol="Hours_Played", 
           predictionCol="prediction") 

** Building Cross Validator with numfolds 3 to split Train and Test Data **

In [0]:
cv = CrossValidator(estimator=als, estimatorParamMaps=HyperParamter, evaluator=Evaluator, numFolds=3)

** Fit the Model to train data**

In [0]:
Model = cv.fit(training)

2024/04/07 09:14:21 INFO mlflow.utils.autologging_utils: Created MLflow autologging run with ID 'd4633006edf44ed3996bf2da0e5e3d7b', which will track hyperparameters, performance metrics, model artifacts, and lineage information for the current pyspark.ml workflow


** Extracting the Best Model **

In [0]:
#Extract best model from the cv model above
Best_model = Model.bestModel

** View the Predictions **

In [0]:
Pred_Test = Best_model.transform(test)
RMSE = Evaluator.evaluate(Pred_Test)
print(RMSE)

155.92720084575782


** Best Model **

In [0]:
print("Parameters for best Model:")

print("Rank Parameter: %g" %Best_model.rank)
print("RegParam Parameter: %g" %Best_model._java_obj.parent().getRegParam())
print("maxIter Parameter: %g" %Best_model._java_obj.parent().getMaxIter())

Parameters for best Model:
Rank Parameter: 5
RegParam Parameter: 0.01
maxIter Parameter: 10


In [0]:
userRecs = Best_model.recommendForAllUsers(6)
userRecs.sort("UserID_Unique",ascending=True).show(truncate=False)

+-------------+-------------------------------------------------------------------------------------------------------------------------+
|UserID_Unique|recommendations                                                                                                          |
+-------------+-------------------------------------------------------------------------------------------------------------------------+
|1            |[{979, 1.2103281}, {738, 1.0707752}, {1680, 1.0440834}, {728, 1.017122}, {737, 1.0026753}, {1553, 0.97379357}]           |
|2            |[{1337, 1.2675005}, {4789, 1.2617447}, {1979, 1.1538719}, {874, 1.135001}, {3066, 1.1240717}, {1895, 1.101735}]          |
|3            |[{1337, 0.54110134}, {982, 0.4820156}, {4365, 0.46802574}, {4258, 0.45557258}, {985, 0.44163045}, {2476, 0.4197257}]     |
|4            |[{70, 0.0}, {69, 0.0}, {66, 0.0}, {60, 0.0}, {59, 0.0}, {58, 0.0}]                                                       |
|5            |[{979, 0.0615364}, 

In [0]:
gameRecommendation = Best_model.recommendForAllItems(6)
gameRecommendation.sort("Game_ID", ascending=True).show(truncate = False)

+-------+--------------------------------------------------------------------------------------------------------------------------------+
|Game_ID|recommendations                                                                                                                 |
+-------+--------------------------------------------------------------------------------------------------------------------------------+
|1      |[{729, 0.0077798194}, {1108, 0.006240012}, {864, 0.0058206413}, {1307, 0.005650445}, {1150, 0.005537675}, {1423, 0.0054618404}] |
|2      |[{729, 0.008158024}, {278, 0.005920442}, {864, 0.0053600217}, {4390, 0.0053321174}, {1188, 0.005241002}, {533, 0.005205419}]    |
|3      |[{1361, 0.08236805}, {1192, 0.07548216}, {3683, 0.07454783}, {3446, 0.07408528}, {3170, 0.07407507}, {1626, 0.07293965}]        |
|4      |[{729, 0.021614678}, {1750, 0.014334912}, {3265, 0.014196817}, {1135, 0.01415117}, {1108, 0.013928233}, {2643, 0.013379663}]    |
|5      |[{1361, 0.02082718

In [0]:
from pyspark.sql.functions import explode
# Top 6 recommendation games - user specific
DF = userRecs.where(userRecs.UserID_Unique==1985)\
        .withColumn("recommendations",explode("recommendations"))\
        .select("recommendations.Game_ID","recommendations.rating")\
        .join(Stream_DF_Final,["Game_ID"])\
        .select("Game_ID","GameName","rating")\
        .distinct()\
        .sort("rating", ascending=False)\
        .show(truncate=False)

+-------+-----------------------------------+---------+
|Game_ID|GameName                           |rating   |
+-------+-----------------------------------+---------+
|1979   |Grand Theft Auto V                 |1.1562202|
|1500   |Empire Total War                   |1.1442862|
|4668   |Total War ROME II - Emperor Edition|1.1092033|
|85     |ARK Survival Evolved               |1.0695336|
|4669   |Total War SHOGUN 2                 |1.0640445|
|985    |Counter-Strike Source              |1.0470443|
+-------+-----------------------------------+---------+



In [0]:
# Top 6 recommendation users - Games specific
gameRecommendation.where(gameRecommendation.Game_ID==4258)\
        .withColumn("recommendations",explode("recommendations"))\
        .select("recommendations.UserID_Unique","recommendations.rating")\
        .join(Stream_DF_Final,["UserID_Unique"])\
        .select("UserID_Unique","rating")\
        .distinct()\
        .sort("rating", ascending=False)\
        .show(truncate=False)


+-------------+---------+
|UserID_Unique|rating   |
+-------------+---------+
|4069         |2.8553896|
|4062         |2.103475 |
|8661         |2.079812 |
|818          |1.9049056|
|3376         |1.7585299|
|3265         |1.56088  |
+-------------+---------+



In [0]:
predictions.sort("prediction", ascending=False).show(truncate=False)

+-------------+---------+-------+--------------+--------+----+------------+-----------+
|UserID_Unique|UserID   |Game_ID|GameName      |Purchase|Play|Hours_Played|prediction |
+-------------+---------+-------+--------------+--------+----+------------+-----------+
|5175         |144578181|1337   |Dota 2        |1       |1   |28.0        |0.16507299 |
|8248         |200759485|1337   |Dota 2        |1       |1   |0.4         |0.16392577 |
|2199         |75689918 |1337   |Dota 2        |1       |1   |9.5         |0.16330025 |
|238          |12529679 |1337   |Dota 2        |1       |1   |0.5         |0.1587666  |
|4490         |129371925|1337   |Dota 2        |1       |1   |355.0       |0.15662536 |
|3905         |115655987|1337   |Dota 2        |1       |1   |601.0       |0.14804275 |
|6821         |176449171|1337   |Dota 2        |1       |1   |1310.0      |0.14206569 |
|1093         |44153929 |1337   |Dota 2        |1       |1   |1.8         |0.14086671 |
|7353         |186069406|1337   