In [270]:
# to use pandas dataframes
import pandas as pd

import numpy as np

# import MongoDB modules
from pymongo import MongoClient

import matplotlib.pyplot as plt
%matplotlib inline

# we can always use more time
import time

from collections import OrderedDict

In [271]:
import pyspark
from pyspark.sql.types import *
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

In [272]:
# Build our Spark Session and Context
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext
spark, sc

(<pyspark.sql.session.SparkSession at 0x7fea49fe08d0>,
 <pyspark.context.SparkContext at 0x7fea4a0e4610>)

In [176]:
#
# Pipeline for model/system
#
# Scrape Data         -> Process data        -> Fit Model        -> Eval Model
#
# have game reviews     read in data         Prob easy           hmm
# for sparse crappy     into dataframe
# first model

def load_game_reviews_into_table(collection, user_list):
    '''
    Spark seems to ingest data via a big list and goes from there
    so make a dataframe that looks like
    
    user | app_id | rating (positive)
    '''
    start_time = time.time()
    data = []
    
    for game in collection.find():
        
        # keep track of users with reviews because the rest of 
        # the users we have to go back and give 0's to
        #temp_user_list = []
        
        for ix, review in enumerate(game["positive_reviews"]):
            
            index = user_list.index(review["user"])

                        
            _t = time.time() - start_time
            _ts = "{:2.2f}".format(_t)[:6]
            
            print "{}s ### Game:{} #######{}##############\r".format(_ts, game["app_id"], ix),

            
#             # frame length in time
#             _tw = 0.1
#             next_checkpoint = 1.0
#             frame_fired = False
            
#             if _t >= (ix * _tw) and frame_fired == False:
                            
#                 idx = ix // 10
            
#                 if idx % 4 == 0:
#                     star = " - "
#                 elif idx % 4 == 1:
#                     star = ' \ '
#                 elif idx % 4 == 2:
#                     star = " | "
#                 elif idx % 4 == 3:
#                     star = " / "
                    
#                 print "{}######{}####################\r".format(star, _ts),
#                 frame_fired = True
#                 next_checkpoint = (ix + 1) * _tw
                
#             elif _t >= next_checkpoint:
#                 frame_fired = False
            #temp_user_list.append(review["user"])
    
            data.append({"app_id":int(game["app_id"]),
                         "user":index,
                         "rating":1})
        
        # now add in zero scores
#         for iz, user in enumerate(user_list):
#             if user not in temp_user_list:
#                 z_index = user_list.index(review["user"])
#                 data.append({"app_id":int(game["app_id"]),
#                          "user":z_index,
#                          "rating":0})
#                 _t = time.time() - start_time
#                 _ts = "{:2.2f}".format(_t)[:6]
            
#                 print "{}s ### Game:{} #######{}##############\r".format(_ts, game["app_id"], iz),

#         del temp_user_list
            
    df = pd.DataFrame(data)
    
    print
    print "Completed."
    
    return df

def get_unique_users(collection):
    '''
    return a set that contains all of the user_ids in a unique
    manner
    
    returns:
    set of user_ids
    '''
    
    # store the users as a set so they're unique
    users = set()

    non_unique_users = 0

    # step through each doc (which is a game's worth of reviews)
    for doc in collection.find():

        # step through the positive reviews, adding the user to the users set
        [users.add(user["user"])for user in doc["positive_reviews"]]

        # find the non-unique users
        non_unique_users += len(doc["positive_reviews"])

        # step through the negative reviews, adding the user to the users set
        [users.add(user["user"])for user in doc["negative_reviews"]]

        # find the non-unique users
        non_unique_users += len(doc["negative_reviews"])

    print "Unique users:", len(users)
    print "non-Unique users:", non_unique_users
    
    return list(users)

def df_to_spark(data):
    '''
    clean up the columns a bit and convert to a spark df
    
    returns the spark dataframe
    '''
    data = data[["app_id", "user", "rating"]]
    
    # convert to Spark DataFrame
    game_ratings_df = spark.createDataFrame(data)
    
    return game_ratings_df
    
def main():
    ''' like __name__ == "__main__"'''
    
    # connect to the hosted MongoDB instance
    db = MongoClient('mongodb://localhost:27017/')["capstone"]

    source_collection = db.selenium_game_review_scrape
    
    user_list = get_unique_users(source_collection)
    
    # data = load_game_reviews_into_table(source_collection, user_list)
    data = load_game_reviews_into_table(source_collection, user_list)
    
    spark_game_ratings = df_to_spark(data)
    
    return spark_game_ratings




In [171]:
data = main()

Unique users: 98297
non-Unique users: 111292
173.74s ### Game:212680 #######917###############
Completed.


In [189]:
# write the dataframe to disk to avoid having to rebuild constantly (~6min for 100 games)
data.write.parquet("game_reviews.parquet", mode="overwrite", compression="gzip")

In [273]:
# read it in to make sure that it's working
red_data = spark.read.parquet("game_reviews.parquet")

red_data.count()

74517

In [276]:
# avoid fitting to final eval
# set seed so we keep these out of the pool
# (prob won't help as more data is added in the future and
# the pool changes but this is paranoia anyways)
train_test, final_eval = red_data.randomSplit([0.9, 0.1], seed=1337)

# break the non-held back into train/test split
train, test = train_test.randomSplit([0.8, 0.2])

In [277]:
#look at test set to see what the data looks like
test.show(10)

+------+----+------+
|app_id|user|rating|
+------+----+------+
| 22000|  17|     1|
| 22000| 613|     1|
| 22000| 634|     1|
| 22000|1546|     1|
| 22000|1628|     1|
| 22000|1650|     1|
| 22000|1860|     1|
| 22000|2260|     1|
| 22000|4270|     1|
| 22000|4642|     1|
+------+----+------+
only showing top 10 rows



In [278]:
# check to make sure the train test split worked
train.count(), test.count()

(53657, 13287)

In [279]:
als_model = ALS(userCol="user",
               itemCol="app_id",
               ratingCol="rating",
               nonnegative=True,
               regParam=0.05,
               rank=10,
               implicitPrefs=False,
               maxIter=20)

In [280]:
recommender = als_model.fit(train)

In [281]:
# make a single row DataFrame
temp = [(1, 413150)]
columns = ('user', 'app_id')
one_row_spark_df = spark.createDataFrame(temp, columns)

In [282]:
one_row_spark_df.show()

+----+------+
|user|app_id|
+----+------+
|   1|413150|
+----+------+



In [283]:
user_factor_df = recommender.userFactors.filter('id = 1')
item_factor_df = recommender.itemFactors.filter('id = 413150')

In [284]:
user_factors = user_factor_df.collect()[0]['features']
item_factors = item_factor_df.collect()[0]['features']

In [285]:
np.dot(user_factors, item_factors)

0.92894982753833344

In [286]:
recommender.transform(one_row_spark_df).show()

+----+------+----------+
|user|app_id|prediction|
+----+------+----------+
|   1|413150|0.92894983|
+----+------+----------+



In [287]:
recommender.userFactors.show()

+---+--------------------+
| id|            features|
+---+--------------------+
| 10|[0.23667388, 0.25...|
| 20|[0.23873065, 0.38...|
| 40|[0.32111812, 0.30...|
| 50|[0.33541638, 0.36...|
| 60|[0.42238978, 0.28...|
| 70|[0.3364444, 0.339...|
| 80|[0.37046024, 0.34...|
| 90|[0.32789287, 0.31...|
|110|[0.34830502, 0.34...|
|140|[0.32665813, 0.32...|
|160|[0.32932562, 0.34...|
|170|[0.23998879, 0.37...|
|180|[0.24348529, 0.57...|
|200|[0.3254567, 0.334...|
|220|[0.32665813, 0.32...|
|230|[0.32949206, 0.36...|
|250|[0.3831413, 0.309...|
|260|[0.38767418, 0.31...|
|280|[0.32831472, 0.33...|
|290|[0.36092678, 0.32...|
+---+--------------------+
only showing top 20 rows



In [288]:
# make predictions on the whole test set
predictions = recommender.transform(test)

In [289]:
# dump the predictions to Pandas so the final calculations are easier to do
predictions_df = predictions.toPandas()
train_df = train.toPandas()

In [290]:
predictions_df.head()

Unnamed: 0,app_id,user,rating,prediction
0,65300,18911,1,0.94433
1,65300,35694,1,
2,65300,36355,1,
3,65300,2563,1,
4,65300,32832,1,


In [297]:
# Fill any missing values with the mean rating
# probably room for improvement here

print predictions.count()

# print the mean rating (1.0, uh... that's not good)
print "Mean rating:", train_df['rating'].mean()/predictions.count()

#
#predictions_df = predictions.toPandas().fillna(train_df['rating'].mean()/predictions.count())
predictions_df = predictions.toPandas().fillna(0.5)

predictions_df.head(20)

13287
Mean rating: 7.52615338301e-05


Unnamed: 0,app_id,user,rating,prediction
0,65300,18911,1,0.94433
1,65300,35694,1,0.5
2,65300,36355,1,0.5
3,65300,2563,1,0.5
4,65300,32832,1,0.5
5,65300,71808,1,0.945329
6,65300,61492,1,0.5
7,65300,36629,1,0.938016
8,65300,39169,1,0.5
9,65300,49937,1,0.9421


In [292]:
# df.loc[df['column_name'] == some_value]
# test_user = predictions_df.loc[predictions["user"] == 47217]
# test_user.head(20)

In [298]:
predictions_df['squared_error'] = (predictions_df['rating'] - predictions_df['prediction'])**2

In [299]:
predictions_df.describe()

Unnamed: 0,app_id,user,rating,prediction,squared_error
count,13287.0,13287.0,13287.0,13287.0,13287.0
mean,282121.986904,48901.467901,1.0,0.561148,0.215454
std,108750.719281,28538.511505,0.0,0.151213,0.085305
min,4000.0,4.0,1.0,0.5,0.001428
25%,230190.0,24201.0,1.0,0.5,0.25
50%,265000.0,48688.0,1.0,0.5,0.25
75%,365450.0,73562.5,1.0,0.5,0.25
max,595140.0,98295.0,1.0,0.962206,0.25


In [300]:
# Calculate RMSE
np.sqrt(sum(predictions_df['squared_error']) / len(predictions_df))

# run  val
# 1    0.078012435752783327
# 2    0.079067974734729068


0.46417058112762605

In [161]:
boink = np.zeros(shape=(3,3))

In [166]:
boink[2,1] = 19

In [167]:
boink

array([[  0.,   0.,   0.],
       [  0.,   9.,   0.],
       [  0.,  19.,   0.]])