# Árboles de Clasificación

In [25]:
from pyspark import SparkContext
from pyspark.sql import SQLContext

In [26]:
#Leer el contenido de una carpeta 
#Para leer de HDFS usar
# hdfs:///tmp/dcd/OnTimeDB
#Para leer de local usar
# file:/home/cloudera/dcd/OnTimeDB/

## Descargar archivo zip y subir al cluster
!wget https://github.com/omarmendoza564/datos/raw/main/datos/OnTimeDB.zip -O /home/sergio_ibarra1795/OnTimeDB.zip
!unzip -o /home/sergio_ibarra1795/OnTimeDB.zip -d /home/sergio_ibarra1795/
!ls -la /home/sergio_ibarra1795/OnTimeDB
!hdfs dfs -mkdir /tmp/dcd/OnTimeDB
!hdfs dfs -put /home/sergio_ibarra1795/OnTimeDB/ /tmp/dcd/

--2023-09-23 18:39:25--  https://github.com/omarmendoza564/datos/raw/main/datos/OnTimeDB.zip
Resolving github.com (github.com)... 140.82.112.4
Connecting to github.com (github.com)|140.82.112.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://raw.githubusercontent.com/omarmendoza564/datos/main/datos/OnTimeDB.zip [following]
--2023-09-23 18:39:25--  https://raw.githubusercontent.com/omarmendoza564/datos/main/datos/OnTimeDB.zip
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.109.133, 185.199.110.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 484951 (474K) [application/zip]
Saving to: ‘/home/sergio_ibarra1795/OnTimeDB.zip’


2023-09-23 18:39:25 (12.7 MB/s) - ‘/home/sergio_ibarra1795/OnTimeDB.zip’ saved [484951/484951]

Archive:  /home/sergio_ibarra1795/OnTimeDB.zip
  inflating: /home/sergio

In [27]:
bd = sqlContext.read.csv("hdfs:///tmp/dcd/OnTimeDB/", inferSchema=True, header=True)
sqlContext.registerDataFrameAsTable(bd, "bd")
bd.count()

                                                                                

30466

In [28]:
bd.show(10)

+----+-----+----------+---------+----------+-------------+-------+--------+--------+------+----+--------+---------+--------+------------+------------+--------+-------------+-----------------+------------------+-------+-----------+-------+
|Year|Month|DayofMonth|DayOfWeek|CRSDepTime|UniqueCarrier|TailNum|ArrDelay|DepDelay|Origin|Dest|Distance|Cancelled|Diverted|CarrierDelay|WeatherDelay|NASDelay|SecurityDelay|LateAircraftDelay|              LogD|Retraso|RetrasoNeto|Horario|
+----+-----+----------+---------+----------+-------------+-------+--------+--------+------+----+--------+---------+--------+------------+------------+--------+-------------+-----------------+------------------+-------+-----------+-------+
|2016|   12|         1|        4|       845|           AA| N8ARAA|    -7.0|    -5.0|   LAX| DFW|  1235.0|      0.0|     0.0|         0.0|         0.0|     0.0|          0.0|              0.0|3.0916669575956846|      0|       -2.0|      2|
|2016|   12|         2|        5|       845|

In [29]:
# Ver las variables disponibles
bd.dtypes

[('Year', 'int'),
 ('Month', 'int'),
 ('DayofMonth', 'int'),
 ('DayOfWeek', 'int'),
 ('CRSDepTime', 'int'),
 ('UniqueCarrier', 'string'),
 ('TailNum', 'string'),
 ('ArrDelay', 'double'),
 ('DepDelay', 'double'),
 ('Origin', 'string'),
 ('Dest', 'string'),
 ('Distance', 'double'),
 ('Cancelled', 'double'),
 ('Diverted', 'double'),
 ('CarrierDelay', 'double'),
 ('WeatherDelay', 'double'),
 ('NASDelay', 'double'),
 ('SecurityDelay', 'double'),
 ('LateAircraftDelay', 'double'),
 ('LogD', 'double'),
 ('Retraso', 'int'),
 ('RetrasoNeto', 'double'),
 ('Horario', 'int')]

In [30]:
spark.sql("SELECT year, Retraso, RetrasoNeto, Horario from bd LIMIT 10").show()

+----+-------+-----------+-------+
|year|Retraso|RetrasoNeto|Horario|
+----+-------+-----------+-------+
|2016|      0|       -2.0|      2|
|2016|      0|       -8.0|      2|
|2016|      0|        0.0|      2|
|2016|      0|        5.0|      2|
|2016|      0|        4.0|      2|
|2016|      0|        1.0|      2|
|2016|      0|       -6.0|      2|
|2016|      0|        7.0|      2|
|2016|      0|       -8.0|      2|
|2016|      0|       -1.0|      2|
+----+-------+-----------+-------+



In [31]:
#Agregar una variable numerica (IndexUniqueCarrier) basada en la variable alfanumerica 
#de la compañia que opera el vuelo (UniqueCarrier) 

from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol='UniqueCarrier',outputCol='IndexUniqueCarrier') #el índice empieza en el 0!
bd1=indexer.fit(bd).transform(bd)

#Se muestra el numero de vuelos operados por cada compañia, la variable IndexUniqueCarrier 
#ya se puede utilizar en el modelo

bd1.groupBy('UniqueCarrier','IndexUniqueCarrier').count().sort('IndexUniqueCarrier').show()


                                                                                

+-------------+------------------+-----+
|UniqueCarrier|IndexUniqueCarrier|count|
+-------------+------------------+-----+
|           AA|               0.0| 8853|
|           UA|               1.0| 6112|
|           WN|               2.0| 5395|
|           DL|               3.0| 4239|
|           VX|               4.0| 1703|
|           NK|               5.0| 1581|
|           F9|               6.0| 1295|
|           OO|               7.0| 1166|
|           B6|               8.0|  121|
|           EV|               9.0|    1|
+-------------+------------------+-----+



## Ajuste del modelo

In [32]:
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.sql.functions import col

#Crear un arreglo de variables predictoras llamdo 'features'
#ArrDelay representa el numero de minutos que un vuelo tiene de retraso
#Si un vuelo llega con mas de 15 minutos de retraso la variable 'Retraso' tiene valor 1 
#Renombrar la variable objetivo (Retraso) como 'label'

#En el caso particular de los árboles de clasificación, la variable objetivo debe ser de tipo doble. 
#Por lo tanto, transformar la variable a tipo doble. 
#Además, la variable debe estar convertida a través de la función stringIndexer 
#para poder ser analizada por el modelo
#Por lo tanto, la variable de trabajo, en este caso, será 'label2'


a1  = VectorAssembler(
    inputCols=['DepDelay','Distance','DayOfWeek',
               'CRSDepTime','IndexUniqueCarrier'],
    outputCol='features')

bd2 = a1.transform(bd1).select(col("Retraso").cast('double').alias("label"),'features')

stringIndexer = StringIndexer(inputCol = 'label', outputCol = 'label2')
sI = stringIndexer.fit(bd2)
bd2 = sI.transform(bd2)
bd2.dtypes



[('label', 'double'), ('features', 'vector'), ('label2', 'double')]

In [33]:
#Mostrar un solo renglon de la BD
bd2.show(5)

+-----+--------------------+------+
|label|            features|label2|
+-----+--------------------+------+
|  0.0|[-5.0,1235.0,4.0,...|   0.0|
|  0.0|[5.0,1235.0,5.0,8...|   0.0|
|  0.0|[-3.0,1235.0,6.0,...|   0.0|
|  0.0|[-7.0,1235.0,7.0,...|   0.0|
|  0.0|[-6.0,1235.0,1.0,...|   0.0|
+-----+--------------------+------+
only showing top 5 rows



### Partición Test - Train

In [34]:
#70% Train
#30% Test
(bd_train, bd_test) = bd2.randomSplit([0.7, 0.3],seed=123)
print("Renglones de la BD Train: ", bd_train.count())
print("Renglones de la BD Test: ",bd_test.count())

                                                                                

Renglones de la BD Train:  21219
Renglones de la BD Test:  9247


                                                                                

In [35]:
# Utilizamos el modelo DecisionTreeClassifier para generar un prediccion basado en los features
# disponibles
#Con una profundidad (maxDepth) de 5
#Espeficiar la variable objetivo (labelCol)

from pyspark.ml.classification import DecisionTreeClassifier as DTC

rt = DTC(maxDepth=5, labelCol = 'label2')

model = rt.fit(bd_train)
pred = model.transform(bd_test)


                                                                                

In [37]:
#La columna rawPrediction, está especificando el número de casos negativos y  positivos 
#en cada uno de los nodos terminales pertinentes para cada observación. 
#[Casos para 0, Casos para 1]

#El campo probability muestra la probabilidad de ser 0 o 1  
#El valor predicho, estableciendo un punto de corte del 50% se muestra en el campo prediction

pred.show()

+-----+--------------------+------+---------------+--------------------+----------+
|label|            features|label2|  rawPrediction|         probability|prediction|
+-----+--------------------+------+---------------+--------------------+----------+
|  0.0|[-19.0,731.0,7.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-17.0,888.0,7.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-16.0,641.0,1.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-16.0,888.0,3.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-15.0,337.0,7.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-15.0,967.0,3.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-14.0,236.0,3.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-14.0,337.0,2.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|       0.0|
|  0.0|[-14.0,337.0,5.0,...|   0.0|[13665.0,985.0]|[0.93276450511945...|    

In [38]:
#Validar el modelo
#Calcular el Areba bajo la curva 
#El AUC proporciona una medición agregada del rendimiento en todos los umbrales de clasificación 
#posibles

from pyspark.ml.evaluation import BinaryClassificationEvaluator as BCE
print('AUC=',BCE(metricName="areaUnderROC", rawPredictionCol = 'probability').evaluate(pred))

                                                                                

AUC= 0.9009880482870561


In [39]:
#Generar una tabla de frecuencias de las distintan probabilidades, es decir de los distintos 
#nodos terminales

pred.groupBy('probability').count().sort('count').show(50)



+--------------------+-----+
|         probability|count|
+--------------------+-----+
|[0.65116279069767...|   16|
|[0.35526315789473...|   42|
|[0.25984251968503...|   62|
|[0.66666666666666...|   63|
|[0.50531914893617...|   94|
|[0.09150326797385...|  146|
|[0.46683673469387...|  176|
|[0.27331887201735...|  195|
|[0.70650032829940...|  654|
|[0.01118838826731...| 1436|
|[0.93276450511945...| 6363|
+--------------------+-----+



                                                                                

In [40]:
#Generar la matriz de confusion
pred.groupBy('label','prediction').count().show()




+-----+----------+-----+
|label|prediction|count|
+-----+----------+-----+
|  1.0|       1.0| 1873|
|  0.0|       1.0|  184|
|  1.0|       0.0|  705|
|  0.0|       0.0| 6485|
+-----+----------+-----+



                                                                                

In [41]:
#Generar algunas estadísticas para tener una idea de cómo fueron las predicciones

numSuccesses = pred.where("""(prediction = 0.0 AND label2 = 0.0) OR (prediction = 1.0 AND label2 = 1.0)""").count()
numInspections = pred.count()

print ("Se realizaron", numInspections, "inspeciones y existen", numSuccesses, "predicciones existosas")
print ("Esta es una tasa de éxito del", str((float(numSuccesses) / float(numInspections)) * 100) + "%")

                                                                                

Se realizaron 9247 inspeciones y existen 8358 predicciones existosas
Esta es una tasa de éxito del 90.38607115821348%


In [42]:
# DecisionTreeClassifier(featuresCol="features",
#    labelCol="label",
#    predictionCol="prediction",
#    probabilityCol="probability",
#    rawPredictionCol="rawPrediction",
#    maxDepth=5,
#    maxBins=32,
#    minInstancesPerNode=1,
#    minInfoGain=0.0,
#    maxMemoryInMB=256,
#    impurity="gini"  / impurity="entropy" )

In [44]:
#Cambiando las propiedades del ejecutamos un nuevo arbol con una profundidad de 20
rt = DTC(maxDepth=20, labelCol = 'label2')
model = rt.fit(bd_train)
pred = model.transform(bd_test)

#Evaluar el modelo con AUC que ahora ha aumentado
print('AUC=',BCE(metricName="areaUnderROC", rawPredictionCol = 'probability').evaluate(pred))



AUC= 0.8193169919620452


                                                                                

### Validación externa

In [45]:
#Validación externa con la BD test

predtest = model.transform(bd_test)

print('AUC=',BCE(metricName="areaUnderROC",rawPredictionCol = 'probability').evaluate(predtest))

                                                                                

AUC= 0.8193169919620452
