In [51]:
import os
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, HiveContext, Row
from pyspark.sql.functions import desc

from pyspark.mllib.recommendation import ALS

import json

## Define Input and Output File Name

In [52]:
# game detail data
game_detail = 'sample_data/game_detail.txt'

# sample data for popularity recommendation
sample_user_owned_games = 'sample_data/user_owned_games_sample.json'
sample_user_friend_list = 'sample_data/user_friend_list_sample.json'

# sample data for collaborative filtering recommendation
sample_user_recent_games = 'sample_data/user_recently_played_games_sample.json'
sample_user_idx = 'sample_data/user_idx_sample.json'

# output files using sample data
sample_recommended = 'sample_result/sample_recommended.json'
sample_final_recommended = 'sample_result/sample_final_recommended'


## Set Up Spark Context

In [53]:
sc = SparkSession \
    .builder \
    .appName("spark-recommender") \
    .getOrCreate()

hiveCtx = HiveContext(sc)

## Popularity Based Recommendation System

For the sake of simplicity, we implement a popularity based recommendation system, by summarizing play time in user owned games table. Note The recommendation results will be globally the same, i.e., same for all users.

First we load game_detail table from json file.

In [54]:
df_game = hiveCtx.read.json(game_detail)
df_game.printSchema()

root
 |-- _corrupt_record: string (nullable = true)
 |-- about_the_game: string (nullable = true)
 |-- achievements: struct (nullable = true)
 |    |-- highlighted: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- path: string (nullable = true)
 |    |-- total: long (nullable = true)
 |-- alternate_appid: string (nullable = true)
 |-- background: string (nullable = true)
 |-- categories: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- description: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- controller_support: string (nullable = true)
 |-- demos: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- description: string (nullable = true)
 |-- detailed_description: string (nullable = true)
 |-- developers: array (nullable = true)
 |    |-- element:

### Clean Corrupt_record in The File "game_detail.txt"


In [55]:
df_game.registerTempTable("temp_game_detail")
df_valid_game = hiveCtx.sql("SELECT * FROM temp_game_detail where _corrupt_record is null")
df_valid_game.registerTempTable("game_detail")
df_valid_game.show(1)

+---------------+--------------------+------------+---------------+--------------------+--------------------+------------------+-----+--------------------+----------+----+----------+-----------------------+--------+------------+--------------------+-------+------------+--------------------+--------------------+--------------------+------+--------------+--------------------+--------+--------------------+----------------+--------------------+----------+---------------+-------------------+------------+-------+--------------------+-----------------+-----------+--------------------+--------------------+----+-------+
|_corrupt_record|      about_the_game|achievements|alternate_appid|          background|          categories|controller_support|demos|detailed_description|developers| dlc|drm_notice|ext_user_account_notice|fullgame|      genres|        header_image|is_free|legal_notice|  linux_requirements|    mac_requirements|          metacritic|movies|          name|      package_groups|packa

### Load User Owned Games

In [70]:
df_user_owned_games = hiveCtx.read.json(sample_user_owned_games)
df_user_owned_games.printSchema()
df_user_owned_games.registerTempTable("user_owned_games")

# find the top 10 games which have longest total played hours
df_global_popular_games = \
hiveCtx.sql("SELECT b.game_id, SUM(b.playtime_forever) AS play_time FROM \
                (SELECT played_games['appid'] AS game_id, played_games['playtime_forever'] AS playtime_forever \
                FROM (SELECT EXPLODE(games) AS played_games FROM user_owned_games) a) b \
                GROUP BY game_id ORDER BY play_time DESC LIMIT 10")
df_global_popular_games.registerTempTable('popular_games')

# find same app id in popular_games and game_detail
# total played_hours is defined as rank
df_global_popular_games = hiveCtx.sql("SELECT b.name AS name, a.play_time AS rank, b.steam_appid, b.header_image FROM \
                                    popular_games a, game_detail b WHERE a.game_id = b.steam_appid ORDER BY rank DESC")
df_global_popular_games.show()

root
 |-- game_count: long (nullable = true)
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |-- steamid: string (nullable = true)

+--------------------+--------+-----------+--------------------+
|                name|    rank|steam_appid|        header_image|
+--------------------+--------+-----------+--------------------+
|Counter-Strike: G...|14355867|        730|http://cdn.akamai...|
|         Garry's Mod| 4485082|       4000|http://cdn.akamai...|
|      Counter-Strike| 4178037|         10|http://cdn.akamai...|
|  Grand Theft Auto V| 3904596|     271590|http://cdn.akamai...|
|       Left 4 Dead 2| 3677466|        550|http://cdn.akamai...|
|Counter-Strike: S...| 3616174|        240|http://cdn.akamai...|
|The Elder Scrolls...| 2900266|      72850|http://cdn.akamai...|
|            Warframe| 25

### Local Popularity
We can also recommend games to users based on the popularity of games which their friends play. We may call it **local popularity**. First we load the friend_list table.

For the sake of simplicity, we only calculate the recommendation results for one user.

In [57]:
sample_user = '76561197972495328'

df_user_friend_list = hiveCtx.read.json(sample_user_friend_list)
# df_user_friend_list = hiveCtx.read.json(full_user_friend_list)
df_user_friend_list.printSchema()
df_user_friend_list.registerTempTable('friend_list')

# find his/her friends
df_friend_list = hiveCtx.sql("SELECT friends['steamid'] AS steamid FROM \
            (SELECT EXPLODE(friends) AS friends FROM friend_list WHERE steamid = %s) a"%sample_user)
df_friend_list.show(10)
df_friend_list.registerTempTable('user_friend_list')

root
 |-- friends: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- friend_since: long (nullable = true)
 |    |    |-- relationship: string (nullable = true)
 |    |    |-- steamid: string (nullable = true)
 |-- steamid: string (nullable = true)

+-----------------+
|          steamid|
+-----------------+
|76561197960265730|
|76561197960265749|
|76561197960268093|
|76561197960430077|
|76561197960434622|
|76561197960435530|
|76561197960443954|
|76561197960482790|
|76561197960538703|
|76561197960780025|
+-----------------+
only showing top 10 rows



In [58]:
# find out the total playtime of all friends for each game
hiveCtx.sql("SELECT game_id, SUM(playtime_forever) AS play_time FROM \
            (SELECT games['appid'] AS game_id, games['playtime_forever'] AS playtime_forever FROM \
            (SELECT a.steamid, EXPLODE(b.games) AS games \
            FROM user_friend_list a, user_owned_games b WHERE a.steamid = b.steamid) c) d \
            GROUP BY game_id ORDER BY play_time DESC LIMIT 10")\
.registerTempTable('temp_local_popular_games')

df_global_popular_games = hiveCtx.sql("SELECT DISTINCT b.name AS game_name, a.play_time FROM \
                                        temp_local_popular_games a, game_detail b WHERE a.game_id = b.steam_appid")
df_global_popular_games.show()

+--------------------+---------+
|           game_name|play_time|
+--------------------+---------+
|  Grand Theft Auto V|    39755|
|           Fallout 4|    19722|
|The Elder Scrolls...|    14743|
|Tom Clancy’s The ...|    13161|
|       Left 4 Dead 2|    11665|
|Plants vs. Zombie...|    10054|
| Grand Theft Auto IV|     9662|
|           Fallout 3|     8794|
|  Fallout: New Vegas|     8784|
+--------------------+---------+



## Collaborative Filtering Recommendation System

**Collaborative filtering** methods are based on collecting and analyzing a large amount of information on users’ behaviors, activities or preferences and predicting what users will like based on their similarity to other users. A key advantage of the collaborative filtering approach is that it does not rely on machine analyzable content and therefore it is capable of accurately recommending complex items such as movies without requiring an "understanding" of the item itself. Collaborative filtering is based on the assumption that people who agreed in the past will agree in the future, and that they will like similar kinds of items as they liked in the past.

In [59]:
df_user_recent_games = hiveCtx.read.json(sample_user_recent_games)
# df_user_recent_games = hiveCtx.read.json(full_user_recent_games)
df_user_recent_games.printSchema()
df_user_recent_games.registerTempTable("user_recent_games")
df_valid_user_recent_games = hiveCtx.sql("SELECT * FROM user_recent_games where total_count != 0")
df_valid_user_recent_games.show(1)

root
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- img_icon_url: string (nullable = true)
 |    |    |-- img_logo_url: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)
 |-- steamid: string (nullable = true)
 |-- total_count: long (nullable = true)

+--------------------+-----------------+-----------+
|               games|          steamid|total_count|
+--------------------+-----------------+-----------+
|[[24740,eca9b0f29...|76561197970565175|          5|
+--------------------+-----------------+-----------+
only showing top 1 row



Convert the Steam ID to index to avoid overflow in the recommendation algorithm. This is achieved by joining tables.

For example:
```json
{"user_idx": 0, "user_id": "76561197970565175"}
```
We map 76561197970565175 to 0

In [60]:
df_user_idx = hiveCtx.read.json(sample_user_idx)
# df_user_idx = hiveCtx.read.json(full_user_idx)
# df_user_idx.printSchema()
df_user_idx.registerTempTable('user_idx')
df_valid_user_recent_games = hiveCtx.sql("SELECT b.user_idx, a.games FROM user_recent_games a \
                                            JOIN user_idx b ON b.user_id = a.steamid WHERE a.total_count != 0")
df_valid_user_recent_games.printSchema()
df_valid_user_recent_games.show(10)

root
 |-- user_idx: long (nullable = true)
 |-- games: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- appid: long (nullable = true)
 |    |    |-- img_icon_url: string (nullable = true)
 |    |    |-- img_logo_url: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- playtime_2weeks: long (nullable = true)
 |    |    |-- playtime_forever: long (nullable = true)

+--------+--------------------+
|user_idx|               games|
+--------+--------------------+
|       0|[[24740,eca9b0f29...|
|       1|[[39210,30461ee9c...|
|       2|[[578080,93d896e7...|
|       3|[[493340,1b2f29e7...|
|       4|[[218620,a6abc0d0...|
|       5|[[578080,93d896e7...|
|       6|[[361420,58b2dfa7...|
|       7|[[275850,fccb20f3...|
|       8|[[238960,1110764a...|
|       9|[[220,fcfb3660517...|
+--------+--------------------+
only showing top 10 rows



## Alternative Least Square

The **collaborive filtering** problem can be formulated as a learning problem in which we are given the ratings that users have given certain items and are tasked with predicting their ratings for the rest of the items. Formally, if there are n users and m items, we are given an n × m matrix R in which the (u, i)th entry is r_ui – the rating for item i by user u. Matrix R has many missing entries indicating unobserved ratings, and our task is to estimate these unobserved ratings.

A popular approach for this is **matrix factorization**, where **Alternative Least Square (ALS)** algorithm renders its power. ALS can not only be implemented in single machine, but also in distributed clusters, or even in streaming. For details, refer to the references.

Using [Alternative Least Square](https://spark.apache.org/docs/latest/mllib-collaborative-filtering.html#collaborative-filtering) algorithm to perform the collaborative filtering. API document can be found [here](https://spark.apache.org/docs/latest/api/python/pyspark.mllib.html#pyspark.mllib.recommendation.ALS).

### Initialize Training Data

In [61]:
# map and filter out the games whose playtime is 0
training_rdd = df_valid_user_recent_games.rdd.flatMapValues(lambda x : x) \
                .map(lambda (x, y) : (x, y.appid, y.reviw_games)) \
                .filter(lambda (x, y, z) : z > 0)
training_rdd.collect()

[(0, 24740, 216),
 (0, 223100, 99),
 (0, 403640, 9),
 (0, 590780, 1),
 (0, 363970, 510),
 (1, 39210, 10521),
 (1, 570, 53685),
 (1, 440, 123990),
 (2, 578080, 468),
 (2, 440, 29658),
 (3, 493340, 68),
 (3, 2700, 8),
 (4, 218620, 39853),
 (4, 359550, 3301),
 (4, 218, 476),
 (4, 221040, 239),
 (5, 578080, 3972),
 (5, 418460, 554),
 (6, 361420, 1377),
 (6, 227300, 6365),
 (6, 270880, 2671),
 (7, 275850, 23295),
 (7, 327030, 1103),
 (7, 340070, 2423),
 (7, 440, 241243),
 (7, 487710, 1),
 (8, 238960, 131143),
 (8, 225540, 1696),
 (8, 388050, 248),
 (8, 460810, 114),
 (8, 418610, 81),
 (8, 403640, 1402),
 (8, 588650, 578),
 (8, 449760, 44),
 (8, 533860, 12),
 (8, 575630, 9),
 (8, 443580, 37),
 (8, 416610, 5),
 (8, 265890, 196),
 (9, 220, 1260),
 (12, 560730, 578),
 (12, 228200, 489),
 (12, 4560, 415),
 (12, 236150, 124),
 (12, 581100, 91),
 (13, 252950, 792),
 (13, 374040, 101),
 (13, 204300, 96253),
 (13, 286160, 222),
 (14, 563560, 61243),
 (14, 244630, 1206252),
 (14, 200210, 1183726),
 (

### ALS recommender engine

In [62]:
als_model = ALS.trainImplicit(training_rdd, 10)

In [63]:
# print out 10 recommendeds product for user of index 0
result_rating = als_model.recommendProducts(0,10)
print result_rating
try_df_result=sc.createDataFrame(result_rating)
print try_df_result.sort(desc("rating")).show()

[Rating(user=0, product=363970, rating=0.3046938568409334), Rating(user=0, product=433850, rating=0.15175814718740938), Rating(user=0, product=72850, rating=0.1421794704660013), Rating(user=0, product=753, rating=0.13219302752311712), Rating(user=0, product=402840, rating=0.12326413470293149), Rating(user=0, product=21690, rating=0.12156766375401792), Rating(user=0, product=306130, rating=0.1198095384178326), Rating(user=0, product=221680, rating=0.10631534097162214), Rating(user=0, product=234330, rating=0.10348192421626112), Rating(user=0, product=230410, rating=0.10201294175900974)]
+----+-------+-------------------+
|user|product|             rating|
+----+-------+-------------------+
|   0| 363970| 0.3046938568409334|
|   0| 433850|0.15175814718740938|
|   0|  72850| 0.1421794704660013|
|   0|    753|0.13219302752311712|
|   0| 402840|0.12326413470293149|
|   0|  21690|0.12156766375401792|
|   0| 306130| 0.1198095384178326|
|   0| 221680|0.10631534097162214|
|   0| 234330|0.103481

### Write out the intermediate recommendation results to json file

In [64]:

with open(sample_recommended, 'w') as output_file:
    for user_idx in range(0, df_user_idx.count()):
        try:
            lst_recommended = [i.product for i in als_model.recommendProducts(user_idx, 10)]
            rank = 1
            for app_id in lst_recommended:
                dict_recommended = {'user_idx': user_idx, 'game_id': app_id, 'rank': rank}
                json.dump(dict_recommended, output_file)
                output_file.write('\n')
                rank += 1
        # some user index may not in the recommendation result since it's been filtered out
        except:
            pass

### Join the Steam user ID table and game_detail table to form the final results

In [65]:
df_recommend_result = hiveCtx.read.json(sample_recommended)
df_recommend_result.show(20)

+-------+----+--------+
|game_id|rank|user_idx|
+-------+----+--------+
| 363970|   1|       0|
| 433850|   2|       0|
|  72850|   3|       0|
|    753|   4|       0|
| 402840|   5|       0|
|  21690|   6|       0|
| 306130|   7|       0|
| 221680|   8|       0|
| 234330|   9|       0|
| 230410|  10|       0|
|    440|   1|       1|
|    570|   2|       1|
|  39210|   3|       1|
| 281990|   4|       1|
| 519860|   5|       1|
| 230410|   6|       1|
| 218230|   7|       1|
| 346900|   8|       1|
| 251060|   9|       1|
| 304050|  10|       1|
+-------+----+--------+
only showing top 20 rows



In [66]:
df_recommend_result.registerTempTable('recommend_result')
df_final_recommend_result = hiveCtx.sql("SELECT DISTINCT b.user_id, a.rank, c.name, c.header_image, c.steam_appid \
                                        FROM recommend_result a, user_idx b, game_detail c \
                                        WHERE a.user_idx = b.user_idx AND a.game_id = c.steam_appid \
                                        ORDER BY b.user_id, a.rank") 
df_final_recommend_result.show(20)

+-----------------+----+--------------------+--------------------+-----------+
|          user_id|rank|                name|        header_image|steam_appid|
+-----------------+----+--------------------+--------------------+-----------+
|76561197960292467|   1|     Team Fortress 2|http://cdn.akamai...|        440|
|76561197960292467|   2|PLAYERUNKNOWN'S B...|http://cdn.akamai...|     578080|
|76561197960292467|   3|Sid Meier's Civil...|http://cdn.akamai...|       8930|
|76561197960292467|   4|            Warframe|http://cdn.akamai...|     230410|
|76561197960292467|   5|    Dead by Daylight|http://cdn.akamai...|     381210|
|76561197960292467|   6|           Stellaris|http://cdn.akamai...|     281990|
|76561197960292467|   7|           Fallout 4|http://cdn.akamai...|     377160|
|76561197960292467|   8|       Assetto Corsa|http://cdn.akamai...|     244210|
|76561197960292467|   9|The Elder Scrolls...|http://cdn.akamai...|     306130|
|76561197960292467|  10|             XCOM® 2|http://

### Store the final results

In [None]:
# store the final results
df_final_recommend_result.write.save(sample_final_recommended, format="json")
