In [1]:
__author__ = 'MegEllis'

import json
import collections
import numpy as np
import math

import os
import sys

# Path for spark source folder
os.environ['SPARK_HOME']= "/Users/MegEllis/Desktop/spark-1.6.0-bin-hadoop2.6_2"

sys.path.append("/Users/MegEllis/Desktop/spark-1.6.0-bin-hadoop2.6_2/python/")

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)
conf = (SparkConf().setMaster("local").setAppName("My app").set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)

we create a default dictionary that maps users to each game they've played and for how long over their lifetime

In [3]:
users_open = open('/Users/MegEllis/Desktop/aml_proj/userData.txt', 'r+')

def nest_dict(filename):
    listed_dict = []
    for line in filename:
        json_lines = json.loads(line)
        rec_dict = collections.defaultdict(dict)
        user = json_lines['user']
        in_response = json_lines['ownedGames']['response']
        if 'games' in in_response:
            for i in in_response['games']:
                rec_dict[user][i['name']] = i['playtime_forever']
            listed_dict.append(rec_dict)
    return listed_dict


final_list = nest_dict(users_open)

we then transform this list of nested dictionaries into an RDD to more efficiently and quickly format the data and find relevent information. For example, we need to standardize the playtime for a given time since some games can only be played for a certain amount of time. 

In [4]:
final_RDD = sc.parallelize(final_list)

in order to run statisical analysis on the data, it must be placed in a tuple which is then transformed into an RDD

In [5]:
tup_list = []
for i in range(len(final_list)):
    tup_list.extend(final_list[i].values()[0].items())

In [6]:
time_count_RDD = sc.parallelize(tup_list)

From this tuple we can calculate the average number of hours played per game

In [7]:
tottime_per_game = time_count_RDD.mapValues(lambda x: (x, 1))\
                .reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1])) \
                .filter(lambda x: x[1][0] != 0)

In [8]:
avg_per_game = tottime_per_game.map(lambda x: (x[0], float(x[1][0])/float(x[1][1])))

In [9]:
full_list = []
for i in range(len(final_list)):
    user = final_list[i].keys()[0]
    tup_list = final_list[i].values()[0].items()
    for j in range(len(tup_list)):
        full_list.extend([(tup_list[j][0], user, tup_list[j][1])])

        
full_list_RDD = sc.parallelize(full_list).filter(lambda x: x[2] != 0)

Standardizing: Now that we have the average playtime for each game, we get the proportion of the number of hours played per player for a given game over the game's average. To do this, we use spark sql to join the table that contains the information of users and and their playtime to the table that contains information of each game and its average playtime. 

In [12]:
users = full_list_RDD.map(lambda x: Row(game = x[0], user = x[1], playtime = x[2]))

In [13]:
Users = sqlContext.createDataFrame(users)
Users.registerTempTable("users")

In [14]:
avg_times = avg_per_game.map(lambda x: Row(game = x[0], avg_time = x[1]))

In [15]:
AvgTimes = sqlContext.createDataFrame(avg_times)
AvgTimes.registerTempTable("AvgTimes")

In [16]:
jointest=sqlContext.sql('''SELECT users.game, users.user, users.playtime, AvgTimes.avg_time
                    FROM users LEFT JOIN AvgTimes
                    ON users.game = AvgTimes.game
                    ''')

Make this dataframe that resulted from the join into an RDD to easily extract info

In [17]:
full_rdd = jointest.rdd

In [27]:
to_get_prop = full_rdd.map(lambda x: (x[0], x[1], x[2], x[3]))

In [34]:
get_prop = to_get_prop.map(lambda x: (x[0], (x[1], float(x[2] + 1)/float(x[3] +1))))

In [44]:
exit_dict = get_prop.collect()

recreate the default dictionary so that we can perform the similarity, rating, and overall RMSE and mean error functions

In [59]:
user_key = collections.defaultdict(dict)
game_key = collections.defaultdict(dict)
for i in range(len(exit_dict)):
    user_key[exit_dict[i][1][0]][exit_dict[i][0]] = exit_dict[i][1][1]
    game_key[exit_dict[i][0]][exit_dict[i][1][0]] = exit_dict[i][1][1]

In [48]:
max_time_prop = get_prop.map(lambda x: x[1][1]).max()

In [49]:
min_time_prop = get_prop.map(lambda x: x[1][1]).min()

In [50]:
max_time_prop - min_time_prop

1384.6459279818016

138.5 per each bucket - the range of the most average hours played and the least divided by 10: the number of buckets

going to test both splitting into evently sized bins and just testing the ratios on their own.
First, make sure we can attain similarity and rating for a certain number of users - treated as the test 

In [56]:
def similarity(i, j, dicto):
    i_rating_avg = np.mean(dicto[i].values())
    j_rating_avg = np.mean(dicto[j].values())
    k = list(set(dicto[i].keys()) & set(dicto[j].keys()))
    if k == []:
        similar = 1
        exit
    else:
        num = 0
        denom1 = 0
        denom2 = 0
        for t in range(len(k)):
            num = num + ((dicto[i][k[t]] - i_rating_avg)) * ((dicto[j][k[t]] - j_rating_avg))
            denom1 = denom1 + (((dicto[i][k[t]] - i_rating_avg)**2))
            denom2 = denom2 + ((dicto[j][k[t]] - j_rating_avg)**2)
        denom = (denom1 * denom2)
        denom_sqrt = math.sqrt(denom)
        similar = (num+1)/(denom_sqrt + 1)
    return similar

In [63]:
def get_rating(user, game, game_key, user_key):
    user_avg_rating = np.mean(user_key[user].values())
    game_user_list = game_key[game].keys()
    term_1 = 0
    term_2 = 0
    for m in game_user_list:
        sim = similarity(user, m, user_key)
        term_1 = term_1 + abs(sim)
        term_2 = term_2 + (sim * ((game_key[game][m]) - np.mean(user_key[m].values())))
    rating = user_avg_rating + ((1/term_1) * term_2)
    return rating

In [87]:
user_list = user_key.keys()[:1000]
game_list = game_key.keys()

In [None]:
diffies = []
diffies2 = []
for u in user_list:
    for g in user_key[u]:
        actual = user_key[u][g]
        new = get_rating(u, g, game_key, user_key)
        diffies.append(abs(new - actual))
        diffies2.append((new - actual)**2)        

In [None]:
mean_abs_error = sum(diffies)/len(diffies)
mean_abs_error

In [None]:
RMSD = math.sqrt(sum(diffies2)/len(diffies2))
RMSD