## Machine Learning sobre el dataset del fútbol Europeo

Para poder ejecutar este notebook se requiere haber procesado la información y generado la base de datos y la tabla correspondiente con el primer notebook.

In [2]:
%sql USE EURO_LEAGUE_DB

In [3]:
%sql DESCRIBE GAME_EVENTS

col_name,data_type,comment
id_odsp,string,
id_event,string,
sort_order,int,
time,int,
event_type,int,
event_type_str,string,
event_type2,int,
event_type2_str,string,
side,int,
side_str,string,


In [4]:
%sql SELECT * FROM GAME_EVENTS limit 30

id_odsp,id_event,sort_order,time,event_type,event_type_str,event_type2,event_type2_str,side,side_str,event_team,opponent,player,player2,player_in,player_out,shot_place,shot_place_str,shot_outcome,shot_outcome_str,is_goal,location,location_str,bodypart,bodypart_str,assist_method,assist_method_str,situation,situation_str,country_code,time_bin
UFot0hit/,UFot0hit1,1,2,1,Attempt,12,Key Pass,2,Away,Hamburg SV,Borussia Dortmund,mladen petric,gokhan tore,,,6,High and wide,2,Off target,0,9,Left side of the box,2,Left foot,1,Pass,1,Open play,DEU,0.0
UFot0hit/,UFot0hit2,2,4,2,Corner,99,,1,Home,Borussia Dortmund,Hamburg SV,dennis diekmeier,dennis diekmeier,,,99,,99,,0,99,,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit3,3,4,2,Corner,99,,1,Home,Borussia Dortmund,Hamburg SV,heiko westermann,heiko westermann,,,99,,99,,0,99,,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit4,4,7,3,Foul,99,,1,Home,Borussia Dortmund,Hamburg SV,sven bender,,,,99,,99,,0,99,,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit5,5,7,8,Free kick won,99,,2,Away,Hamburg SV,Borussia Dortmund,gokhan tore,,,,99,,99,,0,2,Defensive half,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit6,6,9,10,Hand ball,99,,2,Away,Hamburg SV,Borussia Dortmund,jose paolo guerrero,,,,99,,99,,0,99,,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit7,7,10,2,Corner,99,,2,Away,Hamburg SV,Borussia Dortmund,lukasz piszczek,lukasz piszczek,,,99,,99,,0,99,,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit8,8,11,8,Free kick won,99,,1,Home,Borussia Dortmund,Hamburg SV,chris lowe,,,,99,,99,,0,2,Defensive half,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit9,9,11,3,Foul,99,,2,Away,Hamburg SV,Borussia Dortmund,gojko kacar,,,,99,,99,,0,99,,99,,0,,99,,DEU,0.0
UFot0hit/,UFot0hit10,10,13,3,Foul,99,,2,Away,Hamburg SV,Borussia Dortmund,gokhan tore,,,,99,,99,,0,99,,99,,0,,99,,DEU,1.0


## Ejercicio:

Clusterizar los eventos utilizando KMeans usando al menos 3 features basadas en las columnas de la tabla GAME_EVENTS.
Evaluar la clusterización utilizando la métrica de Silhouette y elegir el mejor K entre 2 y 12.
Graficar la clusterización resultante utilizando distintos gráficos que muestren la combinación de features. (2 pto)

https://spark.apache.org/docs/latest/ml-clustering.html

https://stackoverflow.com/questions/47585723/kmeans-clustering-in-pyspark

In [6]:
# Estudiar cual es el mejor valor de K usando al menos 3 features basadas en las columnas de la tabla GAME_EVENTS (utilizando el dataframe de la tabla con la tecnica Silhouette y elegir el mejor K entre 2 y 12)

df = sqlContext.table("GAME_EVENTS")

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator

features = ('time','event_type','side') 
vecAssembler = VectorAssembler(inputCols=features,outputCol="features")
new_df = vecAssembler.transform(df)

new_df = new_df.select('time','event_type','side','features')

# Ahora si evaluemos con la tecnica de Silhouette cual es el K mas optimo

for k in range(2,13):
  kmeans = KMeans().setK(k).setSeed(1)
  model = kmeans.fit(new_df)

# Make predictions
  predictions = model.transform(new_df)

# Evaluate clustering by computing Silhouette score
  evaluator = ClusteringEvaluator()

  silhouette = evaluator.evaluate(predictions)
  print(str(k) + "       |", silhouette)

In [7]:
# como se vio a medida que se incrementa el K_value el valor de silhouette es menor con lo cual:
# Si este número es negativo, los datos no se pueden separar en absoluto.
# Los valores más cercanos a 1 indican la separación máxima.
# Los valores cercanos a cero significan que los datos apenas pueden separarse.

# Por lo visto el K mas optimo resulta ser K_value = 2

# Graficamos la clusterizacion con K_value = 2 (valor mas optimo)
  

  
kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

# make predictions 
transformed = model.transform(new_df)
df_final = transformed.select("time","event_type","side","prediction")

display(df_final.sample(fraction = 0.05))


time,event_type,side,prediction
11,8,1,0
17,1,1,0
19,1,1,0
36,8,2,0
49,1,1,1
1,3,2,0
3,3,2,0
6,8,1,0
7,8,2,0
19,8,1,0


## Ejercicio: GBT Classifier
A continuación vamos a utilizar [Gradient-boosted tree](https://spark.apache.org/docs/2.3.0/ml-classification-regression.html#gradient-boosted-tree-classifier) para fitear un modelo y predecir la combinación de condiciones de un evento que pueden llevar a un gol.

In [9]:
gameEventsDf = spark.sql("select event_type_str, event_team, shot_place_str, location_str, assist_method_str, situation_str, country_code, is_goal from game_events")

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

In [11]:
categFeatures = ["event_type_str", "event_team", "shot_place_str", "location_str", "assist_method_str", "situation_str", "country_code"]

In [12]:
stringIndexers = [StringIndexer().setInputCol(baseFeature).setOutputCol(baseFeature + "_idx") for baseFeature in categFeatures]

In [13]:
encoders = [OneHotEncoder().setInputCol(baseFeature + "_idx").setOutputCol(baseFeature + "_vec") for baseFeature in categFeatures]

In [14]:
featureAssembler = VectorAssembler()
featureAssembler.setInputCols([baseFeature + "_vec" for baseFeature in categFeatures])
featureAssembler.setOutputCol("features")

In [15]:
gbtClassifier = GBTClassifier(labelCol="is_goal", featuresCol="features", maxDepth=5, maxIter=20)

pipelineStages = stringIndexers + encoders + [featureAssembler, gbtClassifier]
pipelineGBT = Pipeline(stages=pipelineStages)

Siguiendo la siguiente documentación, completar los párrafos vacíos
http://spark.apache.org/docs/latest/ml-classification-regression.html#gradient-boosted-tree-regression

In [17]:
# Split the dataset randomly into 75% for training and 25% for testing.
(trainingData, testData) = gameEventsDf.randomSplit([0.75, 0.25])
print("We have %d training examples and %d test examples." % (trainingData.count(), testData.count()))

In [18]:
# Train model.  This also runs the indexers.
modelGBT = pipelineGBT.fit(trainingData)

# Make predictions.
predictionsGBT = modelGBT.transform(testData)

# Select example rows to display.
predictionsGBT.select("prediction", "is_goal", "features").show(5)

In [19]:
# utilizar BinaryClassificationEvaluator para evaluar el modelo
# Respuesta

aucGBT = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='is_goal', metricName='areaUnderROC')
aucGBT = aucGBT.evaluate(predictionsGBT)
print(aucGBT)
#A modo de guía para interpretar las curvas ROC se han establecido los siguientes intervalos para los valores de AUC:
#[0.5]: Es como lanzar una moneda.
#[0.5, 0.6): Test malo.
#[0.6, 0.75): Test regular.
#[0.75, 0.9): Test bueno.
#[0.9, 0.97): Test muy bueno.
#[0.97, 1): Test excelente.

# En nuestro caso como da 0.77 podemos decir que el test es un Test muy bueno 

## Ejercicios:

Basándose en el siguiente notebook https://docs.databricks.com/_static/notebooks/gbt-regression.html implementar algunos de los siguientes puntos:

1. Evaluar el GBT classifier utilizando alguna otra métrica además de el área bajo a curva ROC (1 pto)
2. Entrenar una classificador con las mismas features que el de GBT pero con RF. Qué diferencias nota en las métricas de evaluación? Qué ventajas tiene RF sobre GBT (1.5 puntos)
3. Realizar un ML Pipeline para tuneo de hiperparámetros de cualquiera de los dos clasificadores usando crossvalidation como se observa en el notebook de referencia. (3 puntos)
4. Realizar analytics sobre las predicciones de los modelos, tratar de encontrar patrones para los casos donde las conclusiones fueron erróneas. (2 puntos)
5. Utilizando el siguiente notebook como referencia escribir el dataset de GAME_EVENTS particionado en muchos archivos y luego evaluar el modelo entrenado en streaming leyendo da a un archivo por trigger. https://docs.databricks.com/_static/notebooks/using-mllib-with-structured-streaming.html (2.5 puntos)

In [21]:
#EJERCICIO 1

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.mllib.evaluation import MulticlassMetrics

evaluatorGBT = RegressionEvaluator(metricName="rmse", labelCol=gbtClassifier.getLabelCol(), predictionCol=gbtClassifier.getPredictionCol())
rmseGBT = evaluatorGBT.evaluate(predictionsGBT)
print("RMSE on our test set: %g" % rmseGBT)

#RMSE cuantifica cuán diferente es un conjunto de valores. Cuanto más pequeño es un valor RMSE, más cercanos son los valores predichos y observados.


# haciedo la matriz de confusion

y_true = predictionsGBT.select(['is_goal']).collect()
y_pred = predictionsGBT.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix

print(classification_report(y_true, y_pred))
print(confusion_matrix(y_true, y_pred))


#precision
#Precision es el ratio de true positives de la suma de true and false positives. Said another way, “for all instances classified positive, what percent was correct?”

#recall
#Recall is the ability of a classifier to find all positive instances. For each class it is defined as the ratio of true positives to the sum of true positives and false negatives. Said another way, “for all instances that were actually positive, what percent was classified correctly?”

#f1 score
#The F1 score is a weighted harmonic mean of precision and recall such that the best score is 1.0 and the worst is 0.0. Generally speaking, F1 scores are lower than accuracy measures as they embed precision and recall into their computation. As a rule of thumb, the weighted average of F1 should be used to compare classifier models, not global accuracy.

#support
#Support is the number of actual occurrences of the class in the specified dataset. Imbalanced support in the training data may indicate structural weaknesses in the reported scores of the classifier and could indicate the need for stratified sampling or rebalancing. Support doesn’t change between models but instead diagnoses the evaluation process.


In [22]:
# EJERCICIO2:

# Con Random Forest:
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Crear un Spark ML pipeline usando un RF classifier

rfClassifier = RandomForestClassifier(labelCol="is_goal", featuresCol="features", featureSubsetStrategy="auto", maxDepth=20, maxBins=32)
pipelineStagesRF = stringIndexers + encoders + [featureAssembler, rfClassifier]
pipelineRF = Pipeline(stages=pipelineStagesRF)


# Split the dataset randomly into 75% for training and 25% for testing.
(trainingData, testData) = gameEventsDf.randomSplit([0.75, 0.25])

# Train model.  This also runs the indexers.
modelRF = pipelineRF.fit(trainingData)

# Make predictions.
predictionsRF = modelRF.transform(testData)

# Select example rows to display.
predictionsRF.select("prediction", "is_goal", "features").show(5)


# Respuesta

aucRF = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='is_goal', metricName='areaUnderROC')
aucRF = aucRF.evaluate(predictionsRF)



evaluatorRF = RegressionEvaluator(metricName="rmse", labelCol=rfClassifier.getLabelCol(), predictionCol=rfClassifier.getPredictionCol())
rmseRF = evaluatorRF.evaluate(predictionsRF)


# Haciendo la Confusion Matrix de Random Forest:


y_trueRF = predictionsRF.select(['is_goal']).collect()
y_predRF = predictionsRF.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix

print(classification_report(y_trueRF, y_predRF))
print(confusion_matrix(y_trueRF, y_predRF,))



# Hacemos comparativo de las metricas de evaluacion
# Para curva ROC ambas:

print("aucRF =" + str(aucRF))
print("aucGBT =" + str(aucGBT))

print("rmseRF =" + str(rmseRF))
print("rmseGBT =" + str(rmseGBT))





# Viendo las metricas de evaluacion concluimos en que el modelo de RandomForest predice peor que el modelo de GBT esto puedo mejorarse buscando mejores hiperparametros.
#Tanto los árboles impulsados Gradient Boosting como Random Forest son algoritmos para aprender conjuntos de árboles, pero los procesos de entrenamiento son diferentes:
# Los GBT entrenan un árbol a la vez, por lo que pueden tardar más en entrenarse que Random Forest. Random Forest puede entrenar múltiples árboles en paralelo.
#El algoritmo Random Forest, es menos propenso a overfitting, dado que al entrenar sobre una mayor cantidad de arboles . Por el otro lado, GBT si se entrena con muchos arboles, se puede incurrir en overfitting.
#Random Forest puede ser más fácil para optimizar los hiperparámetros, dado que su performance mejora linealmente con la cantidad de árboles. Con GBT sin embargo, si la cantidad de árboles es muy grande, la performance puede sufrir.







In [23]:
dbutils.library.installPyPI("mlflow", extras="extras")
dbutils.library.restartPython()


In [24]:
# 3 Realizar un ML Pipeline para tuneo de hiperparámetros de cualquiera de los dos clasificadores usando crossvalidation como se observa en el notebook de referencia. (3 puntos)

# Vamos a hacer el tuneo de hiperparametros con el clasificador de GBT Classifier que hicimos en el punto anterior

## GBT
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import mlflow
from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# Takes the "features" column and learns to predict "cnt"
gbt = GBTClassifier(labelCol="is_goal")


from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
# Define a grid of hyperparameters to test:
#  - maxDepth: max depth of each decision tree in the GBT ensemble
#  - maxIter: iterations, i.e., number of trees in each GBT ensemble
# In this example notebook, we keep these values small.  In practice, to get the highest accuracy, you would likely want to try deeper trees (10 or higher) and more trees in the ensemble (>100).
paramGrid = ParamGridBuilder()\
  .addGrid(gbt.maxDepth, [1])\
  .addGrid(gbt.maxIter, [1])\
  .build()
# We define an evaluation metric.  This tells CrossValidator how well we are doing by comparing the true labels with predictions.
evaluator = RegressionEvaluator(metricName="rmse", labelCol=gbt.getLabelCol(), predictionCol=gbt.getPredictionCol())
# Declare the CrossValidator, which runs model tuning for us.
cv = CrossValidator(estimator=gbt, evaluator=evaluator, estimatorParamMaps=paramGrid)

from pyspark.ml import Pipeline
pipelineStages = stringIndexers + encoders + [featureAssembler, cv] 
pipeline = Pipeline(stages=pipelineStages)
pipelineModel = pipeline.fit(trainingData)
predictions = pipelineModel.transform(testData)



best_Model = pipelineModel.bestModel.stages[-1]
print("Max Depth        : ", best_Model._java_obj.getMaxDepth())
print("Num Trees        : ", best_Model._java_obj.getNumTrees())
print("Subsampling Rate : ", best_Model._java_obj.getSubsamplingRate())
    







