# Ejemplo de transformación, limpieza y procesado de conjuntos de datos

In [2]:
games_df = spark.read.csv('../data/games.csv', header=True, sep=',', inferSchema=True)
games_red_df = games_df.selectExpr('winner',
                                   'gameDuration as duration',
                                   'firstBlood',
                                   'firstTower',
                                   'firstInhibitor',
                                   'firstBaron',
                                   'firstDragon',
                                   'firstRiftHerald',
                                   't1_champ1id',
                                   't1_champ2id',
                                   't1_champ3id',
                                   't1_champ4id',
                                   't1_champ5id',
                                   't1_towerKills',
                                   't1_inhibitorKills',
                                   't1_baronKills',
                                   't1_dragonKills',
                                   't1_riftHeraldKills',
                                   't2_champ1id',
                                   't2_champ2id',
                                   't2_champ3id',
                                   't2_champ4id',
                                   't2_champ5id',
                                   't2_towerKills',
                                   't2_inhibitorKills',
                                   't2_baronKills',
                                   't2_dragonKills',
                                   't2_riftHeraldKills')

In [3]:
from pyspark.sql.functions import expr
from pyspark.ml.feature import CountVectorizer
new_column_expression = expr('split( concat_ws( "," ,t1_champ1id, t1_champ2id, t1_champ3id, t1_champ4id, t1_champ5id), "," )' )
games_red_cv_df = games_red_df.withColumn('t1_members_str',new_column_expression).drop('t1_champ1id', 't1_champ2id', 't1_champ3id', 't1_champ4id', 't1_champ5id')
cv = CountVectorizer(inputCol='t1_members_str', outputCol='t1_members')
model = cv.fit(games_red_cv_df)
games_red_cv_df=model.transform(games_red_cv_df).drop('t1_members_str')

In [4]:
from pyspark.sql.functions import expr
new_column_expression = expr('split( concat_ws( "," ,t2_champ1id, t2_champ2id, t2_champ3id, t2_champ4id, t2_champ5id), "," )' )
games_red_cv_df = games_red_cv_df.withColumn('t2_members_str',new_column_expression).drop('t2_champ1id', 't2_champ2id', 't2_champ3id', 't2_champ4id', 't2_champ5id')
cv = CountVectorizer(inputCol='t2_members_str', outputCol='t2_members')
model = cv.fit(games_red_cv_df)
games_red_cv_df=model.transform(games_red_cv_df).drop('t2_members_str')

In [5]:
from pyspark.ml.feature import Binarizer
from pyspark.sql.types import DoubleType
games_red_cv_df = games_red_cv_df.withColumn('winner', games_red_cv_df.winner.cast(DoubleType()))
transformer=Binarizer(inputCol='winner', outputCol='winner_b', threshold=1)
games_red_cv_df=transformer.transform(games_red_cv_df).drop('winner')

In [6]:
from pyspark.ml.feature import OneHotEncoderEstimator
columns = ['firstBlood', 'firstTower', 'firstInhibitor', 'firstBaron', 'firstDragon', 'firstRiftHerald']
new_columns = [ 'b_firstBlood', 'b_firstTower', 'b_firstInhibitor', 'b_firstBaron', 'b_firstDragon', 'b_firstRiftHerald' ]
model = OneHotEncoderEstimator(inputCols=columns, outputCols=new_columns,dropLast=False)
transformer=model.fit(games_red_cv_df)
games_red_cv_df=transformer.transform(games_red_cv_df)
for column in columns :
    games_red_cv_df = games_red_cv_df.drop(column)

In [7]:
from pyspark.ml.feature import StandardScaler, VectorAssembler
columns = ["duration", "t1_towerKills", "t1_inhibitorKills", "t1_baronKills", "t1_dragonKills", "t1_riftHeraldKills",
          "t2_towerKills", "t2_inhibitorKills", "t2_baronKills", "t2_dragonKills", "t2_riftHeraldKills"]
assembler = VectorAssembler(inputCols=columns, outputCol="assembledColumns")
games_red_cv_df=assembler.transform(games_red_cv_df)
model = StandardScaler(inputCol="assembledColumns", outputCol="standardColumns", withStd=True, withMean=True)
transformer = model.fit(games_red_cv_df)
games_red_cv_df=transformer.transform(games_red_cv_df)

In [8]:
columns = ['t1_members', 't2_members', 'b_firstBlood', 'b_firstBaron', 'b_firstDragon', 'b_firstInhibitor', 'b_firstRiftHerald', 'b_firstTower', 'standardColumns']
assembler = VectorAssembler(inputCols=columns, outputCol='features')
games_red_cv_df = assembler.transform(games_red_cv_df)
dataset = games_red_cv_df.selectExpr('winner_b as label', 'features')

In [9]:
from pyspark.ml.feature import PCA
model = PCA(inputCol='features', outputCol='red_features',k=50)
transformer = model.fit(dataset)
red_dataset=transformer.transform(dataset).drop('features')

# Ejemplo de entrenamiento con Pipelines

In [10]:
from pyspark.ml.classification import LogisticRegression
[train_df, test_df]=games_red_df.randomSplit([0.7, 0.3])
train_df = train_df.withColumn('winner', train_df.winner.cast(DoubleType()))
test_df = test_df.withColumn('winner', test_df.winner.cast(DoubleType()))
new_column_expression = expr('split( concat_ws( "," ,t1_champ1id, t1_champ2id, t1_champ3id, t1_champ4id, t1_champ5id), "," )' )
train_df = train_df.withColumn('t1_members_str',new_column_expression)
test_df = test_df.withColumn('t1_members_str', new_column_expression)
new_column_expression = expr('split( concat_ws( "," ,t2_champ1id, t2_champ2id, t2_champ3id, t2_champ4id, t2_champ5id), "," )' )
train_df = train_df.withColumn('t2_members_str',new_column_expression)
test_df = test_df.withColumn('t2_members_str', new_column_expression)
cv1 = CountVectorizer(inputCol='t1_members_str', outputCol='t1_members')
cv2 = CountVectorizer(inputCol='t2_members_str', outputCol='t2_members')
binarizer=Binarizer(inputCol='winner', outputCol='winner_b', threshold=1)
columns = ['firstBlood', 'firstTower', 'firstInhibitor', 'firstBaron', 'firstDragon', 'firstRiftHerald']
new_columns = [ 'b_firstBlood', 'b_firstTower', 'b_firstInhibitor', 'b_firstBaron', 'b_firstDragon', 'b_firstRiftHerald' ]
ohe = OneHotEncoderEstimator(inputCols=columns, outputCols=new_columns,dropLast=False)
columns = ["duration", "t1_towerKills", "t1_inhibitorKills", "t1_baronKills", "t1_dragonKills", "t1_riftHeraldKills",
          "t2_towerKills", "t2_inhibitorKills", "t2_baronKills", "t2_dragonKills", "t2_riftHeraldKills"]
assembler1 = VectorAssembler(inputCols=columns, outputCol="assembledColumns")
scaler = StandardScaler(inputCol="assembledColumns", outputCol="standardColumns", withStd=True, withMean=True)
columns = ['t1_members', 't2_members', 'b_firstBlood', 'b_firstBaron', 'b_firstDragon', 'b_firstInhibitor', 'b_firstRiftHerald', 'b_firstTower', 'standardColumns']
assembler2 = VectorAssembler(inputCols=columns, outputCol='features')
pca = PCA(inputCol='features', outputCol='red_features')
logistic = LogisticRegression(labelCol='winner_b',featuresCol='red_features')

In [11]:
import datetime 
from pyspark.ml.tuning import ParamGridBuilder


params = ParamGridBuilder().addGrid(logistic.elasticNetParam, [0, 0.33]).\
    addGrid(pca.k, [50, 100]).addGrid(logistic.regParam, [0.1, 0.33]).build()
from pyspark.ml.tuning import CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline
evaluator = MulticlassClassificationEvaluator(metricName='accuracy', predictionCol='prediction', labelCol='winner_b')
pipeline = Pipeline().setStages([cv1, cv2, binarizer, ohe, assembler1, scaler, assembler2, pca, logistic])
#Cambiamos el numero del folds a 5
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(params).setNumFolds(5)
#Creamos una variable que guarda la hora en la que empieza la ejecución
now=datetime.datetime.now()
model = cv.fit(train_df)
#Guardamos otra variable con la hora a la que acaba
after=datetime.datetime.now()
for (result, config) in zip(model.avgMetrics, params) :
    print(result, config[pca.k], config[logistic.regParam], config[logistic.elasticNetParam])
evaluator.evaluate(model.transform(test_df))
#Mostramos la diferencia de tiemnpo entre que empezo y acabo
print('Tiempo de ejecucion----->',after-now)

0.9601451366940987 50 0.1 0.0
0.867753558224236 50 0.33 0.0
0.9600618424306799 100 0.1 0.0
0.9587861216949627 100 0.33 0.0
0.8592288152762125 50 0.1 0.33
0.8433579171399568 50 0.33 0.33
0.9482952576647744 100 0.1 0.33
0.8433579171399568 100 0.33 0.33
Tiempo de ejecucion-----> 0:01:28.729545


In [12]:
#Hacemos lo mismo pero cambiamos los parametros pedidos
params = ParamGridBuilder().addGrid(logistic.elasticNetParam, [0, 0.33,0.66,1]).\
    addGrid(pca.k, [50, 100]).addGrid(logistic.regParam, [0.1, 0.33,1,5,9]).addGrid(logistic.maxIter,[30,50,80]).build()

evaluator = MulticlassClassificationEvaluator(metricName='accuracy', predictionCol='prediction', labelCol='winner_b')
pipeline = Pipeline().setStages([cv1, cv2, binarizer, ohe, assembler1, scaler, assembler2, pca, logistic])
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(params).setNumFolds(3)
now=datetime.datetime.now()
model = cv.fit(train_df)
after=datetime.datetime.now()
for (result, config) in zip(model.avgMetrics, params) :
    print(result, config[pca.k], config[logistic.regParam], config[logistic.elasticNetParam])
evaluator.evaluate(model.transform(test_df))
print('Tiempo de ejecucion----->',after-now)

0.9600593200732261 50 0.1 0.0
0.9600593200732261 50 0.1 0.0
0.9600593200732261 50 0.1 0.0
0.9589229406334621 50 0.33 0.0
0.9589229406334621 50 0.33 0.0
0.9589229406334621 50 0.33 0.0
0.9578712505151477 50 1.0 0.0
0.9578712505151477 50 1.0 0.0
0.9578712505151477 50 1.0 0.0
0.9423054288385706 50 5.0 0.0
0.9423054288385706 50 5.0 0.0
0.9423054288385706 50 5.0 0.0
0.9105710962821523 50 9.0 0.0
0.9105710962821523 50 9.0 0.0
0.9105710962821523 50 9.0 0.0
0.9595047744156031 100 0.1 0.0
0.9595047744156031 100 0.1 0.0
0.9595047744156031 100 0.1 0.0
0.958421654712782 100 0.33 0.0
0.958421654712782 100 0.33 0.0
0.958421654712782 100 0.33 0.0
0.9581732884458649 100 1.0 0.0
0.9581732884458649 100 1.0 0.0
0.9581732884458649 100 1.0 0.0
0.9436092729568658 100 5.0 0.0
0.9436092729568658 100 5.0 0.0
0.9436092729568658 100 5.0 0.0
0.9133710893127598 100 9.0 0.0
0.9133710893127598 100 9.0 0.0
0.9133710893127598 100 9.0 0.0
0.9480431586926086 50 0.1 0.33
0.9480431586926086 50 0.1 0.33
0.9480431586926086 5

In [13]:
#LLamamos las librerias que necesitamos
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Llamamos al arbol de regresion
rf = RandomForestClassifier(labelCol="winner_b", featuresCol="red_features")
#Tocamos los parametros y procedemos de la misma manera que antes
params = ParamGridBuilder().addGrid(rf.numTrees, [5,20,70]).\
    addGrid(pca.k, [50, 100]).addGrid(rf.maxDepth, [2, 5,8]).build()
evaluator = MulticlassClassificationEvaluator(
    labelCol="winner_b", predictionCol="prediction", metricName="accuracy")
pipeline = Pipeline().setStages([cv1, cv2, binarizer, ohe, assembler1, scaler, assembler2, pca, rf])
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(params).setNumFolds(3)
now=datetime.datetime.now()
model = cv.fit(train_df)
after=datetime.datetime.now()
for (result, config) in zip(model.avgMetrics, params) :
    print(result, config[pca.k], config[rf.numTrees], config[rf.maxDepth])
evaluator.evaluate(model.transform(test_df))
print('Tiempo de ejecucion----->',after-now)

0.929752356104805 50 5 2
0.9363003608001947 50 5 5
0.943877454259827 50 5 8
0.9370229800245051 100 5 2
0.9358810495948189 100 5 5
0.9377393172529309 100 5 8
0.9293899936336554 50 20 2
0.9358604957588034 50 20 5
0.9474313453917418 50 20 8
0.9306932519056448 100 20 2
0.9364962158301287 100 20 5
0.9409948076497694 100 20 8
0.9306092527141347 50 70 2
0.9379646414791272 50 70 5
0.9473176798725833 50 70 8
0.931886347960254 100 70 2
0.9379943426246831 100 70 5
0.9438843913976569 100 70 8
Tiempo de ejecucion-----> 0:03:05.715760


0.929752356104805 50 5 2
0.9363003608001947 50 5 5
0.94393271509095 50 5 8
0.9370229800245051 100 5 2
0.9358810495948189 100 5 5
0.9376280763607789 100 5 8
0.9293899936336554 50 20 2
0.9358604957588034 50 20 5
0.9467082795927542 50 20 8
0.9306932519056448 100 20 2
0.9365796464992426 100 20 5
0.9414632660619782 100 20 8
0.9305539918830119 50 70 2
0.9379646414791272 50 70 5
0.9473176798725833 50 70 8
0.931886347960254 100 70 2
0.9376904080535071 100 70 5
0.9438843913976569 100 70 8
Tiempo de ejecucion-----> 0:03:07.951911


In [21]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Llamamos al modelo que vamos a usar. Cambiamos el nombre de la columna features porque quitaremos PCA
GBT = GBTClassifier(labelCol="winner_b", featuresCol="features")
#Usamos los parametros pedidos y quitamos PCA. Procedemos igual que en los casos anteriores
params = ParamGridBuilder().addGrid(GBT.maxIter, [30,70]).\
    addGrid(GBT.maxDepth, [3,9]).build()
evaluator = MulticlassClassificationEvaluator(
    labelCol="winner_b", predictionCol="prediction", metricName="accuracy")
pipeline = Pipeline().setStages([cv1, cv2, binarizer, ohe, assembler1, scaler, assembler2, GBT])
cv = CrossValidator().setEstimator(pipeline).setEvaluator(evaluator).setEstimatorParamMaps(params).setNumFolds(3)
now=datetime.datetime.now()
model = cv.fit(train_df)
after=datetime.datetime.now()
for (result, config) in zip(model.avgMetrics, params) :
    print(result, config[GBT.maxIter], config[GBT.maxDepth])
evaluator.evaluate(model.transform(test_df))
print('Tiempo de ejecucion----->',after-now)

0.9651919556670581 30 3
0.9638086948440654 30 9
0.9677759633645426 70 3
0.9650017735747647 70 9
Tiempo de ejecucion-----> 0:07:58.075424


In [13]:
evaluator.evaluate(model.transform(test_df))

0.9630370274803252

In [15]:
from datetime 
delta = timedelta(seconds=10)
print(delta)

0:00:10


In [None]:
#TIEMPOS MEDIOS
#Regresion logistica 1: 11s
#Regresion logistica 2: 5s
#Árbol: 10s
#GBT: 25s
#La configuracion más extosa es la del modelo GBT con 70 iteraciones maximas y 3 de profundidad
#No parece merecer mucho la pena invertir casi el doble de tiempo para obtener solo unas decimas más de precision
#de lo que se obtiene con regresiones como la logistica que son ucho más rapidas