In [1]:
import csv
import os
import sys
# Spark imports
from pyspark.rdd import RDD
from pyspark.sql import DataFrame
from pyspark.sql.types import IntegerType
from pyspark.sql import SparkSession
import operator
# Dask imports
import dask.bag as db
import dask.dataframe as df  # you can use Dask bags or dataframes
from csv import reader
import numpy as np
import pandas as pd
import datetime
from pyspark.sql.functions import *
import sklearn
from sklearn import preprocessing
from sklearn.datasets import make_blobs
from sklearn.preprocessing import StandardScaler
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import re
# from scipy.sparse import csr_matrix
# import scipy as sp
import heapq
# from surprise import CoClustering
# from surprise import Dataset, Reader, SVD, accuracy
# from surprise import KNNBaseline


# Packages for model evaluation
from sklearn.metrics import mean_squared_error
from sklearn.metrics import mean_absolute_error
from time import time

# from surprise.model_selection import train_test_split
# from sklearn.model_selection import train_test_split

# Package to suppress warnings
import warnings
warnings.filterwarnings("ignore")
from pyspark.sql.functions import monotonically_increasing_id
# Packages for saving models
import pickle

from re import split
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, DoubleType, IntegerType, StringType, DateType
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.regression import RandomForestRegressor

In [2]:
def init_spark():
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL basic example") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
    return spark

In [3]:
spark=init_spark()

22/03/30 19:58:01 WARN Utils: Your hostname, Sams-MacBook-Pro-2.local resolves to a loopback address: 127.0.0.1; using 172.30.120.246 instead (on interface en0)
22/03/30 19:58:01 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/03/30 19:58:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/30 19:58:02 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
filename="../data/datasets/games.csv"
filename2="../data/datasets/steam_id_games.csv"

In [5]:
from pyspark.sql.types import *
games_schema = StructType() \
    .add("appid", IntegerType(), True) \
    .add("name", StringType(), True) \
    .add("price", DoubleType(), True) \
    .add("release_date", StringType(), True) \
    .add("required_age", IntegerType(), True) \
    .add("publishers", StringType(), True) \
    .add("developers", StringType(), True) \
    .add("categories", StringType(), True) \
    .add("genres", StringType(), True) \
    .add("ratings", DoubleType(), True) \
    .add("totalRatings", IntegerType(), True) \
    .add("average_playtime", DoubleType(), True) \
    .add("median_playtime", IntegerType(), True) \
    .add("num_owners", StringType(), True)
games = spark.read.schema(games_schema).csv(filename, header=True).dropna()


In [6]:
games.take(5)

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

[Row(appid=10, name='Counter-Strike', price=7.19, release_date='2000-11-01', required_age=0, publishers='Valve', developers='Valve', categories='Multi-player;Online Multi-Player;Local Multi-Player;Valve Anti-Cheat enabled', genres='Action', ratings=97.39, totalRatings=127873, average_playtime=17612.0, median_playtime=317, num_owners='10000000-20000000'),
 Row(appid=20, name='Team Fortress Classic', price=3.99, release_date='1999-04-01', required_age=0, publishers='Valve', developers='Valve', categories='Multi-player;Online Multi-Player;Local Multi-Player;Valve Anti-Cheat enabled', genres='Action', ratings=83.98, totalRatings=3951, average_playtime=277.0, median_playtime=62, num_owners='5000000-10000000'),
 Row(appid=30, name='Day of Defeat', price=3.99, release_date='2003-05-01', required_age=0, publishers='Valve', developers='Valve', categories='Multi-player;Valve Anti-Cheat enabled', genres='Action', ratings=89.56, totalRatings=3814, average_playtime=187.0, median_playtime=34, num_ow

Data Preprocessing

In [7]:
# get the yaer of release_date
games = games.withColumn('release_date', col('release_date')[0:4])
games = games.withColumn("release_date", games["release_date"].cast(IntegerType()))

In [8]:
#create columns for genres
games = games.withColumn('genres', split(col('genres'), ';'))
uniq_genres = games.select('genres').distinct().collect()
unic_genres_set = set()
for u in uniq_genres:
    for i in u.genres:
        unic_genres_set.add(i)
unic_genres_set

for i in unic_genres_set:
    games = games.withColumn(i,when(array_contains(games.genres,i), 1).otherwise(0))

                                                                                

In [9]:
# create columns for the categories
games = games.withColumn('categories', split(col('categories'), ';'))
uniq_cat = games.select('categories').distinct().collect()
unic_cat_set = set()
for u in uniq_cat:
    for i in u.categories:
        # if not i in games2.columns:
        #     games2 = games2.withColumn(i, 0)
        unic_cat_set.add(i)
unic_cat_set

for i in unic_cat_set:
    games = games.withColumn(i,when(array_contains(games.categories,i), 1).otherwise(0))

22/03/30 19:58:10 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [10]:
# add the column to store the index of the row
games = games.select("*").withColumn("RowNumber", monotonically_increasing_id())
games.take(1)

[Row(appid=10, name='Counter-Strike', price=7.19, release_date=2000, required_age=0, publishers='Valve', developers='Valve', categories=['Multi-player', 'Online Multi-Player', 'Local Multi-Player', 'Valve Anti-Cheat enabled'], genres=['Action'], ratings=97.39, totalRatings=127873, average_playtime=17612.0, median_playtime=317, num_owners='10000000-20000000', Game Development=0, Gore=0, Massively Multiplayer=0, Action=1, Web Publishing=0, Violent=0, Nudity=0, Adventure=0, Education=0, Simulation=0, Accounting=0, Documentary=0, Sports=0, Racing=0, Software Training=0, Free to Play=0, Sexual Content=0, Early Access=0, Video Production=0, Casual=0, Indie=0, Photo Editing=0, Design & Illustration=0, Audio Production=0, RPG=0, Strategy=0, Animation & Modeling=0, Tutorial=0, Utilities=0, Cross-Platform Multiplayer=0, Co-op=0, In-App Purchases=0, Includes Source SDK=0, Steam Leaderboards=0, Local Co-op=0, Valve Anti-Cheat enabled=1, Full controller support=0, Partial Controller Support=0, Capt

In [11]:
# create a dataset with features that we need for the model
myGames=games.drop('appid',"developers","genres","name","publishers","categories","num_owners",)

In [12]:
myGames.head(1)

[Row(price=7.19, release_date=2000, required_age=0, ratings=97.39, totalRatings=127873, average_playtime=17612.0, median_playtime=317, Game Development=0, Gore=0, Massively Multiplayer=0, Action=1, Web Publishing=0, Violent=0, Nudity=0, Adventure=0, Education=0, Simulation=0, Accounting=0, Documentary=0, Sports=0, Racing=0, Software Training=0, Free to Play=0, Sexual Content=0, Early Access=0, Video Production=0, Casual=0, Indie=0, Photo Editing=0, Design & Illustration=0, Audio Production=0, RPG=0, Strategy=0, Animation & Modeling=0, Tutorial=0, Utilities=0, Cross-Platform Multiplayer=0, Co-op=0, In-App Purchases=0, Includes Source SDK=0, Steam Leaderboards=0, Local Co-op=0, Valve Anti-Cheat enabled=1, Full controller support=0, Partial Controller Support=0, Captions available=0, Shared/Split Screen=0, Steam Turn Notifications=0, Single-player=0, Steam Trading Cards=0, Local Multi-Player=1, MMO=0, SteamVR Collectibles=0, Stats=0, VR Support=0, Mods (require HL2)=0, Steam Workshop=0, I

Model1 : Cosine Similarity

In [13]:
from sklearn.metrics.pairwise import cosine_similarity
myGames2 = myGames.take(27000)

cos_sim_data = cosine_similarity(myGames2)
cos_sim_data

                                                                                

array([[1.00000000e+00, 8.97414146e-01, 8.89050930e-01, ...,
        3.76547723e-09, 3.99609567e-09, 6.07002769e-09],
       [8.97414146e-01, 1.00000000e+00, 9.99682562e-01, ...,
        2.25443188e-04, 2.25443395e-04, 2.25445218e-04],
       [8.89050930e-01, 9.99682562e-01, 1.00000000e+00, ...,
        4.63813819e-04, 4.63814024e-04, 4.63815831e-04],
       ...,
       [3.76547723e-09, 2.25443188e-04, 4.63813819e-04, ...,
        1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
       [3.99609567e-09, 2.25443395e-04, 4.63814024e-04, ...,
        1.00000000e+00, 1.00000000e+00, 1.00000000e+00],
       [6.07002769e-09, 2.25445218e-04, 4.63815831e-04, ...,
        1.00000000e+00, 1.00000000e+00, 1.00000000e+00]])

In [14]:
type(myGames2)

list

In [15]:
# temp = pd.DataFrame(cos_sim_data)
# cos_sim_df = spark.createDataFrame(temp)
# cos_sim_df.take(5)

In [16]:
# // I need to create a function that get the appId and find the N top similar apps!!!
def generate_top_N_recommendationsX(appId, N=10):
    # appId = 0
    row = GetValueFromDataframe(games.filter(games.appid == appId),"RowNumber")
    sim_scores = list(enumerate(cos_sim_data[row]))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = sim_scores[1:N]
    game_indices = [i[0] for i in sim_scores]
    names = games.where(col("RowNumber").isin(game_indices)).select("name")
    return names


def GetValueFromDataframe(_df,columnName):
    for row in _df.rdd.collect():
        return row[columnName]

In [17]:
equalnames = generate_top_N_recommendationsX(str(10))
equalnames.show()

+--------------------+
|                name|
+--------------------+
|Counter-Strike: S...|
|Mount & Blade: Wa...|
|              Arma 3|
|        APB Reloaded|
|Realm of the Mad God|
|Total War™: ROME ...|
|Kerbal Space Program|
|       Path of Exile|
|ARK: Survival Evo...|
+--------------------+



Model2 :  : Random Forest

In [18]:
# The users dataframe is not currently used.
#users_df = spark.read.csv(users_csv, header=True, inferSchema=True)
# Init the users/games relation dataframe.
users_games_df = spark.read.csv(filename2, header=True, inferSchema=True).drop("_c0")

# DO CALCULATION HERE
# Pick the user ID to test.
user_id = 76561197960360459 # 76561197982716241

# Feature assembler: make a vector of all features.
inputColsList = ["ratings", "price", "release_date", "totalRatings"]
inputColsList.extend(unic_cat_set)
inputColsList.extend(unic_genres_set)
vector_assembler = VectorAssembler().setInputCols(inputColsList).setOutputCol("features")
vectorized_games = vector_assembler.transform(games)
#vectorized_games.select("name", "ratings", "Action", "Nudity", "Simulation", "features").show()

# Join the vectorized games with the user/games relation table.
users_games_joined_df = users_games_df.join(vectorized_games, "appid", "inner")

# Get the data for the games the player has played.
owned_games_df = users_games_joined_df.filter(users_games_joined_df.steam_id == user_id)
user_games_count = owned_games_df.count()
print("Owned Games:", user_games_count)
if user_games_count > 0:
    # Print the games this user has played the most (for comparison to final predictions).
    print("User's top games:")
    owned_games_df.sort(owned_games_df.time_played_in_minutes.desc()).select("name", "genres", "time_played_in_minutes").show()

    # Extract the list of unowned games by removing the already owned ones.
    unowned_games_df = vectorized_games.join(owned_games_df, "appid", "leftanti")

    # Build Random Forest Regression based on time the user played each game in their library.
    randForest = RandomForestRegressor()
    model = randForest.fit(owned_games_df.withColumnRenamed("time_played_in_minutes", "label"))

    # Run the model on unowned game data.
    predictions = model.transform(unowned_games_df).sort(col("prediction").desc())
    print("The user's top predicted games (with predicted minutes played if they owned it):")
    predictions.select('appid', 'features', 'genres', 'name', 'prediction').show(50)

    # Model evaluation (probably not really possible)
    # We'd need both labels and predictions to determine how far off we were.
    # But the labels don't exist for unknown games. :'(
    #evaluator = RegressionEvaluator()
    #print(evaluator.evaluate(predictions, {evaluator.metricName: "rmse"}))

                                                                                

Owned Games: 216
User's top games:
+--------------------+--------------------+----------------------+
|                name|              genres|time_played_in_minutes|
+--------------------+--------------------+----------------------+
|                DOOM|            [Action]|                  6566|
|  Two Point Hospital| [Indie, Simulation]|                  5606|
|          Anno 2070™|          [Strategy]|                  3850|
|           Tropico 5|[RPG, Simulation,...|                  3836|
|LEGO® Batman™ 3: ...| [Action, Adventure]|                  3699|
|Call of Duty®: Bl...|            [Action]|                  2774|
|     Game Dev Tycoon|[Casual, Indie, S...|                  2627|
|  Duke Nukem Forever|            [Action]|                  2432|
|              Dota 2|[Action, Free to ...|                  2324|
|    Castle Crashers®|[Action, Adventur...|                  2198|
|Jurassic World Ev...|[Simulation, Stra...|                  2025|
|       Left 4 Dead 2|     

                                                                                

The user's top predicted games (with predicted minutes played if they owned it):




+------+--------------------+--------------------+--------------------+------------------+
| appid|            features|              genres|                name|        prediction|
+------+--------------------+--------------------+--------------------+------------------+
|413150|(62,[0,1,2,3,5,12...|[Indie, RPG, Simu...|      Stardew Valley| 4016.346992481203|
|582010|(62,[0,1,2,3,5,8,...|            [Action]|MONSTER HUNTER: W...| 3462.496462178173|
|381210|(62,[0,1,2,3,5,11...|            [Action]|    Dead by Daylight|3332.9427580314427|
|374320|(62,[0,1,2,3,5,11...|            [Action]|     DARK SOULS™ III|3012.2023034859876|
|242760|(62,[0,1,2,3,5,12...|[Action, Adventur...|          The Forest|2948.4413377192986|
|291550|(62,[0,2,3,4,5,9,...|[Action, Free to ...|          Brawlhalla| 2817.536537935749|
|322330|(62,[0,1,2,3,4,5,...|[Adventure, Indie...|Don't Starve Toge...|2808.8010851940458|
|211820|(62,[0,1,2,3,4,5,...|[Action, Adventur...|           Starbound|2772.0281409257814|

