In [0]:
import pyspark as ps
import json
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.mllib.fpm import FPGrowth
from pyspark.sql.functions import col, explode
from pyspark.sql import functions as F

In [0]:
master_url = 'spark://spark-master:7077'
SparkContext.setSystemProperty('spark.executor.memory', '24g')
spark = SparkSession.builder.master(master_url).appName("data-miner").getOrCreate()

In [0]:
sc = spark.sparkContext

In [0]:
dataset_path = 'dbfs:/matches-1k.json'
league_df = spark.read.option("inferTimestamp", "false").option("mode", "DROPMALFORMED").json(dataset_path)

In [0]:
league_df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- gameCreation: long (nullable = true)
 |-- gameDuration: long (nullable = true)
 |-- gameId: long (nullable = true)
 |-- gameMode: string (nullable = true)
 |-- gameType: string (nullable = true)
 |-- gameVersion: string (nullable = true)
 |-- mapId: long (nullable = true)
 |-- participantIdentities: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- participantId: long (nullable = true)
 |    |    |-- player: struct (nullable = true)
 |    |    |    |-- accountId: string (nullable = true)
 |    |    |    |-- currentAccountId: string (nullable = true)
 |    |    |    |-- currentPlatformId: string (nullable = true)
 |    |    |    |-- matchHistoryUri: string (nullable = true)
 |    |    |    |-- platformId: string (nullable = true)
 |    |    |    |-- profileIcon: long (nullable = true)
 |    |    |    |-- summonerId: string (nullable = true)
 |    |    |    |-- summo

In [0]:
cleansed_league_df = league_df
cleansed_league_df = cleansed_league_df.drop('_id', 'gameCreation', 'gameDuration', 'gameId', 'gameMode', 'gameType', 'gameVersion', 'mapId', 'participantIdentities', 'platformId', 'queueId', 'seasonId', 'teams') # can use teams
cleansed_league_df = cleansed_league_df.withColumn("participants",
    F.transform(
        cleansed_league_df["participants"],
        lambda x: x.withField("stats", 
                                x["stats"].dropFields('assists', 'champLevel', 'combatPlayerScore',
                                    'damageDealtToObjectives', 'damageDealtToTurrets', 'damageSelfMitigated', 'deaths', 'doubleKills',
                                    'firstBloodAssist', 'firstBloodKill', 'firstInhibitorAssist', 'firstInhibitorKill', 
                                    'firstTowerAssist', 'firstTowerKill', 'goldEarned', 'goldSpent', 'inhibitorKills',
                                    'killingSprees', 'kills', 'largestCriticalStrike', 'largestKillingSpree',
                                    'largestMultiKill', 'longestTimeSpentLiving', 'magicDamageDealt', 
                                    'magicDamageDealtToChampions', 'magicalDamageTaken', 'neutralMinionsKilled', 
                                    'neutralMinionsKilledEnemyJungle', 'neutralMinionsKilledTeamJungle', 
                                    'objectivePlayerScore', 'participantId', 'pentaKills', 
                                    'perk0Var1', 'perk0Var2', 'perk0Var3', 'perk1Var1', 'perk1Var2', 
                                    'perk1Var3', 'perk2Var1', 'perk2Var2', 'perk2Var3', 
                                    'perk3Var1', 'perk3Var2', 'perk3Var3', 'perk4Var1', 'perk4Var2', 
                                    'perk4Var3', 'perk5Var1', 'perk5Var2', 'perk5Var3', 'perkPrimaryStyle', 
                                    'perkSubStyle', 'physicalDamageDealt', 'physicalDamageDealtToChampions', 
                                    'physicalDamageTaken', 'playerScore0', 'playerScore1', 'playerScore2', 
                                    'playerScore3', 'playerScore4', 'playerScore5', 'playerScore6', 'playerScore7', 
                                    'playerScore8', 'playerScore9', 'quadraKills', 'sightWardsBoughtInGame', 
                                    'statPerk0', 'statPerk1', 'statPerk2', 'timeCCingOthers', 'totalDamageDealt', 
                                    'totalDamageDealtToChampions', 'totalDamageTaken', 'totalHeal', 'totalMinionsKilled', 
                                    'totalPlayerScore', 'totalScoreRank', 'totalTimeCrowdControlDealt', 'totalUnitsHealed', 
                                    'tripleKills', 'trueDamageDealt', 'trueDamageDealtToChampions', 'trueDamageTaken', 
                                    'turretKills', 'unrealKills', 'visionScore', 'visionWardsBoughtInGame', 'wardsKilled', 
                                    'wardsPlaced')
                            )
    )
)
cleansed_league_df = cleansed_league_df.withColumn("participants", 
    F.transform(
        cleansed_league_df["participants"],
        lambda x: x.dropFields('timeline', 'participantId', 'killingSprees', 'kills', 'largestCriticalStrike', 'largestKillingSpree', 'largestMultiKill', 'longestTimeSpentLiving', 'magicDamageDealt', 'magicDamageDealtToChampions', 'magicalDamageTaken', 'neutralMinionsKilled', 'neutralMinionsKilledEnemyJungle', 'neutralMinionsKilledTeamJungle', 'objectivePlayerScore', 'participantId', 'pentaKills', 'perk0', 'perk0Var1', 'perk0Var2', 'perk0Var3', 'perk1', 'perk1Var1', 'perk1Var2', 'perk1Var3', 'perk2', 'perk2Var1', 'perk2Var2', 'perk2Var3', 'perk3', 'perk3Var1', 'perk3Var2', 'perk3Var3', 'perk4', 'perk4Var1', 'perk4Var2', 'perk4Var3', 'perk5', 'perk5Var1', 'perk5Var2', 'perk5Var3', 'perkPrimaryStyle', 'perkSubStyle', 'physicalDamageDealt', 'physicalDamageDealtToChampions', 'physicalDamageTaken', 'playerScore0', 'playerScore1', 'playerScore2', 'playerScore3', 'playerScore4', 'playerScore5', 'playerScore6', 'playerScore7', 'playerScore8', 'playerScore9', 'quadraKills', 'sightWardsBoughtInGame', 'statPerk0', 'statPerk1', 'statPerk2', 'timeCCingOthers', 'totalDamageDealt', 'totalDamageDealtToChampions', 'totalDamageTaken', 'totalHeal', 'totalMinionsKilled', 'totalPlayerScore', 'totalScoreRank', 'totalTimeCrowdControlDealt', 'totalUnitsHealed', 'tripleKills', 'trueDamageDealt', 'trueDamageDealtToChampions', 'trueDamageTaken', 'turretKills', 'unrealKills', 'visionScore', 'visionWardsBoughtInGame', 'wardsKilled', 'wardsPlaced', 'teamId') # can use timeline later
    )
)
for i in range(7):
    item = "item" + str(i)
    cleansed_league_df = cleansed_league_df.withColumn("participants", 
        F.transform(
            cleansed_league_df["participants"],
            lambda x: x.withField("stats" , 
                x["stats"].withField(item, 
                    x["stats"][item].dropFields('from', 'gold', 'into', 'stats', 'tags')
                )
            )
        )
    )

cleansed_league_df.printSchema()

root
 |-- participants: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- championId: string (nullable = true)
 |    |    |-- spell1Id: string (nullable = true)
 |    |    |-- spell2Id: string (nullable = true)
 |    |    |-- stats: struct (nullable = true)
 |    |    |    |-- item0: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- item1: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- item2: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- item3: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- item4: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- item5: struct (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- item6: struct (nullable = true)
 |   

In [0]:
def explode_df(nested_df):
    new_df = nested_df
    for column in nested_df.columns:
        if cleansed_league_df.schema[column].dataType.typeName() == 'array':
            new_df = nested_df.selectExpr("*", f"explode({column}) as {column}_exploded").drop(column)
    return new_df

exploded_league_df = explode_df(cleansed_league_df)
exploded_league_df.printSchema()

root
 |-- participants_exploded: struct (nullable = true)
 |    |-- championId: string (nullable = true)
 |    |-- spell1Id: string (nullable = true)
 |    |-- spell2Id: string (nullable = true)
 |    |-- stats: struct (nullable = true)
 |    |    |-- item0: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- item1: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- item2: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- item3: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- item4: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- item5: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- item6: struct (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |-- perk0: string (nullable = true)
 |    |    |-- perk1: string (

In [0]:
league_rdd = exploded_league_df.rdd

In [0]:
def itemize(record):
    items = []
    participant = record.participants_exploded
    items.append("Champion = "+ participant.championId)
    items.append("Spell = "+ participant.spell1Id)
    items.append("Spell = "+ participant.spell2Id)
    stats = participant['stats']
    items.append("Win = " + str(stats.win))
    for i in range(7):
        item = stats['item' + str(i)]
        item_name = item['name'] if item else None
        if item_name: items.append("Item = " + item_name)
    for i in range(6):
        perk = stats['perk' + str(i)]
        perk_name = item['name'] if perk else None
        if perk_name: items.append("Perk = " + perk_name)
    return list(set(items))

In [0]:
items_rdd = league_rdd.map(itemize)

In [0]:
minSupport = 0.025

In [0]:
model = FPGrowth.train(items_rdd, minSupport=minSupport)
result = model.freqItemsets()
result_sorted = result.sortBy(lambda x : (-len(x.items), -x.freq))
fi = result_sorted.collect()
for i in fi:
    print(i)

FreqItemset(items=['Item = Control Ward', 'Spell = Ignite', 'Perk = Oracle Lens', 'Item = Oracle Lens', 'Win = False', 'Spell = Flash'], freq=419)
FreqItemset(items=['Item = Ninja Tabi', 'Spell = Teleport', 'Item = Warding Totem (Trinket)', 'Perk = Warding Totem (Trinket)', 'Win = False', 'Spell = Flash'], freq=418)
FreqItemset(items=['Item = Enchantment: Runic Echoes', "Item = Sorcerer's Shoes", 'Spell = Smite', 'Perk = Oracle Lens', 'Item = Oracle Lens', 'Spell = Flash'], freq=405)
FreqItemset(items=['Item = Infinity Edge', "Item = Berserker's Greaves", 'Spell = Heal', 'Item = Farsight Alteration', 'Perk = Farsight Alteration', 'Spell = Flash'], freq=392)
FreqItemset(items=['Item = Ninja Tabi', 'Spell = Teleport', 'Item = Warding Totem (Trinket)', 'Perk = Warding Totem (Trinket)', 'Win = True', 'Spell = Flash'], freq=377)
FreqItemset(items=["Item = Berserker's Greaves", 'Spell = Heal', 'Item = Farsight Alteration', 'Perk = Farsight Alteration', 'Win = False', 'Spell = Flash'], freq=3

In [0]:
from itertools import combinations
data = items_rdd
min_support = minSupport

item_counts = data.flatMap(lambda transaction: [(item, 1) for item in transaction]).reduceByKey(lambda a, b: a + b)
total_count = data.count()

sup = int(min_support * total_count)
freq_items = item_counts.filter(lambda x: x[1] >= sup)
rules = []
k = 2
while freq_items.count() > 0:
    rules.append(freq_items.collect())
    candidate_counts = data.flatMap(lambda transaction: [(pair, 1) for pair in combinations(transaction, k)]).reduceByKey(lambda a, b: a + b)
    freq_items = candidate_counts.filter(lambda x: x[1] >= sup).map(lambda x: (x[0], x[1]))
    k += 1

In [0]:
min_support = minSupport
total_count = items_rdd.count()
sup = int(total_count * min_support)

broadcasted_items = sc.broadcast(items_rdd.map(lambda x: set(x)).collect())
def sup_filter(x):
    x_sup = len([1 for t in broadcasted_items.value if x.issubset(t)])
    if x_sup >= sup:
        return x, x_sup
    return None

rules = []

k = 1
ck = items_rdd.flatMap(lambda x: set(x)).distinct().collect()
ck = [{x} for x in ck]

while len(ck) > 0:
    fk = sc.parallelize(ck).map(sup_filter).filter(lambda x: x is not None).collect()
    if len(fk): rules.append(fk)
    k += 1
    f_k_items = [item for item in map(lambda x: x[0], fk)]
    ck = [i1 | i2 for i, i1 in enumerate(f_k_items) for i2 in f_k_items[i + 1:] if list(i1)[:k - 2] == list(i2)[:k - 2]]

In [0]:
print(rules[5])

[({'Perk = Oracle Lens', 'Item = Oracle Lens', 'Win = True', 'Item = Boots of Mobility', 'Spell = Flash', 'Spell = Ignite'}, 287), ({'Item = Control Ward', 'Perk = Oracle Lens', 'Item = Oracle Lens', 'Win = True', 'Spell = Flash', 'Spell = Ignite'}, 300), ({'Perk = Oracle Lens', 'Item = Oracle Lens', 'Win = False', 'Item = Enchantment: Runic Echoes', 'Spell = Flash', 'Spell = Smite'}, 263), ({"Item = Sorcerer's Shoes", 'Perk = Oracle Lens', 'Item = Oracle Lens', 'Item = Enchantment: Runic Echoes', 'Spell = Flash', 'Spell = Smite'}, 405), ({'Perk = Oracle Lens', 'Item = Oracle Lens', 'Win = False', 'Item = Boots of Mobility', 'Spell = Flash', 'Spell = Ignite'}, 286), ({'Item = Control Ward', 'Perk = Oracle Lens', 'Item = Oracle Lens', 'Win = False', 'Spell = Flash', 'Spell = Ignite'}, 419), ({'Item = Control Ward', 'Perk = Oracle Lens', 'Item = Oracle Lens', 'Item = Boots of Mobility', 'Spell = Flash', 'Spell = Ignite'}, 282), ({'Spell = Teleport', 'Win = True', 'Item = Warding Totem (T