# Initial Model

### From playtime_distribution, we found 2-5, 6-25, 26+ as good initial cutoffs
### Comparing to min / max normalization

In [194]:
import math
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

import os, sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath('__file__'))))
from src import EDA
from src import ModelEvaluation
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [2]:
# TODO Maybe show playtime distribution picture here

In [3]:
steam_df = EDA.load_without_cold_start(5)
steam_df = steam_df[steam_df['purchase_action'] == 'play']
steam_df.head(2)

  return filtered_users[steam_df['game_name'].isin(usable_games['game_name'].values)]


Unnamed: 0,uid,game_name,purchase_action,playtime
1,151603712,The Elder Scrolls V Skyrim,play,273.0
3,151603712,Fallout 4,play,87.0


## Some normalization needs to be done for played time

In [4]:
steam_df["playtime_rank"] = steam_df['playtime'].map(lambda value: EDA.rank_playtime(value))
steam_df.head(2)

Unnamed: 0,uid,game_name,purchase_action,playtime,playtime_rank
1,151603712,The Elder Scrolls V Skyrim,play,273.0,3
3,151603712,Fallout 4,play,87.0,3


### Game names need to be changed to IDs for Spark ML model

In [5]:
# fitting ALS must have numbers for itemCol and userCol
steam_df = EDA.get_uids(steam_df, from_column='game_name', to_column='game_uid')
steam_df['game_uid'].value_counts().size == steam_df['game_name'].value_counts().size

True

### User-User vs Item-Item

In [6]:
print('Number of users: ', steam_df['uid'].value_counts().size)
print('Number of games: ', steam_df['game_name'].value_counts().size)

Number of users:  2436
Number of games:  3544


##### We will use Co-clustering instead of relying only on user-user or item-item similarity

### Spark ALS Model Building

In [15]:
# Setup a SparkSession
spark = SparkSession.builder.getOrCreate()
# Convert a Pandas DF to a Spark DF
spark_df = spark.createDataFrame(steam_df)
spark_df.count()
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)
# can broadcast these

print('Training size: ', train.count())
print('Test size: ', test.count())

Training size:  46128
Test size:  11661


In [16]:
als_model = ALS(
    itemCol='game_uid',
    userCol='uid',
    ratingCol='playtime_rank',
    nonnegative=True,    
    regParam=0.1,
    coldStartStrategy="drop", # Drops if user or item in test was not in train
    rank=10) 

In [17]:
fitted_als_model = als_model.fit(train)

In [18]:
one_row_pandas_df = pd.DataFrame({'uid': [151603712], 'game_uid': [1]})
one_row_spark_df = spark.createDataFrame(one_row_pandas_df)
fitted_als_model.transform(one_row_spark_df).show()

+--------+---------+----------+
|game_uid|      uid|prediction|
+--------+---------+----------+
|       1|151603712| 1.3429985|
+--------+---------+----------+



In [19]:
predictions = fitted_als_model.transform(test)
evaluator = RegressionEvaluator() \
    .setMetricName("rmse") \
    .setLabelCol("playtime_rank") \
    .setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)

In [20]:
rmse
# was 1.046 without restricting to 5+
# was 1.005 with 5+, without min_max hours played
# was 1.015 with 5+, 2+ users, without normalizing hours played
# was 1.03 using 1-4 instead of 0-3

1.0051786784416223

In [28]:
with_summaries_df = EDA.add_summaries(steam_df)

  return super(DataFrameGroupBy, self).aggregate(arg, *args, **kwargs)


In [29]:
with_summaries_df.head(4)
# can drop some columns here

Unnamed: 0,uid,game_name,purchase_action,playtime,playtime_rank,game_uid,playtime_mean,playtime_min,playtime_max,game_counts,min_max
1,151603712,The Elder Scrolls V Skyrim,play,273.0,3,0,105.72153,0.1,1986.0,562,0.412256
3,151603712,Fallout 4,play,87.0,3,1,65.274172,0.2,629.0,151,0.414122
5,151603712,Spore,play,14.9,2,2,26.016667,0.1,417.0,54,0.1065
7,151603712,Fallout New Vegas,play,12.1,2,3,52.247843,0.1,417.0,255,0.086352


In [30]:
spark_df = spark.createDataFrame(with_summaries_df)
spark_df.count()
train, test = spark_df.randomSplit([0.8, 0.2], seed=427471138)
print('Training size: ', train.count())
print('Test size: ', test.count())

Training size:  46128
Test size:  11661


In [31]:
als_model = ALS(
    itemCol='game_uid',
    userCol='uid',
    ratingCol='min_max',
    nonnegative=True,    
    regParam=0.1,
    coldStartStrategy="drop", # Drops if user or item in test was not in train
    rank=10) 

fitted_als_model = als_model.fit(train)

predictions = fitted_als_model.transform(test)
evaluator = RegressionEvaluator() \
    .setMetricName("rmse") \
    .setLabelCol("min_max") \
    .setPredictionCol("prediction")
rmse = evaluator.evaluate(predictions)
rmse

0.7685464618302161

# NDCG

In [39]:
als_model = ALS(
    itemCol='game_uid',
    userCol='uid',
    ratingCol='min_max',
    nonnegative=True,    
    regParam=0.1,
    coldStartStrategy="drop", # Drops if user or item in test was not in train
    rank=10) 

fitted_als_model = als_model.fit(train)

predictions = fitted_als_model.transform(test)
# evaluator = RegressionEvaluator() \
#     .setMetricName("rmse") \
#     .setLabelCol("min_max") \
#     .setPredictionCol("prediction")
# rmse = evaluator.evaluate(predictions)
# rmse

In [58]:
prediction_count = predictions.count()

In [312]:
# Verify RMSE with rdd math
predictions_rdd = predictions.rdd
SSE = predictions_rdd.map(lambda r: (r['min_max'] - r['prediction'])**2) \
    .reduce(lambda total, x: total + x)
math.sqrt(SSE / prediction_count)

0.768546461830216

In [311]:
# For NDCG Calculation
# fitted_als_model.recommendForAllUsers(n) may be an interesting alternative on train
# predictions.groupBy('user_id') may be more efficient - df is more efficient than rdd

test = (309404240,
  [(0.0006331785563528914, 0.000634816475212574),
   (0.42567567567567566, 0.008724773302674294)])

def do_sort(arr):
    actual_and_pred = np.array(arr)
    indeces = np.argsort(actual_and_pred[:, 1])
    return actual_and_pred[indeces[::-1]].tolist()
    
def sort_predictions_slice_relevance(arr, n):
    actual_and_pred = np.array(arr)
    indeces = np.argsort(actual_and_pred[:, 1])
    return actual_and_pred[indeces[::-1]][:n].tolist()

# lambda functions in rdds cant import modules
def dcg_at_k(scores, k):
    """
    Discounted cumulative gain
    See http://fastml.com/evaluating-recommender-systems/
    Args:
        r: List - Relevance scores in rank order
        k: Number of results to consider
    Returns:
        Float
    """
    r = np.asfarray(scores)[:k]
    if r.size:
        # item 1 and 2 have same weights
        return r[0] + np.sum(r[1:] / np.log2(np.arange(2, r.size + 1)))
        # use below for more emphasis on first rank
        # return np.sum(r / np.log2(np.arange(2, r.size + 2)))
    return 0.

def ndcg_at_k(scores, k, method=0):
    """
    Normalized Discounted cumulative gain
    See http://fastml.com/evaluating-recommender-systems/
    Args:
        r: List - Relevance scores in rank order
        k: Number of results to consider
    Returns:
        Float from 0 to 1
    """
    dcg_max = dcg_at_k(sorted(scores, reverse=True), k)
    if not dcg_max:
        return 0.
    return dcg_at_k(scores, k) / dcg_max

do_sort(test[1])
x = sort_predictions_slice_relevance(test[1], 3)
print('x: ', x)

x:  [[0.42567567567567566, 0.008724773302674294], [0.0006331785563528914, 0.000634816475212574]]


In [313]:
# use actual values for gain
sampled = predictions_rdd.sample(False, 1, 1)
formatted = sampled.map(lambda row: (row['uid'], [(row['min_max'], row['prediction'])])) \
    .reduceByKey(lambda total, val: total + val) \
    .map(lambda kv: (kv[0], sort_predictions_slice_relevance(kv[1], 10))) \
    .map(lambda kv: ndcg_at_k(np.array(kv[1])[:, 0], 10)) \
    .reduce(lambda total, gain: total + gain) \

In [314]:
formatted / prediction_count

0.16524551374952542