In [None]:
!pip install pyspark



In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row, SparkSession
from pyspark.sql.types import LongType, DoubleType
import pandas as pd

from pyspark.sql import functions as fn
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from pyspark.sql.functions import col, asc,desc

Loading the necessary texts:
* Recommender data
* Mapping between item_id and item_name
* Mapping between item_name and item_id

In [None]:
recommender_data = pd.read_csv("/content/drive/My Drive/datasets/recommenders/games_played.csv")
id_by_name = pd.read_csv("/content/drive/My Drive/datasets/recommenders/name2_item_id_map.csv", names=["item_name","item_id"])
name_by_id = pd.read_csv("/content/drive/My Drive/datasets/recommenders/item_id2item_map.csv", names=["item_id","item_name"])
recommender_data.head()

Unnamed: 0.1,Unnamed: 0,Name,item,recent_play_time,total_play_time
0,0,76561197970982479,10,0,6
1,1,76561197970982479,20,0,0
2,2,76561197970982479,30,0,7
3,3,76561197970982479,40,0,0
4,4,76561197970982479,50,0,0


Data processing of the recommender data:
* recode the user key
* remove duplicates
* remove unnecessary columns

In [None]:
recommender_data.drop(columns=['Unnamed: 0'], inplace=True)
recommender_data.head()

Unnamed: 0,Name,item,recent_play_time,total_play_time
0,76561197970982479,10,0,6
1,76561197970982479,20,0,0
2,76561197970982479,30,0,7
3,76561197970982479,40,0,0
4,76561197970982479,50,0,0


In [None]:
recommender_data.drop_duplicates(inplace=True)

In [None]:
recommender_data.duplicated().sum()

0

In [None]:
df_unique_users = pd.DataFrame(recommender_data["Name"].unique(),columns=["user_id"])

In [None]:
df_unique_users["uid"] = df_unique_users.index
df_unique_users.head()

Unnamed: 0,user_id,uid
0,76561197970982479,0
1,76561198035864385,1
2,76561198007712555,2
3,76561197963445855,3
4,76561198002099482,4


Storying the mapping between recoded key and the real key in a csv:

In [None]:
df_unique_users.to_csv("/content/drive/My Drive/datasets/recommenders/user_id2_uid_map.csv")

Experiments with smallar dataset:

In [None]:
small_key_recommender_data=recommender_data.join(df_unique_users.set_index("user_id"), on="Name")
small_key_recommender_data.drop(columns=['Name'], inplace=True)
small_key_recommender_data.head()

Unnamed: 0,item,recent_play_time,total_play_time,uid
0,10,0,6,0
1,20,0,0,0
2,30,0,7,0
3,40,0,0,0
4,50,0,0,0


In [None]:
small_key_recommender_data.count()

item                5094105
recent_play_time    5094105
total_play_time     5094105
uid                 5094105
dtype: int64

In [None]:
recommender_data_small=recommender_data[recommender_data['total_play_time']>0]

In [None]:
recommender_data_small.count()

Name                3246375
item                3246375
recent_play_time    3246375
total_play_time     3246375
dtype: int64

Initializing the Spark processing app:

In [None]:
appName = "Game blog cold start solution"
spark = SparkSession.builder.appName(appName).getOrCreate()
sc = spark.sparkContext

In [None]:
rdd_data = spark.createDataFrame(small_key_recommender_data).rdd

In [None]:
gamesRDD = rdd_data.map(lambda p: Row(user_id=int(p[3]), item_id=int(p[0]), recent_play_time=int(p[2]), total_play_time=int(p[3])))
games_dataframe = spark.createDataFrame(gamesRDD)


Check all time most played played games:

In [None]:
top_games_all_time = games_dataframe.groupBy("item_id").sum("total_play_time")
#top_games_all_time.sort("sum(total_play_time)").orderBy(desc("sum(total_play_time)")).show()
pandas_results = top_games_all_time.toPandas()
named_all_time=pandas_results.join(name_by_id.set_index("item_id"), on="item_id")
#named_all_time.drop(columns=['item_id'], inplace=True)
named_all_time.sort_values(by=['sum(total_play_time)'], ascending=False, inplace=True)
named_all_time.head(10)


Unnamed: 0,item_id,sum(total_play_time),item_name
7990,205790,1665083234,Dota 2 Test
4701,4000,1324917931,Garry's Mod
101,730,1296820188,Counter-Strike: Global Offensive
5379,304930,1219528785,Unturned
2463,223530,1098909289,Left 4 Dead 2 Beta
4131,550,1098909289,Left 4 Dead 2
5222,105600,831366050,Terraria
4,230410,752350569,Warframe
10126,240,697872925,Counter-Strike: Source
8632,620,684899423,Portal 2


In [None]:
top_games_recent = games_dataframe.groupBy("item_id").sum("recent_play_time")
top_games_recent.sort("sum(recent_play_time)").orderBy(desc("sum(recent_play_time)")).show()
pandas_results = top_games_recent.toPandas()
named_recent=pandas_results.join(name_by_id.set_index("item_id"), on="item_id")
#named_recent.drop(columns=['item_id'], inplace=True)
named_recent.sort_values(by=['sum(recent_play_time)'], ascending=False, inplace=True)
named_recent.head(10)

+-------+---------------------+
|item_id|sum(recent_play_time)|
+-------+---------------------+
|    730|            775918724|
|   4000|            441871026|
| 105600|            152997644|
|  72850|            134851450|
| 230410|            122726853|
|    240|            111314706|
|    550|            101191416|
| 218620|             98405781|
|   8930|             81187409|
| 252490|             80110135|
|  49520|             79422022|
| 107410|             66163784|
| 271590|             58886281|
| 304930|             50333977|
| 377160|             44782893|
| 236390|             42971942|
| 218230|             40713146|
|  33930|             39911210|
|  48700|             38905649|
| 202990|             38209064|
+-------+---------------------+
only showing top 20 rows



Unnamed: 0,item_id,sum(recent_play_time),item_name
101,730,775918724,Counter-Strike: Global Offensive
4701,4000,441871026,Garry's Mod
5222,105600,152997644,Terraria
2846,72850,134851450,The Elder Scrolls V: Skyrim
4,230410,122726853,Warframe
10126,240,111314706,Counter-Strike: Source
4131,550,101191416,Left 4 Dead 2
4762,218620,98405781,PAYDAY 2
6507,8930,81187409,Sid Meier's Civilization V
4074,252490,80110135,Rust


In [None]:
named_recent.head(10)

Unnamed: 0,item_id,sum(recent_play_time),item_name
101,730,775918724,Counter-Strike: Global Offensive
4701,4000,441871026,Garry's Mod
5222,105600,152997644,Terraria
2846,72850,134851450,The Elder Scrolls V: Skyrim
4,230410,122726853,Warframe
10126,240,111314706,Counter-Strike: Source
4131,550,101191416,Left 4 Dead 2
4762,218620,98405781,PAYDAY 2
6507,8930,81187409,Sid Meier's Civilization V
4074,252490,80110135,Rust


Here is how one can recode implicit variable into explicit binary indicator. This might be used for future modelling:

In [None]:


def get_binary_data(data_frame_used, column_used="recent_play_time"):
    data_frame_mod = data_frame_used.select("user_id", "item_id", column_used)
    ratings = data_frame_mod.withColumn('binary', fn.lit(1))
    userIds = data_frame_mod.select("user_id").distinct()
    itemIds = data_frame_mod.select("item_id").distinct()

    user_game = userIds.crossJoin(itemIds).join(ratings, ['user_id', 'item_id'], "left")
    user_game = user_game.select(['user_id', 'item_id', 'binary']).fillna(0)
    return user_game

#user_games = get_binary_data(games_dataframe)

In [None]:
#user_games.sort("binary").orderBy(desc("binary")).show(5)

Calculating data sparsity - useful for making a decision what type of similarity measure and model to use:

In [None]:
numerator = games_dataframe.select("recent_play_time").count()
# Count the number of distinct Id’s
num_users = games_dataframe.select("user_id").distinct().count()
num_items = games_dataframe.select("item_id").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of items
denominator = num_users * num_items
# Divide the numerator by the denominator
sparsity = (1.0 - (numerator * 1.0)/ denominator) * 100
print("The games dataframe is ", "%.2f" % sparsity + "% empty.")

The games dataframe is  99.35% empty.


Creating a model using the Alternating Least Squares with the implicit feedback of recent play time:

In [None]:
# dataset split into training and testing set
(training, test) = games_dataframe.randomSplit([0.8, 0.2], seed=2021)
# training the model
als = ALS(maxIter=5, implicitPrefs=True,userCol="user_id", itemCol="item_id",nonnegative = True, ratingCol="recent_play_time",coldStartStrategy="drop")
model = als.fit(training)
# predict using the testing datatset
predictions = model.transform(test)
predictions.show()

In [None]:
gamesRecs = model.recommendForAllItems(10)
gamesRecs.show()



+-------+--------------------+
|item_id|     recommendations|
+-------+--------------------+
|     20|[{41418, 3.208560...|
|     40|[{47498, 1.677432...|
|    280|[{41418, 1.916484...|
|    300|[{41030, 0.0}, {4...|
|    340|[{51247, 1.623201...|
|    360|[{51247, 1.821780...|
|    500|[{41030, 0.0}, {4...|
|    620|[{41030, 0.0}, {4...|
|   1002|[{19678, 0.145275...|
|   1210|[{41030, 0.0}, {4...|
|   1250|[{41030, 0.0}, {4...|
|   1280|[{4221, 1.3800358...|
|   1300|[{39815, 1.127766...|
|   1500|[{37522, 1.530484...|
|   1510|[{38131, 1.980491...|
|   1520|[{37364, 1.403493...|
|   1530|[{38131, 1.609884...|
|   1600|[{37072, 0.811117...|
|   1670|[{45510, 0.296791...|
|   1700|[{37522, 1.116034...|
+-------+--------------------+
only showing top 20 rows



Helper functions for getting the id of a particular game by its name and also for getting top similary results.

In [None]:


def get_id_by_name(name):
  return id_by_name[id_by_name["item_name"]==name]


def get_topn_similarity_byid(id_searched, n=5):
  item_vectors=model.itemFactors.select("id", "features")
  specific_item = item_vectors.where(col("id")==id_searched).select('features').collect()[0][0]
  result = item_vectors.rdd.map(lambda x: (x['id'],
                                 float(
                                     cosine_similarity(
                                         [x['features']],
                                         [specific_item]
                                     )[0,0]
                                 ))).toDF(schema=['item_id', 'cosine_similarity']).sort('cosine_similarity')
  result.orderBy(desc("cosine_similarity")).show(50)
  return result



Note that the top row is in the item itself!

In [None]:


#testing the function with one favorite game
game_names=["Air Control", "Sid Meier's Civilization IV", "Joint Task Force", "Need for Speed: Undercover", "Call of Duty: Modern Warfare 2 - Multiplayer"]
item_df = get_id_by_name(game_names[3])

top_items = get_topn_similarity_byid(item_df["item_id"].item())




+-------+------------------+
|item_id| cosine_similarity|
+-------+------------------+
|  17430|               1.0|
| 348710| 0.962107635412054|
| 265020|0.9546876685147764|
| 234310|0.9545148858766462|
|  37270|0.9534481270243044|
| 365430|0.9503236496559985|
| 404820|0.9492013509696193|
|  51020|0.9490395831899194|
| 404230|0.9426783274742662|
| 297390|0.9399477430356383|
| 435480|0.9395470365237925|
| 207210|0.9385371420870992|
|  94200|0.9319658788684612|
|  91100|0.9304007206559509|
| 236890|0.9302083121675719|
| 422270| 0.929139189146392|
|  29180|0.9268271621495552|
| 400110|0.9262097628041264|
|  20820|0.9260848677937064|
| 284910|0.9235669253545319|
|  23200|0.9232317896993855|
|  44100|0.9229919541555978|
|  40210|0.9229510140839214|
|  50000|0.9228942166236237|
|  99700|0.9227705066310021|
| 500790|0.9224835277459271|
| 253690|0.9224815944461271|
| 410570| 0.922338196820518|
| 222140|0.9216983793939294|
|  46600|0.9213647630876701|
| 353540|0.9210502805280252|
| 334120|0.920

Top items with their names:

In [None]:
pandas_results = top_items.toPandas()

named_top_results=pandas_results.join(name_by_id.set_index("item_id"), on="item_id")
named_top_results.drop(columns=['item_id'], inplace=True)
named_top_results.sort_values(by=['cosine_similarity'], ascending=False, inplace=True)
named_top_results.head(11)
#print(df[df["item_id"]==302010])

Unnamed: 0,cosine_similarity,item_name
10805,1.0,Need for Speed: Undercover
10804,0.962108,Saint Seiya: Soldiers' Soul
10803,0.954688,Ashes Cricket 2013
10802,0.954515,March of War
10801,0.953448,Wandering Willows
10800,0.950324,Everything is Black and White
10799,0.949201,Ragdoll Runners
10798,0.94904,Unwell Mel
10797,0.942678,MAGIX Music Maker 2016
10796,0.939948,Collateral


Testing with a new model using total play time as an implicit feedback:

In [None]:
# dataset split into training and testing set
(training, test) = games_dataframe.randomSplit([0.8, 0.2], seed=2021)
# training the model
als = ALS(maxIter=5, implicitPrefs=True,userCol="user_id", itemCol="item_id",nonnegative = True, ratingCol="total_play_time",coldStartStrategy="drop")
model = als.fit(training)
# predict using the testing datatset
predictions_all = model.transform(test)
predictions_all.sort('prediction').orderBy(desc("prediction")).show()

+-------+-------+----------------+---------------+----------+
|user_id|item_id|recent_play_time|total_play_time|prediction|
+-------+-------+----------------+---------------+----------+
|  12191|  29640|               6|          12191| 1.9097133|
|  17401| 356180|               0|          17401| 1.7229642|
|  13223| 384190|             148|          13223| 1.7118622|
|  28469| 428870|               0|          28469| 1.6572301|
|  25195| 279840|               0|          25195| 1.6251016|
|  26512| 488310|             301|          26512| 1.5934759|
|  23764| 521340|               3|          23764| 1.5493846|
|  27782| 292180|              61|          27782| 1.5488398|
|  10458| 493810|               1|          10458|   1.54358|
|   3055| 211780|              50|           3055| 1.5323814|
|  20585| 336010|             226|          20585| 1.5278997|
|   5868| 316110|              86|           5868| 1.5219426|
|  23930| 378180|               0|          23930| 1.5212606|
|  60635

In [None]:

#testing the function with one favorite game
game_names=["Air Control", "Sid Meier's Civilization IV", "Joint Task Force", "Call of Duty: Modern Warfare 2 - Multiplayer"]
item_df = get_id_by_name(game_names[2])

top_items_all_time = get_topn_similarity_byid(item_df["item_id"].item())

pandas_results_at = top_items_all_time.toPandas()

named_top_results_at=pandas_results_at.join(name_by_id.set_index("item_id"), on="item_id")
named_top_results_at.drop(columns=['item_id'], inplace=True)
named_top_results_at.sort_values(by=['cosine_similarity'], ascending=False, inplace=True)
named_top_results_at.head()

In [None]:
named_all_time.head(10)


Unnamed: 0,sum(total_play_time),item_name
7990,1665083234,Dota 2 Test
4701,1324917931,Garry's Mod
101,1296820188,Counter-Strike: Global Offensive
5379,1219528785,Unturned
2463,1098909289,Left 4 Dead 2 Beta
4131,1098909289,Left 4 Dead 2
5222,831366050,Terraria
4,752350569,Warframe
10126,697872925,Counter-Strike: Source
8632,684899423,Portal 2


In [None]:
named_all_time.head(10)

Unnamed: 0,sum(total_play_time),item_name
7990,1665083234,Dota 2 Test
4701,1324917931,Garry's Mod
101,1296820188,Counter-Strike: Global Offensive
5379,1219528785,Unturned
2463,1098909289,Left 4 Dead 2 Beta
4131,1098909289,Left 4 Dead 2
5222,831366050,Terraria
4,752350569,Warframe
10126,697872925,Counter-Strike: Source
8632,684899423,Portal 2
