In [3]:
from pyspark.sql import SparkSession
from pyspark import Row
from pyspark.sql import SQLContext
import sys
import json
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import lit
from pyspark.ml.feature import MinMaxScaler
from pyspark.mllib.util import MLUtils
from pyspark.ml.feature import StandardScaler
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

In [4]:
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()


In [5]:
df = spark.read.load('steam_data_w_game_id.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')

In [6]:
df = df.withColumnRenamed("User ID", "userId").withColumnRenamed("_c0", "num").\
        withColumnRenamed("Game Title","gameTitle").withColumnRenamed("Hours Played","hoursPlayed").\
        withColumnRenamed("Game ID","gameId")

In [23]:
print((df.count(), len(df.columns)))

(70489, 5)


In [6]:
df.show()

+---+---------+--------------------+-----------+------+
|num|   userId|           gameTitle|hoursPlayed|gameId|
+---+---------+--------------------+-----------+------+
|  0|151603712|The Elder Scrolls...|      273.0|  3067|
|  1|151603712|           Fallout 4|       87.0|  1162|
|  2|151603712|               Spore|       14.9|  2813|
|  3|151603712|   Fallout New Vegas|       12.1|  1163|
|  4|151603712|       Left 4 Dead 2|        8.9|  1733|
|  5|151603712|            HuniePop|        8.5|  1535|
|  6|151603712|       Path of Exile|        8.1|  2197|
|  7|151603712|         Poly Bridge|        7.5|  2251|
|  8|151603712|         Left 4 Dead|        3.3|  1732|
|  9|151603712|     Team Fortress 2|        2.8|  2994|
| 10|151603712|         Tomb Raider|        2.5|  3247|
| 11|151603712|     The Banner Saga|        2.0|  3024|
| 12|151603712|Dead Island Epidemic|        1.4|   783|
| 13|151603712|   BioShock Infinite|        1.3|   348|
| 14|151603712|Dragon Age Origin...|        1.3|

# Normalization

In [7]:
df_norm = df.select(
    _mean(col('hoursPlayed')).alias('mean'),
    _stddev(col('hoursPlayed')).alias('std')
).collect()

mean = df_norm[0]['mean']
std = df_norm[0]['std']

print(mean)
print(std)

48.878063243911484
229.33523599681345


In [8]:
df = df.withColumn("normalizedH", (df['hoursPlayed']-mean)/std)

# Baseline - Basic ALS

In [25]:
df = spark.read.load('steam_data_w_game_id.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
df = df.withColumnRenamed("User ID", "userId").withColumnRenamed("_c0", "num").\
        withColumnRenamed("Game Title","gameTitle").withColumnRenamed("Hours Played","hoursPlayed").\
        withColumnRenamed("Game ID","gameId")
df_norm = df.select(
    _mean(col('hoursPlayed')).alias('mean'),
    _stddev(col('hoursPlayed')).alias('std')
).collect()

mean = df_norm[0]['mean']
std = df_norm[0]['std']
df = df.withColumn("normalizedH", (df['hoursPlayed']-mean)/std)
(training, test) = df.randomSplit([0.8, 0.2], 123)

In [26]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs = True, userCol="userId", itemCol="gameId", ratingCol="normalizedH",
          coldStartStrategy="drop").setSeed(123)

model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalizedH",predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

#Result
rmse

0.8612171660889554

In [11]:
# df2.show()

# ALS Global Average

In [12]:
df = spark.read.load('steam_data_w_game_id.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
df = df.withColumnRenamed("User ID", "userId").withColumnRenamed("_c0", "num").\
        withColumnRenamed("Game Title","gameTitle").withColumnRenamed("Hours Played","hoursPlayed").\
        withColumnRenamed("Game ID","gameId")
df_norm = df.select(
    _mean(col('hoursPlayed')).alias('mean'),
    _stddev(col('hoursPlayed')).alias('std')
).collect()

mean = df_norm[0]['mean']
std = df_norm[0]['std']
df = df.withColumn("normalizedH", (df['hoursPlayed']-mean)/std)
(training, test) = df.randomSplit([0.8, 0.2], 123)

In [13]:
global_mean = training.groupBy().avg("normalizedH").collect()[0]['avg(normalizedH)']
training = training.withColumn('normalizedHAVG',lit(global_mean))
print(global_mean)

0.0012531904932816308


In [14]:
als = ALS(maxIter=5, regParam=0.01,implicitPrefs = True,userCol="userId", itemCol="gameId", ratingCol="normalizedHAVG",
          coldStartStrategy="drop").setSeed(123)

model = als.fit(training)

# Evaluate the model by computing the RMSE on the test dataI
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalizedH",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

#Result
rmse

0.8753411684638525

# ALS with Bias

In [15]:
df = spark.read.load('steam_data_w_game_id.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
df = df.withColumnRenamed("User ID", "userId").withColumnRenamed("_c0", "num").\
        withColumnRenamed("Game Title","gameTitle").withColumnRenamed("Hours Played","hoursPlayed").\
        withColumnRenamed("Game ID","gameId")
df_norm = df.select(
    _mean(col('hoursPlayed')).alias('mean'),
    _stddev(col('hoursPlayed')).alias('std')
).collect()

mean = df_norm[0]['mean']
std = df_norm[0]['std']
df = df.withColumn("normalizedH", (df['hoursPlayed']-mean)/std)
(training, test) = df.randomSplit([0.8, 0.2], 123)

In [16]:
#################

#global average 
global_mean = training.groupBy().avg("normalizedH").collect()[0]['avg(normalizedH)']

#User_mean
user_mean = training.groupBy("userId").agg({"normalizedH": "avg"})
user_mean = user_mean.withColumnRenamed('avg(normalizedH)', 'user_mean')

#item_mean
item_mean = training.groupBy('gameId').agg({"normalizedH": "avg"})
item_mean = item_mean.withColumnRenamed('avg(normalizedH)', 'item_mean')

#joining DFs
# mainDF = training.alias('mainDF')
userDF = user_mean.alias('userDF')
itemDF = item_mean.alias('itemDF')

training = training.join(userDF, training.userId == userDF.userId, 'outer')\
    .select(training.userId, training.gameId, training.normalizedH , userDF.user_mean)

training = training.join(itemDF, training.gameId == itemDF.gameId, 'outer')\
    .select(training.userId, training.gameId, training.normalizedH ,training.user_mean, itemDF.item_mean)

#user_item_interaction  
training = training.withColumn('user_item_interaction',training.normalizedH\
                               - (training.user_mean + training.item_mean - global_mean))


#################
#####test#######

test_user_mean = user_mean.alias('test_user_mean')
test = test.join(test_user_mean, test.userId == test_user_mean.userId, 'inner') \
    .select(test.userId, test.gameId, test.normalizedH, test_user_mean.user_mean)

test_item_mean = item_mean.alias('test_item_mean')
test = test.join(test_item_mean, test.gameId == test_item_mean.gameId, 'inner') \
    .select(test.userId, test.gameId, test.normalizedH, test.user_mean, test_item_mean.item_mean)

als = ALS( maxIter=5, regParam=0.01,implicitPrefs = True,userCol="userId", itemCol="gameId", ratingCol="user_item_interaction",
          coldStartStrategy="drop").setSeed(123)


model = als.fit(training)
predictions = model.transform(test)

predictions = predictions.withColumn('prediction_calculated',
                                     predictions.prediction + predictions.user_mean + predictions.item_mean - global_mean)

evaluator = RegressionEvaluator(metricName="rmse", labelCol="normalizedH", predictionCol="prediction_calculated")
rmse = evaluator.evaluate(predictions)

rmse


0.9186947958927599

# Normalization based on Game ID

In [134]:
df = spark.read.load('steam_data_w_game_id.csv', 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
df = df.withColumnRenamed("User ID", "userId").withColumnRenamed("_c0", "num").\
        withColumnRenamed("Game Title","gameTitle").withColumnRenamed("Hours Played","hoursPlayed").\
        withColumnRenamed("Game ID","gameId")
df_norm = df.select(
    _mean(col('hoursPlayed')).alias('mean'),
    _stddev(col('hoursPlayed')).alias('std')
).collect()

mean = df_norm[0]['mean']
std = df_norm[0]['std']
df = df.withColumn("normalizedH", (df['hoursPlayed']-mean)/std)


In [60]:
# df.show()

In [135]:
#item_mean
item_mean = df.groupBy('gameId').agg({"hoursPlayed": "avg"})
item_mean = item_mean.withColumnRenamed('avg(hoursPlayed)', 'item_mean')

#item_std
item_stddev = df.groupBy('gameId').agg(stddev_pop("hoursPlayed"))
item_stddev = item_stddev.withColumnRenamed('stddev_pop(hoursPlayed)', 'item_stddev')

#joining DFs
itemDF = item_mean.alias('itemDF')
itemsdDF = item_stddev.alias('itemsdDF')

from pyspark.sql.functions import * #for decs or asc

df = df.join(itemDF, df.gameId == itemDF.gameId, 'outer')\
    .select( df.num, df.userId, df.gameTitle, df.hoursPlayed ,df.gameId, df.normalizedH ,itemDF.item_mean)#.orderBy(asc("num"))

df = df.join(itemsdDF, df.gameId == itemsdDF.gameId, 'outer')\
    .select( df.num, df.userId, df.gameTitle, df.hoursPlayed ,df.gameId, df.normalizedH ,df.item_mean , itemsdDF.item_stddev)


In [136]:
df = df.withColumn("item_normalizedH", (df['hoursPlayed']-df['item_mean'])/df['item_stddev'])

In [137]:
df = df.na.fill(0)

In [106]:
df.show()

+-----+---------+--------------------+-----------+------+--------------------+------------------+------------------+--------------------+
|  num|   userId|           gameTitle|hoursPlayed|gameId|         normalizedH|         item_mean|       item_stddev|    item_normalizedH|
+-----+---------+--------------------+-----------+------+--------------------+------------------+------------------+--------------------+
|  976| 57103808|    America's Army 3|       25.0|   148| -0.1041185979996692|          10.61875|23.253957156524997|  0.6184431279028422|
|14122|206014489|    America's Army 3|        0.3|   148|-0.21182119281742848|          10.61875|23.253957156524997|-0.44374167934271724|
|15206|197406178|    America's Army 3|        0.7|   148|-0.21007702124143235|          10.61875|23.253957156524997|-0.42654030594602804|
|25522| 43955374|    America's Army 3|        0.2|   148| -0.2122572357114275|          10.61875|23.253957156524997| -0.4480420226918896|
|26334|132014951|    America's Arm

In [83]:
print((df.count(), len(df.columns)))

(70489, 9)


In [107]:
from pyspark.sql.functions import isnan, when, count, col

# df.select([count(when(isnan(c), c)).alias(c) for c in df.columns]).count()
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()

+---+------+---------+-----------+------+-----------+---------+-----------+----------------+
|num|userId|gameTitle|hoursPlayed|gameId|normalizedH|item_mean|item_stddev|item_normalizedH|
+---+------+---------+-----------+------+-----------+---------+-----------+----------------+
|  0|     0|        0|          0|     0|          0|        0|          0|               0|
+---+------+---------+-----------+------+-----------+---------+-----------+----------------+



# Basic ALS with new normalization

In [138]:
(training, test) = df.randomSplit([0.8, 0.2], 123)

In [109]:
als = ALS(maxIter=5, regParam=0.01, implicitPrefs = True, userCol="userId", itemCol="gameId", ratingCol="item_normalizedH",
          coldStartStrategy="drop").setSeed(123)

model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="item_normalizedH",predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

#Result
rmse

0.9938771458503272

# Global average ALS with new normalization

In [139]:
global_item_mean = training.groupBy().avg("item_normalizedH").collect()[0]['avg(item_normalizedH)']
training = training.withColumn('item_normalizedHAVG',abs(lit(global_item_mean)))
print(global_item_mean)

-0.0004050004770339087


In [None]:
# Note: I should have used absolute value for global average to be able to use 

In [119]:
# training.show()

In [120]:
als = ALS(maxIter=5, regParam=0.01,implicitPrefs = True,userCol="userId", itemCol="gameId", ratingCol="item_normalizedHAVG",
          coldStartStrategy="drop").setSeed(123)

model = als.fit(training)

# Evaluate the model by computing the RMSE on the test dataI
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="item_normalizedH",
                                predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

#Result
rmse

1.0074705380442321

In [129]:
training.show()

+----+---------+--------------------+-----------+------+--------------------+-------------------+-------------------+--------------------+--------------------+
| num|   userId|           gameTitle|hoursPlayed|gameId|         normalizedH|          item_mean|        item_stddev|    item_normalizedH| item_normalizedHAVG|
+----+---------+--------------------+-----------+------+--------------------+-------------------+-------------------+--------------------+--------------------+
| 298| 97298878|            Mafia II|       15.9|  1829|-0.14379850135358052|  24.94310344827587|  30.57027298485587|-0.29581363086799095|4.050004770339087E-4|
|1430| 69857045|            Mafia II|       21.0|  1829|-0.12156031375963021|  24.94310344827587|  30.57027298485587| -0.1289848949085026|4.050004770339087E-4|
|1702|162649407|            Mafia II|       23.0|  1829| -0.1128394558796497|  24.94310344827587|  30.57027298485587|-0.06356186119889932|4.050004770339087E-4|
|1864| 42681063|            Mafia II|   

# Bias ALS with new normalization

In [141]:
# #################

# # #global average 
# # global_mean = training.groupBy().avg("normalizedH").collect()[0]['avg(normalizedH)']

# #User_mean
# user_mean = training.groupBy("userId").agg({"item_normalizedH": "avg"})
# user_mean = user_mean.withColumnRenamed('avg(item_normalizedH)', 'user_mean')

# #item_mean
# item_mean = training.groupBy('gameId').agg({"item_normalizedH": "avg"})
# item_mean = item_mean.withColumnRenamed('avg(item_normalizedH)', 'item_mean')

# #joining DFs
# # mainDF = training.alias('mainDF')
# userDF = user_mean.alias('userDF')
# itemDF = item_mean.alias('itemDF')

# training = training.join(userDF, training.userId == userDF.userId, 'outer')\
#     .select(training.userId, training.gameId, training.item_normalizedH , userDF.user_mean)

# training = training.join(itemDF, training.gameId == itemDF.gameId, 'outer')\
#     .select(training.userId, training.gameId, training.item_normalizedH ,training.user_mean, itemDF.item_mean)

# #user_item_interaction  
# training = training.withColumn('user_item_interaction',training.item_normalizedH\
#                                - (training.user_mean + training.item_mean - training.item_normalizedHAVG))


# #################
# #####test#######

# test_user_mean = user_mean.alias('test_user_mean')
# test = test.join(test_user_mean, test.userId == test_user_mean.userId, 'inner') \
#     .select(test.userId, test.gameId, test.normalizedH, test_user_mean.user_mean)

# test_item_mean = item_mean.alias('test_item_mean')
# test = test.join(test_item_mean, test.gameId == test_item_mean.gameId, 'inner') \
#     .select(test.userId, test.gameId, test.normalizedH, test.user_mean, test_item_mean.item_mean)

# als = ALS( maxIter=5, regParam=0.01,implicitPrefs = True,userCol="userId", itemCol="gameId", ratingCol="user_item_interaction",
#           coldStartStrategy="drop").setSeed(123)


# model = als.fit(training)
# predictions = model.transform(test)

# predictions = predictions.withColumn('prediction_calculated',
#                                      predictions.prediction + predictions.user_mean + predictions.item_mean - global_mean)

# evaluator = RegressionEvaluator(metricName="rmse", labelCol="item_normalizedH", predictionCol="prediction_calculated")
# rmse = evaluator.evaluate(predictions)

# rmse

AttributeError: 'DataFrame' object has no attribute 'item_normalizedHAVG'