## Simple Recommenders on Steam Games Data

In this notebook we try out two different recommenders on [Steam dataset on Kaggle](https://www.kaggle.com/tamber/steam-video-games). We will use:
- ```pyspark``` for preprocessing
- ```turicreate``` for building an item-item similarity recommender
- ```pyspark.ml``` for building an ALS-based recommender

The dataset has no header but comes in 5 columns:
- User ID: integer
- Game title: string
- Activity: string (purchase/play)
- Status: float (1.0 if activity is purchase, total number of hours in game if activity is play)
- A column full of 0's, will discard

In [2]:
import pandas as pd
import turicreate as tc
import pyspark.sql.functions as F
from pyspark.sql.window import Window

### First load the data and gather some basic stats

In [4]:
# pyspark's way of load csv to dataframe
input_df = sqlContext.read.format("csv") \
  .option("header", False) \
  .option("inferSchema", True) \
  .load("dbfs:/FileStore/tables/steam_200k-30fe6.csv") \
  .drop(F.col("_c4"))

# Rename the columns to something meaningful
steam_df = input_df.selectExpr("_c0 as user", "_c1 as game", "_c2 as activity", "_c3 as hours_played")

# shows first 1k rows
# this is a function provided by Databricks
# use .show() instead for spark
display(steam_df)

user,game,activity,hours_played
151603712,The Elder Scrolls V Skyrim,purchase,1.0
151603712,The Elder Scrolls V Skyrim,play,273.0
151603712,Fallout 4,purchase,1.0
151603712,Fallout 4,play,87.0
151603712,Spore,purchase,1.0
151603712,Spore,play,14.9
151603712,Fallout New Vegas,purchase,1.0
151603712,Fallout New Vegas,play,12.1
151603712,Left 4 Dead 2,purchase,1.0
151603712,Left 4 Dead 2,play,8.9


The ```activity``` column has only 2 distinct values: purchase and play. We can separate them into their own columns and examine their values.

In [6]:
# Spark uses lazy evaluation so transformations on a dataframe are
# only executed when an action (to return some results) is called on
# the dataframe. You can choose to persist a calculated dataframe in
# memory by doing below so future calculations on it would be fast.

steam_df.cache()

# Keep only the records with "purchase"
# drop unneeded column (original activity column)
# and remove duplicated rows

purchase_df = steam_df.filter(F.col("activity") == "purchase") \
  .withColumnRenamed("hours_played", "purchase") \
  .drop(F.col("activity")) \
  .dropDuplicates()

# same as above for activity "play"

play_df = steam_df.filter(F.col("activity") == "play") \
  .withColumnRenamed("hours_played", "play") \
  .drop(F.col("activity")) \
  .dropDuplicates()

Spark offers a convenient method on dataframe called ```summary()```, which returns a dataframe on simple summarizing stats on numerical columns.

In [8]:
purchase_df.cache()
play_df.cache()

purchase_summary = purchase_df.select("purchase").summary()
play_summary = play_df.select("play").summary()

summary_stats = purchase_summary.join(play_summary, ["summary"], how="left")

display(summary_stats)

summary,purchase,play
count,128804.0,70489.0
mean,1.0,48.87806324391041
stddev,0.0,229.3352359968129
min,1.0,0.1
25%,1.0,1.0
50%,1.0,4.5
75%,1.0,19.1
max,1.0,11754.0


#### Some obervations

Above summary shows that ```purchase``` has only a single value 1.0. We will **not** include purchase status in building our recommender, for two reasons:
1. number of hours played is more interesting to look at - it is an implicit feedback of the user's preference for games he/she has purchased
2. if using [Jaccard similarity](https://apple.github.io/turicreate/docs/api/generated/turicreate.recommender.item_similarity_recommender.ItemSimilarityRecommender.html) when measuring the similarity between two sets of elements, the number of hours played are treated as binary purchase status (1/0)

For ```play``` column, the data is heavily skewed and has a large range. There are different ways to do normalization. Since we are going to use it to represent the user's ranking of preference within his / her own purchases, let's convert it to the percentage of hours each user spends on each game he / she owns. 

But before doing that, notice there are 128804 purchase records and only 70489 play records, meaning some users bought certain games but never played them. We need to find a score for those less-favoured games too. To differentiate them from the games that a user has never bought, we can impute the corresponding play fields with a small value less than the min 0.1, say 0.05. Then we can move on to calculate the percentage as planned.

In [10]:
imputed_df = purchase_df.join(play_df, ["user", "game"], how="left") \
  .na.fill(0.05, ["play"]) \
  .drop(F.col("purchase")) \
  .dropDuplicates()

In [11]:
print(imputed_df.distinct().count())
print(purchase_df.distinct().count())

So there is a bit of problem here - while we expect to have 128804 rows in the new dataframe after left join, we ended up with 128816 records. Those extra 12 rows could be duplicates with minor discrepencies in the records. They can be located by assigning an index to each row.

In [13]:
imputed_df.cache()

# monotonically_increasing_id() in spark assigns an unique id to each row.
# the ids are guaranteed to be monotonically increasing, but will have gaps
# between them. This is because under the hood a dataframe in spark has been
# divided up into multiple partitions which have been distributed to many workers
# in the cluster.

indexed_df = imputed_df.withColumn("index", F.monotonically_increasing_id())

duplicates = indexed_df.groupBy(["user", "game"]) \
  .agg(F.collect_list("index").alias("indices")) \
  .where(F.size("indices") > 1)

display(duplicates)

user,game,indices
118664413,Grand Theft Auto San Andreas,"List(360777253398, 1142461301257)"
28472068,Grand Theft Auto III,"List(1331439862120, 1640677507394)"
28472068,Grand Theft Auto San Andreas,"List(455266533540, 618475290777)"
148362155,Grand Theft Auto San Andreas,"List(790273982968, 1417339208185)"
176261926,Sid Meier's Civilization IV Beyond the Sword,"List(541165879545, 1649267441890)"
71411882,Grand Theft Auto III,"List(1116691497487, 42949673537)"
50769696,Grand Theft Auto San Andreas,"List(1022202216569, 171798691961)"
59925638,Tom Clancy's H.A.W.X. 2,"List(335007449120, 1073741824044)"
71510748,Grand Theft Auto San Andreas,"List(515396076160, 1443109012100)"
28472068,Grand Theft Auto Vice City,"List(1090921693502, 1228360646945)"


In [14]:
duplicates.count()

So we do have 12 pairs of rows with same user-game key! Let's get those rows.

In [16]:
# first get a list of the row ids
# remember that our data are on workers in a distributed manner,
# therefore we cannot directly access them. Instead they need to
# be brought back to the driver of the cluster, which is our gateway
# to all the workers. collect() is the method to do that.

dupe_pairs = duplicates.select("indices").collect()

dupe_indices = []
for pair in dupe_pairs:
  dupe_indices.extend(pair[0])

In [17]:
# notice the elements in dupe pairs here after collect() back to driver
# those are spark structs, used to represent a record in dataframe and are similar 
# to python's named tuples.

dupe_pairs

In [18]:
# now we have a flattened list of indices which
# can be used for filtering

dupe_indices

In [19]:
display(indexed_df.where(F.col("index").isin(dupe_indices)).orderBy(F.col("user"), F.col("game")))

user,game,play,index
28472068,Grand Theft Auto III,0.4,1331439862120
28472068,Grand Theft Auto III,0.1,1640677507394
28472068,Grand Theft Auto San Andreas,0.2,455266533540
28472068,Grand Theft Auto San Andreas,0.7,618475290777
28472068,Grand Theft Auto Vice City,0.4,1090921693502
28472068,Grand Theft Auto Vice City,5.3,1228360646945
33865373,Sid Meier's Civilization IV,2.0,962072674771
33865373,Sid Meier's Civilization IV,135.0,309237645767
50769696,Grand Theft Auto San Andreas,3.1,1022202216569
50769696,Grand Theft Auto San Andreas,10.9,171798691961


So for those user-game pairs, multiple hours of play got mixed in. Let's combine each pair into one by adding the hours.

In [21]:
fixed_df = imputed_df.groupBy(["user", "game"]) \
  .agg(F.sum("play").alias("play"))

fixed_df.count() # now we have the expected number of records

In [22]:
fixed_df.printSchema()  # check if you have all the columns you need

#### Replace actual hours played with percentage of hours on a game for each user

In [24]:
user_window = Window.partitionBy("user")
steam_play_pcnt = fixed_df \
  .withColumn("total_user_time", F.sum("play").over(user_window)) \
  .withColumn("percent_time", (lambda a, b: (a / b)*100)(F.col("play"), F.col("total_user_time"))) \
  .drop("total_user_time")

In [25]:
display(steam_play_pcnt)

user,game,play,percent_time
16167221,Half-Life 2 Episode One,7.0,0.688062122180174
16167221,ShareX,0.05,0.0049147294441441
16167221,Half-Life 2 Lost Coast,0.6,0.0589767533297292
16167221,Blender 2.76b,0.05,0.0049147294441441
16167221,Anno 1404,0.05,0.0049147294441441
16167221,Counter-Strike,35.0,3.44031061090087
16167221,Portal,1.9,0.1867597188774758
16167221,Little Big Adventure 2,0.1,0.0098294588882882
16167221,Dark Messiah of Might & Magic Multi-Player,1.3,0.1277829655477466
16167221,Fuse,0.05,0.0049147294441441


In [26]:
steam_play_pcnt.select("percent_time").summary().show()

Keep the original ```play``` column - maybe we can build a recommender on each of ```play``` and ```percent_time```, and compare their performance, and see if what we did makes a difference.

In [28]:
# write the above data to csv for future use
#steam_play_pcnt.write.mode("overwrite").csv("dbfs:/FileStore/tables/steam_200k_hours_percent_20190305.csv", header=True)

### Before building the recommender, here is an optional step - indexing the game names with integers

Based on [doc](https://apple.github.io/turicreate/docs/api/generated/turicreate.recommender.create.html#turicreate.recommender.create) of ```turicreate.recommender.create```, the user ID and item ID columns can be either ```int``` or ```str```, meaning that keeping the game title as is should be fine. Here let's do an extra step that indexes those strings with integers. (Maybe when we feel less lazy we can try ```spark.ml```'s recommender on it too.)

However indexing is essential when you build models with Spark's machine learning libraries.

In [30]:
from pyspark.ml.feature import StringIndexer
from pyspark.sql.types import IntegerType

indexer = StringIndexer(inputCol="game", outputCol="game_index", stringOrderType="frequencyDesc")
steam_play_games_indexed = indexer.fit(steam_play_pcnt).transform(steam_play_pcnt) \
  .select("user", "game", F.col("game_index").cast(IntegerType()), "percent_time")

display(steam_play_games_indexed)

user,game,game_index,percent_time
16167221,Half-Life 2 Episode One,30,0.688062122180174
16167221,ShareX,1969,0.0049147294441441
16167221,Half-Life 2 Lost Coast,4,0.0589767533297292
16167221,Blender 2.76b,475,0.0049147294441441
16167221,Anno 1404,1568,0.0049147294441441
16167221,Counter-Strike,7,3.44031061090087
16167221,Portal,19,0.1867597188774758
16167221,Little Big Adventure 2,4780,0.0098294588882882
16167221,Dark Messiah of Might & Magic Multi-Player,528,0.1277829655477466
16167221,Fuse,1324,0.0049147294441441


In [31]:
#steam_play_games_indexed.write.mode("overwrite").csv("dbfs:/FileStore/tables/steam_200k_hours_percent_indexed_20190305.csv", header=True)

In [32]:
# keep a lookup table for games

game_lookup = steam_play_games_indexed \
  .select("game", "game_index").distinct() \
  .orderBy("game_index")

display(game_lookup)

game,game_index
Dota 2,0
Team Fortress 2,1
Unturned,2
Counter-Strike Global Offensive,3
Half-Life 2 Lost Coast,4
Counter-Strike Source,5
Left 4 Dead 2,6
Counter-Strike,7
Warframe,8
Half-Life 2 Deathmatch,9


In [33]:
#game_lookup.write.mode("overwrite").csv("dbfs:/FileStore/tables/steam_games_lookup_20190305.csv", header=True)

### Item-Item Similarity Recommender in ```turicreate```

In [35]:
steam_play_pd = steam_play_games_indexed \
  .select("user", "game_index", "percent_time") \
  .toPandas()

steam_sframe = tc.SFrame(steam_play_pd)
steam_sframe

In [36]:
# You can do a train-test split with the below function
# but for now let's skip this and construct the matrix with all data

#train_sframe, test_sframe = tc.recommender.util.random_split_by_user(steam_sframe, user_id="user", item_id="game_index", max_num_users=500)

Below creates a recommender that uses item-item similarities based on users in common ([doc](https://apple.github.io/turicreate/docs/api/generated/turicreate.recommender.item_similarity_recommender.create.html#turicreate.recommender.item_similarity_recommender.create)). You can choose from 3 similarity measurements for ```similarity type``` - ```jaccard```, ```cosine``` and ```pearson```. More details can be found [here](https://apple.github.io/turicreate/docs/api/generated/turicreate.recommender.item_similarity_recommender.ItemSimilarityRecommender.html).

In [38]:
item_item_reco = tc.recommender.item_similarity_recommender.create( \
    steam_sframe, user_id="user", item_id="game_index", \
    target="percent_time", similarity_type="pearson")

In [39]:
# this gives for each game its top 64 closest neighbours
# you can change the number of neighbours to calculate

similar_games = item_item_reco.get_similar_items()

In [40]:
# for example to check some results
game_lookup_sframe = tc.SFrame(game_lookup.toPandas())

similar_games_named = similar_games.join(game_lookup_sframe, on="game_index", how="left") \
  .join(game_lookup_sframe, on={"similar":"game_index"}, how="left") \
  .rename({"game.1":"similar_game"}, True) \
  .select_columns(["game_index", "game", "similar", "similar_game", "score", "rank"])

In [41]:
similar_games_named.print_rows(num_rows=10)

Woah the above results actually look quite nice! Let's check another one:

In [43]:
similar_games_named.filter_by("Grand Theft Auto III", "game").print_rows(num_rows=10)

Let's pick a user and see what recommendations he / she gets:

In [45]:
# actual purchases

display(steam_play_games_indexed.filter(F.col("user")==194670621))

user,game,game_index,percent_time
194670621,Patch testing for Chivalry,92,0.0057833555028627
194670621,BioShock Infinite,53,0.9831704354866696
194670621,Left 4 Dead 2,6,3.238679081603147
194670621,Team Fortress 2,1,61.18790122028802
194670621,School of Dragons How to Train Your Dragon,455,0.5205019952576485
194670621,Wild Warfare,1601,0.3007344861488636
194670621,Chivalry Medieval Warfare,93,0.4279683072118444
194670621,DCS World,303,0.0057833555028627
194670621,Minecraft Story Mode - A Telltale Games Series,2171,0.5667688392805508
194670621,Robocraft,12,0.0578335550286276


In [46]:
recs = item_item_reco.recommend()

In [47]:
recs_named = recs.join(game_lookup_sframe, on="game_index", how="left")

In [48]:
recs_named.filter_by(194670621, "user").print_rows(num_rows=50)

**SOMETHING IS NOT RIGHT HEREEEEEE!!!!!** Since it looks like everyone is getting pretty much the same recommendations! (COMBAK and figure this out)

### Spark's ALS-based recommender in ```pyspark.ml.recommendation```

(Below paragraph is an exerpt from [Spark's doc](https://spark.apache.org/docs/2.2.0/ml-collaborative-filtering.html#explicit-vs-implicit-feedback).)

Collaborative filtering is commonly used for recommender systems. These techniques aim to fill in the missing entries of a user-item association matrix. ```spark.ml``` currently supports model-based collaborative filtering, in which users and products are described by a small set of latent factors that can be used to predict missing entries. spark.ml uses the **Alternating Least Squares (ALS)** algorithm to learn these latent factors. 

To see a list of hyperparameters to tune, see the Spark doc linked.

In [51]:
# back to our original indexed dataset again
steam_play_games_indexed.printSchema()

In [52]:
from pyspark.ml.recommendation import ALS

# init the model
als = ALS(maxIter=6, regParam=0.01, implicitPrefs=True, \
  userCol="user", itemCol="game_index", ratingCol="percent_time")

# fit the model to our data
model = als.fit(steam_play_games_indexed)

In [53]:
# get recommendations for all users (50 per user)
user_recs = model.recommendForAllUsers(50)

In [54]:
display(user_recs)

user,recommendations
46014950,"List(List(62, 0.4097474), List(47, 0.36577514), List(44, 0.3651413), List(119, 0.35992107), List(108, 0.35621178), List(70, 0.33824965), List(157, 0.33312264), List(69, 0.33048403), List(45, 0.32422546), List(217, 0.30283773), List(143, 0.30271062), List(139, 0.2663434), List(528, 0.2614322), List(165, 0.24875428), List(524, 0.24753352), List(415, 0.24231088), List(1272, 0.24076268), List(148, 0.23916933), List(26, 0.23000166), List(91, 0.22805157), List(11, 0.22324729), List(138, 0.21799096), List(3, 0.21700536), List(469, 0.21482116), List(144, 0.21153821), List(305, 0.20984766), List(142, 0.2002224), List(767, 0.19996227), List(249, 0.19602063), List(337, 0.19270203), List(24, 0.19106393), List(276, 0.1894375), List(223, 0.18591079), List(141, 0.18308331), List(1, 0.18279698), List(369, 0.1761222), List(437, 0.17552227), List(434, 0.1733752), List(1531, 0.17231895), List(59, 0.17184648), List(568, 0.17058797), List(131, 0.17052831), List(438, 0.1690799), List(327, 0.16868503), List(509, 0.16575377), List(206, 0.1642923), List(15, 0.16329977), List(17, 0.16086766), List(397, 0.15810794), List(54, 0.15732178))"
95059220,"List(List(81, 0.8573685), List(44, 0.8435464), List(3, 0.82290286), List(82, 0.81529534), List(45, 0.76698965), List(69, 0.71641), List(70, 0.70006645), List(134, 0.5366995), List(6, 0.5203797), List(17, 0.4679038), List(498, 0.4380998), List(7, 0.41699174), List(288, 0.37876046), List(0, 0.37658644), List(75, 0.36454055), List(136, 0.35558406), List(119, 0.35207695), List(285, 0.3473496), List(162, 0.34590295), List(27, 0.33899525), List(51, 0.3378387), List(71, 0.32256603), List(292, 0.30925202), List(499, 0.3088986), List(26, 0.2995578), List(1148, 0.2919853), List(95, 0.28908107), List(66, 0.28671357), List(13, 0.27579844), List(941, 0.26085967), List(261, 0.25468442), List(206, 0.25277108), List(142, 0.25239122), List(820, 0.2503833), List(58, 0.247715), List(91, 0.24329776), List(451, 0.24235009), List(77, 0.24121067), List(20, 0.23909907), List(337, 0.23804386), List(114, 0.23642543), List(148, 0.22996494), List(14, 0.2287651), List(132, 0.22744563), List(54, 0.22522977), List(294, 0.22388673), List(1239, 0.22338107), List(399, 0.22185202), List(434, 0.21892922), List(5, 0.215964))"
108219790,"List(List(81, 0.8784238), List(82, 0.84762806), List(44, 0.81645846), List(3, 0.8088436), List(45, 0.75058454), List(69, 0.6867235), List(70, 0.67643946), List(134, 0.50367105), List(17, 0.49404353), List(498, 0.46468657), List(6, 0.3919897), List(75, 0.35690814), List(0, 0.35172808), List(119, 0.35128728), List(136, 0.33885458), List(499, 0.33671093), List(288, 0.32799202), List(51, 0.32789418), List(285, 0.31578875), List(27, 0.30899683), List(95, 0.3070007), List(91, 0.30580086), List(292, 0.30570883), List(7, 0.30517316), List(162, 0.29236874), List(71, 0.28887516), List(1239, 0.2768095), List(618, 0.26738197), List(206, 0.2587881), List(1148, 0.25389767), List(261, 0.25178257), List(451, 0.2463771), List(941, 0.23928693), List(820, 0.23646748), List(128, 0.23176436), List(148, 0.22970505), List(66, 0.22782525), List(337, 0.22579432), List(58, 0.2233685), List(872, 0.22304533), List(26, 0.22228669), List(114, 0.22202477), List(78, 0.21400607), List(99, 0.21360742), List(132, 0.21266124), List(142, 0.21172033), List(276, 0.20603572), List(178, 0.20356327), List(1915, 0.20126681), List(279, 0.20113876))"
128412180,"List(List(1, 1.2694958), List(217, 0.86428523), List(157, 0.47661114), List(143, 0.38315538), List(62, 0.3593001), List(148, 0.34873068), List(108, 0.34543788), List(86, 0.34247148), List(528, 0.33649987), List(48, 0.33224446), List(233, 0.32993615), List(524, 0.30731243), List(369, 0.30344233), List(509, 0.30159515), List(397, 0.2986431), List(708, 0.2940144), List(881, 0.29353988), List(2734, 0.29297474), List(215, 0.2856612), List(536, 0.28136006), List(845, 0.25556812), List(107, 0.2546572), List(330, 0.2517628), List(0, 0.24622855), List(287, 0.243879), List(249, 0.24025716), List(734, 0.23897639), List(141, 0.23826495), List(1631, 0.22762164), List(365, 0.2249507), List(1782, 0.21723644), List(50, 0.21461602), List(119, 0.21245208), List(196, 0.21166782), List(437, 0.2027821), List(305, 0.19635883), List(164, 0.19342521), List(76, 0.19323161), List(269, 0.18768075), List(429, 0.18748209), List(52, 0.18667366), List(236, 0.18512225), List(15, 0.18107486), List(67, 0.17936395), List(1617, 0.17870818), List(434, 0.17790738), List(144, 0.17673694), List(1272, 0.17616239), List(963, 0.17581984), List(282, 0.17490178))"
141774640,"List(List(2, 1.114949), List(12, 0.8218074), List(76, 0.6849172), List(131, 0.68314743), List(33, 0.6748294), List(102, 0.67314917), List(15, 0.6033949), List(5, 0.5671831), List(29, 0.54996186), List(198, 0.5325083), List(155, 0.49881402), List(24, 0.46449587), List(95, 0.4558154), List(245, 0.45358104), List(47, 0.44034475), List(143, 0.43005252), List(48, 0.42718127), List(44, 0.42421067), List(165, 0.4222782), List(415, 0.4082482), List(282, 0.40695783), List(11, 0.40580165), List(70, 0.39383686), List(383, 0.3927977), List(42, 0.38878536), List(45, 0.38747045), List(144, 0.3873529), List(1653, 0.38402414), List(69, 0.38158208), List(281, 0.37297297), List(941, 0.3660963), List(236, 0.36485812), List(152, 0.3625239), List(253, 0.35732546), List(261, 0.35573545), List(88, 0.34748036), List(62, 0.34666696), List(55, 0.34436738), List(129, 0.3424289), List(52, 0.3327095), List(312, 0.3288923), List(203, 0.32779214), List(276, 0.32738447), List(31, 0.31966394), List(28, 0.31089467), List(9, 0.30684328), List(208, 0.30665538), List(8, 0.30582133), List(157, 0.3025443), List(26, 0.297753))"
166705920,"List(List(0, 0.9879885), List(2, 0.49176097), List(8, 0.3422222), List(1, 0.3321223), List(3, 0.3165116), List(12, 0.31644845), List(18, 0.30506733), List(29, 0.2964858), List(15, 0.25781494), List(67, 0.2460263), List(25, 0.24168983), List(76, 0.1978578), List(10, 0.19257212), List(48, 0.19205889), List(33, 0.18116215), List(50, 0.17813161), List(95, 0.17483062), List(41, 0.16373499), List(55, 0.16069338), List(52, 0.15834253), List(74, 0.15811428), List(63, 0.15598296), List(31, 0.14943764), List(102, 0.14494066), List(128, 0.14222716), List(75, 0.13724959), List(27, 0.13390663), List(176, 0.13357602), List(112, 0.12854716), List(43, 0.12753648), List(94, 0.12538959), List(46, 0.11843378), List(84, 0.118267626), List(49, 0.10953257), List(129, 0.10548211), List(187, 0.10117941), List(161, 0.1000565), List(42, 0.099858925), List(155, 0.0995909), List(59, 0.09885859), List(274, 0.09863862), List(159, 0.0948668), List(91, 0.09444436), List(167, 0.094442084), List(88, 0.092177436), List(296, 0.09073861), List(201, 0.08997005), List(583, 0.08907714), List(312, 0.08861698), List(96, 0.08835702))"
167193350,"List(List(0, 0.9918989), List(3, 0.2750367), List(1, 0.21402939), List(29, 0.16932815), List(2, 0.14517523), List(67, 0.12541406), List(296, 0.107608244), List(18, 0.10345845), List(1769, 0.094071686), List(583, 0.093329646), List(6, 0.085428774), List(1072, 0.080684975), List(112, 0.07824934), List(2140, 0.07601664), List(25, 0.07442826), List(7, 0.074210376), List(1149, 0.07302532), List(1484, 0.07162302), List(8, 0.07147201), List(187, 0.068674035), List(478, 0.06731703), List(59, 0.067034595), List(95, 0.066904806), List(15, 0.061592173), List(346, 0.06143938), List(292, 0.05946295), List(342, 0.0578387), List(734, 0.057379104), List(76, 0.056802835), List(1272, 0.056493867), List(102, 0.055955365), List(176, 0.055891506), List(232, 0.054999553), List(469, 0.053455446), List(51, 0.051255822), List(330, 0.050194703), List(1134, 0.04907455), List(10, 0.046921358), List(58, 0.046585094), List(138, 0.046345145), List(1239, 0.046009243), List(383, 0.0455609), List(43, 0.044691104), List(224, 0.044184584), List(236, 0.04378348), List(319, 0.043647893), List(664, 0.043631732), List(239, 0.04264938), List(12, 0.04228155), List(1402, 0.04214102))"
208061820,"List(List(1, 0.82668746), List(0, 0.6258901), List(51, 0.40243715), List(162, 0.38003325), List(148, 0.3639916), List(6, 0.34325308), List(188, 0.3304912), List(478, 0.31452185), List(78, 0.31021118), List(86, 0.30911398), List(154, 0.29821274), List(112, 0.2925547), List(344, 0.28943384), List(571, 0.28406918), List(108, 0.28188547), List(83, 0.2795917), List(292, 0.2689625), List(1239, 0.26446122), List(276, 0.26358894), List(239, 0.259696), List(44, 0.25577277), List(17, 0.25240904), List(66, 0.25060317), List(47, 0.2420341), List(53, 0.24180523), List(45, 0.23913433), List(216, 0.23493202), List(114, 0.23358873), List(20, 0.23229301), List(528, 0.23081857), List(406, 0.23076512), List(138, 0.22722837), List(103, 0.22505297), List(106, 0.21771885), List(612, 0.21692388), List(224, 0.21542677), List(264, 0.21349503), List(139, 0.21337429), List(153, 0.20817637), List(1148, 0.2055855), List(64, 0.2040176), List(11, 0.20400842), List(158, 0.20346305), List(257, 0.20272359), List(82, 0.2003138), List(26, 0.20005521), List(242, 0.19889297), List(791, 0.19853984), List(330, 0.19797504), List(81, 0.19744578))"
275521700,"List(List(0, 0.9918989), List(3, 0.2750367), List(1, 0.21402939), List(29, 0.16932815), List(2, 0.14517523), List(67, 0.12541406), List(296, 0.107608244), List(18, 0.10345845), List(1769, 0.094071686), List(583, 0.093329646), List(6, 0.085428774), List(1072, 0.080684975), List(112, 0.07824934), List(2140, 0.07601664), List(25, 0.07442826), List(7, 0.074210376), List(1149, 0.07302532), List(1484, 0.07162302), List(8, 0.07147201), List(187, 0.068674035), List(478, 0.06731703), List(59, 0.067034595), List(95, 0.066904806), List(15, 0.061592173), List(346, 0.06143938), List(292, 0.05946295), List(342, 0.0578387), List(734, 0.057379104), List(76, 0.056802835), List(1272, 0.056493867), List(102, 0.055955365), List(176, 0.055891506), List(232, 0.054999553), List(469, 0.053455446), List(51, 0.051255822), List(330, 0.050194703), List(1134, 0.04907455), List(10, 0.046921358), List(58, 0.046585094), List(138, 0.046345145), List(1239, 0.046009243), List(383, 0.0455609), List(43, 0.044691104), List(224, 0.044184584), List(236, 0.04378348), List(319, 0.043647893), List(664, 0.043631732), List(239, 0.04264938), List(12, 0.04228155), List(1402, 0.04214102))"
299523550,"List(List(0, 0.8468679), List(17, 0.279388), List(19, 0.2790128), List(10, 0.23106699), List(490, 0.22165546), List(874, 0.20388632), List(296, 0.18601081), List(646, 0.17556585), List(30, 0.17383167), List(16, 0.1652714), List(371, 0.16479939), List(1769, 0.16078939), List(1484, 0.15905525), List(80, 0.15892826), List(20, 0.14963062), List(407, 0.14532392), List(583, 0.14458162), List(83, 0.14351445), List(1149, 0.14254436), List(170, 0.14253676), List(38, 0.14150952), List(325, 0.13900474), List(319, 0.13543509), List(25, 0.13343334), List(127, 0.13184762), List(2, 0.13180661), List(195, 0.12941286), List(74, 0.12779208), List(459, 0.12654023), List(24, 0.12620729), List(274, 0.12081277), List(2140, 0.12006595), List(178, 0.11831308), List(333, 0.11447172), List(1137, 0.11266397), List(1088, 0.11263079), List(137, 0.10855762), List(224, 0.105326906), List(4, 0.103041396), List(860, 0.1014145), List(302, 0.10140802), List(1620, 0.101293445), List(87, 0.10081394), List(65, 0.09944442), List(1072, 0.09874311), List(2522, 0.09845971), List(158, 0.09528353), List(687, 0.09521013), List(116, 0.09459305), List(366, 0.091662556))"


In [55]:
user_recs.cache()

The recommendations and rankings are all in a dictionary for each user right now. To join the game indices back with their titles, explode the dictionary items into rows, and then separate them into two columns - ```reco``` and ```rank```.

In [57]:
# udf is user-defined function
# by registering your own function as an udf
# spark can then apply it on dataframe columns
# (distribute to RDDs on workers)

from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, FloatType

# second parameter specifies the return type
get_index = udf(lambda x: x["game_index"], IntegerType())
get_rating = udf(lambda x: x["rating"], FloatType())

In [58]:
user_recs_with_title = user_recs \
  .withColumn("reco_and_rank", F.explode("recommendations")) \
  .withColumn("game_reco", get_index(F.col("reco_and_rank"))) \
  .withColumn("game_rating", get_rating(F.col("reco_and_rank"))) \
  .drop("recommendations") \
  .drop("reco_and_rank")

In [59]:
display(user_recs_with_title)

user,game_reco,game_rating
46014950,62,0.4097474
46014950,47,0.36577514
46014950,44,0.3651413
46014950,119,0.35992107
46014950,108,0.35621178
46014950,70,0.33824965
46014950,157,0.33312264
46014950,69,0.33048403
46014950,45,0.32422546
46014950,217,0.30283773


In [60]:
reco_results = user_recs_with_title.join(game_lookup, user_recs_with_title.game_reco == game_lookup.game_index, how="left") \
  .select("user", F.col("game_reco").alias("game_index"), F.col("game").alias("title"), "game_rating")

In [61]:
display(reco_results)

user,game_index,title,game_rating
46014950,62,Empire Total War,0.4097474
46014950,47,Fallout New Vegas,0.36577514
46014950,44,Call of Duty Modern Warfare 2 - Multiplayer,0.3651413
46014950,119,Euro Truck Simulator 2,0.35992107
46014950,108,Total War SHOGUN 2,0.35621178
46014950,70,Call of Duty Black Ops,0.33824965
46014950,157,Saints Row 2,0.33312264
46014950,69,Call of Duty Black Ops - Multiplayer,0.33048403
46014950,45,Call of Duty Modern Warfare 2,0.32422546
46014950,217,Serious Sam HD The Second Encounter,0.30283773


In [62]:
reco_results.cache()

Pick a user and see if the results make sense.

In [64]:
u = 194670621

# get a list of purchased games
u_purchases = steam_play_games_indexed.filter(F.col("user") == u).collect()

In [65]:
u_purchases

In [66]:
# display the user's recommendations
display(reco_results.filter(F.col("user") == u))

user,game_index,title,game_rating
194670621,1,Team Fortress 2,0.9918953
194670621,10,Garry's Mod,0.7747938
194670621,8,Warframe,0.63580334
194670621,2,Unturned,0.60818815
194670621,6,Left 4 Dead 2,0.55379975
194670621,11,The Elder Scrolls V Skyrim,0.48942426
194670621,12,Robocraft,0.4792027
194670621,25,No More Room in Hell,0.45624518
194670621,27,PAYDAY 2,0.45290953
194670621,18,War Thunder,0.45177215


It seems that games that have been purchased are recommended. Get rid of those first. However is there a function that does not include those games?

In [68]:
purchase_hist = [row.game for row in u_purchases] # exclude those games
u_reco = reco_results.filter((F.col("user") == u) & (F.col("title").isin(purchase_hist)==False)) 

In [69]:
display(u_reco)

user,game_index,title,game_rating
194670621,8,Warframe,0.63580334
194670621,11,The Elder Scrolls V Skyrim,0.48942426
194670621,27,PAYDAY 2,0.45290953
194670621,0,Dota 2,0.434652
194670621,98,Rust,0.4249744
194670621,19,Portal,0.4114416
194670621,16,Half-Life 2,0.40492496
194670621,50,Tom Clancy's Ghost Recon Phantoms - NA,0.40452582
194670621,41,Loadout,0.3867079
194670621,24,Terraria,0.37339702


This was only for one user. **Let's remove all the purchased games from recommendations for all users.**

There are many ways doing this. One way is to gather each user's purchase and recommendations into two lists, and convert them to Python ```set```, which is fast for finding the differece between two collections (we want a resulting set the elements of which are in reco set but not in purchase history set).

In [71]:
# get each user's list of historical purchase

all_purchase_hist = steam_play_games_indexed \
  .select("user", "game_index").groupBy("user") \
  .agg(F.collect_list("game_index").alias("purchased_index"))

display(all_purchase_hist)

user,purchased_index
16167221,"List(30, 1969, 4, 475, 1568, 7, 19, 4780, 528, 1324, 1082, 341, 3573, 14, 3084, 16, 708, 406, 1081, 336, 1, 13, 139, 186, 2286, 408, 55, 8, 38, 471, 743, 524)"
119310413,"List(1451, 454, 116, 10, 64, 60, 936, 1413, 117, 185, 852, 143, 507, 723, 161, 441, 611, 11, 6, 1406, 1, 1485, 1466, 303, 1362, 1395, 1006, 58, 358, 31, 1480, 56, 142, 1357, 1284)"
141774640,"List(131, 585)"
192170147,"List(1070, 224, 1201, 186, 873, 504, 609, 16, 59, 1147, 29, 1151, 4, 678, 98, 705, 2589, 14, 2, 32, 197, 28, 22, 37, 73, 210, 2642, 1247, 278, 20, 837, 92, 21, 13, 5, 115, 40, 595, 141, 6, 1256, 1136, 637, 2575, 1267, 62, 2578, 573, 816, 3, 9, 38, 2573, 2545, 61, 2645, 93, 280, 30, 36, 518, 23, 792, 2647, 7, 19, 57, 629)"
174276289,"List(708, 254, 67, 167, 1157, 1, 41, 560)"
261857176,"List(198, 12)"
219822826,"List(42, 186, 0, 27, 3, 2879)"
109780632,"List(1, 1557, 462, 20, 10, 4474, 406, 24, 794, 3975, 3446, 228)"
187877855,"List(2, 8, 155, 12, 15, 1)"
84471496,"List(41, 187, 48, 434, 432, 106, 68, 146, 88, 1557, 307, 5, 107, 716, 1505, 1622, 228, 152, 3, 20, 145, 213, 426, 0, 135)"


In [72]:
all_purchase_hist.cache()
reco_results.cache()

In [73]:
# get each user's list of recommended items

all_reco_indices = reco_results \
  .select("user", "game_index").groupBy("user") \
  .agg(F.collect_list("game_index").alias("reco_index"))

In [74]:
# check if number of users in the above two datasets match

print(all_purchase_hist.count())
print(all_reco_indices.count())

In [75]:
all_reco_indices.cache()

In [76]:
# join purchase history with recommendations on user id

purchase_and_reco = all_reco_indices.join(all_purchase_hist, ["user"], how="left")

# check if there are missing users in either column, the result is 0 so we are good to go!
purchase_and_reco.filter((F.col("reco_index").isNull()) | (F.col("purchased_index").isNull())).count()

In [77]:
display(purchase_and_reco)

user,reco_index,purchased_index
46014950,"List(62, 47, 44, 119, 108, 70, 157, 69, 45, 217, 143, 139, 528, 165, 524, 415, 1272, 148, 26, 91, 11, 138, 3, 469, 144, 305, 142, 767, 249, 337, 24, 276, 223, 141, 1, 369, 437, 434, 1531, 59, 568, 131, 438, 327, 509, 206, 15, 17, 397, 54)",List(1223)
95059220,"List(81, 44, 3, 82, 45, 69, 70, 134, 6, 17, 498, 7, 288, 0, 75, 136, 119, 285, 162, 27, 51, 71, 292, 499, 26, 1148, 95, 66, 13, 941, 261, 206, 142, 820, 58, 91, 451, 77, 20, 337, 114, 148, 14, 132, 54, 294, 1239, 399, 434, 5)","List(346, 342, 109, 666, 285, 337, 81, 288, 191, 82)"
108219790,"List(81, 82, 44, 3, 45, 69, 70, 134, 17, 498, 6, 75, 0, 119, 136, 499, 288, 51, 285, 27, 95, 91, 292, 7, 162, 71, 1239, 618, 206, 1148, 261, 451, 941, 820, 128, 148, 66, 337, 58, 872, 26, 114, 78, 99, 132, 142, 276, 178, 1915, 279)","List(81, 82)"
128412180,"List(1, 217, 157, 143, 62, 148, 108, 86, 528, 48, 233, 524, 369, 509, 397, 708, 881, 2734, 215, 536, 845, 107, 330, 0, 287, 249, 734, 141, 1631, 365, 1782, 50, 119, 196, 437, 305, 164, 76, 269, 429, 52, 236, 15, 67, 1617, 434, 144, 1272, 963, 282)",List(217)
141774640,"List(2, 12, 76, 131, 33, 102, 15, 5, 29, 198, 155, 24, 95, 245, 47, 143, 48, 44, 165, 415, 282, 11, 70, 383, 42, 45, 144, 1653, 69, 281, 941, 236, 152, 253, 261, 88, 62, 55, 129, 52, 312, 203, 276, 31, 28, 9, 208, 8, 157, 26)","List(131, 585)"
166705920,"List(0, 2, 8, 1, 3, 12, 18, 29, 15, 67, 25, 76, 10, 48, 33, 50, 95, 41, 55, 52, 74, 63, 31, 102, 128, 75, 27, 176, 112, 43, 94, 46, 84, 49, 129, 187, 161, 42, 155, 59, 274, 159, 91, 167, 88, 296, 201, 583, 312, 96)","List(583, 8, 287, 25, 176, 12, 67, 2, 0, 18)"
167193350,"List(0, 3, 1, 29, 2, 67, 296, 18, 1769, 583, 6, 1072, 112, 2140, 25, 7, 1149, 1484, 8, 187, 478, 59, 95, 15, 346, 292, 342, 734, 76, 1272, 102, 176, 232, 469, 51, 330, 1134, 10, 58, 138, 1239, 383, 43, 224, 236, 319, 664, 239, 12, 1402)",List(0)
208061820,"List(1, 0, 51, 162, 148, 6, 188, 478, 78, 86, 154, 112, 344, 571, 108, 83, 292, 1239, 276, 239, 44, 17, 66, 47, 53, 45, 216, 114, 20, 528, 406, 138, 103, 106, 612, 224, 264, 139, 153, 1148, 64, 11, 158, 257, 82, 26, 242, 791, 330, 81)","List(1432, 1647, 1, 1384, 1343, 1105)"
275521700,"List(0, 3, 1, 29, 2, 67, 296, 18, 1769, 583, 6, 1072, 112, 2140, 25, 7, 1149, 1484, 8, 187, 478, 59, 95, 15, 346, 292, 342, 734, 76, 1272, 102, 176, 232, 469, 51, 330, 1134, 10, 58, 138, 1239, 383, 43, 224, 236, 319, 664, 239, 12, 1402)",List(0)
299523550,"List(0, 17, 19, 10, 490, 874, 296, 646, 30, 16, 371, 1769, 1484, 80, 20, 407, 583, 83, 1149, 170, 38, 325, 319, 25, 127, 2, 195, 74, 459, 24, 274, 2140, 178, 333, 1137, 1088, 137, 224, 4, 860, 302, 1620, 87, 65, 1072, 2522, 158, 687, 116, 366)",List(2299)


In [78]:
from pyspark.sql.types import ArrayType

# a lesson learnt here is that if return type is not specified,
# the udf will return a string

diff_sets = udf((lambda r,p: list(set(r).difference(set(p)))), ArrayType(IntegerType()))

actual_recommendations = purchase_and_reco \
  .withColumn("recos_only", diff_sets(F.col("reco_index"), F.col("purchased_index"))) \
  .drop("reco_index").drop("purchased_index") \
  .withColumn("game_index", F.explode("recos_only")) \
  .drop("recos_only")

display(actual_recommendations)

user,game_index
46014950,1
46014950,3
46014950,131
46014950,138
46014950,139
46014950,524
46014950,11
46014950,142
46014950,143
46014950,528


Again join the above dataframe back with the original recommendations to get the game titles and predicted rankings.

**Ta-dahhhhhh now we have the final list of recommended games for each user.**

In [80]:
actual_recommendations_ranked = actual_recommendations \
  .join(reco_results, ["user", "game_index"], how="left")

display(actual_recommendations_ranked)

user,game_index,title,game_rating
547685,856,Farming Simulator 15,0.2379146
861238,54,Alien Swarm,0.54803336
994489,146,Arma 3,0.29537022
1024319,16,Half-Life 2,0.43946555
1423371,498,Train Simulator,0.25150725
1603625,54,Alien Swarm,0.37187812
1603625,56,Grand Theft Auto IV,0.24084282
1834453,106,Rising Storm/Red Orchestra 2 Multiplayer,0.27828267
2039434,233,South Park The Stick of Truth,0.26575232
2531540,82,Call of Duty Modern Warfare 3,0.40107903
