# Recommender Systems for Steam Video Games

data source: https://www.kaggle.com/datasets/tamber/steam-video-games/data

In [3]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, countDistinct, sum, round, max, min, explode, array_contains
from pyspark.ml.feature import StringIndexer
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
import tensorflow as tf

In [2]:
spark = SparkSession.builder.appName('Steam Recommender System').getOrCreate()
spark

24/10/18 21:05:32 WARN Utils: Your hostname, Asenas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.1.106 instead (on interface en0)
24/10/18 21:05:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/10/18 21:05:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read.csv('steam-200k.csv', inferSchema = True)
df

                                                                                

DataFrame[_c0: int, _c1: string, _c2: string, _c3: double, _c4: int]

In [4]:
df.show(5)

+---------+--------------------+--------+-----+---+
|      _c0|                 _c1|     _c2|  _c3|_c4|
+---------+--------------------+--------+-----+---+
|151603712|The Elder Scrolls...|purchase|  1.0|  0|
|151603712|The Elder Scrolls...|    play|273.0|  0|
|151603712|           Fallout 4|purchase|  1.0|  0|
|151603712|           Fallout 4|    play| 87.0|  0|
|151603712|               Spore|purchase|  1.0|  0|
+---------+--------------------+--------+-----+---+
only showing top 5 rows



In [5]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)
 |-- _c3: double (nullable = true)
 |-- _c4: integer (nullable = true)



In [6]:
df.columns

['_c0', '_c1', '_c2', '_c3', '_c4']

In [7]:
df = df.withColumnRenamed('_c0', 'user_id') \
       .withColumnRenamed('_c1', 'name') \
       .withColumnRenamed('_c2', 'action') \
       .withColumnRenamed('_c3', 'hours') \
       .withColumnRenamed('_c4', 'zero')

In [8]:
df.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- action: string (nullable = true)
 |-- hours: double (nullable = true)
 |-- zero: integer (nullable = true)



shape

In [9]:
df.count(), len(df.columns) ## shape

(200000, 5)

null or na values

In [10]:
df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns]).show()



+-------+----+------+-----+----+
|user_id|name|action|hours|zero|
+-------+----+------+-----+----+
|      0|   0|     0|    0|   0|
+-------+----+------+-----+----+



                                                                                

In [11]:
columns = ['user_id', 'name', 'action', 'hours', 'zero']

df.select([count(when(isnan(col(c)), c)).alias(c) for c in columns]).show()



+-------+----+------+-----+----+
|user_id|name|action|hours|zero|
+-------+----+------+-----+----+
|      0|   0|     0|    0|   0|
+-------+----+------+-----+----+



                                                                                

duplicate

In [12]:
df.groupBy(columns).count().filter('count > 1').show()



+---------+--------------------+--------+-----+----+-----+
|  user_id|                name|  action|hours|zero|count|
+---------+--------------------+--------+-----+----+-----+
| 86338111|Grand Theft Auto ...|purchase|  1.0|   0|    2|
|189858084|Grand Theft Auto ...|purchase|  1.0|   0|    2|
|150882304|Sid Meier's Civil...|purchase|  1.0|   0|    2|
|189858084|Grand Theft Auto ...|purchase|  1.0|   0|    2|
|116617462|Grand Theft Auto ...|purchase|  1.0|   0|    2|
| 37422528|Sid Meier's Civil...|purchase|  1.0|   0|    2|
|147859903|Sid Meier's Civil...|purchase|  1.0|   0|    2|
|138941587|Sid Meier's Civil...|purchase|  1.0|   0|    2|
|145825155|Grand Theft Auto ...|purchase|  1.0|   0|    2|
|105782521|Sid Meier's Civil...|purchase|  1.0|   0|    2|
| 46301758|Sid Meier's Civil...|purchase|  1.0|   0|    2|
|179936723|Grand Theft Auto ...|purchase|  1.0|   0|    2|
| 64455019|Sid Meier's Civil...|purchase|  1.0|   0|    2|
|142650116|Grand Theft Auto ...|purchase|  1.0|   0|    

                                                                                

In [13]:
df_new = df.dropDuplicates()

In [14]:
df_new.count()

                                                                                

199293

unique

In [15]:
df_new.select([countDistinct(c).alias(c) for c in columns]).show()



+-------+----+------+-----+----+
|user_id|name|action|hours|zero|
+-------+----+------+-----+----+
|  12393|5155|     2| 1593|   1|
+-------+----+------+-----+----+



                                                                                

remove column zero

In [16]:
df_new = df_new.drop('zero')

data analysis

In [17]:
df.groupBy('name')\
    .agg(count('user_id').alias('users_count'))\
    .orderBy('users_count', ascending = False)\
    .limit(20)\
    .show()

+--------------------+-----------+
|                name|users_count|
+--------------------+-----------+
|              Dota 2|       9682|
|     Team Fortress 2|       4646|
|Counter-Strike Gl...|       2789|
|            Unturned|       2632|
|       Left 4 Dead 2|       1752|
|Counter-Strike So...|       1693|
|      Counter-Strike|       1424|
|         Garry's Mod|       1397|
|The Elder Scrolls...|       1394|
|            Warframe|       1271|
|Half-Life 2 Lost ...|       1158|
|Sid Meier's Civil...|       1150|
|           Robocraft|       1096|
|Half-Life 2 Death...|       1021|
|              Portal|       1005|
|            Portal 2|        997|
|         Half-Life 2|        995|
|   Heroes & Generals|        993|
|            Terraria|        956|
|Counter-Strike Co...|        904|
+--------------------+-----------+



In [18]:
df.filter(col('action') == 'purchase') \
  .groupBy('name') \
  .agg(count('user_id').alias('count_user_purchase')) \
  .orderBy('count_user_purchase', ascending=False) \
  .limit(20) \
  .show()

+--------------------+-------------------+
|                name|count_user_purchase|
+--------------------+-------------------+
|              Dota 2|               4841|
|     Team Fortress 2|               2323|
|            Unturned|               1563|
|Counter-Strike Gl...|               1412|
|Half-Life 2 Lost ...|                981|
|Counter-Strike So...|                978|
|       Left 4 Dead 2|                951|
|      Counter-Strike|                856|
|            Warframe|                847|
|Half-Life 2 Death...|                823|
|         Garry's Mod|                731|
|The Elder Scrolls...|                717|
|           Robocraft|                689|
|Counter-Strike Co...|                679|
|Counter-Strike Co...|                679|
|   Heroes & Generals|                658|
|         Half-Life 2|                639|
|Sid Meier's Civil...|                596|
|         War Thunder|                590|
|              Portal|                588|
+----------

In [19]:
df.filter(col('action') == 'play') \
  .groupBy('name') \
  .agg(count('user_id').alias('count_user_play')) \
  .orderBy('count_user_play', ascending=False) \
  .limit(20) \
  .show()

+--------------------+---------------+
|                name|count_user_play|
+--------------------+---------------+
|              Dota 2|           4841|
|     Team Fortress 2|           2323|
|Counter-Strike Gl...|           1377|
|            Unturned|           1069|
|       Left 4 Dead 2|            801|
|Counter-Strike So...|            715|
|The Elder Scrolls...|            677|
|         Garry's Mod|            666|
|      Counter-Strike|            568|
|Sid Meier's Civil...|            554|
|            Terraria|            460|
|            Portal 2|            453|
|            Warframe|            424|
|              Portal|            417|
|           Robocraft|            407|
|            PAYDAY 2|            390|
|       Borderlands 2|            386|
|         Half-Life 2|            356|
|   Heroes & Generals|            335|
|         War Thunder|            303|
+--------------------+---------------+



In [20]:
df.filter(col('action') == 'play') \
  .groupBy('name') \
  .agg(round(sum('hours'), 2).alias('total_play')) \
  .orderBy('total_play', ascending = False) \
  .limit(50) \
  .show()

+--------------------+----------+
|                name|total_play|
+--------------------+----------+
|              Dota 2|  981684.6|
|Counter-Strike Gl...|  322771.6|
|     Team Fortress 2|  173673.3|
|      Counter-Strike|  134261.1|
|Sid Meier's Civil...|   99821.3|
|Counter-Strike So...|   96075.5|
|The Elder Scrolls...|   70889.3|
|         Garry's Mod|   49725.3|
|Call of Duty Mode...|   42009.9|
|       Left 4 Dead 2|   33596.7|
|Football Manager ...|   32308.6|
|Football Manager ...|   30845.8|
|Football Manager ...|   30574.8|
|            Terraria|   29951.8|
|            Warframe|   27074.6|
|Football Manager ...|   24283.1|
|              Arma 3|   24055.7|
|  Grand Theft Auto V|   22956.7|
|       Borderlands 2|   22667.9|
|    Empire Total War|   21030.3|
+--------------------+----------+
only showing top 20 rows



data prep for recommedation system

In [21]:
df_purchase = df_new.filter(col('action') == 'purchase')
df_purchase.count()

128804

In [22]:
df_purchase.select(countDistinct('user_id')).show()
df_purchase = df_purchase.withColumnRenamed('action', 'action_pur') \
                         .withColumnRenamed('hours', 'purchase')

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                  12393|
+-----------------------+



In [23]:
df_play = df_new.filter(col('action') == 'play')
df_play.count()

70489

In [24]:
df_play.select(countDistinct('user_id')).show()

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                  11350|
+-----------------------+



In [25]:
full_df = df_purchase.join(df_play, ["user_id", "name"], "outer")

In [26]:
full_df.count()

                                                                                

128816

In [27]:
new_columns = full_df.columns

In [28]:
full_df_unique = full_df.select([countDistinct(c).alias(c) for c in new_columns]).show()

[Stage 90:>                                                         (0 + 6) / 6]

+-------+----+----------+--------+------+-----+
|user_id|name|action_pur|purchase|action|hours|
+-------+----+----------+--------+------+-----+
|  12393|5155|         1|       1|     1| 1593|
+-------+----+----------+--------+------+-----+



                                                                                

In [29]:
full_df.select([count(when(col(c).isNull(), c)).alias(c) for c in new_columns]).show()

+-------+----+----------+--------+------+-----+
|user_id|name|action_pur|purchase|action|hours|
+-------+----+----------+--------+------+-----+
|      0|   0|         0|       0| 58327|58327|
+-------+----+----------+--------+------+-----+



In [30]:
data = full_df.fillna(0, subset=['hours'])

In [31]:
data = data.drop("action_pur", "purchase", "action")

In [32]:
data.describe("hours").show()

+-------+------------------+
|summary|             hours|
+-------+------------------+
|  count|            128816|
|   mean|26.746411936405334|
| stddev|171.38236775962963|
|    min|               0.0|
|    max|           11754.0|
+-------+------------------+



In [33]:
min_hours = data.select(min("hours")).collect()[0][0]
max_hours = data.select(max("hours")).collect()[0][0]

data = data.withColumn("normalized_hours", (col("hours") - min_hours) / (max_hours - min_hours))

                                                                                

## Recommender Systems

### 1) ALS - Pyspark

In [34]:
indexer = StringIndexer(inputCol="name", outputCol="game_id")
data_indexed = indexer.fit(data).transform(data)

                                                                                

*Train - Test Split*

In [35]:
(train_set, test_set) = data_indexed.randomSplit([0.8, 0.2])

*Model Selection - Hyperparameter Tunning*

In [36]:
# the als model 
als = ALS(userCol="user_id", itemCol="game_id", ratingCol="normalized_hours", implicitPrefs=True, coldStartStrategy='drop') #Note we set cold start strategy to 'drop' 
#to ensure we don't get NaN evaluation metrics

# the parameter grid
paramGrid = ParamGridBuilder() \
  .addGrid(als.maxIter, [5, 10, 15]) \
  .addGrid(als.regParam, [0.01, 0.05, 0.1]) \
  .build()

# cross-validator
crossval = CrossValidator(estimator=als, estimatorParamMaps=paramGrid, evaluator=RegressionEvaluator(metricName="rmse", labelCol="normalized_hours", predictionCol="prediction"), numFolds=5, seed=42)

# Fit the cross-validator to the training data
cvModel = crossval.fit(train_set)

24/10/18 21:06:27 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/10/18 21:06:28 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                ]]

In [37]:
# best model
bestModel = cvModel.bestModel

# Parameter map
paramMap = bestModel.extractParamMap()

# best params
best_maxIter = bestModel._java_obj.parent().getMaxIter()
best_regParam = bestModel._java_obj.parent().getRegParam()

print("Best maxIter:", best_maxIter)
print("Best regParam:", best_regParam)


Best maxIter: 5
Best regParam: 0.1


*Model Training*

In [38]:
als_best = ALS(maxIter=best_maxIter, regParam=best_regParam, userCol="user_id", itemCol="game_id",
                ratingCol="normalized_hours", implicitPrefs=True, coldStartStrategy="drop")
als_best = als_best.fit(train_set)

                                                                                

*Making predictions*

In [39]:
predictions_als = als_best.transform(test_set)

*Model Evaluation*

In [40]:
evaluator_als = RegressionEvaluator(metricName="rmse", labelCol="normalized_hours", predictionCol="prediction")
rmse_als = evaluator_als.evaluate(predictions_als)
print('RMSE of ALS:', rmse_als)

RMSE of ALS: 0.09570352962587655


*Model Training by using all data to get recommendations*

In [41]:
als_best = ALS(maxIter=best_maxIter, regParam=best_regParam, userCol="user_id", itemCol="game_id",
                ratingCol="normalized_hours", implicitPrefs=True, coldStartStrategy="drop")
als_full_model = als_best.fit(data_indexed)

*Recommender System*

In [42]:
user_recommendations = als_full_model.recommendForAllUsers(5)
user_recommendations.show()



+--------+--------------------+
| user_id|     recommendations|
+--------+--------------------+
|   76767|[{44, 0.81006765}...|
|  144736|[{7, 0.62927514},...|
|  229911|[{7, 0.82156044},...|
|  835015|[{49, 0.0}, {48, ...|
|  948368|[{5, 0.6563678}, ...|
|  975449|[{20, 0.6852299},...|
| 1268792|[{7, 0.62928313},...|
| 2531540|[{5, 0.5487922}, ...|
| 2753525|[{3, 0.64218736},...|
| 3450426|[{7, 0.7289074}, ...|
| 7923954|[{7, 0.62927914},...|
| 7987640|[{49, 0.0}, {48, ...|
| 8259307|[{11, 0.33154145}...|
| 8567888|[{5, 0.59169555},...|
| 8585433|[{20, 0.57494766}...|
| 8784496|[{1, 0.4558313}, ...|
| 8795607|[{5, 0.68889225},...|
|10144413|[{49, 0.0}, {48, ...|
|10595342|[{5, 0.8738419}, ...|
|10599862|[{11, 0.7632819},...|
+--------+--------------------+
only showing top 20 rows



                                                                                

In [46]:
# Convert DataFrame to a list of dictionaries
user_recommendations_list = list(user_recommendations.toLocalIterator())

In [48]:
# replace game IDs with their corresponding names
recommendation_df = spark.createDataFrame(user_recommendations_list, ["user_id", "recommendations"])
 

In [51]:
recommendation_df.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- recommendations: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- game_id: long (nullable = true)
 |    |    |-- rating: double (nullable = true)



add game names to recommendations

### 2) Tensorflow Recommender (TFRS)