In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import regexp_replace, split, when, col, array, concat, concat_ws
from pyspark.ml.feature import BucketedRandomProjectionLSH, VectorAssembler, MinMaxScaler, Tokenizer, StopWordsRemover, CountVectorizer

spark = SparkSession.builder.appName("SteamRecommender").config('spark.driver.memory','8g').config('spark.executor.memory', '8g').getOrCreate()
df = spark.read.csv("steam.csv", header=True, inferSchema=True)

25/05/19 18:38:03 WARN Utils: Your hostname, javi-Z790-PG-SONIC resolves to a loopback address: 127.0.1.1; using 192.168.100.10 instead (on interface enp3s0)
25/05/19 18:38:03 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/19 18:38:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
df1 = df
df1 = df1.filter(df1['english'] == 1)
df1 = df1.drop(*['release_date', 'english','platforms','required_age', 'average_playtime', 'median_playtime', 'genres', 'achievements'])

In [3]:
columns_dict={'appid':'int', 'name':'string', 'developer':'string','publisher':'string','categories':'string',
              'steamspy_tags':'string','positive_ratings':'int','negative_ratings':'int','owners':'string', 'price':'double'
              }

for column, type in columns_dict.items():
    df1 = df1.withColumn(column, col(column).cast(type))
df1 = df1.withColumn('developer', split(regexp_replace('developer', ' ',''), ';'))
df1 = df1.withColumn('publisher', split(regexp_replace('publisher', ' ',''), ';'))
df1 = df1.withColumn(
    "owners",
    when(col("owners").contains("-"),
         ((split(col("owners"), "-").getItem(0).cast("int") +
           split(col("owners"), "-").getItem(1).cast("int")) / 2).cast("int"))
    .otherwise(col("owners").cast("int"))
)
df1 = df1.withColumn('categories', split(regexp_replace(regexp_replace('categories', '-', ''), ' ', ''), ';'))
df1 = df1.withColumn('steamspy_tags', split(regexp_replace(regexp_replace('steamspy_tags', '-', ''), ' ', ''), ';'))
df1 = df1.withColumn('name_list', array(col('name')))


In [4]:
numerical_columns = ['owners', 'price']
ratings_df = df1.select('positive_ratings','negative_ratings')

In [5]:
assembler = VectorAssembler(inputCols=numerical_columns, outputCol="num_features", handleInvalid='skip')
assembled = assembler.transform(df1)

In [6]:
scaler = MinMaxScaler(inputCol="num_features", outputCol="scaled_features")
scaler_model = scaler.fit(assembled)
scaled_df = scaler_model.transform(assembled)

In [7]:
# prepare the tags
scaled_df = scaled_df.withColumn('tags', concat(col('name_list'),col('developer'), col('publisher'), col('categories'), col('steamspy_tags'))).withColumn('tags', concat_ws(' ', col('tags')))

In [8]:

tokenizer = Tokenizer(inputCol="tags", outputCol="words")
words_df = tokenizer.transform(scaled_df)

remover = StopWordsRemover(inputCol="words", outputCol="filtered")
filtered_df = remover.transform(words_df)

cv = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=3000, minDF=5)
model = cv.fit(filtered_df)
vectorized_df = model.transform(filtered_df)

                                                                                

In [9]:
final_assembler = VectorAssembler(
    inputCols=['scaled_features', 'features'],
    outputCol='final_features'
)

In [10]:
final_df = final_assembler.transform(vectorized_df)

In [11]:
lsh = BucketedRandomProjectionLSH(inputCol="final_features", outputCol="hashes", bucketLength=2.0, numHashTables=5)
lsh_model = lsh.fit(final_df)

In [12]:
def recommend(game_name, top_n=5):
        query_df = final_df.filter(final_df['name'] == game_name).select("final_features")

        if query_df.count() ==0:
                return f'Game "{game_name}" not found in dataset'
        
        similar = lsh_model.approxSimilarityJoin(
                datasetA=query_df,
                datasetB=final_df,
                threshold=float('inf'),
                distCol='distance'
        )

        results = (
        similar
        .filter(similar.datasetB["name"] != game_name)
        .orderBy("distance")
        .select(similar.datasetB["name"], "distance")
        .limit(top_n)
        .collect()
    )

        return [row["datasetB.name"] for row in results]

In [13]:
recommend('Counter-Strike')

25/05/19 18:38:39 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

['Ricochet',
 'Deathmatch Classic',
 'Team Fortress Classic',
 'Counter-Strike: Condition Zero',
 'Half-Life Deathmatch: Source']

In [15]:
recommend('Ricochet')

['Counter-Strike',
 'Deathmatch Classic',
 'Half-Life Deathmatch: Source',
 'Counter-Strike: Condition Zero',
 'Day of Defeat']

In [116]:
# next steps:
# clean up
# do protection against game names (lookup, like?)
# publish to github
# create GUI