# Actividad 1: HDFS, Spark SQL y MLlib

## Recuerda borrar siempre las líneas que dicen `raise NotImplementedError`

Lee con detenimiento cada ejercicio. Las variables utilizadas para almacenar las soluciones, al igual que las nuevas columnas creadas, deben llamarse **exactamente** como indica el ejercicio, o de lo contrario los tests fallarán y el ejercicio no puntuará. Debe reemplazarse el valor `None` al que están inicializadas por el código necesario para resolver el ejercicio.

## Leemos el fichero flights.csv que hemos subido a HDFS

Indicamos que contiene encabezados (nombres de columnas) y que intente inferir el esquema, aunque después comprobaremos si lo
ha inferido correctamente o no. La ruta del archivo en HDFS debería ser /<nombre_alumno>/flights.csv

In [180]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null #Se instala el JDK8
#Instalamos spark con pip
!pip install -q findspark
!pip install pyspark
!pip install pyspark==2.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install pyspark==3.2.0 -i https://pypi.tuna.tsinghua.edu.cn/simple
!pip install pyspark==2.4.0 -i https://pypi.tuna.tsinghua.edu.cn/simple

from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import functions as F
from pyspark.sql.types import StructType,StructField, StringType, IntegerType


import findspark
findspark.init()

spark = SparkSession. \
builder. \
appName('UNIR'). \
config("spark.executor.memory","1g"). \
config("spark.driver.memory","2g"). \
config("spark.driver.maxResultSize","1g"). \
getOrCreate()
ruta_hdfs = "hdfs://localhost:8020/GuillermoGomez/flights.csv" # Reemplaza esto por la ruta correcta del fichero flights.csv en HDFS
#flightsDF = None

# Descomentar estas líneas
flightsDF = spark.read\
             .option("header", "true")\
             .option("inferSchema", "true")\
             .csv(ruta_hdfs)

# YOUR CODE HERE


E: Could not open lock file /var/lib/dpkg/lock-frontend - open (13: Permission denied)
E: Unable to acquire the dpkg frontend lock (/var/lib/dpkg/lock-frontend), are you root?
Looking in indexes: https://pypi.tuna.tsinghua.edu.cn/simple
Collecting pyspark==2.4.0
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/88/01/a37e827c2d80c6a754e40e99b9826d978b55254cc6c6672b5b08f2e18a7f/pyspark-2.4.0.tar.gz (213.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m213.4/213.4 MB[0m [31m229.9 kB/s[0m eta [36m0:00:00[0m00:01[0m00:22[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.7 (from pyspark==2.4.0)
  Downloading https://pypi.tuna.tsinghua.edu.cn/packages/e3/53/c737818eb9a7dc32a7cd4f1396e787bd94200c3997c72c1dbe028587bd76/py4j-0.10.7-py2.py3-none-any.whl (197 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m197.3/197.3 kB[0m [31m477.4 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hBuilding wheels for collec

                                                                                

Imprimimos el esquema para comprobar qué tipo de dato ha inferido en cada columna

In [181]:
flightsDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [182]:
flightsDF.count()

162049

Vemos que tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [183]:
flightsDF.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|       70|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|      -23|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|       -4|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|      -23|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|       43|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como
*no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna
el tipo de dato string (cadena de caracteres). Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark
las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>


Vamos a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time:

In [186]:
#spark.sql("SET spark.sql.adaptive.enabled=true")
#cuantos_NA1 = flightsDF.where(F.col("dep_time").isNull()).count()
#from pyspark.sql import functions as F

#cuantos_NA = flightsDF.where(F.col("dep_time") == "NA").count()
cuantos_NA = flightsDF.filter(flightsDF["dep_time"] == "NA").count()
cuantos_NA

857

Por tanto, hay 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base
a la cantidad de datos que tenemos. En nuestro caso, como tenemos un número considerable de filas, vamos a quitar todas las filas donde hay un NA en cualquiera de las columnas.

In [187]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # para cada columna, nos quedamos con las filas que no tienen NA en esa columna
    #flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")
    flightsLimpiado = flightsLimpiado.filter(flightsDF[nombreColumna] != "NA")
flightsLimpiado.cache()

24/07/07 07:18:20 WARN CacheManager: Asked to cache already cached data.


DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

Si ahora mostramos el número de filas que tiene el DataFrame `flightsLimpiado` tras eliminar todas esas filas, vemos que ha disminuido ligeramente
pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [188]:
flightsLimpiado.count()

160748

Una vez que hemos eliminado los NA, vamos a convertir a tipo entero cada una de esas columnas que eran de tipo string. 
Ahora no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número. Vamos también a convertir la columna `arr_delay` de tipo entero a número real, necesario para los pasos posteriores donde ajustaremos un modelo predictivo.

In [189]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar:
    # método que crea una columna o reemplaza una existente
    #flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType()))
    flightsConvertido = flightsConvertido.withColumn(c, flightsConvertido[c].cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", flightsConvertido["arr_delay"].cast(DoubleType()))

#flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()

24/07/07 07:18:24 WARN CacheManager: Asked to cache already cached data.


DataFrame[year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: double, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int]

In [190]:
flightsConvertido.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Vamos a volver a mostrar las 5 primeras filas del DataFrame limpio. Aparentemente son iguales a las que ya teníamos, pero ahora
Spark sí está tratando como enteros las columnas que deberían serlo, y si queremos podemos hacer operaciones aritméticas
con ellas.

In [191]:
flightsConvertido.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|    -23.0|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|     -4.0|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|    -23.0|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

### Ejercicio 1

Partiendo del DataFrame `flightsConvertido` que ya tiene los tipos correctos en las columnas, se pide: 

* Crear un nuevo DataFrame llamado `aeropuertosOrigenDF` que tenga una columna `origin` y que tenga tantas filas como aeropuertos distintos de *origen* existan. ¿Cuántas filas tiene? Almacenar dicho recuento en la variable entera `n_origen`.
* Crear un nuevo DataFrame llamado `rutasDistintasDF` que tenga dos columnas `origin`, `dest` y que tenga tantas filas como rutas diferentes existan (es decir, como combinaciones distintas haya entre un origen y un destino). Una vez creado, contar cuántas combinaciones hay, almacenando dicho recuento en la variable entera `n_rutas`.


In [192]:
# Reemplaza None por el código necesario para calcular sus valores correctos
aeropuertosOrigenDF = flightsConvertido.groupBy(flightsConvertido['origin']).count()
aeropuertosOrigenDF.show()
n_origen = aeropuertosOrigenDF.count()
rutasDistintasDF = flightsConvertido.groupBy(flightsConvertido['origin'],flightsConvertido['dest']).count()
rutasDistintasDF.show()
n_rutas = rutasDistintasDF.count()

# YOUR CODE HERE
#raise NotImplementedError

+------+------+
|origin| count|
+------+------+
|   SEA|107940|
|   PDX| 52808|
+------+------+

+------+----+-----+
|origin|dest|count|
+------+----+-----+
|   SEA| RNO|  157|
|   SEA| DTW| 1572|
|   SEA| CLE|   40|
|   SEA| LAX| 7430|
|   PDX| SEA| 2241|
|   SEA| BLI|   92|
|   PDX| IAH|  986|
|   PDX| PHX| 3536|
|   SEA| SLC| 3503|
|   SEA| SBA|  364|
|   SEA| BWI|  319|
|   PDX| IAD|  368|
|   PDX| SFO| 5090|
|   SEA| KOA|  559|
|   SEA| JAC|   14|
|   PDX| MCI|  364|
|   SEA| SJC| 3948|
|   SEA| ABQ|  607|
|   SEA| SAT|  364|
|   PDX| ONT|  914|
+------+----+-----+
only showing top 20 rows



In [193]:
assert(n_origen == 2)
assert(n_rutas == 115)
assert(aeropuertosOrigenDF.count() == n_origen)
assert(rutasDistintasDF.count() == n_rutas)

### Ejercicio 2

* Partiendo de nuevo de `flightsConvertido`, se pide calcular, *sólo para los vuelos que llegan con* ***retraso positivo***, el retraso medio a la llegada de dichos vuelos, para cada aeropuerto de destino. La nueva columna con el retraso medio a la llegada debe llamarse `retraso_medio`. El DF resultante debe estar **ordenado de mayor a menor retraso medio**. El código que calcule esto debería ir encapsulado en una función de Python llamada `retrasoMedio` que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo descrito anteriormente.

* Una vez hecha la función, invocarla pasándole como argumento `flightsConvertido` y almacenar el resultado devuelto en la variable `retrasoMedioDF`.

In [194]:
import pandas as pd

#def retrasoMedio(df):
  # Filtrar los vuelos que llegan con retraso positivo
#  vuelos_retraso_positivo = df[df['arr_delay'] > 0]
  
  # Calcular el retraso medio a la llegada para cada aeropuerto de destino
#  retraso_medio = vuelos_retraso_positivo.groupby(df['dest'])
    
    #.agg(df.mean(df['arr_delay']).alias('retraso_medio'))
    
  
  # Ordenar el DataFrame resultante de mayor a menor retraso medio
#  retraso_medio = retraso_medio.sort_values(by=df['arr_delay'], ascending=False)
  
  # Renombrar la columna con el retraso medio a la llegada
  #retraso_medio = retraso_medio.rename(columns={'arr_delay': 'retraso_medio'})
  
#  return retraso_medio

def retrasoMedio(df):
  # Filtrar los vuelos que llegan con retraso positivo
    
  vuelos_retraso_positivo = df[df['arr_delay'] > 0]
  vuelos_retraso_positivo = vuelos_retraso_positivo.toPandas()
  
  # Calcular el retraso medio a la llegada para cada aeropuerto de destino
  retraso_medio = vuelos_retraso_positivo.groupby(['dest'])['arr_delay'].mean().reset_index()
  
  # Ordenar el DataFrame resultante de mayor a menor retraso medio
  retraso_medio = retraso_medio.sort_values(by='arr_delay', ascending=False)
  
  # Renombrar la columna con el retraso medio a la llegada
  retraso_medio = retraso_medio.rename(columns={'arr_delay': 'retraso_medio'})
  mySchema = StructType([ StructField("dest", StringType(), True)\
                       ,StructField("retraso_medio", DoubleType(), True)])
  sparkDF2 = spark.createDataFrame(retraso_medio,schema=mySchema)
  
  return sparkDF2

# Invocar la función retrasoMedio pasando como argumento flightsConvertido
retrasoMedioDF = retrasoMedio(flightsConvertido)


#Create DataFrame by changing schema

sparkDF2.printSchema()
sparkDF2.show()



root
 |-- dest: string (nullable = true)
 |-- retraso_medio: double (nullable = true)

+----+------------------+
|dest|     retraso_medio|
+----+------------------+
| BOI|             64.75|
| HDN|              46.8|
| SFO|41.193768844221104|
| CLE| 35.74193548387097|
| SBA|35.391752577319586|
| COS| 35.05607476635514|
| BWI|34.585798816568044|
| EWR| 33.52972258916777|
| DFW| 33.27519181585678|
| MIA| 32.66187050359712|
| ORD| 32.47909024211299|
| BNA| 31.94871794871795|
| JFK|31.255884586180713|
| JAC|             30.25|
| PHL|29.245989304812834|
| OGG|27.511111111111113|
| IAD|27.430875576036865|
| HOU| 27.33009708737864|
| LGB| 27.07634730538922|
| FAT|26.852589641434264|
+----+------------------+
only showing top 20 rows



In [195]:
lista = retrasoMedio(flightsConvertido).take(3)
assert((lista[0].retraso_medio == 64.75) & (lista[0].dest == "BOI"))
assert((lista[1].retraso_medio == 46.8) & (lista[1].dest == "HDN"))
assert((round(lista[2].retraso_medio, 2) == 41.19) & (lista[2].dest == "SFO"))

Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`. ¿Cuáles son los tres aeropuertos con mayor retraso medio? ¿Cuáles son sus retrasos medios en minutos?

In [196]:
# ESCRIBE AQUÍ TU CÓDIGO PARA MOSTRAR EL CONTENIDO DE retrasoMedioDF


### Ejercicio 3

Ajustar un modelo de DecisionTree de Spark para predecir si un vuelo vendrá o no con retraso (problema de clasificación binaria), utilizando como variables predictoras el mes, el día del mes, la hora de partida `dep_time`, la hora de llegada `arr_time`, el tipo de avión (`carrier`), la distancia y el tiempo que permanece en el aire. Para ello, sigue los siguientes pasos.

Notemos que en estos datos hay variables numéricas y variables categóricas que ahora mismo están tipadas como numéricas, como por ejemplo el mes del año (`month`), que es en realidad categórica. Debemos indicar a Spark cuáles son categóricas e indexarlas. Para ello se pide: 

* Crear un `StringIndexer` llamado `indexerMonth` y otro llamado `indexerCarrier` sobre las variables categóricas `month` y `carrier` (tipo de avión). El nombre de las columnas indexadas que se crearán debe ser, respectivamente, `monthIndexed` y `carrierIndexed`.

In [209]:
# Incluye aquí los imports que necesites
# import ......  

# Se importa el StringIndexer
from pyspark.ml.feature import StringIndexer
from pyspark.sql.functions import lit
# Se crean las columnas indexadas con la funcion StringIndexer
#indexerMonth = StringIndexer(inputCol=flightsConvertido["month"], outputCol=flightsConvertido["monthIndexed"])
#indexerCarrier = StringIndexer(inputCol=flightsConvertido["carrier"], outputCol=flightsConvertido["carrierIndexed"])


#indexerMonth = StringIndexer(inputCol=flightsConvertido["month"], outputCol="monthIndexed")
#indexerCarrier = StringIndexer(inputCol=flightsConvertido["carrier"], outputCol="carrierIndexed")

#indexerMonth = StringIndexer(inputCol="month", outputCol="monthIndexed")
#indexerCarrier = StringIndexer(inputCol="carrier", outputCol="carrierIndexed")
print(spark)
print(spark.sparkContext.getConf().getAll())
SparkContext._active_spark_context = spark.sparkContext
indexerMonth = StringIndexer(). \
setInputCol("month"). \
setOutputCol("monthIndexed") 

indexerCarrier = StringIndexer(). \
setInputCol("carrier"). \
setOutputCol("carrierIndexed") 


<pyspark.sql.session.SparkSession object at 0x7f037d604ad0>
[('spark.master', 'local'), ('spark.app.id', 'local-1720151757532'), ('spark.app.submitTime', '1720150042955'), ('spark.executor.id', 'driver'), ('spark.driver.host', '10.0.2.15'), ('spark.driver.port', '44953'), ('spark.app.name', 'pyspark-shell'), ('spark.driver.extraJavaOptions', '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add

In [210]:
assert(isinstance(indexerMonth, StringIndexer))
assert(isinstance(indexerCarrier, StringIndexer))
assert(indexerMonth.getInputCol() == "month")
assert(indexerMonth.getOutputCol() == "monthIndexed")
assert(indexerCarrier.getInputCol() == "carrier")
assert(indexerCarrier.getOutputCol() == "carrierIndexed")

Recordemos también que Spark requiere que todas las variables estén en una única columna de tipo vector, por lo que después de indexar estas dos variables, tendremos que fusionar en una columna de tipo vector todas ellas, utilizando un `VectorAssembler`. Se pide:

* Crear en una variable llamada `vectorAssembler` un `VectorAssembler` que reciba como entrada una lista de todas las variables de entrada (y que no debe incluir `arr_delay`) que serán las que formarán parte del modelo. Crear primero esta lista de variables (lista de strings) en la variable `columnas_ensamblar` y pasar dicha variable como argumento al crear el `VectorAssembler`. Como es lógico, en el caso de las columnas `month` y `carrier`, no usaremos las variables originales sino las indexadas en el apartado anterior. La columna de tipo vector creada con las características ensambladas debe llamarse `features`.

In [211]:
# Incluye aquí los imports que necesites
# import ...........

columnas_ensamblar = None
vectorAssembler = None

# Se importa el vector assembler
from pyspark.ml.feature import VectorAssembler
# Recorremos las columnas de entrada
columnas_ensamblar = [x for x in flightsConvertido.columns if x in {"day", "dep_time", "arr_time", "distance","air_time"}]
# Se listan las columnas de entrada con las columnas indexadas
columnas_ensamblar = columnas_ensamblar + ["monthIndexed"] + ["carrierIndexed"]
# Se crea el vector de assembler con las columnas ensambladas
vectorAssembler = VectorAssembler(inputCols=columnas_ensamblar, outputCol="features")
vectorAssembler

VectorAssembler_785c44eb584e

In [212]:
assert(isinstance(vectorAssembler, VectorAssembler))
assert(vectorAssembler.getOutputCol() == "features")
input_cols = vectorAssembler.getInputCols()
assert(len(input_cols) == 7)
assert("arr_delay" not in input_cols)

Finalmente, vemos que la columna `arr_delay` es continua, y no binaria como requiere un problema de clasificación con dos clases. Vamos a convertirla en binaria. Para ello se pide:

* Utilizar un binarizador de Spark, fijando a 15 el umbral, y guardarlo en la variable `delayBinarizer`. Consideramos retrasado un vuelo que ha llegado con más de 15 minutos de retraso, y no retrasado en caso contrario. La nueva columna creada con la variable binaria debe llamarse `arr_delay_binary` y debe ser interpretada como la columna target para nuestro algoritmo. Por ese motivo, esta columna **no** se incluyó en el apartado anterior entre las columnas que se ensamblan para formar las features.

In [213]:
# Incluye aquí los imports que necesites y que no hayas incluido ya en alguna celda anterior
# import .........

delayBinarizer = None

# Se importa el binarizador
from pyspark.ml.feature import Binarizer
# Se utiliza el binarizador considerando un umbral de 15 con threshold
delayBinarizer = Binarizer(threshold=15, inputCol="arr_delay", outputCol="arr_delay_binary")

In [214]:
assert(isinstance(delayBinarizer, Binarizer))
assert(delayBinarizer.getThreshold() == 15)
assert(delayBinarizer.getInputCol() == "arr_delay")
assert(delayBinarizer.getOutputCol() == "arr_delay_binary")

Por último, crearemos el modelo de clasificación.

* Crear en una variable `decisionTree` un árbol de clasificación de Spark (`DecisionTreeClassifier` del paquete `pyspark.ml.classification`)
* Indicar como columna de entrada la nueva columna creada por el `VectorAssembler` creado en un apartado anterior.
* Indicar como columna objetivo (target) la nueva columna creada por el `Binarizer` del apartado anterior.

In [215]:
# Incluye aquí los imports que necesites y que no hayas incluido ya en alguna celda anterior
# import .........

decisionTree = None

# Se importa el modelo de clasificacion DecisionTreeClassifier
from pyspark.ml.classification import DecisionTreeClassifier
# Se entrena el modelo con la columna crea en el binarizador arr_delay_binary
decisionTree = DecisionTreeClassifier(labelCol='arr_delay_binary', featuresCol='features')

In [216]:
assert(isinstance(decisionTree, DecisionTreeClassifier))
assert(decisionTree.getFeaturesCol() == "features")
assert(decisionTree.getLabelCol() == "arr_delay_binary")

Ahora vamos a encapsular todas las fases en un sólo pipeline y procederemos a entrenarlo. Se pide:

* Crear en una variable llamada `pipeline` un objeto `Pipeline` de Spark con las etapas anteriores en el orden adecuado para poder entrenar un modelo. 

* Entrenarlo invocando sobre ella al método `fit` y guardar el pipeline entrenado devuelto por dicho método en una variable llamada `pipelineModel`. 

* Aplicar el pipeline entrenado para transformar (predecir) el DataFrame `flightsConvertido`, guardando las predicciones devueltas en la variable `flightsPredictions` que será un DataFrame. Nótese que estamos prediciendo los propios datos de entrenamiento y que, por simplicidad, no habíamos hecho (aunque habría sido lo correcto) ninguna división de nuestros datos originales en subconjuntos distintos de entrenamiento y test antes de entrenar.

In [217]:
# Incluye aquí los imports que necesites y que no hayas incluido ya en alguna celda anterior
# import .........

pipeline = None
pipelineModel = None
flightsPredictions = None

# Se importa Pipeline
from pyspark.ml import Pipeline
#Se crea el Pipeline en paralelo
pipeline = Pipeline(stages=[indexerMonth, indexerCarrier, vectorAssembler, delayBinarizer, decisionTree])
# Estrenamiento usando el metodo fit
pipelineModel = pipeline.fit(flightsConvertido)
# Se aplica la predeccion con el metodo transform al Pipeline entrenado
flightsPredictions = pipelineModel.transform(flightsConvertido)
#Se visualiza el DataFrame
flightsPredictions.show()

                                                                                

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+--------------+--------------------+----------------+----------------+--------------------+----------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|monthIndexed|carrierIndexed|            features|arr_delay_binary|   rawPrediction|         probability|prediction|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+------------+--------------+--------------------+----------------+----------------+--------------------+----------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|        10.0|           0.0|[1.0,1.0,235.0,19...|             1.0|     [30.0,70.0]|           [0.3,0.7]|       1.0|
|2014|    1|  1|       4|       -6|     738|    -23.0|     U

In [218]:
from pyspark.ml import PipelineModel
assert(isinstance(pipeline, Pipeline))
assert(len(pipeline.getStages()) == 5)
assert(isinstance(pipelineModel, PipelineModel))
assert("probability" in flightsPredictions.columns)
assert("prediction" in flightsPredictions.columns)
assert("rawPrediction" in flightsPredictions.columns)

Vamos a mostrar la matriz de confusión (este apartado no es evaluable). Agrupamos por la variable que tiene la clase verdadera y la que tiene la clase predicha, para ver en cuántos casos coinciden y en cuántos difieren.

In [219]:
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().show()

[Stage 318:>                                                        (0 + 1) / 1]

+----------------+----------+------+
|arr_delay_binary|prediction| count|
+----------------+----------+------+
|             1.0|       1.0|   495|
|             0.0|       1.0|    61|
|             1.0|       0.0| 23754|
|             0.0|       0.0|136438|
+----------------+----------+------+



                                                                                