# Structured Streaming: convirtiendo consultas batch en streaming

<div class="alert alert-block alert-info">
<p><b>PARA SABER MÁS</b>: Notebook interesante sobre Structured Streaming hecho por Databricks 
    <a target = "_blank" href="https://docs.databricks.com/_static/notebooks/structured-streaming-python.html">aquí</a>
</p>
</div>


## Descripción de las variables

El dataset está compuesto por las siguientes variables:

1. **Year** 2008
2. **Month** 1
3. **DayofMonth** 1-31
4. **DayOfWeek** 1 (Monday) - 7 (Sunday)
5. **DepTime** hora real de salida (local, hhmm)
6. **CRSDepTime** hora prevista de salida (local, hhmm)
7. **ArrTime** hora real de llegada (local, hhmm)
8. **CRSArrTime** hora prevista de llegada (local, hhmm)
9. **UniqueCarrier** código del aparato
10. **FlightNum** número de vuelo
11. **TailNum** identificador de cola: aircraft registration, unique aircraft identifier
12. **ActualElapsedTime** tiempo real invertido en el vuelo
13. **CRSElapsedTime** en minutos
14. **AirTime** en minutos
15. **ArrDelay** retraso a la llegada, en minutos: se considera que un vuelo ha llegado "on time" si aterrizó menos de 15 minutos más tarde de la hora prevista en el Computerized Reservations Systems (CRS).
16. **DepDelay** retraso a la salida, en minutos
17. **Origin** código IATA del aeropuerto de origen
18. **Dest** código IATA del aeropuerto de destino
19. **Distance** en millas
20. **TaxiIn** taxi in time, in minutes
21. **TaxiOut** taxi out time in minutes
22. **Cancelled** *si el vuelo fue cancelado (1 = sí, 0 = no)
23. **CancellationCode** razón de cancelación (A = aparato, B = tiempo atmosférico, C = NAS, D = seguridad)
24. **Diverted** *si el vuelo ha sido desviado (1 = sí, 0 = no)
25. **CarrierDelay** en minutos: El retraso del transportista está bajo el control del transportista aéreo. Ejemplos de sucesos que pueden determinar el retraso del transportista son: limpieza de la aeronave, daño de la aeronave, espera de la llegada de los pasajeros o la tripulación de conexión, equipaje, impacto de un pájaro, carga de equipaje, servicio de comidas, computadora, equipo del transportista, problemas legales de la tripulación (descanso del piloto o acompañante) , daños por mercancías peligrosas, inspección de ingeniería, abastecimiento de combustible, pasajeros discapacitados, tripulación retrasada, servicio de inodoros, mantenimiento, ventas excesivas, servicio de agua potable, denegación de viaje a pasajeros en mal estado, proceso de embarque muy lento, equipaje de mano no válido, retrasos de peso y equilibrio.
26. **WeatherDelay** en minutos: causado por condiciones atmosféricas extremas o peligrosas, previstas o que se han manifestado antes del despegue, durante el viaje, o a la llegada.
27. **NASDelay** en minutos: retraso causado por el National Airspace System (NAS) por motivos como condiciones meteorológicas (perjudiciales pero no extremas), operaciones del aeropuerto, mucho tráfico aéreo, problemas con los controladores aéreos, etc.
28. **SecurityDelay** en minutos: causado por la evacuación de una terminal, re-embarque de un avión debido a brechas en la seguridad, fallos en dispositivos del control de seguridad, colas demasiado largas en el control de seguridad, etc.
29. **LateAircraftDelay** en minutos: debido al propio retraso del avión al llegar, problemas para conseguir aterrizar en un aeropuerto a una hora más tardía de la que estaba prevista.

#### Descargamos una versión del dataset reducido dividida en 10 ficheros pequeños y los subimos a la carpeta /tmp/flightsFolder de HDFS

**Vamos a simular** procesamiento en Streaming leyendo cada vez un fichero de esa carpeta, de los 10 existentes, como si hubiese un proceso externo que los va creando en esa carpeta (aunque ya existan todos desde el principio; por eso los vamos procesando uno a uno). Podéis probar a no subir los 10 sino subir solo 3 o 4, y después de un rato, en cualquier momento subir más ficheros.

In [1]:
#!wget https://github.com/olbapjose/xapi-clojure/raw/master/flightsFolder.zip
#!unzip flightsFolder.zip
#!hdfs dfs -copyFromLocal flightsFolder /tmp

--2021-09-27 19:24:56--  https://github.com/olbapjose/xapi-clojure/raw/master/flightsFolder.zip
Resolviendo github.com (github.com)... 140.82.121.4
Conectando con github.com (github.com)[140.82.121.4]:443... conectado.
Petición HTTP enviada, esperando respuesta... 302 Found
Ubicación: https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flightsFolder.zip [siguiente]
--2021-09-27 19:24:57--  https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flightsFolder.zip
Resolviendo raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.111.133, 185.199.109.133, 185.199.108.133, ...
Conectando con raw.githubusercontent.com (raw.githubusercontent.com)[185.199.111.133]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 2728655 (2,6M) [application/zip]
Guardando como: “flightsFolder.zip”


2021-09-27 19:24:59 (7,10 MB/s) - “flightsFolder.zip” guardado [2728655/2728655]

Archive:  flightsFolder.zip
   creating: flightsFolder/
  inflating: 

In [None]:
#!hdfs dfs -ls /tmp/flightsFolder

#### Descargamos también una versión completa de ese dataset, que todavía no incluye la columna ArrDelayCat que le vamos a añadir a continuación. Los 10 ficheros anteriores sí tienen ya esa columna y siguen el mismo esquema que este fichero. El objetivo de descargar este fichero es para aprovechar su esquema y no tener que construir nosotros a mano un esquema

In [2]:
#!wget https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flights_jan08.csv
#!hdfs dfs -copyFromLocal flights_jan08.csv /tmp

--2021-09-27 19:28:51--  https://raw.githubusercontent.com/olbapjose/xapi-clojure/master/flights_jan08.csv
Resolviendo raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.108.133, 185.199.109.133, ...
Conectando con raw.githubusercontent.com (raw.githubusercontent.com)[185.199.110.133]:443... conectado.
Petición HTTP enviada, esperando respuesta... 200 OK
Longitud: 9719583 (9,3M) [text/plain]
Guardando como: “flights_jan08.csv”


2021-09-27 19:28:53 (9,23 MB/s) - “flights_jan08.csv” guardado [9719583/9719583]



In [4]:
from pyspark.sql import SparkSession

spark = SparkSession\
    .builder\
    .appName("Tema2")\
    .getOrCreate()

In [5]:
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

# Leemos los datos, quitamos filas con NA y convertimos a numérico
flightsDF = spark.read\
                 .option("header", "true")\
                 .option("inferSchema", "true")\
                 .csv("flights_jan08.csv")

cleanFlightsDF = flightsDF.where("ArrDelay != 'NA' and DepDelay != 'NA'")\
                          .withColumn("ArrDelay", F.col("ArrDelay").cast(IntegerType()))\
                          .withColumn("DepDelay", F.col("DepDelay").cast(IntegerType()))\
                          .withColumn("ArrDelayCat", F.when(F.col("ArrDelay") < 15, "None")\
                                                      .when((F.col("ArrDelay") >= 15) & (F.col("ArrDelay") < 60), "Slight")\
                                                      .otherwise("Huge"))\
                          .cache() # we will be working with it from now on!

### Agregaciones en streaming

<div class="alert alert-block alert-success">
<p><b>PREGUNTA</b>: ¿Cuál es el retraso medio por cada destino para vuelos que salen de SFO?
    Convierte esta consulta en streaming</p>
</div>

In [14]:
flightsSchema = cleanFlightsDF.schema # in Structured Streaming the schema is mandatory

streamingFlights = spark.readStream.schema(flightsSchema)\
                        .option("maxFilesPerTrigger", 1)\
                        .csv("flightsFolder") # leer un solo fichero nuevo en cada operación de lectura de esta carpeta

# Operación de agregación, igual que con un DataFrame convencional
# COMPLETA LA CONSULTA: de los vuelos que salen de SFO, calcular el retraso medio para cada destino
largestAverageSFOstreamingDF = streamingFlights.<COMPLETAR>

# Ahora escribimos continuamente el resultado. Solo para test, escribimos en memoria
countQuery = largestAverageSFOstreamingDF\
                .writeStream.queryName("meanArrDelaySFO")\
                .format("memory")\
                .outputMode("complete")\
                .start() # esto lanza el stream

# Puesto que el driver es este notebook de Jupyter y no lo pensamos cerrar,
# hasta que hayamos visualizado correctamente todas las salidas, entonces podemos omitir la línea siguiente

#countQuery.awaitTermination() # obligatorio en aplicaciones en producción para evitar que finalice el Driver

import time 

resultDF = spark.sql("select * from meanArrDelaySFO")

resultDF.show()

time.sleep(15)

resultDF.show()

time.sleep(10)

resultDF.show()

time.sleep(5)

resultDF.show()

time.sleep(5)

resultDF.show()

time.sleep(5)

AttributeError: 'DataFrame' object has no attribute 'avg'

### Escribir en memoria implica que se crea una tabla automáticamente en memoria con el nombre de la consulta

En la celda anterior, lo que hemos hecho es que vamos consultando periódicamente el contenido de esa tabla y vemos que cada vez es distinto (va cambiando a lo largo del tiempo a medida que Spark va procesando nuevos ficheros y actualizando el resultado en la tabla)