#DOTA 2 ML in Spark

Read in data

In [3]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# main data
# matchData = sqlContext.sql("Select * from matchdata3_csv") 

# lookups 
#hero_names = sqlContext.sql("Select * from hero_names_csv")
#item_ids = sqlContext.sql("Select * from item_ids_csv")
#patch_dates = sqlContext.sql("Select * from dota_patch_dates_csv")

In [4]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

# recreate matchData from scratch
matches = sqlContext.sql("Select * from match_csv")
players = sqlContext.sql("Select * from players_csv")
time = sqlContext.sql("Select * from player_time_csv")

from pyspark.sql.functions import *

# join players with match
joined = matches.join(players, ['match_id'])
joined = joined.select(joined.columns[0:51])
#joined = joined.where('account_id > 0')
#joined = joined.withColumn('side', when(joined['player_slot'] < 100, "Radiant").otherwise("Dire"))

joined.count()

In [5]:
# spread by player_slot & hero_id

temp = joined.select('match_id', 'duration', 'first_blood_time', 'hero_id', 'player_slot', 'radiant_win')
pivot = (temp
         .select('match_id', 'duration', 'first_blood_time', 'hero_id', 'player_slot', 'radiant_win')
         .groupBy('match_id')
         .pivot('player_slot', ['0','1','2','3','4','128','129','130','131','132'])
         .agg(first('hero_id')))

pivot = (pivot
         .withColumnRenamed('0','t0')
         .withColumnRenamed('1','t1')
         .withColumnRenamed('2','t2')
         .withColumnRenamed('3','t3')
         .withColumnRenamed('4','t4')
         .withColumnRenamed('128','t128')
         .withColumnRenamed('129','t129')
         .withColumnRenamed('130','t130')
         .withColumnRenamed('131','t131')
         .withColumnRenamed('132','t132'))

In [6]:
# build dummy columns per side for each hero_id

string = "radiant"
for i in range(114): # 113 heroes
  pivot=pivot.withColumn(string+str(i), when(((col('t0') == i) | (col('t1') == i) | (col('t2') == i) | (col('t3') == i) | (col('t4') == i)), 1)
                         .otherwise(0))

string = "dire"
for i in range(114): # 113 heroes
  pivot=pivot.withColumn(string+str(i), when(((col('t128') == i) | (col('t129') == i) | (col('t130') == i) | (col('t131') == i) | (col('t132') == i)), 1)
                         .otherwise(0))
  
pivot.count()

In [7]:
# subset important cols
data = matches.select('match_id', 'duration', 'first_blood_time', 'radiant_win')
pivot = pivot.drop('t0','t1','t2','t3','t4','t128','t129','t130','t131','t132')

# join important cols to pivoted/spread data
matchData = data.join(pivot, ['match_id'])

matchData.count()

In [8]:
# add in data from player_time
matchData = matchData.join(time, ['match_id'])

matchData.count()

In [9]:
matchData.groupBy('match_id').count().show()
# since we get data every 60 seconds, the count here represents roughly how long each match lasted

In [10]:
# create sum columns for team gold, xp, lh
modelData = matchData.withColumn("radiant_gold", 
                                 matchData["gold_t_0"] + matchData["gold_t_1"] + matchData["gold_t_2"] + matchData["gold_t_3"] + matchData["gold_t_4"])
modelData = modelData.withColumn("radiant_xp", 
                                 matchData["xp_t_0"] + matchData["xp_t_1"] + matchData["xp_t_2"] + matchData["xp_t_3"] + matchData["xp_t_4"])
modelData = modelData.withColumn("radiant_lh", 
                                 matchData["lh_t_0"] + matchData["lh_t_1"] + matchData["lh_t_2"] + matchData["lh_t_3"] + matchData["lh_t_4"])

modelData = modelData.withColumn("dire_gold", 
                                 matchData["gold_t_128"] + matchData["gold_t_129"] + matchData["gold_t_130"] + matchData["gold_t_131"] + matchData["gold_t_132"])
modelData = modelData.withColumn("dire_xp", 
                                 matchData["xp_t_128"] + matchData["xp_t_129"] + matchData["xp_t_130"] + matchData["xp_t_131"] + matchData["xp_t_132"])
modelData = modelData.withColumn("dire_lh", 
                                 matchData["lh_t_128"] + matchData["lh_t_129"] + matchData["lh_t_130"] + matchData["lh_t_131"] + matchData["lh_t_132"])

In [11]:
# create differential columns for gold, xp, lh
modelData = modelData.withColumn("golddiff", modelData["radiant_gold"] - modelData["dire_gold"])
modelData = modelData.withColumn("xpdiff", modelData["radiant_xp"] - modelData["dire_xp"])
modelData = modelData.withColumn("lhdiff", modelData["radiant_lh"] - modelData["dire_lh"])

In [12]:
# recode 'times' to float
modelData = modelData.withColumn('times', modelData.times.cast('float'))
#modelData.dtypes

In [13]:
# drop unnecessary / old columns
modelData = modelData.drop("match_id","gold_t_0", "gold_t_1", "gold_t_2", "gold_t_3", "gold_t_4", 
                           "xp_t_0", "xp_t_1", "xp_t_2", "xp_t_3", "xp_t_4",
                           "lh_t_0", "lh_t_1", "lh_t_2", "lh_t_3", "lh_t_4",
                           "gold_t_128", "gold_t_129", "gold_t_130", "gold_t_131", "gold_t_132",
                           "xp_t_128", "xp_t_129", "xp_t_130", "xp_t_131", "xp_t_132",
                           "lh_t_128", "lh_t_129", "lh_t_130", "lh_t_131", "lh_t_132")

In [14]:
print(modelData.columns)

### Research Question: What wins?  
* Do heroes matter?  
* Do items matter?  
* Does KDA matter?  
* Pattern Frequency - draft predictions --> https://spark.apache.org/docs/2.2.0/ml-frequent-pattern-mining.html

## Inputs
* Heroes (dummy variables)
* per-minute gold (team sum)
* per-minute xp (team sum)
* per-minute last-hits (team sum)
* final result (coded as 'radiant win')

Logistic regression predicting (radiant-side) win probability given a timestamp and gold, xp, lh
* this means we may have to subset by timestamp(?)

__TASK 1:__ Read the dataset and Encode the class column to 0 and 1

In [18]:
# define dependent variable
target = 'radiant_win'

In [19]:
# recode 'radiant_win' into 0/1
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf # UDF - user defined function
from pyspark.sql.functions import *

# create user defined function that codes as 1 if "CDK" and 0 otherwise; cast as type integer
get_01_label = udf(lambda x: 1 if x == 'True' else 0, IntegerType())

# withColumn("new_name", function(old_df["old_name"]))
modelData = modelData.withColumn("radiant_win", get_01_label(modelData["radiant_win"]))


In [20]:
print(modelData.columns)

In [21]:
# premade onehot encoded heroes
rhero_input = [col for col in modelData.columns if ('radiant') in col]
dhero_input = [col for col in modelData.columns if ('dire') in col]

hero_input = rhero_input + dhero_input
hero_input

In [22]:
rnonhero_input = [col for col in modelData.columns if ('radiant_') in col]
dnonhero_input = [col for col in modelData.columns if ('dire_') in col]

nonhero_input = rnonhero_input + dnonhero_input
nonhero_input

In [23]:
# premade onehot encoded heroes
hero_input = list(set(hero_input)-set(nonhero_input))
hero_input

In [24]:
# categorical variables
dtypes = modelData.dtypes
cat_input = []
for i in range(0, len(modelData.columns)):
   if dtypes[i][1] == 'string':      # if data type is string
      cat_input.append(dtypes[i][0])  # append to list of categorical variables
cat_input = list(set(cat_input)-set(target))   # returns elements that exist in list A that are NOT in list B (remove dependent variable from list)
   #
cat_input

In [25]:
# numerical variables
num_input = list(set(modelData.columns) - set([target]) - set(cat_input) - set(hero_input))
num_input

__TASK 2:__ Do some visualizations to summarize your dataset

In [27]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns  #ImportError: cannot import name '_to_unmasked_float_array'


In [28]:
# take a sample of the total data, convert to pandas df for plotting
modelData_pd = modelData.sample(False, .25, 12345).toPandas()

In [29]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# calculate the correlation matrix
corr = modelData_pd.corr()

# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(20, 20))

# plot the heatmap
rc={'font.size': 18, 'axes.labelsize': 18, 'legend.fontsize': 18.0, 
    'axes.titlesize': 18, 'xtick.labelsize': 12, 'ytick.labelsize': 12}

sns.set(rc=rc)
sns.heatmap(corr, 
            xticklabels=corr.columns.values,
            yticklabels=corr.columns.values)


display(f)

In [30]:
# subset data
wins = modelData_pd.loc[modelData_pd['radiant_win'] == 1]
loses = modelData_pd.loc[modelData_pd['radiant_win'] == 0]

In [31]:
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(5, 4))

rc = {'font.size': 18}
sns.set(font_scale=1, rc=rc)

# density plots
sns.kdeplot(wins['golddiff'], ax=ax)
sns.kdeplot(loses['golddiff'], ax=ax)

# replace labels
#new_labels = ['radiant_win', 'radiant_lose']
#for t, l in zip(g._legend.texts, new_labels): t.set_text(l)

display(f)

In [32]:
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(6, 3))

rc = {'font.size': 18}
sns.set(font_scale=1, rc=rc)

# density plots
sns.kdeplot(wins['xpdiff'], ax=ax)
sns.kdeplot(loses['golddiff'], ax=ax)

# replace labels
#new_labels = ['radiant_win', 'radiant_lose']
#for t, l in zip(g._legend.texts, new_labels): t.set_text(l)
  
display(f)

In [33]:
# Set up the matplotlib figure
f, ax = plt.subplots(figsize=(5, 4))

rc = {'font.size': 18}
sns.set(font_scale=1, rc=rc)

# density plots
sns.kdeplot(wins['lhdiff'], ax=ax)
sns.kdeplot(loses['lhdiff'], ax=ax)

# replace labels
#new_labels = ['radiant_win', 'radiant_lose']
#for t, l in zip(g._legend.texts, new_labels): t.set_text(l)
  
display(f)

__TASK 3:__ Write a transfomation class and its functions for replacing numerical missing values with the median. (You will write a trasformer class). Create the transformers for numerical variables using the transformation class you have just written

In [35]:
# impute missing values
from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col

# class ClassName (SuperClass, SuperClass, ...)
# inherits functions from SuperClasses
class NumericImputer(Transformer, HasInputCol, HasOutputCol):

      # 'self' refers to future objects created using this class
      def __init__(self, inputCol=None, outputCol=None):
            super(NumericImputer, self).__init__() # creates the NumericImputer object
            self.setParams(inputCol = inputCol , outputCol = outputCol) #from Transformer superclass
            # .setParams(Transformer.inputCol = NumericImputer.inputCol, ...)

      # typically have 'setters' and 'getters
      def setParams(self, inputCol=None, outputCol=None):
         return self._set(inputCol = inputCol, outputCol = outputCol)
            

      # update _transform any time we need a custom transformer!
      def _transform(self, dataset):
         from pyspark.sql.functions import when   
         in_col = self.getInputCol()
         out_col = self.getOutputCol()
         median_v = dataset.approxQuantile(in_col, [0.5], 0)[0] # get the median of the input column
         #dataset = dataset.fillna(median_v, subset=in_col)
         return dataset.withColumn(out_col, when(col(in_col).isNull(), median_v).otherwise(col(in_col))) # replace null with median, otherwise bring through input

In [36]:
numericimputers = [NumericImputer( inputCol = column, outputCol = column) for column in num_input]
# run imputer transform for all variables in our list of numerical input variables 'num_input'

__TASK 4:__ Write a transfomation class and its functions for replacing categorical missing values with the mode. (You will write a trasformer class). Create the transformers for categorical variables using the transformation class you have just written

In [38]:
# impute missing values
from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import col

# class ClassName (SuperClass, SuperClass, ...)
# inherits functions from SuperClasses
class CategoricalImputer(Transformer, HasInputCol, HasOutputCol):

      # 'self' refers to future objects created using this class
      def __init__(self, inputCol=None, outputCol=None):
            super(CategoricalImputer, self).__init__() # creates the NumericImputer object
            self.setParams(inputCol = inputCol , outputCol = outputCol) #from Transformer superclass
            # .setParams(Transformer.inputCol = NumericImputer.inputCol, ...)

      # typically have 'setters' and 'getters
      def setParams(self, inputCol=None, outputCol=None):
         return self._set(inputCol = inputCol, outputCol = outputCol)
            

      # update _transform any time we need a custom transformer!
      def _transform(self, dataset):
         from pyspark.sql.functions import when   
         in_col = self.getInputCol()
         out_col = self.getOutputCol()
         
         # group by the unique values in in_col; count; return most frequent
         mode_v = dataset.select(in_col).groupBy(col(in_col)).count().sort(col('count').asc()).collect().pop()[0]
         return dataset.withColumn(out_col, when(col(in_col).isNull(), mode_v).otherwise(col(in_col))) 
         # replace null with mode, otherwise bring through input

In [39]:
categoricalimputers = [CategoricalImputer( inputCol = column, outputCol = column) for column in cat_input]
# run imputer transform for all variables in our list of categorical input variables 'cat_input'

__TASK 5:__ Write a transfomation class and its functions for normalizing numerical data. The _transform function should normalize your column using the feature scaling method: (x-xmin)/(xmax-xmin)

Create the transformers for numerical variables using the transformation class you have just written

In [41]:
# min-max scale a dataset

from pyspark import keyword_only  ## < 2.0 -> pyspark.ml.util.keyword_only
from pyspark.ml import Transformer
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType


class MinMaxStandardizer(Transformer, HasInputCol, HasOutputCol):

      def __init__(self, inputCol=None, outputCol=None):
            super(MinMaxStandardizer, self).__init__()
            self.setParams(inputCol = inputCol , outputCol = outputCol)

            
            
      def setParams(self, inputCol=None, outputCol=None):
         return self._set(inputCol = inputCol, outputCol = outputCol)
            

      def _transform(self, dataset):
         from pyspark.sql.functions import stddev, mean, col
         in_col = dataset[self.getInputCol()]
         out_col = self.getOutputCol()
         # update below to tweak
         minimum, maximum = dataset.select(F.min(in_col),  F.max(in_col)).first()
         return dataset.withColumn(out_col, (in_col - minimum)/(maximum-minimum))    
      
      
# replicate transformation class; tweak to use (x-min)/range (or use minmax scaler)
# create transformer for each numerical varialbe

In [42]:
# run scaler to transform all columns in num_input
minmax = [MinMaxStandardizer( inputCol = column, outputCol = column+"_standardized") for column in num_input]

#from pyspark.ml.feature import MinMaxScaler
#minmax = [MinMaxScaler( inputCol = column, outputCol = column+"_standardized") for column in num_input]


__TASK 6:__ Create the transformers for encoding dummy variables

In [44]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoder

### for categorical
indexers = [StringIndexer(inputCol = column, outputCol = column+"_index") for column in cat_input] 
   # factorize: replace levels of a categorical variable w/ a number
   # most frequent level is 0, 2nd most frequent is 1, etc...

encoders = [OneHotEncoder(inputCol = column+"_index", outputCol = column+"_dummy") for column in cat_input] 
   # create binary vector out of category codes (if not ordinal)


In [45]:
input_cols = []
for i in cat_input:
   input_cols.append(i+"_dummy")  # rename in input_cols because we did index/OneHotEncoder and renamed the columns, otherwise it'll use the original categoricals
for i in num_input:
   input_cols.append(i+"_standardized") # rename in input_cols because we imputed and standardized and renamed the columns, otherwise it'll use original numerics
input_cols

In [46]:
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols= input_cols, outputCol="features")

__TASK 7:__ Create the transformer for logistic regression (or decision tree or random forest)

In [48]:
from pyspark.ml.classification import LogisticRegression

lr = LogisticRegression(featuresCol = 'features', 
                                    labelCol = target , 
                                    maxIter=10) # use grid search to optimize params


__Task 8:__ Combine the transfomers and create the pipeline and fit the pipeline.

In [50]:
### staging area
# this is all python, not pyspark or spark
import functools 
import operator

stages = []
stages = functools.reduce(operator.concat, [numericimputers, 
                                            minmax,
                                            categoricalimputers,
                                            indexers, 
                                            encoders]) # could run this above
stages.append(assembler) # could run this post assembler
stages.append(lr) # could run this post-regression
stages

In [51]:
### staging area
# this is all python, not pyspark or spark
import functools 
import operator

# stages without imputers (because we don't need imputers for our data)
stages = []
stages = functools.reduce(operator.concat, [minmax,
                                            indexers, 
                                            encoders]) # could run this above
stages.append(assembler) # could run this post assembler
stages.append(lr) # could run this post-regression
stages
# if you look, we have a ton of MinMaxStandardizers - I think this is because we're standardizing the dummy variables we created for the heroes - 
# shouldn't change the data, but might add to the runtime

In [52]:
### simplified data
rdd = modelData.rdd

train, test = rdd.randomSplit([.25,.75])

traindf = train.toDF()
testdf = test.toDF()

from pyspark.ml import Pipeline

# run the pipeline and fit the model on training data
pipeline = Pipeline(stages=stages)  # create the pipeline
model = pipeline.fit(traindf)# fit the data . transform the data

In [53]:
# use our model object to predict win probabilities for training and testing sets
trainOut = model.transform(traindf)
testOut = model.transform(testdf)

__Task 9:__ Performance checks
* Write code to calculate accuracy rate, false positive rate, false negative rate
* Write code to visualize and display ROC (this is called ROC) (can use mathplotlib or seaborne in python to viz)

In [55]:
# classification evaluation metrics
eval = trainOut.select('radiant_win','prediction')

TP = eval[(eval['radiant_win'] == 1) & (eval['prediction'] == 1)].count()
FP = eval[(eval['radiant_win'] == 0) & (eval['prediction'] == 1)].count()
TN = eval[(eval['radiant_win'] == 0) & (eval['prediction'] == 0)].count()
FN = eval[(eval['radiant_win'] == 1) & (eval['prediction'] == 0)].count()

print("True Positives:", TP)
print("False Positives:", FP)
print("True Negatives:", TN)
print("False Negatives:", FN)
print("Total:", eval.count())

# accuracy rate, false positive rate, false negative rate
A = float(TP + TN) / (TP + TN + FP + FN)
print("accuracy:", A)

P = float(TP) / (TP + FP)
print("precision:", P)

R = float(TP) / (TP + FN)
print("recall(TPR):", R)

S = float(TN) / (TN + FP)
print("specificity(TNR):", S)

N = float(FN) / (TP + FN)
print("FNR:", N)

F = float(FP) / (FP + TN)
print("FPR:", F)

In [56]:
# classification evaluation metrics
eval = testOut.select('radiant_win','prediction')

TP = eval[(eval['radiant_win'] == 1) & (eval['prediction'] == 1)].count()
FP = eval[(eval['radiant_win'] == 0) & (eval['prediction'] == 1)].count()
TN = eval[(eval['radiant_win'] == 0) & (eval['prediction'] == 0)].count()
FN = eval[(eval['radiant_win'] == 1) & (eval['prediction'] == 0)].count()

print("True Positives:", TP)
print("False Positives:", FP)
print("True Negatives:", TN)
print("False Negatives:", FN)
print("Total:", eval.count())

# accuracy rate, false positive rate, false negative rate
A = float(TP + TN) / (TP + TN + FP + FN)
print("accuracy:", A)

P = float(TP) / (TP + FP)
print("precision:", P)

R = float(TP) / (TP + FN)
print("recall(TPR):", R)

S = float(TN) / (TN + FP)
print("specificity(TNR):", S)

N = float(FN) / (TP + FN)
print("FNR:", N)

F = float(FP) / (FP + TN)
print("FPR:", F)

In [57]:
# calculate the fpr and tpr for all thresholds of the classification
import numpy as np
import pandas as pd
from sklearn import metrics

rocauc = eval.toPandas()

fpr, tpr, thresholds = metrics.roc_curve(rocauc['radiant_win'], rocauc['prediction'])
roc_auc = metrics.auc(fpr, tpr)

# ROC
import matplotlib.pyplot as plt

f, ax = plt.subplots(figsize=(5, 5))

plt.title('Receiver Operating Characteristic')
plt.plot(fpr, tpr, 'b', label = 'AUC = %0.2f' % roc_auc)
plt.legend(loc = 'lower right')
plt.plot([0, 1], [0, 1],'r--')
plt.xlim([0, 1])
plt.ylim([0, 1])
plt.ylabel('True Positive Rate')
plt.xlabel('False Positive Rate')
#plt.show()
display(f)

In [58]:
trainOut.select('prediction').show()