In [1]:
import os
# need to copy the driver class to $SPARK_HOME/jars
# os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /Users/comestime/Downloads/dump/mysql-connector-java-5.1.42-bin.jar \
#                                      pyspark-shell'

from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext, Row

from pyspark.mllib.recommendation import ALS

import json

In [2]:
# game detail data
game_detail = 'full_data/game_detail.txt'

# sample data for popularity recommendation
sample_user_owned_games = 'sample_data/user_owned_games_sample.json'
sample_user_friend_list = 'sample_data/user_friend_list_sample.json'

# sample data for collaborative filtering recommendation
sample_user_recent_games = 'sample_data/user_recently_played_games_sample.json'
sample_user_idx = 'sample_data/user_idx_sample.json'

# output files using sample data
sample_recommended = 'sample_result/sample_recommended.json'
sample_final_recommended = 'sample_result/sample_final_recommended'

# full data for popularity recommendation
full_user_owned_games = 'full_data/user_owned_games_full.json'
full_user_friend_list = 'full_data/user_friend_list_full.json'

# full data for collaborative filtering recommendation
full_user_recent_games = 'full_data/user_recently_played_games_full.json'
full_user_idx = 'full_data/user_idx_full.json'

# output files using full data
full_recommended = 'full_result/full_recommended.json'
full_final_recommended = 'full_result/full_final_recommended'

In [3]:
sc = SparkSession \
    .builder \
    .appName("spark-recommender") \
    .getOrCreate()

hiveCtx = HiveContext(sc)

## Popularity Based Recommendation System

For the sake of simplicity, we implement a popularity based recommendation system, by summarizing play time in user owned games table. Note The recommendation results will be globally the same, i.e., same for all users.

First we load game_detail table from json file.

In [4]:
df_game = hiveCtx.read.json(game_detail)
df_game.printSchema()
df_game.registerTempTable("temp_game_detail")
df_valid_game = hiveCtx.sql("SELECT * FROM temp_game_detail where _corrupt_record is null")
df_valid_game.registerTempTable("game_detail")
df_valid_game.show(1)

root
 |-- _corrupt_record: string (nullable = true)
 |-- about_the_game: string (nullable = true)
 |-- achievements: struct (nullable = true)
 |    |-- highlighted: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- path: string (nullable = true)
 |    |-- total: long (nullable = true)
 |-- alternate_appid: string (nullable = true)
 |-- background: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- controller_support: string (nullable = true)
 |-- demos: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |-- detailed_description: string (nullable = true)
 |-- developers: array (nullable = true)
 |    |-- element:

Load user_owned_games from json file as well

In [23]:
# df_user_owned_games = hiveCtx.read.json(sample_user_owned_games)
df_user_owned_games = hiveCtx.read.json(full_user_owned_games)
df_user_owned_games.printSchema()
df_user_owned_games.registerTempTable("user_owned_games")

df_global_popular_games = \
hiveCtx.sql("SELECT b.game_id, SUM(b.playtime_forever) AS play_time FROM \
                (SELECT played_games['appid'] AS game_id, played_games['playtime_forever'] AS playtime_forever \
                FROM (SELECT EXPLODE(games) AS played_games FROM user_owned_games) a) b \
            GROUP BY game_id ORDER BY play_time DESC LIMIT 10")
df_global_popular_games.registerTempTable('popular_games')
df_global_popular_games = hiveCtx.sql("SELECT b.name AS name, a.play_time AS rank, b.steam_appid, b.header_image FROM \
                                    popular_games a, game_detail b WHERE a.game_id = b.steam_appid ORDER BY rank DESC")
df_global_popular_games.show()

root
 |-- game_count: long (nullable = true)
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |-- steamid: string (nullable = true)

+--------------------+--------+-----------+--------------------+
|                name|    rank|steam_appid|        header_image|
+--------------------+--------+-----------+--------------------+
|Counter-Strike: G...|69648077|        730|http://cdn.akamai...|
|         Garry's Mod|54063544|       4000|http://cdn.akamai...|
|Counter-Strike: S...|17866760|        240|http://cdn.akamai...|
|  Grand Theft Auto V|14605372|     271590|http://cdn.akamai...|
|       Left 4 Dead 2|14252479|        550|http://cdn.akamai...|
|            Warframe|13113788|     230410|http://cdn.akamai...|
|            PAYDAY 2|12112242|     218620|http://cdn.akamai...|
|The Elder Scrolls...|117

We can also recommend games to users based on the popularity of games which their friends play. We may call it local popularity. First we load the friend_list table.

For the sake of simplicity, we only calculate the recommendation results for one user.

In [6]:
sample_user = '76561197972495328'

# df_user_friend_list = hiveCtx.read.json(sample_user_friend_list)
df_user_friend_list = hiveCtx.read.json(full_user_friend_list)
df_user_friend_list.printSchema()
df_user_friend_list.registerTempTable('friend_list')

hiveCtx.sql("SELECT friends['steamid'] AS steamid FROM \
            (SELECT EXPLODE(friends) AS friends FROM friend_list WHERE steamid = %s) a"%sample_user)\
.registerTempTable('user_friend_list')

hiveCtx.sql("SELECT game_id, SUM(playtime_forever) AS play_time FROM \
            (SELECT games['appid'] AS game_id, games['playtime_forever'] AS playtime_forever FROM \
            (SELECT a.steamid, EXPLODE(b.games) AS games \
            FROM user_friend_list a, user_owned_games b WHERE a.steamid = b.steamid) c) d \
            GROUP BY game_id ORDER BY play_time DESC LIMIT 10")\
.registerTempTable('temp_local_popular_games')

df_global_popular_games = hiveCtx.sql("SELECT b.name AS game_name, a.play_time FROM \
                                        temp_local_popular_games a, game_detail b WHERE a.game_id = b.steam_appid")
df_global_popular_games.show()

root
 |-- friends: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- friend_since: long (nullable = true)
 |    |    |-- relationship: string (nullable = true)
 |    |    |-- steamid: string (nullable = true)
 |-- steamid: string (nullable = true)

+--------------------+---------+
|           game_name|play_time|
+--------------------+---------+
|       Borderlands 2|    52652|
|       Killing Floor|    41458|
|       Left 4 Dead 2|    33578|
|      Spiral Knights|    29602|
|      Clicker Heroes|    26640|
|      Clicker Heroes|    26640|
|    Cities in Motion|    24866|
|Sid Meier's Civil...|    22163|
|Sid Meier's Civil...|    22163|
|              SMITE®|    21553|
|              SMITE®|    21553|
|The Witcher® 3: W...|    20292|
+--------------------+---------+



## Collaborative Filtering Recommendation System

Filter users without any recently played games

In [7]:
# df_user_recent_games = hiveCtx.read.json(sample_user_recent_games)
df_user_recent_games = hiveCtx.read.json(full_user_recent_games)
df_user_recent_games.printSchema()
df_user_recent_games.registerTempTable("user_recent_games")
df_valid_user_recent_games = hiveCtx.sql("SELECT * FROM user_recent_games where total_count != 0")
df_valid_user_recent_games.show(1)

root
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- img_icon_url: string (nullable = true)
 |    |    |-- img_logo_url: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |-- steamid: string (nullable = true)
 |-- total_count: long (nullable = true)

+--------------------+-----------------+-----------+
|               games|          steamid|total_count|
+--------------------+-----------------+-----------+
|[[578080,93d896e7...|76561198097278802|          2|
+--------------------+-----------------+-----------+
only showing top 1 row



Convert the Steam ID to index to avoid overflow in the recommendation algorithm. This is achieved by joining tables.

In [8]:
# df_user_idx = hiveCtx.read.json(sample_user_idx)
df_user_idx = hiveCtx.read.json(full_user_idx)
# df_user_idx.printSchema()
df_user_idx.registerTempTable('user_idx')
df_valid_user_recent_games = hiveCtx.sql("SELECT b.user_idx, a.games FROM user_recent_games a \
                                            JOIN user_idx b ON b.user_id = a.steamid WHERE a.total_count != 0")
# df_valid_user_recent_games.printSchema()
df_valid_user_recent_games.show(10)

+--------+--------------------+
|user_idx|               games|
+--------+--------------------+
|    1459|[[578080,93d896e7...|
|    1455|[[578080,93d896e7...|
|       0|[[39210,30461ee9c...|
|       1|[[501300,f71d2d0c...|
|    1461|[[377160,779c4356...|
|    1457|[[377160,779c4356...|
|    1462|[[570,0bbb630d632...|
|    1458|[[570,0bbb630d632...|
|       2|[[578080,93d896e7...|
|    1463|[[252950,9ad6dd3d...|
+--------+--------------------+
only showing top 10 rows



In [None]:
training_rdd = df_valid_user_recent_games.rdd.flatMapValues(lambda x : x) \
                .map(lambda (x, y) : (x, y.appid, y.playtime_forever)) \
                .filter(lambda (x, y, z) : z > 0)
training_rdd.collect()

Using [Alternative Least Square](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering) algorithm to perform the collaborative filtering. API document can be found [here](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS).

In [10]:
als = ALS.trainImplicit(training_rdd, 10)

In [11]:
result_rating = als.recommendProducts(0,10)
print result_rating

[Rating(user=0, product=72850, rating=1.080533744968419), Rating(user=0, product=440, rating=0.9969255430731465), Rating(user=0, product=570, rating=0.9809604429602001), Rating(user=0, product=39210, rating=0.9202178437694855), Rating(user=0, product=4000, rating=0.9199807235230111), Rating(user=0, product=252950, rating=0.8478899813371471), Rating(user=0, product=1840, rating=0.8338965062848274), Rating(user=0, product=211820, rating=0.7592396294615982), Rating(user=0, product=431960, rating=0.7048469966050396), Rating(user=0, product=238960, rating=0.6679573871110611)]


Write out the intermediate recommendation results to json files

In [12]:
# with open(sample_recommended, 'w') as output_file:
with open(full_recommended, 'w') as output_file:
    for user_idx in range(0, df_user_idx.count()):
        try:
            lst_recommended = [i.product for i in als.recommendProducts(user_idx, 10)]
            rank = 1
            for app_id in lst_recommended:
                dict_recommended = {'user_idx': user_idx, 'game_id': app_id, 'rank': rank}
                json.dump(dict_recommended, output_file)
                output_file.write('\n')
                rank += 1
        # some user index may not in the recommendation result since it's been filtered out
        except:
            pass

Join the Steam user ID table and game_detail table to form the final results

In [17]:
# df_recommend_result = hiveCtx.read.json(sample_recommended)
df_recommend_result = hiveCtx.read.json(full_recommended)
# df_recommend_result.show()
df_recommend_result.registerTempTable('recommend_result')
df_final_recommend_result = hiveCtx.sql("SELECT b.user_id, a.rank, c.name, c.header_image, c.steam_appid \
                                        FROM recommend_result a, user_idx b, game_detail c \
                                        WHERE a.user_idx = b.user_idx AND a.game_id = c.steam_appid \
                                        ORDER BY b.user_id, a.rank")
df_final_recommend_result.show()

+-----------------+----+--------------------+--------------------+-----------+
|          user_id|rank|                name|        header_image|steam_appid|
+-----------------+----+--------------------+--------------------+-----------+
|76561197960268841|   1|     Team Fortress 2|http://cdn.akamai...|        440|
|76561197960268841|   2|         Garry's Mod|http://cdn.akamai...|       4000|
|76561197960268841|   3|            Terraria|http://cdn.akamai...|     105600|
|76561197960268841|   4|The Elder Scrolls...|http://cdn.akamai...|      72850|
|76561197960268841|   5|              Arma 3|http://cdn.akamai...|     107410|
|76561197960268841|   6|      Clicker Heroes|http://cdn.akamai...|     363970|
|76561197960268841|   6|      Clicker Heroes|http://cdn.akamai...|     363970|
|76561197960268841|   7|                Rust|http://cdn.akamai...|     252490|
|76561197960268841|   8|           Starbound|http://cdn.akamai...|     211820|
|76561197960268841|   9|      Rocket League®|http://

In [18]:
# store the final results
# df_final_recommend_result.write.save(sample_final_recommended, format="json")
df_final_recommend_result.write.save(full_final_recommended, format="json")

## Store the Recommendation Results to AWS RDS

Download MySQL JDBC [connector](https://dev.mysql.com/downloads/connector/j/) class first, and copy it to $SPARK_HOME/jars, e.g., /Library/spark-2.1.1-bin-hadoop2.7/jars

A good reference for connecting to AWS MySQL DB through JDBC can be found [here](https://medium.com/modernnerd-code/connecting-to-mysql-db-on-aws-ec2-with-jdbc-for-java-91dba3003abb) and [here](https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#writing-data-to-jdbc).

First we upload the popularity-based recommendation results to database. We specify the database name to be "test1", and the table name to be "global_recommend".

In [24]:
# define jdbc properties
url = 'jdbc:mysql://test1.czrwqe1jiypg.us-west-2.rds.amazonaws.com:3306'
mode = 'overwrite'
properties = {
    "user": "comestime",
    "password": "xxxxxxxxxxx",
    "driver": 'com.mysql.jdbc.Driver'
}

In [25]:
df_global_popular_games.write.jdbc(url=url, table="test1.global_recommend", mode=mode, properties=properties)

Read the results back to verify the data has been uploaded successfully.

In [26]:
df_read_back = sc.read.jdbc(url=url, table="test1.global_recommend", properties=properties)
df_read_back.show()

+--------------------+--------+-----------+--------------------+
|                name|    rank|steam_appid|        header_image|
+--------------------+--------+-----------+--------------------+
|Counter-Strike: G...|69648077|        730|http://cdn.akamai...|
|Counter-Strike: S...|17866760|        240|http://cdn.akamai...|
|         Garry's Mod|54063544|       4000|http://cdn.akamai...|
|  Grand Theft Auto V|14605372|     271590|http://cdn.akamai...|
|       Left 4 Dead 2|14252479|        550|http://cdn.akamai...|
|            Warframe|13113788|     230410|http://cdn.akamai...|
|            PAYDAY 2|12112242|     218620|http://cdn.akamai...|
|The Elder Scrolls...|11718038|      72850|http://cdn.akamai...|
|            Terraria|10835681|     105600|http://cdn.akamai...|
|      Counter-Strike|10455387|         10|http://cdn.akamai...|
+--------------------+--------+-----------+--------------------+



Next we upload the collaborative filtering recommendation results to database. The database name is "test1" and the table name is "final_recommend"

In [19]:

df_final_recommend_result.write.jdbc(url=url, table="test1.final_recommend", mode=mode, properties=properties)

Read the dataframe back from AWS, to check if the data is stored successfully

In [20]:
df_read_back = sc.read.jdbc(url=url, table="test1.final_recommend", properties=properties)
df_read_back.show()

+-----------------+----+--------------------+--------------------+-----------+
|          user_id|rank|                name|        header_image|steam_appid|
+-----------------+----+--------------------+--------------------+-----------+
|76561197960268841|   1|     Team Fortress 2|http://cdn.akamai...|        440|
|76561197961017729|   3|            PAYDAY 2|http://cdn.akamai...|     218620|
|76561197960268841|   2|         Garry's Mod|http://cdn.akamai...|       4000|
|76561197961017729|   4|         War Thunder|http://cdn.akamai...|     236390|
|76561197960558092|   3|       Borderlands 2|http://cdn.akamai...|      49520|
|76561197960354636|   5|              Dota 2|http://cdn.akamai...|        570|
|76561197960268841|   3|            Terraria|http://cdn.akamai...|     105600|
|76561197960558092|   4|  Grand Theft Auto V|http://cdn.akamai...|     271590|
|76561197961017729|   5|            NBA 2K17|http://cdn.akamai...|     385760|
|76561197960354636|   6|         Garry's Mod|http://