# Ingeniería para el Procesado Masivo de Datos. Actividad 1. 
### Rubio Limon, Erick Javier: 

# PARTE 2: Manejo de Spark

Este notebook te guiará en la parte 2 de la práctica. Debes sustituir la palabra `<COMPLETAR>` de cada celda por los nombres de los métodos adecuados y sus argumentos (siempre sin `<>`).

**IMPORTANTE**: CUANDO VAYAS COMPLETANDO EL CÓDIGO, RECUERDA BORRAR COMPLETAMENTE LAS LÍNEAS DE COMENTARIO (empiezan por el carácter #) QUE VAYAS ENCONTRANDO A LA DERECHA DE LAS BARRAS INVERTIDAS \ A LO LARGO DEL CÓDIGO. De lo contrario Python te dará errores de compilación

## Inicializamos la sparkSession

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("prueba")\
    .config("spark.executor.extraClassPath", "/mnt/spark-sql-kafka-0-10_2.11-2.4.4.jar:/mnt/kafka-clients-2.0.0.jar:/mnt/lz4-java-1.4.1.jar:/mnt/org.osgi.core-4.3.0.jar:/mnt/scala-library-2.11.12.jar:/mnt/snappy-java-1.1.7.1.jar:/mnt/spark-streaming-kafka-0-10_2.11-2.4.4.jar:/mnt/spark-tags_2.11-2.4.4.jar")\
    .config("spark.driver.extraClassPath", "/mnt/spark-sql-kafka-0-10_2.11-2.4.4.jar:/mnt/kafka-clients-2.0.0.jar:/mnt/lz4-java-1.4.1.jar:/mnt/org.osgi.core-4.3.0.jar:/mnt/scala-library-2.11.12.jar:/mnt/snappy-java-1.1.7.1.jar:/mnt/spark-streaming-kafka-0-10_2.11-2.4.4.jar:/mnt/spark-tags_2.11-2.4.4.jar")\
    .getOrCreate()

## Leemos el fichero flights.csv que hemos subido a HDFS

Indicamos que contiene encabezados (nombres de columnas) y que intente inferir el esquema, aunque después comprobaremos si lo
ha inferido correctamente o no. En la ruta, hay que indicar explícitamente el protocolo hdfs:// y la dirección IP del namenode
y el puerto de lectura (9000 por defecto). Después, se indica la ruta del archivo, que debería ser /nombre_alumno/flights.csv

In [2]:
flightsDF = spark.read\
            .option("header", True)\
            .option("inferSchema", True)\
            .csv("hdfs://192.168.240.4:9000/Erick_Rubio/flights.csv")

Imprimimos el esquema para comprobar qué tipo de dato ha inferido en cada columna

In [3]:
flightsDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [4]:
flightsDF.count()

162049

Vemos que tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [5]:
flightsDF.head(5)

[Row(year=2014, month=1, day=1, dep_time='1', dep_delay='96', arr_time='235', arr_delay='70', carrier='AS', tailnum='N508AS', flight=145, origin='PDX', dest='ANC', air_time='194', distance=1542, hour='0', minute='1'),
 Row(year=2014, month=1, day=1, dep_time='4', dep_delay='-6', arr_time='738', arr_delay='-23', carrier='US', tailnum='N195UW', flight=1830, origin='SEA', dest='CLT', air_time='252', distance=2279, hour='0', minute='4'),
 Row(year=2014, month=1, day=1, dep_time='8', dep_delay='13', arr_time='548', arr_delay='-4', carrier='UA', tailnum='N37422', flight=1609, origin='PDX', dest='IAH', air_time='201', distance=1825, hour='0', minute='8'),
 Row(year=2014, month=1, day=1, dep_time='28', dep_delay='-2', arr_time='800', arr_delay='-23', carrier='US', tailnum='N547UW', flight=466, origin='PDX', dest='CLT', air_time='251', distance=2282, hour='0', minute='28'),
 Row(year=2014, month=1, day=1, dep_time='34', dep_delay='44', arr_time='325', arr_delay='43', carrier='AS', tailnum='N762

La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como
*no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna
el tipo de dato string (cadena de caracteres). Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark
las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>


Vamos a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time:

In [6]:
from pyspark.sql import functions as F
cuantos_NA = flightsDF\
                .filter(F.col("dep_time") == "NA")\
                .count()
cuantos_NA    

857

Por tanto, hay 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base
a la cantidad de datos que tenemos. En nuestro caso, como tenemos un número considerable de filas, vamos a quitar todas las filas donde hay un NA en cualquiera de las columnas.

In [7]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache()

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

Si ahora mostramos el número de filas que tiene el DataFrame `flightsLimpiado` tras eliminar todas esas filas, veremos que ha disminuido ligeramente (160748) pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [8]:
flightsLimpiado.count()

160748

Una vez que hemos eliminado los NA, vamos a convertir a tipo entero cada una de esas columnas que eran de tipo string. Para ello utilizamos el método withColumn con el mismo nombre de columna que ya tenía para reemplazar una columna por su versión convertida a entero (IntegerType). Ahora
no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número.

In [9]:
from pyspark.sql.types import IntegerType

flightsConvertido = flightsLimpiado
for c in columnas_limpiar:
    flightsConvertido = flightsConvertido\
                            .withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido.cache()

DataFrame[year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: int, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int]

In [10]:
flightsConvertido.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: integer (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Vamos a volver a mostrar las 5 primeras filas del DataFrame limpio. Aparentemente son iguales a las que ya teníamos, pero ahora
Spark sí está tratando como enteros las columnas que deberían serlo, y si queremos podemos hacer operaciones aritméticas
con ellas.

In [11]:
flightsConvertido.head(5)

[Row(year=2014, month=1, day=1, dep_time=1, dep_delay=96, arr_time=235, arr_delay=70, carrier='AS', tailnum='N508AS', flight=145, origin='PDX', dest='ANC', air_time=194, distance=1542, hour=0, minute=1),
 Row(year=2014, month=1, day=1, dep_time=4, dep_delay=-6, arr_time=738, arr_delay=-23, carrier='US', tailnum='N195UW', flight=1830, origin='SEA', dest='CLT', air_time=252, distance=2279, hour=0, minute=4),
 Row(year=2014, month=1, day=1, dep_time=8, dep_delay=13, arr_time=548, arr_delay=-4, carrier='UA', tailnum='N37422', flight=1609, origin='PDX', dest='IAH', air_time=201, distance=1825, hour=0, minute=8),
 Row(year=2014, month=1, day=1, dep_time=28, dep_delay=-2, arr_time=800, arr_delay=-23, carrier='US', tailnum='N547UW', flight=466, origin='PDX', dest='CLT', air_time=251, distance=2282, hour=0, minute=28),
 Row(year=2014, month=1, day=1, dep_time=34, dep_delay=44, arr_time=325, arr_delay=43, carrier='AS', tailnum='N762AS', flight=121, origin='SEA', dest='ANC', air_time=201, distanc

## Ejercicio 1

### Contar el número de aeropuertos distintos que existen como origen de algún vuelo, y el número de aeropuertos distintos que son destino de algún vuelo.
Utilizar para ello el DataFrame `flightsConvertido` que ya tiene los tipos correctos en las columnas

In [12]:
from pyspark.sql.functions import countDistinct, lit

numAeropuertos = flightsConvertido.agg(countDistinct("origin")).selectExpr("`count(DISTINCT origin)` as Total").withColumn("TipoAeropuerto", lit("Origen")).union(\
flightsConvertido.agg(countDistinct("dest")).selectExpr("`count(DISTINCT dest)` as Total").withColumn("TipoAeropuerto", lit("Destino")))

numAeropuertos.select("TipoAeropuerto", "Total").show()

+--------------+-----+
|TipoAeropuerto|Total|
+--------------+-----+
|        Origen|    2|
|       Destino|   71|
+--------------+-----+



## Ejercicio 2

### Para los vuelos que llegan con retraso positivo, calcular para cada aeropuerto de llegada el retraso medio.
El código que calcule esto debería ir encapsulado en una función de Python que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo del retraso medio por aeropuerto, ordenado de mayor a menor retraso medio. La columna creada con el retraso medio debe llamarse `retraso_medio`.

In [13]:
def retrasoMedio(df):
    retrasoMedioOrdenadoDF = df\
                    .filter(F.col("arr_delay") > 0)\
                    .groupBy("dest")\
                    .agg(F.avg(F.col("arr_delay"))).selectExpr("dest as dest", "`avg(arr_delay)` as arr_delay")\
                    .orderBy(F.col("arr_delay").desc())
    return retrasoMedioOrdenadoDF

Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`. ¿Cuáles son los tres aeropuertos con mayor retraso medio? ¿Cuáles son sus retrasos medios en minutos?

In [14]:
retrasoMedio(flightsConvertido).head(3)

[Row(dest='BOI', arr_delay=64.75),
 Row(dest='HDN', arr_delay=46.8),
 Row(dest='SFO', arr_delay=41.193768844221104)]

*Los tres aeropuertos con mayor retraso medio son **BOI, HDN y SFO** con tiempos promedio de 64.75, 46.8 y 41.19 respectivamente*

## Ejercicio 3

###  Ajustar un modelo de DecisionTree de Spark para predecir si un vuelo vendrá o no con retraso, utilizando como variables predictoras el mes, el día del mes, la hora de partida dep_time, la hora de llegada arr_time, el tipo de avión (carrier), la distancia y el tiempo que permanece en el aire.

Notemos que en estos datos hay variables numéricas y variables categóricas que ahora mismo están tipadas como numéricas, como por ejemplo el mes del año (`month`), que es en realidad categórica. Debemos indicar a Spark cuáles son de cada tipo, y para ello aplicaremos un `StringIndexer` sobre las variables categóricas `month` y `carrier` (tipo de avión). El nombre de las columnas indexadas debe ser, respectivamente, `monthIndexed` y `carrierIndexed`.

Recordemos también que Spark requiere que todas las variables estén en una única columna de tipo vector, por lo que después de indexar estas dos variables, tendremos que fusionar en una columna de tipo vector todas ellas, utilizando un `VectorAssembler`. Como es lógico, en el caso de las columnas `month` y `carrier`, no usaremos las originales sino las indexadas en el apartado anterior. La nueva columna de tipo vector creada por `VectorAssembler` con las características ensambladas debe llamarse `features`.

Finalmente, vemos que la columna `arr_delay` es continua, y no binaria como requiere un problema de clasificación con dos clases. Para convertirla en binaria utilizaremos un binarizador de Spark, fijando a 15 el umbral. Consideramos retrasado un vuelo que ha llegado con más de 15 minutos de retraso, y no retrasado en caso contrario. La nueva columna creada con la variable binaria debe llamarse `arr_delay_binary` y debe ser interpretada como la columna target para nuestro algoritmo. Por ese motivo, esta columna **no** debe incluirse entre las columnas que se ensamblan para formar las features.

Tanto los indexadores como el conversor a una única columa formarán parte de un `Pipeline` de Spark, tal como hemos visto en las clases de teoría.

In [15]:
# Antes de empezar, convertimos la variable arr_delay, que era de tipo entero, a números reales puesto que Spark
# necesita que sea un número real para poder binarizarla
from pyspark.sql.types import DoubleType

flightsConvertido = flightsConvertido\
                        .withColumn("arr_delay", F.col("arr_delay").cast(DoubleType())) 

In [16]:
# Primera parte. Creamos dos objetos StringIndexer para las dos columnas que vamos a indexar
from pyspark.ml.feature import StringIndexer, Binarizer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline

# Indicamos que la columna de entrada es month y la que se creará como salida debe llamarse monthIndexed
indexerMonth = StringIndexer().setInputCol("month").setOutputCol("monthIndexed")

# Indicamos que la columna de entrada es carrier y la que se creará como salida debe llamarse carrierIndexed
indexerCarrier = StringIndexer().setInputCol("carrier").setOutputCol("carrierIndexed")

# Segunda parte. Binarizamos la variable arr_delay para usarla después como variable target (clase: 1 o 0)
# Indicamos que la columna de entrada es arr_delay y la de salida es arr_delay_binary
delayBinarizer = Binarizer().setInputCol("arr_delay").setOutputCol("arr_delay_binary").setThreshold(15.0)

# Tercera parte: ensamblamos las columnas que actúan como features en una sola columna
columnas_ensamblar = ["monthIndexed", "carrierIndexed", "day", "dep_time", "arr_time", "distance", "air_time"]
vectorAssembler = VectorAssembler().setInputCols(columnas_ensamblar).setOutputCol("features")

decisionTree = DecisionTreeClassifier().setFeaturesCol("features").setLabelCol("arr_delay_binary")

pipeline = Pipeline().setStages([indexerMonth, indexerCarrier, delayBinarizer, vectorAssembler, decisionTree])

# Ajustamos el pipeline a nuestros datos. No vamos a hacer ninguna división en train/test aunque deberíamos.
pipelineModel = pipeline.fit(flightsConvertido)

# Predecimos el propio conjunto de datos que hemos usado como entrenamiento.
flightsPredictions = pipelineModel.transform(flightsConvertido).cache()

In [17]:
# Veamos en cuántos casos coincide o difiere la predicción respecto a la variable real
aciertosErroresDF = flightsPredictions.groupBy("arr_delay_binary", "prediction").count()
# aciertosErroresDF = flightsPredictions.groupBy("arr_delay_binary", "prediction").agg({'arr_delay':'avg'}).show()
#test = flightsPredictions.filter(F.col("arr_delay_binary") == 1).filter(F.col("prediction") == 0)
#.filter(F.col("dep_time") == "NA")\
#aciertosErroresDF = test.groupBy("arr_delay_binary", "prediction", "arr_delay").agg(F.avg("arr_delay"), F.count("*")).sort("count(1)", ascending=False).show(500)
aciertosErroresDF.show() # lo mostramos por pantalla

+----------------+----------+------+
|arr_delay_binary|prediction| count|
+----------------+----------+------+
|             1.0|       1.0|   772|
|             0.0|       1.0|   245|
|             1.0|       0.0| 23477|
|             0.0|       0.0|136254|
+----------------+----------+------+



¿Qué conclusiones podemos sacar sobre la precisión del clasificador, a la vista de estos resultados? ¿En qué casos falla más?

**RESPUESTA:** El clasificador ha sido capaz de identificar correctamente el % de los registros, en casos como los retrasos mas largos ha hecho un buen trabajo, sin embargo, ha tenido problemas con la estimacion de los retrasos cuando estos se refieren a un periodo de tiempo corto (menos de veinte minutos). Es necesario robustecer el arbol de desicion para mejorar pronosticos con retrasos mas cortos.

## Ejercicio 4

### Utilizaremos Kafka para actualizar en tiempo real el resultado calculado en el Ejercicio 2. 

Para simplificar, asumimos que los mensajes leídos de Kafka tiene solamente dos campos que son los necesarios para la operación anterior: dest y arr_delay. La idea será crear un Streaming DataFrame para leer de Kafka, y después invocar a nuestra función `retrasoMedio` pasándolo como argumento. Vamos a leer del topic `retrasos` por lo que debes indicar esta opción a continuación.

In [None]:
# puerto 9092 para leer de un brocker de Kafka
# Queremos leer del topic "retrasos"
inputDF = spark.readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "192.168.240.2:9092")\
  .option("subscribe", "retrasos")\
  .load()
    
# Mostramos el esquema de este DataFrame
inputDF.printSchema()

Todas estas columnas son creadas automáticamente por Spark cuando leemos de Kafka. De ellas, la que nos interesa es `value` que contiene propiamente el mensaje de Kafka, en formato datos binarios. Tendremos que estructurar estos datos para poder extraer los campos. Al convertir a StringType, tendremos una columna que contendrá en cada fila un fichero JSON completo, tal como se muestra en cada una de las plantillas anteriores. Para extraer sus dos campos como dos columnas distintas `dest` y `arr_delay` con tipos String e Integer respectivamente, utilizamos la función `from_json` de Spark, que se aplica a cada elemento (cada fila) de la columna "value" y parsea el String según un esquema que le indiquemos. 

La función `from_json` devuelve una columna de tipo struct, que en el código hemos llamado "parejas", a cuyos campos internos podemos acceder con el operador '.' para poder llevarlos a columnas separadas. El nombre de esos campos internos coincide con
lo que hayamos indicado en el esquema que le hemos dado a `from_json`.

In [None]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

esquema = StructType([\
  StructField("dest", StringType()),\
  StructField("arr_delay", DoubleType())\
])

parsedDF = inputDF\
    .select("value")\
    .withColumn("value", F.col("value").cast(StringType()))\
    .withColumn("parejas", F.from_json(F.col("value"), esquema))\
    .withColumn("dest", F.col("parejas.dest"))\
    .withColumn("arr_delay", F.col("parejas.arr_delay"))\

# Comprobamos qué esquema tiene ahora parsedDF
parsedDF.printSchema()

Nuestro DataFrame ya contiene una columna `dest` con el nombre del aeropuerto destino y una columna entera arr_delay con el retraso.
Ya podemos efectuar el mismo tipo de agregación que estamos haciendo en nuestra función `retrasoMedio`. Por tanto, 
invocamos a `retrasoMedio` pasando `parsedDF` como argumento.

In [None]:
# Invocamos tal cual a la misma función que hicimos para la agregación del ejercicio 2... ¡ahora en streaming!
retrasoMedioStreamingDF = retrasoMedio(parsedDF)

# indicamos que este DataFrame se vaya imprimiendo por consola cuando se va actualizando,
# y arrancamos la ejecución en Streaming con la acción start()

consoleOutput = retrasoMedioStreamingDF\
                    .writeStream\
                    .queryName("retrasos")\
                    .outputMode("complete")\
                    .format("console")\
                    .start()

# Comando obligado cuando usamos Structured Streaming para evitar cerrar el driver!
consoleOutput.awaitTermination()

Una vez evaluada la celda anterior, perderás definitivamente el control sobre este notebook y sólo podrás volver a recuperarlo si paras manualmente el kernel (pinchando en el cuadrado STOP al lado del botón "Run" en la parte superior). En caso de hacerlo, tendrás que volver a evaluar la primera celda porque la sparkSession se habrá perdido, y también tendrás que volver a evaluar todas las celdas del notebook de las cuales dependa la celda actual. Por eso es buena idea guardar tu trabajo antes de ejecutarla.

Recuerda abrir el productor de Kafka haciendo doble click en `terminal-contenedor-kafka` y pegar allí algunos mensajes en formato JSON que tengan un campo `dest` de tipo cadena y un campo `arr_delay` de tipo número real (positivo o negativo, ya que nuestra función lo filtrará si es negativo), simulando la información que estaríamos recibiendo en tiempo real de los distintos aeropuertos a medida que los vuelos van aterrizando. Puedes copiar y pegar los de esta plantilla para ello. Para pegarlos en el productor, simplemente pulsa el botón derecho del ratón y luego la tecla Enter.

`
{"dest": "BCN", "arr_delay": 4.0}
{"dest": "MAD", "arr_delay": 3.0}
{"dest": "MAD", "arr_delay": -2}
{"dest": "BCN", "arr_delay": 8.0}
{"dest": "GRX", "arr_delay": 2.4}
`

**IMPORTANTE**: la salida no se verá en el Notebook sino en la *consola* que se muestra en la ventana negra que se desplegó al ejecutar docker-compose-up. En esa ventana se muestran los logs de los contenedores docker al arrancar, y también es el lugar donde Spark envía sus salidas mientras está leyendo de Kafka. Por tanto cada versión actualizada de la agregación se mostrará ahí. Ten paciencia, cada actualización tarda un tiempo debido al entorno limitado que tenemos.

**Realiza una captura de pantalla del último batch que se actualice y pégala en la documentación que entregarás.**