# Gradient Boosted Tree
### Modelo de clasificación

Un modelo Gradient Boosting Trees está formado por un conjunto (ensemble) de árboles de decisión individuales, entrenados de forma secuencial. Cada nuevo árbol emplea información del árbol anterior para aprender de sus errores, mejorando iteración a iteración. En cada árbol individual, las observaciones se van distribuyendo por bifurcaciones (nodos) generando la estructura del árbol hasta alcanzar un nodo terminal. La predicción de una nueva observación se obtiene agregando las predicciones de todos los árboles individuales que forman el modelo.

[Fuente](https://www.cienciadedatos.net/documentos/py09_gradient_boosting_python.html)

### Aplicación:

En este ejercicio aplicamos este modelo de aprendizaje automático con dos categorías de partículas subatomicas y cuatro de sus principales características. La meta es entrenar el algoritmo para ser capaz de separar estas categorías conociendo únicamente sus características.

# 1. Importamos las librerias

In [1]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("GradientBoostedTreeClassifierExample")\
        .getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
21/12/02 22:50:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


# 2. Cargamos los datos

In [2]:
data = spark.read.format("csv").load("data.csv", header = True)

In [3]:
data

DataFrame[_c0: string, Label: string, TrackP: string, TrackPt: string, MuonFlag: string, SpdE: string]

In [4]:
from pyspark.sql.functions import col
data = data.drop("_c0")

In [5]:
data.show()

+-----+-------------+-------------+--------+-------------+
|Label|       TrackP|      TrackPt|MuonFlag|         SpdE|
+-----+-------------+-------------+--------+-------------+
|Other|74791.1562629|3141.93067698|     1.0|3.20000004768|
|Ghost|2738.48998933|199.573653278|     0.0|3.20000004768|
|Ghost|2161.40990765|94.8294175338|     0.0|          0.0|
|Other|15277.7304903|808.631063989|     0.0|3.20000004768|
|Other|7563.70019502|1422.56921358|     0.0|3.20000004768|
|Other|62641.6210901|3195.36230097|     0.0|3.20000004768|
|Other|18872.8105703|1428.89675193|     0.0|3.20000004768|
|Ghost|1993.55004844|469.429473483|     0.0|          0.0|
|Other|90635.2968712|4560.59667592|     0.0|3.20000004768|
|Other|11633.6699412|286.894581101|     0.0|          0.0|
|Other|3432.92993402|447.756746761|     0.0|          0.0|
|Other|21985.5390787|1616.39540194|     0.0|3.20000004768|
|Other|  5666.160153|334.631435959|     0.0|          0.0|
|Other|72726.6015693|3350.76196972|     1.0|3.2000000476

# 3. Procesamiento de datos

- Cambiamos las variables numéricas al tipo correcto
- Codificamos las etiquetas
- Preparamos los vectores de características

In [6]:
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import col
data = data.withColumn("TrackP",data.TrackP.cast(DoubleType()))
data = data.withColumn("TrackPt",data.TrackPt.cast(DoubleType()))
data = data.withColumn("MuonFlag",data.MuonFlag.cast(DoubleType()))
data = data.withColumn("SpdE",data.SpdE.cast(DoubleType()))

In [7]:
labelIndexer = StringIndexer(inputCol="Label", outputCol="indexedLabel").fit(data)

[Stage 2:>                                                        (0 + 17) / 17]                                                                                

In [8]:
data.show()

+-----+-------------+-------------+--------+-------------+
|Label|       TrackP|      TrackPt|MuonFlag|         SpdE|
+-----+-------------+-------------+--------+-------------+
|Other|74791.1562629|3141.93067698|     1.0|3.20000004768|
|Ghost|2738.48998933|199.573653278|     0.0|3.20000004768|
|Ghost|2161.40990765|94.8294175338|     0.0|          0.0|
|Other|15277.7304903|808.631063989|     0.0|3.20000004768|
|Other|7563.70019502|1422.56921358|     0.0|3.20000004768|
|Other|62641.6210901|3195.36230097|     0.0|3.20000004768|
|Other|18872.8105703|1428.89675193|     0.0|3.20000004768|
|Ghost|1993.55004844|469.429473483|     0.0|          0.0|
|Other|90635.2968712|4560.59667592|     0.0|3.20000004768|
|Other|11633.6699412|286.894581101|     0.0|          0.0|
|Other|3432.92993402|447.756746761|     0.0|          0.0|
|Other|21985.5390787|1616.39540194|     0.0|3.20000004768|
|Other|  5666.160153|334.631435959|     0.0|          0.0|
|Other|72726.6015693|3350.76196972|     1.0|3.2000000476

In [9]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(
    inputCols=['TrackP', "TrackPt", "MuonFlag", 'SpdE'],
    outputCol="features")

In [10]:
data = assembler.transform(data)
data.show()

+-----+-------------+-------------+--------+-------------+--------------------+
|Label|       TrackP|      TrackPt|MuonFlag|         SpdE|            features|
+-----+-------------+-------------+--------+-------------+--------------------+
|Other|74791.1562629|3141.93067698|     1.0|3.20000004768|[74791.1562629,31...|
|Ghost|2738.48998933|199.573653278|     0.0|3.20000004768|[2738.48998933,19...|
|Ghost|2161.40990765|94.8294175338|     0.0|          0.0|[2161.40990765,94...|
|Other|15277.7304903|808.631063989|     0.0|3.20000004768|[15277.7304903,80...|
|Other|7563.70019502|1422.56921358|     0.0|3.20000004768|[7563.70019502,14...|
|Other|62641.6210901|3195.36230097|     0.0|3.20000004768|[62641.6210901,31...|
|Other|18872.8105703|1428.89675193|     0.0|3.20000004768|[18872.8105703,14...|
|Ghost|1993.55004844|469.429473483|     0.0|          0.0|[1993.55004844,46...|
|Other|90635.2968712|4560.59667592|     0.0|3.20000004768|[90635.2968712,45...|
|Other|11633.6699412|286.894581101|     

In [11]:
featureIndexer = \
VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

                                                                                

# 4. Separamos los datos en entrenamiento y prueba

In [12]:
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# 5. Entrenamos el modelo

In [13]:
%%time

gbt = GBTClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", maxIter=10)
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, gbt])
model = pipeline.fit(trainingData)

                                                                                

CPU times: user 24.7 ms, sys: 7.23 ms, total: 31.9 ms
Wall time: 13.8 s


# 6. Utilizamos el modelo para hacer predicciones

In [14]:
predictions = model.transform(testData)
predictions.select("prediction", "indexedLabel", "features").show(5)

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       0.0|         1.0|[1150.7199885,162...|
|       0.0|         1.0|[1216.72999479,36...|
|       0.0|         1.0|[1272.82994686,22...|
|       0.0|         1.0|[1293.06006353,55...|
|       0.0|         1.0|[1307.17004305,46...|
+----------+------------+--------------------+
only showing top 5 rows



# 7. Evaluamos el modelo

In [15]:
evaluator = MulticlassClassificationEvaluator(
labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g" % (1.0 - accuracy))

[Stage 114:===>                                                   (1 + 16) / 17]

Test Error = 0.15003




### El error que se obtiene al utilizar el modelo y los datos es de 0.15003

In [16]:
gbtModel = model.stages[2]
print(gbtModel)  # summary only

GBTClassificationModel: uid = GBTClassifier_ab9012c9ca65, numTrees=10, numClasses=2, numFeatures=4


In [17]:
spark.stop()