# Steam RS - Data Cleanup

In [None]:
pwd

In [None]:
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession

from pyspark.mllib.recommendation import ALS
from pyspark.sql.functions import desc, split

In [None]:
spark = SparkSession \
    .builder \
    .appName("spark-recommender") \
    .getOrCreate()

 #### Import games data

In [None]:
df_games = spark.read.json('./rawdata/apps_detail.json')
df_games.printSchema()

df_games.createOrReplaceTempView("games")
df_games.count()


#### Play with games data

In [None]:
tmp = spark.sql('''SELECT data.recommendations FROM games''')
tmp.take(10)

In [None]:
categories = spark.sql('''SELECT data.categories.id, COUNT(data.categories.id) AS count \
FROM games GROUP BY data.categories.id''')
categories.take(10)

In [None]:
sqlGame = spark.sql("SELECT appid, data.name FROM games")
sqlGame.show()

#### Import ratings data

In [None]:
df_ratings = spark.read.json('./rawdata/ratings_detail.json')
df_ratings.printSchema()
df_ratings.count()

#### unique

In [None]:
df_ratings = df_ratings.distinct()
df_ratings.count()

In [None]:
df_ratings.show()
#### HAVE TO use df.select('col').show() but not df.col.show()
df_ratings.select("apps").show()
df_ratings.createOrReplaceTempView("ratings")

#### Mangle data for ALS input

In [None]:
# HOW TO select information in nested JSON:
# 1. https://stackoverflow.com/questions/29948789/how-to-parse-nested-json-objects-in-spark-sql
# DataFrame app = df.select("app");
#        app.printSchema();
# DataFrame appName = app.select("element.appName");
#        appName.printSchema();
# 2. select nested struct with SQL
tmp = spark.sql('SELECT apps.lastPlayed FROM ratings')
# tmp.take(10)

#### removing na and add index

In [None]:
# remove missing value and make an index
# Re-organize data structure by map(x:(x.a, x.b))
ratings_filtered_rdd = df_ratings.rdd.filter(lambda x: x.apps !=[])\
.map(lambda x: (x.steamID, x.apps))
#print(ratings_filtered_rdd.take(1)) #RDD needs print???

ratings_filtered_rdd = ratings_filtered_rdd.zipWithIndex()
#ratings_filtered_rdd.take(1)

#### check information of a given user

In [None]:
### test
user0 = ratings_filtered_rdd.filter (lambda ((ID, apps), index):index == 0)
user0.collect()
# user0: [((76561198096934288,
#   [Row(appID=u'570', lastPlayed=u'1497026829', name=u'Dota 2', totalTime=u'')]), 0)]

#### core of rdd

In [None]:
# Extract apps information
# With index, the extracted info can be easily mapped back to ID
training_rdd = ratings_filtered_rdd.map(lambda ((ID, apps), index):(index,apps))

# flatMapValues is a combination of flatMap and mapValues
# it applies on (key [val] pair), while keeping the keys, flatMap the [val] to each key
# https://stackoverflow.com/questions/37302264/spark-flatmapvalues-query
training_rdd = training_rdd.flatMapValues(lambda x: x)

training_rdd = training_rdd.filter(lambda (x,y): len(y.totalTime) > 0)
training_rdd = training_rdd.filter(lambda (x,y): float(y.totalTime.replace(",","") > 0))
training_rdd = training_rdd.map(lambda (x,y): (x, y.appID, float(y.totalTime.replace(",","")))) 
training_rdd.take(10)

In [None]:
def isFloat(text):
    try:
        float(text)
        return 0
    except ValueError:
        return text

tmp = ratings_filtered_rdd.map(lambda ((ID, apps), index):(index,apps))\
.flatMapValues(lambda x: x)

tmp = tmp.map(lambda (x, y): (x, isFloat(y.totalTime.replace(",",""))))
tmp = tmp.filter(lambda (x, y): (y != 0))

In [None]:
#tmp = training_rdd.toDF()
#tmp.show()



#### Model training - ALS

In [None]:
# Implicit model (vs. explicit model: real 'rating' data
# related to the level of confidence in observed user preferences, 
# rather than explicit ratings given to items. 
# The model then tries to find latent factors that can be used to 
# predict the expected preference of a user for an item.
# https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS
model = ALS.trainImplicit(training_rdd, 10)
print model

#### Sample result display

In [None]:
# model.recommendProducts(user, product#)
try_result_rating = model.recommendProducts(3,5)
print try_result_rating
print type(try_result_rating)

In [None]:
try_result_rating_df = spark.createDataFrame(try_result_rating)
try_result_rating_df.sort(desc("rating")).show()

In [None]:
try_final_result = try_result_rating_df.join(\
                df_games, try_result_rating_df.product == \
                df_games.appid,"left")
# Or use left join. Left is `try_result_rating_df` here
#.select("user",df_games.data.name)
try_final_result.show()
print df_games.count(), try_result_rating_df.count(), try_final_result.count(), type(try_final_result)
try_final_result.select('data').show()

In [None]:
#data = try_final_result.select('data')
from pyspark.sql.functions import split


#### Generate recommended game list for all users

In [None]:
tmpID = ratings_filtered_rdd.map(lambda ((ID, apps), index):(index, ID))
#tmpID.take(10)

# rdd.collectAsMap(): convert tuple to dictionary
# type(tmpID_dict): dict
tmpID_dict = tmpID.collectAsMap()

In [None]:
# Some user IDs are not in the recommendation system 
# because they don't have valid game data (totalTime = 0)
# Here we extract the userID that are in training data.
# This should be done before training steps
user_rdd = ratings_filtered_rdd\
           .map(lambda ((ID, apps), index):(index, ID))

# http://spark.apache.org/docs/2.1.0/api/python/pyspark.html?highlight=join#pyspark.RDD.join
user_rdd = training_rdd\
           .join(user_rdd)\
           .map(lambda (index, (gameID, userID)):(index, userID))\
           .distinct()
user_rdd.take(10)
user_rdd_dict = user_rdd.collectAsMap()

In [None]:
user_rec_list=[]


for index in user_rdd_dict.keys():
    user_rec_list+=[(user_rdd_dict[index],i.product) for i in model.recommendProducts(index,5)]

print user_rec_list[0:10]

In [None]:
# Cannot apply SparkContext methods to recommendProducts results
# Exception: It appears that you are attempting to reference SparkContext 
# from a broadcast variable, action, or transformation. 
# SparkContext can only be used on the driver, not in code that it run on workers. 
# For more information, see SPARK-5063.

#def get_rec_list(x):
#    fullList = model.recommendProducts(index,5)
    
#user_rec_rdd = user_rdd.flatMap(lambda (index, userID) : (userID, [get_rec_list(index)]))
#user_rec_rdd.take(10)

#### orgnize and save data

In [None]:
from pyspark.sql.types import StructType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import StructField
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType

schema = StructType([
          StructField("id", StringType(), True),
          StructField("game",IntegerType(), True)
         ])


result_df = spark.createDataFrame(user_rec_list, schema)
result_df.take(3)
# EQUIVALENT TO ALL THE LINES BELOW
#
# all_result_rdd = result_df.rdd.groupByKey().mapValues(list).flatMapValues(lambda x: x)
#
# .rdd.groupByKey(): 
# generate (key, <pyspark.resultiterable.ResultIterable at 0x7f8f5e9905d0>)
# .rdd.groupByKey().mapValues(list): 
# (key, [val1, val2, ..])
# .rdd.groupByKey().mapValues(list).flatMapValues(lambda x: x)
# (key, val1)(key, val2)

# dif_all_result=all_result_rdd.toDF(schema)
# dif_all_result.take(3)

In [None]:
import sqlalchemy
import pandas
engine = sqlalchemy.create_engine('sqlite:///game.sqlite3')
result_df.toPandas().to_sql('recommended_game', engine, if_exists='replace')

#### Load from jdbc

In [None]:
# Load data to a JDBC source
jdbcDF = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:sqlite://game.sqlite3") \
    .option("dbtable", "recommended_game") \
    .option("user", "") \
    .option("password", "") \
    .load()

