# setup spark

In [None]:
#@title spark install { form-width: "20%" }
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz

!tar xf spark-2.4.4-bin-hadoop2.7.tgz
!pip install -q findspark

!pip install pyspark


In [None]:
#@title spark setup { form-width: "20%" }

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"


import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.driver.memory", "10g").getOrCreate()
sc = spark.sparkContext

# read and view the data

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import os
outputs_folder = '/content/drive/MyDrive/MP2-bigdata/analysis_outputs'
data_path ='/content/drive/MyDrive/MP2-bigdata/data/complete_data'
data_path

'/content/drive/MyDrive/MP2-bigdata/data/complete_data'

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# explore the champion_bans.csv dataset
bans_df = spark.read.csv(data_path+'/champion_bans.csv', header=True).drop('_c0')
bans_df.show()

+----------+---------+
|championId|totalBans|
+----------+---------+
|        26|     2062|
|        29|     2199|
|       222|     1125|
|        19|     3063|
|        54|     4898|
|       112|     1678|
|       113|      132|
|        22|     2828|
|       427|      527|
|         7|    13101|
|        77|      659|
|        34|     3619|
|       126|      998|
|       202|     2869|
|        50|     1946|
|       421|     1867|
|       876|     1441|
|       110|      434|
|        57|      261|
|       136|      194|
+----------+---------+
only showing top 20 rows



In [None]:
# explore the all_participants.csv dataset
participants_df = spark.read.csv(data_path+'/all_participants.csv', header=True).drop('_c0').cache()
participants_df.show()

+----------+------------+--------------------+---------------+-------------+-------+-----+------+-------+----------+-----------+---------------+----------+-----------------+--------------------+----------------------+-----------------------+--------------------+-------------------+------+-------------------+-----------+-----------+----------------+--------------+----------------+--------------+-------------------------+--------------------+----------+----------+---------+------------------+--------------+------------------+--------------+-----+-----+-----+-----+-----+-----+-----+--------------+-------------+-----+---------------------+-------------------+----------------+----------------------+----------------+---------------------------+----------------+--------------------+----------+---------+--------------+----------------+-----------------------+-------------+----------+--------------------+-------------------+------------------------------+-------------------+-----------+--------

# REQUIREMENT 1: Champion win, pick, and ban rates
pick_rates = 
num_champion_picks/ total_num_of_participants

win_rate=
number_of_champ_wins / num_champ_picks



In [None]:
num_matches = participants_df[['gameId']].distinct().count()
num_participants = participants_df.count()

In [None]:
# count the number of each champion occurances in the dataset
req1_df = participants_df[['championId','championName']].groupBy('championId','championName').count().orderBy('count')
req1_df.show()

+----------+------------+-----+
|championId|championName|count|
+----------+------------+-----+
|       136| AurelionSol|  795|
|        72|     Skarner| 1083|
|       268|        Azir| 1261|
|        33|      Rammus| 1407|
|        44|       Taric| 1529|
|       133|       Quinn| 1546|
|        77|        Udyr| 1562|
|       113|     Sejuani| 1565|
|       429|     Kalista| 1612|
|       420|      Illaoi| 1878|
|       518|       Neeko| 1952|
|        42|       Corki| 1969|
|       427|       Ivern| 1985|
|        83|      Yorick| 2054|
|       102|     Shyvana| 2173|
|         2|        Olaf| 2204|
|       150|        Gnar| 2275|
|        96|      KogMaw| 2392|
|        74|Heimerdinger| 2409|
|        60|       Elise| 2475|
+----------+------------+-----+
only showing top 20 rows



In [None]:
req1_df.count()

157

In [None]:
# calculate the pickrate
req1_df = req1_df.withColumn('pickRate%', F.round((req1_df['count']/num_participants)*100, 2))
req1_df.show()

+----------+------------+-----+---------+
|championId|championName|count|pickRate%|
+----------+------------+-----+---------+
|       136| AurelionSol|  795|     0.09|
|        72|     Skarner| 1083|     0.12|
|       268|        Azir| 1261|     0.14|
|        33|      Rammus| 1407|     0.16|
|        44|       Taric| 1529|     0.17|
|       133|       Quinn| 1546|     0.17|
|        77|        Udyr| 1562|     0.17|
|       113|     Sejuani| 1565|     0.17|
|       429|     Kalista| 1612|     0.18|
|       420|      Illaoi| 1878|     0.21|
|       518|       Neeko| 1952|     0.22|
|        42|       Corki| 1969|     0.22|
|       427|       Ivern| 1985|     0.22|
|        83|      Yorick| 2054|     0.23|
|       102|     Shyvana| 2173|     0.24|
|         2|        Olaf| 2204|     0.25|
|       150|        Gnar| 2275|     0.25|
|        96|      KogMaw| 2392|     0.27|
|        74|Heimerdinger| 2409|     0.27|
|        60|       Elise| 2475|     0.28|
+----------+------------+-----+---

In [None]:
## aggregate the number of wins per champion
u = F.udf(lambda b: int(b=='True'), IntegerType())
win_df = participants_df.select('championId', 'win').withColumn('win',u('win') ).groupBy('championId')\
                    .agg(F.sum('win'))
win_df.show()

+----------+--------+
|championId|sum(win)|
+----------+--------+
|        51|    6851|
|         7|    3384|
|       711|    3174|
|       234|    4964|
|        15|    1715|
|        54|    2656|
|       154|    2323|
|       101|    2822|
|        11|    3075|
|        69|    1636|
|        29|    3224|
|       112|    2493|
|        42|    1009|
|        64|    5336|
|         3|    2613|
|       113|     748|
|       432|    2781|
|        30|    2228|
|        34|    1601|
|       133|     774|
+----------+--------+
only showing top 20 rows



In [None]:
# calculate the winrate
req1_df = req1_df.join(win_df, 'championId', 'inner')
req1_df =req1_df.withColumn('winRate%', F.round(req1_df['sum(win)']*100/ req1_df['count'],2)).drop('sum(win)')
req1_df.show()


+----------+------------+-----+---------+--------+
|championId|championName|count|pickRate%|winRate%|
+----------+------------+-----+---------+--------+
|        51|     Caitlyn|13604|     1.51|   50.36|
|         7|     Leblanc| 6857|     0.76|   49.35|
|       711|         Vex| 6565|     0.73|   48.35|
|        15|       Sivir| 3394|     0.38|   50.53|
|       234|       Viego|10048|     1.12|    49.4|
|        54|    Malphite| 5435|     0.61|   48.87|
|       154|         Zac| 4570|     0.51|   50.83|
|       101|      Xerath| 5614|     0.62|   50.27|
|        11|    MasterYi| 5868|     0.65|    52.4|
|        29|      Twitch| 6501|     0.72|   49.59|
|        69|  Cassiopeia| 3323|     0.37|   49.23|
|       112|      Viktor| 4891|     0.54|   50.97|
|        42|       Corki| 1969|     0.22|   51.24|
|        64|      LeeSin|11156|     1.24|   47.83|
|         3|       Galio| 5180|     0.58|   50.44|
|       113|     Sejuani| 1565|     0.17|    47.8|
|        30|     Karthus| 4381|

In [None]:
# ban rate = number of times banned/ total num of games
r = req1_df.join( bans_df.select('championId', 'totalBans'), 'championId', 'inner')
req1_df=r.withColumn('banRate%', F.round(r['totalBans']*100/num_matches,2)).drop('totalBans')
req1_df.show()

+----------+------------+-----+---------+--------+--------+
|championId|championName|count|pickRate%|winRate%|banRate%|
+----------+------------+-----+---------+--------+--------+
|        51|     Caitlyn|13604|     1.51|   50.36|     6.9|
|         7|     Leblanc| 6857|     0.76|   49.35|   14.77|
|       711|         Vex| 6565|     0.73|   48.35|   20.08|
|        15|       Sivir| 3394|     0.38|   50.53|    0.76|
|       234|       Viego|10048|     1.12|    49.4|   15.08|
|        54|    Malphite| 5435|     0.61|   48.87|    5.52|
|       154|         Zac| 4570|     0.51|   50.83|     2.2|
|       101|      Xerath| 5614|     0.62|   50.27|    2.85|
|        11|    MasterYi| 5868|     0.65|    52.4|   15.01|
|        29|      Twitch| 6501|     0.72|   49.59|    2.48|
|        69|  Cassiopeia| 3323|     0.37|   49.23|    1.74|
|       112|      Viktor| 4891|     0.54|   50.97|    1.89|
|        42|       Corki| 1969|     0.22|   51.24|    0.35|
|        64|      LeeSin|11156|     1.24

In [None]:
# save outpu to csv
req1_df.drop('count').toPandas().to_csv(os.path.join(outputs_folder,'Req1_champ_rates.csv'))

# REQUIREMENT2: Champion Synergies or duos
a synergy score was calculated with reference to this [paper](https://https://courses.cs.washington.edu/courses/cse547/21sp/old_projects/kim_etal.pdf) as : 

**synergy_score(champ1, champ2) =
 (total number of winning duos)/(total number of champion duos winning or losing)** 


In [None]:
# I am extracting a smaller version of the participants dataframe with only the necessary columns to work on
columns = ['championId','championName','gameAndTeam' ,\
                      'role','win']
req2_df = participants_df.select(columns)
req2_df.show()

+----------+------------+--------------------+-------+-----+
|championId|championName|         gameAndTeam|   role|  win|
+----------+------------+--------------------+-------+-----+
|       516|        Ornn|Row(_1=3956502423...|   SOLO| True|
|       120|     Hecarim|Row(_1=3956502423...|   NONE| True|
|        91|       Talon|Row(_1=3956502423...|   SOLO| True|
|       202|        Jhin|Row(_1=3956502423...|  CARRY| True|
|       235|       Senna|Row(_1=3956502423...|SUPPORT| True|
|        62|  MonkeyKing|Row(_1=3956502423...|   SOLO|False|
|       106|    Volibear|Row(_1=3956502423...|   NONE|False|
|       238|         Zed|Row(_1=3956502423...|   SOLO|False|
|        51|     Caitlyn|Row(_1=3956502423...|  CARRY|False|
|        99|         Lux|Row(_1=3956502423...|SUPPORT|False|
|        58|    Renekton|Row(_1=4046232109...|   SOLO|False|
|        20|        Nunu|Row(_1=4046232109...|   NONE|False|
|        61|     Orianna|Row(_1=4046232109...|   SOLO|False|
|       145|       Kaisa

In [None]:
# here I am selecting only the DUO ranked rows from the participants dataframe
# Iam then grouping by the game and team IDs  composite ID to get the participants whose role is DUO  
# within the same game and team
# Iam aggregating the DUO champions with the same  teamAndgameId in a list, and their win statuses in a set 
# so that for every unique team in a given game I will have a list of duo players, 
# and their win status (which should be a one element set since all champs in a team get the same win status )

req2_df = req2_df.filter(req2_df.role == 'DUO').select('gameAndTeam','championId','championName','win').\
                groupby('gameAndTeam').agg(F.collect_list('championId'),F.collect_list('championName'),F.collect_set('win'))
req2_df.show()

+--------------------+------------------------+--------------------------+----------------+
|         gameAndTeam|collect_list(championId)|collect_list(championName)|collect_set(win)|
+--------------------+------------------------+--------------------------+----------------+
|Row(_1=3953932960...|              [887, 517]|             [Gwen, Sylas]|         [False]|
|Row(_1=3954864646...|                [41, 92]|        [Gangplank, Riven]|         [False]|
|Row(_1=3955811265...|                [90, 23]|      [Malzahar, Trynda...|         [False]|
|Row(_1=3957155134...|                    [10]|                   [Kayle]|         [False]|
|Row(_1=3960319186...|              [420, 150]|            [Illaoi, Gnar]|          [True]|
|Row(_1=3962529217...|                    [62]|              [MonkeyKing]|         [False]|
|Row(_1=3963407417...|                 [51, 7]|        [Caitlyn, Leblanc]|         [False]|
|Row(_1=3966406586...|                [90, 23]|      [Malzahar, Trynda...|      

In [None]:
# here Iam removing any of the resulting duo-champions lists if it contains a single player 
@F.udf("boolean")
def valid(l):
    if len(l)>1:
        return True
req2_df= req2_df.filter(valid(req2_df['collect_list(championName)'] ))
req2_df.show()

+--------------------+------------------------+--------------------------+----------------+
|         gameAndTeam|collect_list(championId)|collect_list(championName)|collect_set(win)|
+--------------------+------------------------+--------------------------+----------------+
|Row(_1=3953932960...|              [887, 517]|             [Gwen, Sylas]|         [False]|
|Row(_1=3954864646...|                [41, 92]|        [Gangplank, Riven]|         [False]|
|Row(_1=3955811265...|                [90, 23]|      [Malzahar, Trynda...|         [False]|
|Row(_1=3960319186...|              [420, 150]|            [Illaoi, Gnar]|          [True]|
|Row(_1=3963407417...|                 [51, 7]|        [Caitlyn, Leblanc]|         [False]|
|Row(_1=3966406586...|                [90, 23]|      [Malzahar, Trynda...|         [False]|
|Row(_1=3972629104...|                 [4, 27]|      [TwistedFate, Sin...|         [False]|
|Row(_1=3974659038...|               [92, 157]|            [Riven, Yasuo]|      

In [None]:
# first I'll rename the columns after aggregation
# then I will generate all the possible pairings of each two champions in each list in this dataframe so that
# the multi-element list becomes a list of tuples : 
# [champ1, champ2, champ3]--> becomes [(champ1, champ2),(champ2,champ3), (champ1,champ3)]
# I will then explode the champions columns to convert the list of tuples per row to a single tuple per row
# the resulting df should be something like this:
#   |championPair|           |win|
#   |(ch1, ch2)|                  |True|
from pyspark.sql.functions import collect_set, collect_list,col

# renaming columns
replacements = {c:c.replace('collect_set(','').replace(')','') for c in req2_df.columns if 'collect_set(' in c}
req2_df = req2_df.select([col(c).alias(replacements.get(c, c)) for c in req2_df.columns])
replacements = {c:c.replace('collect_list(','').replace(')','') for c in req2_df.columns if 'collect_list(' in c}
req2_df = req2_df.select([col(c).alias(replacements.get(c, c)) for c in req2_df.columns])

req2_df.show()

+--------------------+--------------------+--------------------+-------+
|         gameAndTeam|          championId|        championName|    win|
+--------------------+--------------------+--------------------+-------+
|Row(_1=3953932960...|          [887, 517]|       [Gwen, Sylas]|[False]|
|Row(_1=3954864646...|            [41, 92]|  [Gangplank, Riven]|[False]|
|Row(_1=3955811265...|            [90, 23]|[Malzahar, Trynda...|[False]|
|Row(_1=3960319186...|          [420, 150]|      [Illaoi, Gnar]| [True]|
|Row(_1=3963407417...|             [51, 7]|  [Caitlyn, Leblanc]|[False]|
|Row(_1=3966406586...|            [90, 23]|[Malzahar, Trynda...|[False]|
|Row(_1=3972629104...|             [4, 27]|[TwistedFate, Sin...|[False]|
|Row(_1=3974659038...|           [92, 157]|      [Riven, Yasuo]|[False]|
|Row(_1=3976408301...|           [61, 266]|   [Orianna, Aatrox]|[False]|
|Row(_1=3981277424...|           [103, 82]| [Ahri, Mordekaiser]| [True]|
|Row(_1=3990610844...|            [1, 106]|   [Anni

In [None]:
req2_df.printSchema()

root
 |-- gameAndTeam: string (nullable = true)
 |-- championId: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- championName: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- win: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [None]:
# pairing champions
from itertools import combinations

def all_pairings(lst):
    return [i for i in combinations(lst, 2)]

pair = F.udf(all_pairings, ArrayType(ArrayType(StringType()))) 
req2_df = req2_df.withColumn('championId', pair('championId')).withColumn('championName', pair('championName'))



In [None]:
req2_df.printSchema()

root
 |-- gameAndTeam: string (nullable = true)
 |-- championId: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- championName: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- win: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [None]:
# explode

def sort_pair(pair):

    return(sorted(pair))

reformat = F.udf(sort_pair)
req2_df= req2_df.select('championName','win' ).\
        withColumnRenamed("championName","championPairs").\
        withColumn('championPairs',F.explode('championPairs') ).\
        withColumn('championPairs',reformat('championPairs') )
req2_df.show()

+--------------------+-------+
|       championPairs|    win|
+--------------------+-------+
|       [Gwen, Sylas]|[False]|
|  [Gangplank, Riven]|[False]|
|[Malzahar, Trynda...|[False]|
|      [Gnar, Illaoi]| [True]|
|  [Caitlyn, Leblanc]|[False]|
|[Malzahar, Trynda...|[False]|
|[Singed, TwistedF...|[False]|
|      [Riven, Yasuo]|[False]|
|   [Aatrox, Orianna]|[False]|
| [Ahri, Mordekaiser]| [True]|
|   [Annie, Volibear]| [True]|
|[Cassiopeia, Miss...|[False]|
|        [Ekko, Sett]| [True]|
|       [Kayle, Yone]|[False]|
|      [Urgot, Ziggs]|[False]|
|    [Sett, Vladimir]| [True]|
|  [Mordekaiser, Zed]| [True]|
|        [Fiora, Lux]| [True]|
|     [Caitlyn, Sona]|[False]|
|      [Anivia, Sona]|[False]|
+--------------------+-------+
only showing top 20 rows



In [None]:
# count the synergy score 
un_set= F.udf(lambda s: int(s[0]=='True'), IntegerType())
total_pairings = req2_df.withColumn('win', un_set('win')).groupBy("championPairs").count()
winning_pairings= req2_df.withColumn('win', un_set('win')).groupby('championPairs').\
                                    agg(F.sum('win')).withColumnRenamed('sum(win)','win')


In [None]:
total_pairings.count(), winning_pairings.count()

(9918, 9918)

In [None]:
@F.udf("boolean")
def filt(v):
  if v[0] != v[1]:
    return True

synergy_df= winning_pairings.join(total_pairings, 'championPairs', 'inner').filter(filt('championPairs'))
synergy_df= synergy_df.withColumn('SynergyScore%', F.round(synergy_df['win']*100/synergy_df['count'],2)).\
                        drop('win','count')
synergy_df.show()

+--------------------+-------------+
|       championPairs|SynergyScore%|
+--------------------+-------------+
|  [Amumu, Seraphine]|        57.14|
|        [Sett, Sona]|        66.67|
|[Kassadin, TahmKe...|        55.56|
|    [Hecarim, Yasuo]|        33.33|
|      [Fizz, Lucian]|         44.0|
|       [Ekko, Poppy]|        63.64|
|[Katarina, Tristana]|         50.0|
|        [Ashe, Ekko]|        70.59|
|    [Seraphine, Vex]|        66.67|
|    [Orianna, Shaco]|        100.0|
| [Kindred, Malzahar]|          0.0|
|  [MasterYi, Samira]|          0.0|
|  [Caitlyn, Hecarim]|         40.0|
|    [Karthus, Yuumi]|        33.33|
|   [MasterYi, Vayne]|        33.33|
|          [Ekko, Vi]|          0.0|
|    [Sivir, Trundle]|          0.0|
|   [Katarina, Teemo]|        57.89|
| [Chogath, JarvanIV]|          0.0|
|  [Lucian, Malphite]|        45.45|
+--------------------+-------------+
only showing top 20 rows



In [None]:
synergy_df.toPandas().to_csv(os.path.join(outputs_folder,'req2_champ_synergies.csv'))

# REQUIREMENT 3: Item win, pick rates

In [None]:
item_cols=[col for col in participants_df.columns if col.startswith('item') and  col not in [ 'itemsPurchased', 'item6']]
select_columns = item_cols+['win']
items_df = participants_df.select(select_columns)
items_df.show()

+-----+-----+-----+-----+-----+-----+-----+
|item0|item1|item2|item3|item4|item5|  win|
+-----+-----+-----+-----+-----+-----+-----+
| 8001| 3047| 1057| 7005| 3076| 1011| True|
| 6632| 1037| 3053| 1031| 3158| 1036| True|
| 1055| 3071| 7015| 3053| 3134| 3047| True|
| 1055| 6671| 3009| 3094| 1038| 1018| True|
| 2055| 3864| 3133| 6632| 3067| 3047| True|
| 2031| 6632| 3047| 1054| 3067| 1036|False|
| 6662| 3047| 3075|    0|    0|    0|False|
| 6692| 3179| 3123| 2031| 1036| 3158|False|
| 3031| 6671| 3006| 3094|    0|    0|False|
| 3020| 3853| 6655| 4628| 2031|    0|False|
| 6630| 2031| 1055| 3047| 3053| 3211|False|
| 3068| 2031| 3047| 3076| 2055| 1011|False|
| 6655| 3157| 1058| 3916| 3070| 3020|False|
| 1055| 3134| 3123| 6672| 3006| 1018|False|
| 3067| 3857|    0|    0| 3117| 3190|False|
| 6630| 3071| 3133| 1031| 3047|    0| True|
| 4636| 3165| 2421| 1082| 3020| 3191| True|
| 6694| 3158| 6692|    0|    0|    0| True|
| 3006| 6672| 1055| 3508| 3133|    0| True|
| 3158| 3011| 3853| 2055| 3114| 

In [None]:
# melt all the item columns into one
def melt_columns(df, by,subset=None):
    relevant_columns = subset if subset else df.columns
    n = len(relevant_columns)
    cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns]) 
    stack_expression = "stack({}, {}) as (itemColumn, item)".format(n, cols_to_stack)

    fetures_to_check_df = df.select(*([by] + relevant_columns)).createOrReplaceTempView("features_to_check")

    sql = f"select {by}, {stack_expression} from features_to_check"
    return spark.sql(sql)

req3_df = melt_columns(items_df,'win', item_cols)
req3_df.show()

+----+----------+----+
| win|itemColumn|item|
+----+----------+----+
|True|     item0|8001|
|True|     item1|3047|
|True|     item2|1057|
|True|     item3|7005|
|True|     item4|3076|
|True|     item5|1011|
|True|     item0|6632|
|True|     item1|1037|
|True|     item2|3053|
|True|     item3|1031|
|True|     item4|3158|
|True|     item5|1036|
|True|     item0|1055|
|True|     item1|3071|
|True|     item2|7015|
|True|     item3|3053|
|True|     item4|3134|
|True|     item5|3047|
|True|     item0|1055|
|True|     item1|6671|
+----+----------+----+
only showing top 20 rows



In [None]:
total_item_picks = req3_df.count()


In [None]:
# count the ocurrances and wins of each item in dataset, then find the item pick and win win rate
makeInt = F.udf(lambda l: int(l=='True'), IntegerType())
item_picks = req3_df.groupBy('item').count().withColumnRenamed('count','itemCounts')
item_wins = req3_df.select('item','win').withColumn('win', makeInt('win')).groupBy('item').agg(F.sum('win')).\
                        withColumnRenamed('sum(win)', 'winCount').join(item_picks, 'item','inner')
req3_df= item_wins.withColumn('itemPickRate%', F.round( item_wins['itemCounts']*100/total_item_picks ,  2 ) ).\
                    withColumn('itemWinRate%', F.round( item_wins['winCount']*100/item_wins['itemCounts'] ,  2 ) ).\
                    drop('itemCounts','winCount')
req3_df.show()
                          

+----+-------------+------------+
|item|itemPickRate%|itemWinRate%|
+----+-------------+------------+
|3858|         0.02|       33.55|
|3121|         0.03|       52.52|
|3057|         0.18|       38.95|
|7013|         0.01|       58.38|
|7014|          0.0|       62.24|
|1043|         0.12|       41.85|
|3179|         0.11|       52.22|
|7015|          0.0|       63.98|
|3089|         0.57|       60.13|
|6333|         0.45|       59.98|
|3145|         0.17|       43.19|
|3863|         0.01|       24.59|
|6609|         0.17|       55.28|
|3140|         0.08|       57.09|
|7012|          0.0|       60.43|
|2138|          0.0|        47.3|
|6632|         1.27|       50.81|
|3742|         0.25|       58.02|
|6662|         0.39|       51.58|
|2139|          0.0|        54.4|
+----+-------------+------------+
only showing top 20 rows



In [None]:
# read item names from data dragon
items_ddragon_df = spark.read.csv(data_path+'/item_names_from_ddragon.csv', header=True).drop('image', '_c0').withColumnRenamed('name','itemName')
items_ddragon_df.show()

+---------+-----------------+
|itemIndex|         itemName|
+---------+-----------------+
|     1001|            Boots|
|     1004|     Faerie Charm|
|     1006|Rejuvenation Bead|
|     1011|     Giant's Belt|
|     1018| Cloak of Agility|
|     1026|    Blasting Wand|
|     1027| Sapphire Crystal|
|     1028|     Ruby Crystal|
|     1029|      Cloth Armor|
|     1031|       Chain Vest|
|     1033|Null-Magic Mantle|
|     1035|       Emberknife|
|     1036|       Long Sword|
|     1037|          Pickaxe|
|     1038|      B. F. Sword|
|     1039|        Hailblade|
|     1040|    Obsidian Edge|
|     1042|           Dagger|
|     1043|      Recurve Bow|
|     1052|  Amplifying Tome|
+---------+-----------------+
only showing top 20 rows



In [None]:
# add the items names to the dataframe
req3_df=req3_df.withColumnRenamed('item','itemIndex').join(items_ddragon_df,'itemIndex','inner').\
                select('itemIndex','itemName','itemPickRate%', 'itemWinRate%')
req3_df.show()

+---------+--------------------+-------------+------------+
|itemIndex|            itemName|itemPickRate%|itemWinRate%|
+---------+--------------------+-------------+------------+
|     3858|        Relic Shield|         0.02|       33.55|
|     3121|        Fimbulwinter|         0.03|       52.52|
|     3057|               Sheen|         0.18|       38.95|
|     1043|         Recurve Bow|         0.12|       41.85|
|     3179|       Umbral Glaive|         0.11|       52.22|
|     3089|  Rabadon's Deathcap|         0.57|       60.13|
|     6333|       Death's Dance|         0.45|       59.98|
|     3145|  Hextech Alternator|         0.17|       43.19|
|     3863|  Harrowing Crescent|         0.01|       24.59|
|     6609| Chempunk Chainsword|         0.17|       55.28|
|     3140|    Quicksilver Sash|         0.08|       57.09|
|     2138|      Elixir of Iron|          0.0|        47.3|
|     6632|     Divine Sunderer|         1.27|       50.81|
|     3742|    Dead Man's Plate|        

In [None]:
# save the output
req3_df.toPandas().to_csv(outputs_folder+'/req3_item_rates.csv')

# REQUIREMENT 4: Item Synergies

In [None]:

item_cols=[col for col in participants_df.columns if col.startswith('item') and  col not in ['itemsPurchased', 'item6']]
select_columns = item_cols+['win'] +['championName']+['gameAndTeam']
items_df = participants_df.select(select_columns)
#items_df.show()
# melt all the item columns into
def melt_columns(df, by,subset=None):
    relevant_columns = subset if subset else df.columns
    n = len(relevant_columns)
    cols_to_stack = ", ".join(['\'{c}\', {c}'.format(c=c) for c in relevant_columns]) 
    stack_expression = "stack({}, {}) as (itemColumn, item)".format(n, cols_to_stack)
    fetures_to_check_df = df.select(*(by + relevant_columns)).createOrReplaceTempView("features_to_check")
    sql = f"select {by[0]}, {by[1]}, {by[2]}, {stack_expression} from features_to_check"
    return spark.sql(sql)

req4_df = melt_columns(items_df,['win','gameAndTeam','championName'] , item_cols)
req4_df.show()

+----+--------------------+------------+----------+----+
| win|         gameAndTeam|championName|itemColumn|item|
+----+--------------------+------------+----------+----+
|True|Row(_1=3956502423...|        Ornn|     item0|8001|
|True|Row(_1=3956502423...|        Ornn|     item1|3047|
|True|Row(_1=3956502423...|        Ornn|     item2|1057|
|True|Row(_1=3956502423...|        Ornn|     item3|7005|
|True|Row(_1=3956502423...|        Ornn|     item4|3076|
|True|Row(_1=3956502423...|        Ornn|     item5|1011|
|True|Row(_1=3956502423...|     Hecarim|     item0|6632|
|True|Row(_1=3956502423...|     Hecarim|     item1|1037|
|True|Row(_1=3956502423...|     Hecarim|     item2|3053|
|True|Row(_1=3956502423...|     Hecarim|     item3|1031|
|True|Row(_1=3956502423...|     Hecarim|     item4|3158|
|True|Row(_1=3956502423...|     Hecarim|     item5|1036|
|True|Row(_1=3956502423...|       Talon|     item0|1055|
|True|Row(_1=3956502423...|       Talon|     item1|3071|
|True|Row(_1=3956502423...|    

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.functions import collect_set, collect_list,col
from pyspark.sql import Window

a = req4_df.groupBy("championName", "item", "win").agg(F.count(F.lit(1)).alias("winCount")).where(col('win')=='True')
total = req4_df.groupBy("championName", "item").agg(F.count(F.lit(1)).alias("totalCount"))
req41 = total.join(a.select('championName', 'item', 'winCount'), ['championName', 'item'])
req41 = req41.withColumn("totalPick", F.sum("totalCount") \
  .over(Window.partitionBy("championName"))) 
req41 = req41.withColumn('chSynergy', F.round( req41['winCount']*100/req41['totalCount'] )).withColumn('chItemPick', F.round(req41['totalCount']*100/req41['totalPick'],2))
req41.show()

+------------+----+----------+--------+---------+---------+----------+
|championName|item|totalCount|winCount|totalPick|chSynergy|chItemPick|
+------------+----+----------+--------+---------+---------+----------+
|      RekSai|3075|        41|      23|    19602|     56.0|      0.21|
|      RekSai|6632|         6|       4|    19602|     67.0|      0.03|
|      RekSai|7000|         7|       4|    19602|     57.0|      0.04|
|      RekSai|6693|      2765|    1503|    19602|     54.0|     14.11|
|      RekSai|3123|       214|     105|    19602|     49.0|      1.09|
|      RekSai|1042|         5|       2|    19602|     40.0|      0.03|
|      RekSai|6029|         6|       2|    19602|     33.0|      0.03|
|      RekSai|3067|       117|      63|    19602|     54.0|       0.6|
|      RekSai|1029|       219|     116|    19602|     53.0|      1.12|
|      RekSai|1001|       220|      98|    19602|     45.0|      1.12|
|      RekSai|3748|        21|      12|    19602|     57.0|      0.11|
|     

In [None]:
#items_ddragon_df = spark.createDataFrame(items_ddragon_pd).drop('image').withColumnRenamed('name','itemName')
items_ddragon_df.show()
req41=req41.withColumnRenamed('item','itemIndex').join(items_ddragon_df,'itemIndex','inner').\
                select('itemIndex','championName','itemName','chSynergy', 'chItemPick')
req41.show()

+---------+-----------------+
|itemIndex|         itemName|
+---------+-----------------+
|     1001|            Boots|
|     1004|     Faerie Charm|
|     1006|Rejuvenation Bead|
|     1011|     Giant's Belt|
|     1018| Cloak of Agility|
|     1026|    Blasting Wand|
|     1027| Sapphire Crystal|
|     1028|     Ruby Crystal|
|     1029|      Cloth Armor|
|     1031|       Chain Vest|
|     1033|Null-Magic Mantle|
|     1035|       Emberknife|
|     1036|       Long Sword|
|     1037|          Pickaxe|
|     1038|      B. F. Sword|
|     1039|        Hailblade|
|     1040|    Obsidian Edge|
|     1042|           Dagger|
|     1043|      Recurve Bow|
|     1052|  Amplifying Tome|
+---------+-----------------+
only showing top 20 rows

+---------+------------+--------------------+---------+----------+
|itemIndex|championName|            itemName|chSynergy|chItemPick|
+---------+------------+--------------------+---------+----------+
|     3075|      RekSai|           Thornmail|     56.

In [None]:
# save the output
req41.toPandas().to_csv(outputs_folder+'/req41_Item_champ_synergy.csv')

In [None]:
from pyspark.sql.functions import explode, udf
from pyspark.sql.types import  StringType,  ArrayType
import ast

chClasses = spark.read.csv(data_path+'/champion_classes_from_ddragon.csv', header = True)
chClasses = chClasses.withColumnRenamed('class', 'chclass')
udf_func = udf(lambda x: ([(n) for n in ast.literal_eval(x)]), ArrayType(StringType()))
chClasses = chClasses.withColumn('chclass', udf_func(col("chclass")))

chClasses.show()
req42 = req4_df.select('championName', 'item', 'win').join(chClasses.select('championName', 'chclass' ), ['championName'])
req42.show()
req42 = req42.select(req42.win, req42.item, explode(('chclass')).alias('chclass'))#.show() #req42.win, req42.item,
req42.show()

+---+------------+--------------------+---------------+
|_c0|championName|             chclass|          image|
+---+------------+--------------------+---------------+
|  0|      Aatrox|     [Fighter, Tank]|     Aatrox.png|
|  1|        Ahri|    [Mage, Assassin]|       Ahri.png|
|  2|       Akali|          [Assassin]|      Akali.png|
|  3|      Akshan|[Marksman, Assassin]|     Akshan.png|
|  4|     Alistar|     [Tank, Support]|    Alistar.png|
|  5|       Amumu|        [Tank, Mage]|      Amumu.png|
|  6|      Anivia|     [Mage, Support]|     Anivia.png|
|  7|       Annie|              [Mage]|      Annie.png|
|  8|    Aphelios|          [Marksman]|   Aphelios.png|
|  9|        Ashe| [Marksman, Support]|       Ashe.png|
| 10| AurelionSol|              [Mage]|AurelionSol.png|
| 11|        Azir|    [Mage, Marksman]|       Azir.png|
| 12|        Bard|     [Support, Mage]|       Bard.png|
| 13|  Blitzcrank|     [Tank, Fighter]| Blitzcrank.png|
| 14|       Brand|              [Mage]|      Bra

In [None]:

a = req42.groupBy("chclass", "item", "win").agg(F.count(F.lit(1)).alias("winCount")).where(col('win')=='True')
total = req42.groupBy("chclass", "item").agg(F.count(F.lit(1)).alias("totalCount"))
req42_f = total.join(a.select('chclass', 'item', 'winCount'), ['chclass', 'item'])
req42_f = req42_f.withColumn('clSynergy', F.round( req42_f['winCount']*100/req42_f['totalCount'] ))
req42_f.show()

+--------+----+----------+--------+---------+
| chclass|item|totalCount|winCount|clSynergy|
+--------+----+----------+--------+---------+
|    Mage|3862|         6|       1|     17.0|
|    Mage|4628|     16047|    8872|     55.0|
|Marksman|2010|      1023|     373|     36.0|
|    Tank|2015|        26|      15|     58.0|
|    Tank|3862|         7|       3|     43.0|
|Assassin|2140|       106|      44|     42.0|
|Assassin|4630|      3945|    1952|     49.0|
| Fighter|2421|     11168|    6329|     57.0|
| Fighter|3068|     13524|    7043|     52.0|
| Fighter|3851|       142|      38|     27.0|
| Fighter|6630|     64524|   32886|     51.0|
|    Mage|3142|       645|     329|     51.0|
|Marksman|3053|        74|      40|     54.0|
|Marksman|3085|     13542|    7468|     55.0|
|Marksman|6693|       376|     206|     55.0|
| Support|3004|       139|      80|     58.0|
|    Tank|1039|       317|     147|     46.0|
|    Tank|3145|       611|     248|     41.0|
|Assassin|3142|     28609|   15395

In [None]:
#items_ddragon_df = spark.createDataFrame(items_ddragon_pd).drop('image').withColumnRenamed('name','itemName')
items_ddragon_df.show()
req42_f=req42_f.withColumnRenamed('item','itemIndex').join(items_ddragon_df,'itemIndex','inner').\
                select('itemIndex','chClass','itemName','clSynergy')
req42_f.show()

+---------+-----------------+
|itemIndex|         itemName|
+---------+-----------------+
|     1001|            Boots|
|     1004|     Faerie Charm|
|     1006|Rejuvenation Bead|
|     1011|     Giant's Belt|
|     1018| Cloak of Agility|
|     1026|    Blasting Wand|
|     1027| Sapphire Crystal|
|     1028|     Ruby Crystal|
|     1029|      Cloth Armor|
|     1031|       Chain Vest|
|     1033|Null-Magic Mantle|
|     1035|       Emberknife|
|     1036|       Long Sword|
|     1037|          Pickaxe|
|     1038|      B. F. Sword|
|     1039|        Hailblade|
|     1040|    Obsidian Edge|
|     1042|           Dagger|
|     1043|      Recurve Bow|
|     1052|  Amplifying Tome|
+---------+-----------------+
only showing top 20 rows

+---------+--------+--------------------+---------+
|itemIndex| chClass|            itemName|clSynergy|
+---------+--------+--------------------+---------+
|     3862|    Mage|     Spectral Sickle|     17.0|
|     4628|    Mage|       Horizon Focus|     

In [None]:
req42_f.toPandas().to_csv(outputs_folder+'/req42_Item_class_synergy.csv')

# REQUIREMENT 5: item suggestion

In [None]:
req5 =req41.join(req3_df,'itemIndex')
req5.show()

+---------+------------+------------+---------+----------+------------+-------------+------------+
|itemIndex|championName|    itemName|chSynergy|chItemPick|    itemName|itemPickRate%|itemWinRate%|
+---------+------------+------------+---------+----------+------------+-------------+------------+
|     3858|        Rell|Relic Shield|     35.0|      0.16|Relic Shield|         0.02|       33.55|
|     3858|        Bard|Relic Shield|     33.0|      0.06|Relic Shield|         0.02|       33.55|
|     3858|    Pantheon|Relic Shield|     67.0|      0.01|Relic Shield|         0.02|       33.55|
|     3858|       Poppy|Relic Shield|    100.0|       0.0|Relic Shield|         0.02|       33.55|
|     3858|    Nautilus|Relic Shield|     30.0|      0.28|Relic Shield|         0.02|       33.55|
|     3858|         Lux|Relic Shield|    100.0|       0.0|Relic Shield|         0.02|       33.55|
|     3858|         Vex|Relic Shield|    100.0|       0.0|Relic Shield|         0.02|       33.55|
|     3858

In [None]:
iclasses = spark.read.csv(data_path+'/item_namesandtags_from_ddragon.csv', header = True)
udf_func = udf(lambda x: ([(n) for n in ast.literal_eval(x)]), ArrayType(StringType()))
iclasses = iclasses.withColumn('tags', udf_func(col("tags"))).drop('_c0')
#iclasses.show()

In [None]:
func = F.udf(lambda x: len(x))

req5 = req5.join(iclasses, 'itemIndex')
req5 = req5.withColumn('n_tags', func(req5.tags) )#.show()


In [None]:
from pyspark.sql.types import IntegerType

def heuristic(chSynergy,  ch_item_pic, n, win, pick):
  try: 
   h = 0.75*float(chSynergy)/float(ch_item_pic) + 0.25*float(win)/float(pick) -5*float(n)
  except: 
    h = 0
  return  h

func2 = F.udf(heuristic)
#req5 = req5.withColumn('heuristic', func2( "n_tags","chSynergy"))#, "chItemPick", "itemWinRate%" , "itemPickRate%"))# 'n_tags'))#, req5['itemWinRate%'] , req5['itemPickRate%']) )
req5 = req5.withColumn('heuristic', func2("chSynergy", "chItemPick", "n_tags", "itemWinRate%" , "itemPickRate%") )

req5.show()

+---------+------------+------------+---------+----------+------------+-------------+------------+------------+--------------------+--------+------+-----------------+
|itemIndex|championName|    itemName|chSynergy|chItemPick|    itemName|itemPickRate%|itemWinRate%|        name|                tags|   image|n_tags|        heuristic|
+---------+------------+------------+---------+----------+------------+-------------+------------+------------+--------------------+--------+------+-----------------+
|     3858|        Rell|Relic Shield|     35.0|      0.16|Relic Shield|         0.02|       33.55|Relic Shield|[Health, HealthRe...|3858.png|     6|         553.4375|
|     3858|        Bard|Relic Shield|     33.0|      0.06|Relic Shield|         0.02|       33.55|Relic Shield|[Health, HealthRe...|3858.png|     6|          801.875|
|     3858|    Pantheon|Relic Shield|     67.0|      0.01|Relic Shield|         0.02|       33.55|Relic Shield|[Health, HealthRe...|3858.png|     6|         5414.375

In [None]:
def is_valid(item, lclasses):
   
    a = iclasses.where(iclasses.name == item)
    item_classes = ((a.select('tags').toPandas()['tags'])).tolist()
    for c in item_classes[0]:
      if c in lclasses:
        if c == 'mythetic' and ''
        return False, []
    
    return True, item_classes

In [None]:
def suggest(ch):
   c = req5.where(req5.championName == ch).orderBy(col('heuristic').desc())#.select('chSynergy', 'itemName')#.toPandas()['chSynergy']
   rest_list = []
   items = []
   c_ind = 0
   while len(items) <6:
     current_item = c.collect()[c_ind]['itemName']
     b, new_rest = is_valid(current_item, rest_list)
     #print(new_rest[0])
     if b: 
       items.append(current_item)
       rest_list.extend(new_rest[0])
     c_ind +=1

   return items

In [None]:
champs = ['Alistar', 'Anivia']
for ch in champs: 
  print(suggest(ch))


['Sunfire Aegis', 'Faerie Charm', 'Boots', 'Sapphire Crystal', 'Rejuvenation Bead', 'Tiamat']
['Bulwark of the Mountain', 'Glacial Buckler', 'Stopwatch', 'Total Biscuit of Everlasting Will', 'Boots of Swiftness', 'Sheen']


In [None]:
#c.show()


# EXTRA REQUIREMENTS

## 1- match win prediction
FEATURES (for the two teams):
* Kills
* Assists
* Deaths
* Damage to Objectives
* Damage to Turrets
* Damage that is Self-Mitigated
* Neutral minions killed
* Physical Damage Dealt
* Magical Damage Dealt
* Time ccing Others
* WardsPlaced

In [None]:
# read dataset and cast types
features =[ 'gameAndTeam', 
'gameId',
'teamId',
'win',
'assists',
'damageDealtToObjectives',
'damageDealtToTurrets',
'damageSelfMitigated',
'deaths',
'kills',
'magicDamageDealt',
'neutralMinionsKilled',
'physicalDamageDealt',
'timeCCingOthers', 'wardsPlaced',

'doubleKills',
'tripleKills',
'pentaKills',
'dragonKills',
'goldEarned',
'goldSpent',
'turretKills',
'turretTakedowns',
'turretsLost',
'unrealKills',
'visionScore'
]

dataset = participants_df.select(features ).cache()


mapping = {
'assists':'float',
'damageDealtToObjectives':'float',
'damageDealtToTurrets':'float',
'damageSelfMitigated':'float',
'deaths':'float',
'kills':'float',
'magicDamageDealt':'float',
'neutralMinionsKilled':'float',
'physicalDamageDealt':'float',
'timeCCingOthers':'float',
'wardsPlaced':'float',
'doubleKills':'float',
'tripleKills':'float',
'pentaKills':'float',
'dragonKills':'float',
'goldEarned':'float',
'goldSpent':'float',
'turretKills':'float',
'turretTakedowns':'float',
'turretsLost':'float',
'unrealKills':'float',
'visionScore':'float'}

rest_cols = [F.col(cl) for cl in dataset.columns if cl not in mapping.keys()]
conv_cols = [F.col(cl_name).cast(cl_type).alias(cl_name) for cl_name, cl_type in mapping.items()  if cl_name in dataset.columns]
dataset = dataset.select(*rest_cols, *conv_cols)

In [None]:
dataset.show()

+--------------------+----------+------+-----+-------+-----------------------+--------------------+-------------------+------+-----+----------------+--------------------+-------------------+---------------+-----------+-----------+-----------+----------+-----------+----------+---------+-----------+---------------+-----------+-----------+-----------+
|         gameAndTeam|    gameId|teamId|  win|assists|damageDealtToObjectives|damageDealtToTurrets|damageSelfMitigated|deaths|kills|magicDamageDealt|neutralMinionsKilled|physicalDamageDealt|timeCCingOthers|wardsPlaced|doubleKills|tripleKills|pentaKills|dragonKills|goldEarned|goldSpent|turretKills|turretTakedowns|turretsLost|unrealKills|visionScore|
+--------------------+----------+------+-----+-------+-----------------------+--------------------+-------------------+------+-----+----------------+--------------------+-------------------+---------------+-----------+-----------+-----------+----------+-----------+----------+---------+-----------+

In [None]:
lll=[]
for i in ['doubleKills','tripleKills','pentaKills','dragonKills','goldEarned','goldSpent','turretKills','turretTakedowns','turretsLost','unrealKills','visionScore']:
    lll.append(f"SUM('{i}').alias('{i}')")
lll

["SUM('doubleKills').alias('doubleKills')",
 "SUM('tripleKills').alias('tripleKills')",
 "SUM('pentaKills').alias('pentaKills')",
 "SUM('dragonKills').alias('dragonKills')",
 "SUM('goldEarned').alias('goldEarned')",
 "SUM('goldSpent').alias('goldSpent')",
 "SUM('turretKills').alias('turretKills')",
 "SUM('turretTakedowns').alias('turretTakedowns')",
 "SUM('turretsLost').alias('turretsLost')",
 "SUM('unrealKills').alias('unrealKills')",
 "SUM('visionScore').alias('visionScore')"]

In [None]:
dataset.printSchema()

root
 |-- gameAndTeam: string (nullable = true)
 |-- gameId: string (nullable = true)
 |-- teamId: string (nullable = true)
 |-- win: string (nullable = true)
 |-- assists: float (nullable = true)
 |-- damageDealtToObjectives: float (nullable = true)
 |-- damageDealtToTurrets: float (nullable = true)
 |-- damageSelfMitigated: float (nullable = true)
 |-- deaths: float (nullable = true)
 |-- kills: float (nullable = true)
 |-- magicDamageDealt: float (nullable = true)
 |-- neutralMinionsKilled: float (nullable = true)
 |-- physicalDamageDealt: float (nullable = true)
 |-- timeCCingOthers: float (nullable = true)
 |-- wardsPlaced: float (nullable = true)
 |-- doubleKills: float (nullable = true)
 |-- tripleKills: float (nullable = true)
 |-- pentaKills: float (nullable = true)
 |-- dragonKills: float (nullable = true)
 |-- goldEarned: float (nullable = true)
 |-- goldSpent: float (nullable = true)
 |-- turretKills: float (nullable = true)
 |-- turretTakedowns: float (nullable = true)
 |-

In [None]:
#Conststruct a dataset:
from pyspark.sql.functions import sum as SUM
import pyspark.sql.functions as F

by_team_and_game = dataset.groupBy('gameAndTeam').\
    agg(F.collect_set('gameId').alias('gameId'),F.collect_set('teamId').alias('teamId'),F.collect_set('win').alias('win'), SUM('assists').alias('assists'), \
        SUM('damageDealtToObjectives').alias('damageDealtToObjectives'),SUM('damageDealtToTurrets').alias('damageDealtToTurrets'),\
        SUM('damageSelfMitigated').alias('damageSelfMitigated'), SUM('deaths').alias('deaths'), SUM('kills').alias('kills'), \
        SUM('magicDamageDealt').alias('magicDamageDealt'), SUM('neutralMinionsKilled').alias('neutralMinionsKilled'), \
        SUM('physicalDamageDealt').alias('neutralMinionsKilled'),SUM('timeCCingOthers').alias('timeCCingOthers'), SUM('wardsPlaced').alias('wardsPlaced'),\
        SUM('doubleKills').alias('doubleKills'),\
        SUM('tripleKills').alias('tripleKills'),\
        SUM('pentaKills').alias('pentaKills'),\
        SUM('dragonKills').alias('dragonKills'),\
        SUM('goldEarned').alias('goldEarned'),\
        SUM('goldSpent').alias('goldSpent'),\
        SUM('turretKills').alias('turretKills'),\
        SUM('turretTakedowns').alias('turretTakedowns'),\
        SUM('turretsLost').alias('turretsLost'),\
        SUM('unrealKills').alias('unrealKills'),\
        SUM('visionScore').alias('visionScore'))
    
by_team_and_game.show()

+--------------------+------------+------+-------+-------+-----------------------+--------------------+-------------------+------+-----+----------------+--------------------+--------------------+---------------+-----------+-----------+-----------+----------+-----------+----------+---------+-----------+---------------+-----------+-----------+-----------+
|         gameAndTeam|      gameId|teamId|    win|assists|damageDealtToObjectives|damageDealtToTurrets|damageSelfMitigated|deaths|kills|magicDamageDealt|neutralMinionsKilled|neutralMinionsKilled|timeCCingOthers|wardsPlaced|doubleKills|tripleKills|pentaKills|dragonKills|goldEarned|goldSpent|turretKills|turretTakedowns|turretsLost|unrealKills|visionScore|
+--------------------+------------+------+-------+-------+-----------------------+--------------------+-------------------+------+-----+----------------+--------------------+--------------------+---------------+-----------+-----------+-----------+----------+-----------+----------+-------

In [None]:
by_team_and_game=by_team_and_game.cache()
dataset=dataset.unpersist()


In [None]:
# combine all features into one column 
teams_records = by_team_and_game.rdd.map(lambda x: (x['gameId'][0],[x['teamId'][0], int(x['win'][0].lower()=='true')],list(x[4:]) )).toDF(['gameId','teamWin', 'featuresList'])
teams_records.show()

+----------+--------+--------------------+
|    gameId| teamWin|        featuresList|
+----------+--------+--------------------+
|1052559995|[100, 0]|[29.0, 28790.0, 3...|
|3953932960|[200, 0]|[22.0, 21370.0, 4...|
|3954864646|[100, 0]|[22.0, 39465.0, 6...|
|3955095594|[100, 0]|[38.0, 19931.0, 8...|
|3955726176|[200, 1]|[41.0, 71303.0, 2...|
|3955811265|[200, 0]|[45.0, 65401.0, 2...|
|3955857870|[100, 1]|[65.0, 63860.0, 1...|
|3956029213|[200, 0]|[56.0, 67830.0, 2...|
|3956393182|[100, 0]|[28.0, 14389.0, 3...|
|3956908003|[100, 0]|[18.0, 31649.0, 5...|
|3957155134|[200, 0]|[20.0, 8309.0, 19...|
|3959062267|[200, 0]|[35.0, 28087.0, 8...|
|3959197488|[100, 1]|[70.0, 70998.0, 1...|
|3959626691|[200, 0]|[27.0, 35234.0, 1...|
|3959697619|[200, 0]|[31.0, 26491.0, 9...|
|3960319186|[200, 1]|[31.0, 63769.0, 1...|
|3960474990|[200, 1]|[45.0, 53152.0, 2...|
|3960772674|[200, 0]|[50.0, 60735.0, 1...|
|3962185793|[100, 0]|[15.0, 21411.0, 5...|
|3962445825|[100, 0]|[37.0, 21504.0, 2...|
+----------

In [None]:
teams_records.printSchema()

root
 |-- gameId: string (nullable = true)
 |-- teamWin: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- featuresList: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [None]:
@F.udf("integer")
def result(lst):
    lst = list(lst)
    if lst ==['100','1'] or lst==['200','0']: # blue team wins -> 1
        return 1
    else: # red team wins
        return 0

teams_records = teams_records.withColumn('teamWin', result('teamWin'))
teams_records.show()

+----------+-------+--------------------+
|    gameId|teamWin|        featuresList|
+----------+-------+--------------------+
|1052559995|      0|[29.0, 28790.0, 3...|
|3953932960|      1|[22.0, 21370.0, 4...|
|3954864646|      0|[22.0, 39465.0, 6...|
|3955095594|      0|[38.0, 19931.0, 8...|
|3955726176|      0|[41.0, 71303.0, 2...|
|3955811265|      1|[45.0, 65401.0, 2...|
|3955857870|      1|[65.0, 63860.0, 1...|
|3956029213|      1|[56.0, 67830.0, 2...|
|3956393182|      0|[28.0, 14389.0, 3...|
|3956908003|      0|[18.0, 31649.0, 5...|
|3957155134|      1|[20.0, 8309.0, 19...|
|3959062267|      1|[35.0, 28087.0, 8...|
|3959197488|      1|[70.0, 70998.0, 1...|
|3959626691|      1|[27.0, 35234.0, 1...|
|3959697619|      1|[31.0, 26491.0, 9...|
|3960319186|      0|[31.0, 63769.0, 1...|
|3960474990|      0|[45.0, 53152.0, 2...|
|3960772674|      1|[50.0, 60735.0, 1...|
|3962185793|      0|[15.0, 21411.0, 5...|
|3962445825|      0|[37.0, 21504.0, 2...|
+----------+-------+--------------

In [None]:
teams_records.printSchema()

root
 |-- gameId: string (nullable = true)
 |-- teamWin: integer (nullable = true)
 |-- featuresList: array (nullable = true)
 |    |-- element: double (containsNull = true)



In [None]:
# now aggregate the two teams info in one list by grouping on game id
u =F.udf(lambda l: l[0], IntegerType())
game_records = teams_records.groupBy('gameId').agg(F.collect_set('teamWin').alias('y'),F.collect_list('featuresList').alias('X')).\
                                withColumn('y', u('y')).drop('gameId')
game_records.printSchema()

root
 |-- y: integer (nullable = true)
 |-- X: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: double (containsNull = true)



In [None]:
game_records.show()

+---+--------------------+
|  y|                   X|
+---+--------------------+
|  0|[[41.0, 41862.0, ...|
|  0|[[19.0, 28151.0, ...|
|  1|[[41.0, 80798.0, ...|
|  1|[[27.0, 7891.0, 3...|
|  0|[[62.0, 79704.0, ...|
|  0|[[51.0, 20528.0, ...|
|  1|[[17.0, 20909.0, ...|
|  1|[[69.0, 50040.0, ...|
|  1|[[22.0, 13206.0, ...|
|  0|[[35.0, 18780.0, ...|
|  1|[[26.0, 19464.0, ...|
|  1|[[17.0, 1769.0, 0...|
|  1|[[16.0, 11016.0, ...|
|  0|[[43.0, 50138.0, ...|
|  1|[[57.0, 38833.0, ...|
|  0|[[24.0, 89334.0, ...|
|  1|[[25.0, 26706.0, ...|
|  0|[[22.0, 52804.0, ...|
|  1|[[34.0, 12909.0, ...|
|  0|[[20.0, 30628.0, ...|
+---+--------------------+
only showing top 20 rows



In [None]:
import itertools
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors

df = game_records.rdd.map(lambda x: (x[0], list(itertools.chain.from_iterable(x[1])))).toDF(['y','X'])

y = df.rdd.map(lambda row:row[0])
X = df.rdd.map(lambda row:row[1])
new_df = y.zip(X.map(lambda x:Vectors.dense(x))).toDF(schema=['label','features'])
new_df.printSchema()


root
 |-- label: long (nullable = true)
 |-- features: vector (nullable = true)



In [None]:
new_df.show()

+-----+--------------------+
|label|            features|
+-----+--------------------+
|    0|[41.0,41862.0,156...|
|    0|[19.0,28151.0,740...|
|    1|[41.0,80798.0,188...|
|    1|[27.0,7891.0,3695...|
|    0|[62.0,79704.0,246...|
|    0|[51.0,20528.0,956...|
|    1|[17.0,20909.0,356...|
|    1|[69.0,50040.0,185...|
|    1|[22.0,13206.0,722...|
|    0|[35.0,18780.0,146...|
|    1|[26.0,19464.0,576...|
|    1|[17.0,1769.0,0.0,...|
|    1|[16.0,11016.0,618...|
|    0|[43.0,50138.0,224...|
|    1|[57.0,38833.0,133...|
|    0|[24.0,89334.0,250...|
|    1|[25.0,26706.0,127...|
|    0|[22.0,52804.0,980...|
|    1|[34.0,12909.0,282...|
|    0|[20.0,30628.0,105...|
+-----+--------------------+
only showing top 20 rows



In [None]:
new_df[['label']].take(20)

[Row(label=0),
 Row(label=0),
 Row(label=1),
 Row(label=1),
 Row(label=0),
 Row(label=0),
 Row(label=1),
 Row(label=1),
 Row(label=1),
 Row(label=0),
 Row(label=1),
 Row(label=1),
 Row(label=1),
 Row(label=0),
 Row(label=1),
 Row(label=0),
 Row(label=1),
 Row(label=0),
 Row(label=1),
 Row(label=0)]

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

train, test = new_df.randomSplit([0.7, 0.3], seed = 2000)
#print("Training Dataset Count: " + str(train.count()))
#print("Test Dataset Count: " + str(test.count()))

In [None]:
from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(featuresCol = 'features', labelCol = 'label', maxDepth = 2)
dtModel = dt.fit(train)
predictions = dtModel.transform(test)
predictions.show(10)

+-----+--------------------+-----------------+--------------------+----------+
|label|            features|    rawPrediction|         probability|prediction|
+-----+--------------------+-----------------+--------------------+----------+
|    0|[5.0,2141.0,1482....|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[11.0,6875.0,0.0,...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[12.0,24913.0,565...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[13.0,7800.0,0.0,...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[14.0,35719.0,808...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[17.0,37035.0,107...|  [4169.0,3438.0]|[0.54804785066386...|       0.0|
|    0|[18.0,37067.0,212...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[19.0,27236.0,714...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[19.0,28151.0,740...|[21909.0,23207.0]|[0.48561485947335...|       1.0|
|    0|[22.0,52804.0,980...|[21909.0,23207.0]|[0.485

In [None]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator = BinaryClassificationEvaluator()
print('Test Area Under ROC', evaluator.evaluate(predictions))

Test Area Under ROC 0.5208128896684295


## 2- BEST LANE FOR EACH CHAMPION

In [None]:
# select relevant columns
int_win = F.udf(lambda l: int(l=='True'), IntegerType())
df = participants_df.select('championName', 'lane', 'win').withColumn('win',int_win('win')).cache()

In [None]:
# count thedistinct number of champion occurances
count_df = df.groupby('championName').count()

In [None]:
# group by champion and lane and aggregate the sum of wins

max_df = df.groupby('championName', 'lane').agg(SUM('win').alias('win'))
max_df.show()

+------------+------+----+
|championName|  lane| win|
+------------+------+----+
|        Jhin|  NONE| 942|
|       Annie|  NONE| 230|
|       Amumu|MIDDLE| 179|
|  Cassiopeia|JUNGLE|  29|
|FiddleSticks|   TOP|  24|
|    JarvanIV|   TOP| 164|
|      Viktor|MIDDLE|1865|
|     Kindred|  NONE| 504|
|      Thresh|JUNGLE|  69|
|      Syndra|MIDDLE|1217|
|        Olaf|BOTTOM|   5|
|        Fizz|JUNGLE|  21|
|        Ornn|MIDDLE|  77|
|       Jayce|JUNGLE| 153|
| MissFortune|JUNGLE|  84|
|      Zilean|BOTTOM|1416|
|       Sivir|MIDDLE| 106|
|       Varus|   TOP|  17|
|      Veigar|JUNGLE|  15|
|      Irelia|   TOP|2222|
+------------+------+----+
only showing top 20 rows



In [None]:
#now filter out the rows with the max win sum for each champion
from pyspark.sql import Window
w = Window.partitionBy('championName')
best_lanes_df = max_df.withColumn('maxWin', F.max('win').over(w))\
    .where(F.col('win') == F.col('maxWin'))\
    .drop('maxWin')
best_lanes_df.show()

+------------+------+----+
|championName|  lane| win|
+------------+------+----+
|      RekSai|JUNGLE|1464|
|   Gangplank|   TOP|1035|
|     Kalista|BOTTOM| 537|
|        Rell|BOTTOM|1086|
|        Ahri|MIDDLE|1875|
|        Bard|BOTTOM|2105|
|    Pantheon|MIDDLE| 718|
|       Poppy|JUNGLE| 846|
|    Katarina|MIDDLE|2177|
|       Elise|JUNGLE|1059|
|      Lucian|BOTTOM|4602|
|       Urgot|   TOP|1577|
| MissFortune|BOTTOM|6626|
|    Nautilus|BOTTOM|2615|
|          Vi|JUNGLE|1639|
|        Azir|MIDDLE| 457|
|     Camille|   TOP|4471|
|      Illaoi|   TOP| 694|
|       Jayce|   TOP|1412|
|        Pyke|BOTTOM|2420|
+------------+------+----+
only showing top 20 rows



In [None]:
best_lanes_df.count()

157

In [None]:
best_lanes_df = best_lanes_df.join(count_df, 'championName', 'inner')
best_lanes_df= best_lanes_df.withColumn('winRate%', F.round(best_lanes_df.win*100/best_lanes_df['count'],2)).\
                                drop('win', 'count')
best_lanes_df.show()

+------------+------+--------+
|championName|  lane|winRate%|
+------------+------+--------+
|      RekSai|JUNGLE|   44.73|
|   Gangplank|   TOP|   31.03|
|     Kalista|BOTTOM|   33.31|
|        Rell|BOTTOM|   41.26|
|        Ahri|MIDDLE|    38.6|
|        Bard|BOTTOM|   39.96|
|    Pantheon|MIDDLE|   18.26|
|       Poppy|JUNGLE|   21.34|
|    Katarina|MIDDLE|   37.85|
|       Elise|JUNGLE|   42.79|
|      Lucian|BOTTOM|   33.67|
|       Urgot|   TOP|   37.57|
| MissFortune|BOTTOM|   41.32|
|    Nautilus|BOTTOM|   37.23|
|          Vi|JUNGLE|   43.74|
|        Azir|MIDDLE|   36.24|
|     Camille|   TOP|   39.68|
|      Illaoi|   TOP|   36.95|
|       Jayce|   TOP|   27.86|
|        Pyke|BOTTOM|   31.85|
+------------+------+--------+
only showing top 20 rows



In [None]:
# save the output
best_lanes_df.toPandas().to_csv(outputs_folder+'/extra_req_best_lane_per_champ.csv')

In [None]:
#we can also save a winrate over lane distribution without selecting the max for visuaization
dist_df = max_df.join(count_df, 'championName', 'inner')
dist_df= dist_df.withColumn('winRate%', F.round(dist_df.win*100/dist_df['count'],2)).\
                                drop('win', 'count')
dist_df.show()

+------------+------+--------+
|championName|  lane|winRate%|
+------------+------+--------+
|      RekSai|BOTTOM|    0.03|
|      RekSai|  NONE|    7.27|
|      RekSai|MIDDLE|     0.4|
|      RekSai|   TOP|    1.04|
|      RekSai|JUNGLE|   44.73|
|   Gangplank|MIDDLE|     6.8|
|   Gangplank|BOTTOM|    0.24|
|   Gangplank|   TOP|   31.03|
|   Gangplank|JUNGLE|    2.88|
|   Gangplank|  NONE|    5.19|
|     Kalista|JUNGLE|    1.74|
|     Kalista|MIDDLE|    3.04|
|     Kalista|  NONE|    5.89|
|     Kalista|BOTTOM|   33.31|
|     Kalista|   TOP|    1.74|
|        Rell|BOTTOM|   41.26|
|        Rell|MIDDLE|    3.31|
|        Rell|  NONE|    5.51|
|        Rell|JUNGLE|    0.68|
|        Rell|   TOP|    0.68|
+------------+------+--------+
only showing top 20 rows



In [None]:
#save 
best_lanes_df.toPandas().to_csv(outputs_folder+'/champ_winrate_over_lanes_disribution.csv')