# **Big Data Pipeline: HDFS, Spark, and Kafka Integration**

---

Este notebook debe quedar alojado en la carpeta GCS del cluster creado. Hay que seleccionar el kernel de PySpark para ejecutarlo correctamente.

## Uso de Apache Spark

### Análisis y tratamiento de los datos

In [1]:
# El archivo flights.csv contiene información sobre vuelos, como el año, el mes, el día, la hora, el origen, el destino, la duración del vuelo, el retraso en la salida y el retraso en la llegada
ruta_hdfs = "/DataCluster_Test/flights.csv" # Ruta HDFS en la que se encuentra el archivo flights.csv

# Cargo el archivo flights.csv en un DataFrame con PySpark
# Indico que el archivo contiene encabezados y que intente inferir el esquema.
flightsDF = spark.read.option("header", "true").option("inferSchema", "true").csv(ruta_hdfs) 


                                                                                

Imprimo el esquema para comprobar si ha inferido correctamente el tipo de dato en cada columna.

In [2]:
flightsDF.printSchema() # Muestro el esquema

flightsDF.count() # Cuento la cantidad de registros

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



162049

Veo que tenemos 162049 registros, coincide con el archivo. Si imprimo por pantalla las 5 primeras filas, veré qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [3]:
flightsDF.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|       70|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|      -23|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|       -4|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|      -23|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|       43|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

Al comparar lo inferido por Spark con el valor que tienen en el dataset algunas columnas, se puede ver que no les ha asignado el tipo de dato correctamente. La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como *no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna el tipo de dato string. 

Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>


Voy a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time por ejemplo:

In [4]:
from pyspark.sql import functions as F
cuantos_NA = flightsDF\
                .where(F.col("dep_time") == "NA")\
                .count()
cuantos_NA

                                                                                

857

Existen 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). 

Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base a la cantidad de datos que tenemos. En este caso, el dataset dispone de un número considerable de filas en total en comparación con las que tienen valores nulos, así que quitaré todas las filas donde hay un NA en cualquiera de las columnas.

In [5]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # Para cada columna, me quedo solo con las filas que no tienen NA en esa columna
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache() # Guardo el DataFrame en memoria para que las operaciones sean más rápidas

DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

Una vez que se han eliminado los NA, voy a convertir a entero cada una las columnas que erróneamente eran de tipo string.

Ahora no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número. 
Convertiré también la columna `arr_delay` de entero a número real, será necesario para los pasos posteriores donde ajustaré un modelo predictivo.

In [6]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar: # Método que crea una columna o reemplaza una existente con el mismo nombre, pero con un tipo de dato diferente
    flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType())) # Cambio el tipo de dato de la columna arr_delay a DoubleType
flightsConvertido.cache()  # Cacheo el DataFrame para que las operaciones sean más rápidas

DataFrame[year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: double, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int]

In [7]:
flightsConvertido.printSchema() # Muestro el esquema

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Ahora el esquema si muestra el tipo de datos correcto en cada columna y Spark si está tratando como enteros las columnas que deberían serlo, y si quisiera podría hacer operaciones aritméticas
con ellas.

### Tareas varias

Partiendo del DataFrame `flightsConvertido` en cada tarea: 

#### Tarea 1

- Crear un nuevo DataFrame llamado `aeropuertosOrigenDF` que tenga una columna `origin` y que tenga tantas filas como aeropuertos distintos de *origen* existan. ¿Cuántas filas tiene? Almacenar dicho recuento en la variable entera `n_origen`.

In [8]:
aeropuertosOrigenDF = flightsConvertido.select("origin").distinct()
n_origen = aeropuertosOrigenDF.count()
print(n_origen)



2


                                                                                

#### Tarea 2

- Crear un nuevo DataFrame llamado `rutasDistintasDF` que tenga dos columnas `origin`, `dest` y que tenga tantas filas como rutas diferentes existan (es decir, como combinaciones distintas haya entre un origen y un destino). Una vez creado, contar cuántas combinaciones hay, almacenando dicho recuento en la variable entera `n_rutas`.

In [9]:
rutasDistintasDF = flightsConvertido.select("origin", "dest").distinct()
n_rutas = rutasDistintasDF.count()
print(n_rutas)

115


#### Tarea 3

- Calcular, *sólo para los vuelos que llegan con* ***retraso positivo***, el retraso medio a la llegada de dichos vuelos, para cada aeropuerto de destino. La nueva columna con el retraso medio a la llegada debe llamarse `retraso_medio`. El DF resultante debe estar ordenado de mayor a menor retraso medio. El código que calcule esto debería ir encapsulado en una función de Python llamada `retrasoMedio` que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo descrito anteriormente.
  
  Una vez hecha la función, invocarla pasándole como argumento `flightsConvertido` y almacenar el resultado devuelto en la variable `retrasoMedioDF`.

In [10]:
from pyspark.sql.functions import avg

def retrasoMedio(df):                          # Función que recibe un DataFrame y devuelve un nuevo DataFrame con el retraso medio por destino
    df_filtrado = df.filter(df.arr_delay > 0)
    df_resultado = df_filtrado.groupBy("dest") \
                              .agg(avg("arr_delay").alias("retraso_medio")) \
                              .orderBy("retraso_medio", ascending=False)
    return df_resultado

retrasoMedioDF = retrasoMedio(flightsConvertido) # DataFrame con el retraso medio por destino

Ahora llamo a la función `retrasoMedio` pasándole como argumento `flightsConvertido`. 
- ¿Cuáles son los tres aeropuertos con mayor retraso medio? 
- ¿Cuáles son sus retrasos medios en minutos?

In [11]:
retrasoMedioDF = retrasoMedio(flightsConvertido)
retrasoMedioDF.show(3)

+----+------------------+
|dest|     retraso_medio|
+----+------------------+
| BOI|             64.75|
| HDN|              46.8|
| SFO|41.193768844221104|
+----+------------------+
only showing top 3 rows



### Aplicación de Spark MLlib

A continuación, voy a ajustar un modelo de DecisionTree de Spark para predecir si un vuelo vendrá o no con retraso (problema de clasificación binaria), utilizando como variables predictoras el mes, el día del mes, la hora de partida (`dep_time`), la hora de llegada (`arr_time`), el tipo de avión (`carrier`), la distancia y el tiempo que permanece en el aire.

#### Indexación

En estos datos hay variables numéricas y variables categóricas que ahora mismo están tipadas como numéricas, como por ejemplo el mes del año (`month`), que es en realidad categórica. Hay que indicar a Spark cuáles son categóricas e indexarlas. Para ello tengo que: 

- Crear un `StringIndexer` al que llamaré `indexerMonth` y otro al que llamaré `indexerCarrier` sobre las variables categóricas `month` y `carrier`. El nombre de las columnas indexadas que se van a crear son, respectivamente, `monthIndexed` y `carrierIndexed`.

In [12]:
from pyspark.ml.feature import StringIndexer # Importo la clase StringIndexer

indexerMonth = StringIndexer(inputCol="month", outputCol="monthIndexed") # Creo un objeto StringIndexer para la columna month

indexerCarrier = StringIndexer(inputCol="carrier", outputCol="carrierIndexed") # Creo un objeto StringIndexer para la columna carrier

#### Ensamble

Spark requiere que todas las variables estén en una única columna de tipo vector, por lo que después de indexar estas dos variables, hay que fusionarlas en una columna de tipo vector todas ellas, utilizando un `VectorAssembler`. Por tanto:

- Crearé en una variable llamada `vectorAssembler` un `VectorAssembler` que reciba como entrada una lista de todas las variables de entrada (sin incluir `arr_delay`) que serán las que formarán parte del modelo. 
  
    Crearé primero esta lista de variables (lista de strings) en la variable `columnas_ensamblar` y pasaré dicha variable como argumento al crear el `VectorAssembler`. En el caso de las columnas `month` y `carrier`, no usaré las variables originales sino las indexadas en el apartado anterior. La columna de tipo vector creada con las características ensambladas se llamará `features`.

In [13]:
from pyspark.ml.feature import VectorAssembler # Importo la clase VectorAssembler

columnas_ensamblar = ["monthIndexed", "carrierIndexed", "day", "dep_time", "arr_time", "distance", "air_time"] # Columnas que quiero ensamblar

vectorAssembler = VectorAssembler(inputCols=columnas_ensamblar, outputCol="features")  # Creo un objeto VectorAssembler

In [14]:
assert(isinstance(vectorAssembler, VectorAssembler))
assert(vectorAssembler.getOutputCol() == "features")
input_cols = vectorAssembler.getInputCols() 
assert(len(input_cols) == 7)
assert("arr_delay" not in input_cols)

#### Binarización de la columna objetivo

Finalmente, veo que la columna `arr_delay` es continua, y no binaria como requiere un problema de clasificación con dos clases. Por tanto, voy a convertirla en binaria. Para ello:

-  Utilizaré un binarizador de Spark, fijando a 15 el umbral, y guardarlo en la variable `delayBinarizer`. Consideraré retrasado un vuelo que llegue con más de 15 minutos de retraso, y no retrasado en caso contrario. La nueva columna creada con la variable binaria se llamará `arr_delay_binary` y será interpretada como la columna target para el algoritmo. Por ese motivo, esta columna **no** se incluyó en el apartado anterior entre las columnas que se ensamblan para formar las features.

In [15]:
from pyspark.ml.feature import Binarizer

delayBinarizer = Binarizer(threshold=15, inputCol="arr_delay", outputCol="arr_delay_binary") # Creo un objeto Binarizer para la columna arr_delay con un threshold de 15 minutos de retraso en la llegada

#### Creación del modelo de árbol de decisión

Por último, creo el modelo de clasificación.

- Creo en una variable `decisionTree` un árbol de clasificación de Spark (`DecisionTreeClassifier` del paquete `pyspark.ml.classification`)
- Indico como columna de entrada la columna creada por el `VectorAssembler`.
- Indico como columna objetivo (target) la columna creada por el `Binarizer`.

In [16]:
from pyspark.ml.classification import DecisionTreeClassifier

decisionTree = DecisionTreeClassifier(featuresCol="features", labelCol="arr_delay_binary") # Creo un objeto DecisionTreeClassifier con las columnas features y arr_delay_binary

Ahora encapsulo todas las fases en un sólo pipeline y lo entrenaré:

- Crearé en una variable llamada `pipeline` un objeto `Pipeline` de Spark con las etapas anteriores en el orden adecuado para poder entrenar un modelo. 

- Entrenarlo llamando al método `fit` y guardaré el pipeline entrenado devuelto en una variable llamada `pipelineModel`. 

- Aplico el pipeline entrenado para predecir el DataFrame `flightsConvertido`, guardando las predicciones devueltas en la variable `flightsPredictions` que será un DataFrame. 

- Crearé un evaluador con el fin de calcular la precisión del modelo.

In [17]:
from pyspark.ml import Pipeline

trainData, testData = flightsConvertido.randomSplit([0.8, 0.2], seed=7) # Divido los datos en entrenamiento y test (80% - 20%)

pipeline = Pipeline(stages=[indexerMonth, indexerCarrier, vectorAssembler, delayBinarizer, decisionTree]) # Creo un objeto Pipeline con los distintos objetos

pipelineModel = pipeline.fit(flightsConvertido) # Entreno el modelo

flightsPredictions = pipelineModel.transform(flightsConvertido) # Realizo las predicciones

flightsPredictions.select("features", "prediction", "arr_delay_binary").show() # Muestro algunas predicciones

                                                                                

+--------------------+----------+----------------+
|            features|prediction|arr_delay_binary|
+--------------------+----------+----------------+
|[10.0,0.0,1.0,1.0...|       1.0|             1.0|
|[10.0,6.0,1.0,4.0...|       0.0|             0.0|
|[10.0,4.0,1.0,8.0...|       0.0|             0.0|
|[10.0,6.0,1.0,28....|       0.0|             0.0|
|[10.0,0.0,1.0,34....|       1.0|             1.0|
|[10.0,3.0,1.0,37....|       0.0|             1.0|
|[10.0,4.0,1.0,346...|       0.0|             1.0|
|[10.0,4.0,1.0,526...|       0.0|             0.0|
|[10.0,4.0,1.0,527...|       0.0|             1.0|
|[10.0,4.0,1.0,536...|       0.0|             0.0|
|[10.0,4.0,1.0,541...|       0.0|             0.0|
|[10.0,6.0,1.0,549...|       0.0|             0.0|
|[10.0,3.0,1.0,550...|       0.0|             0.0|
|[10.0,5.0,1.0,557...|       0.0|             0.0|
|[10.0,0.0,1.0,557...|       0.0|             0.0|
|[10.0,0.0,1.0,558...|       0.0|             0.0|
|[10.0,9.0,1.0,559...|       0.

Muestro la matriz de confusión. Agrupo por la variable que tiene la clase verdadera y la que tiene la clase predicha, para ver en cuántos casos coinciden y en cuántos difieren.

In [18]:
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().show()



+----------------+----------+------+
|arr_delay_binary|prediction| count|
+----------------+----------+------+
|             1.0|       1.0|   530|
|             0.0|       1.0|    70|
|             1.0|       0.0| 23719|
|             0.0|       0.0|136429|
+----------------+----------+------+



                                                                                

In [19]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator( # Creo un evaluador para la precisión del modelo
    labelCol="arr_delay_binary",
    predictionCol="prediction",
    metricName="accuracy"
)

accuracy = evaluator.evaluate(flightsPredictions) # Calculo la precisión del modelo
print(f"Accuracy del modelo: {accuracy:.2f}")




Accuracy del modelo: 0.85


                                                                                

## Manejo de Spark Streaming y Kafka

Ahora se va a implementar un flujo de datos en tiempo real utilizando Apache Kafka y Spark Streaming. El objetivo es leer mensajes JSON desde un topic de Kafka, estructurar y transformar estos datos en columnas estructuradas para analizar y actualizar dinámicamente los resultados mediante el cálculo del retraso promedio en los vuelos.

Lo primero que haré es crear un topic llamado "retrasos" en Apache Kafka. Un topic representa una cola lógica donde se publican y almacenan mensajes. Es el mecanismo principal para organizar y categorizar los datos en Kafka.

- Comando de creación del topic -> /usr/lib/kafka/bin/kafka-topics.sh --bootstrap-server cluster1-w-0:9092 --create --replication-factor 1 --partitions 1 --topic retrasos

Para comprobar si se ha creado correctamente puedo usar el siguiente comando:
- Comando para obtener una lista de los topics existentes -> /usr/lib/kafka/bin/kafka-topics.sh --bootstrap-server cluster1-w-0:9092 --list 

Debería aparecer que se ha creado el topic "retrasos" 

*Para los comandos anteriores "cluster1" debe ser sustituido por el nombre del cluster en cuestión.*


Partiré de la función `retrasoMedio`, creada en la Tarea 3.

**El objetivo es calcular para cada aeropuerto de llegada el retraso medio de los vuelos que llegan con retraso positivo.**

Copio la función retrasoMedio, pero le añodo una columna al final llamada `Ingeniero_responsable`. De forma que ahora el DataFrame devuelto por la función tendrá tres columnas: `dest`, `retraso_medio` e `Ingeniero_responsable`.

In [20]:
# 
def retrasoMedio(df):
    df_filtrado = df.filter(df.arr_delay > 0)
    df_resultado = df_filtrado.groupBy("dest") \
                              .agg(avg("arr_delay").alias("retraso_medio")) \
                              .orderBy("retraso_medio", ascending=False) \
                              .withColumn("Ingeniero_responsable", F.lit("Rubén Jaime")) # Añado una columna con mi nombre
    return df_resultado

### Procesamiento en tiempo real con Kafka: Transformación y análisis de datos en streaming

Utilizaremos Kafka para actualizar en tiempo real el resultado calculado en el apartado anterior. 

Asumiré que los mensajes leídos de Kafka tienen solamente dos campos que son los únicos necesarios para llevar a cabo la operación anterior: dest y arr_delay. La idea será crear un Streaming DataFrame para leer de Kafka, y después a la función retrasoMedio pasándolo como argumento. Voy a leer del topic `retrasos` por lo que debe quedar indicado.

El paso a paso para crear la variable `retrasosStreamingDF`, un Streaming DataFrame leyendo de Apache Kafka, es el siguiente:
  - Uso la variable `readStream` interna de la SparkSession `spark` (en lugar de `read` como se usó anteriormente).
  
  - Indico que el formato es `"kafka"`.

  - Indico cuáles son los brokers de Kafka de los que voy a leer y el puerto al que me quiero conectar para leer (9092 es el que usa Kafka por defecto), con `.option("kafka.bootstrap.servers", "nombre_cluster-w-0:9092,nombre_cluster-w-1:9092")`. De esta manera puedo leer el mensaje si el productor de Kafka lo envía a cualquiera de los dos brokers existentes, que son los nodos del cluster identificados como `nombre_cluster-w-0` y `nombre_cluster-w-1`
  
  - Indico que quiero suscribirme al topic `"retrasos"`, así el programa escuchará continuamente el topic retrasos y leerá todos los mensajes que se publiquen en él.
  
  - Finalmente añado `load()` para realizar la lectura.

In [21]:
# Creación de un DataFrame con el retraso medio por destino y el ingeniero responsable
retrasosStreamingDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "cluster1-w-0:9092,cluster1-w-1:9092") \
    .option("subscribe", "retrasos") \
    .load()

retrasosStreamingDF.printSchema() # Muestro el esquema del DataFrame de streaming 

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



También he mostrado el esquema del DataFrame resultante. Todas las columnas que aparecen son creadas automáticamente por Spark cuando leemos de Kafka. De ellas, la que me interesa es `value` que contiene el mensaje de Kafka, en formato de datos binarios. 

Tendré que estructurar estos datos para poder extraer los campos. Para ello:

- Convierto la columna `value` a tipo String. De esta forma tendremos una columna que contendrá en cada fila** un fichero JSON completo. 

- Necesito extraer los dos campos de cada uno de los JSON y convertirlos en una columna llamada `parejas`, de tipo `struct`, utilizó la función `from_json` de Spark, que se aplica a cada fila de la columna "value" y parsea el String según un esquema indicado, devolviendo una columna de tipo `struct` (una estructura formada por dos campos de tipo String e Integer respectivamente).

- `parejas` al ser de tipo `struct`, permite acceder a cada uno de sus dos campos (`dest` y `arr_delay`) con el operador . (punto). Utilizando `withColumn` dos veces, crea dos columnas llamadas `dest` y `arr_delay` como el resultado de acceder a `parejas.dest` y `parejas.arr_delay` respectivamente.

In [22]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
import pyspark.sql.functions as F

esquema = StructType([                              # Creo un esquema para el DataFrame de streaming
    StructField("dest", StringType(), True),        # Campo "dest" de tipo StringType
    StructField("arr_delay", DoubleType(), True)    # Campo "arr_delay" de tipo DoubleType
])

# Parseo el DataFrame de streaming para obtener las columnas "dest" y "arr_delay" con los tipos de datos definidos en el esquema
parsedDF = retrasosStreamingDF \
    .select("value") \
    .withColumn("value", F.col("value").cast(StringType())) \
    .withColumn("parejas", F.from_json(F.col("value"), esquema)) \
    .withColumn("dest", F.col("parejas.dest")) \
    .withColumn("arr_delay", F.col("parejas.arr_delay"))


El DataFrame ya contiene una columna `dest` con el nombre del aeropuerto destino y una columna de números reales `arr_delay` con el retraso. Ahora puedo efectuar el mismo tipo de agregación que estaba haciendo en la función `retrasoMedio`. Por tanto, llamo a `retrasoMedio` pasando `parsedDF` como argumento.

In [23]:
retrasoMedioStreamingDF = retrasoMedio(parsedDF) # Creo un DataFrame de streaming con el retraso medio por destino y el ingeniero responsable

# Escribo el DataFrame de streaming en memoria para poder consultarlo con Spark SQL cuando se va actualizando y arranco la ejecución en streaming con start() 
consoleOutput = retrasoMedioStreamingDF\
                    .writeStream\
                    .queryName("retrasosAgg")\
                    .outputMode("complete")\
                    .format("memory")\
                    .start()


25/01/22 11:59:33 WARN org.apache.spark.sql.streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-409f0c09-93e5-4fc0-9a40-8b50dad64562. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/01/22 11:59:33 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


Con la celda anterior ejecutada, abro el kafka-console-producer (que es un productor de Kafka que envía los mensajes que escribamos por teclado al topic que le indiquemos) entrando por SSH a cualquiera de las máquinas. Hay que cambiar el nombre del clúster por el que se tenga en cada caso (se puede ver en la propia terminal, justo detrás de @: es el nombre que viene antes de «-m»): 

Comando para ejecutar el kafka-console-producer -> /usr/lib/kafka/bin/kafka-console-producer.sh --broker-list cluster1-w-0:9092 --topic retrasos 

- El productor de consola suele utilizarse para desarrollo y testeo, pero nunca para entornos productivos. Tras cada mensaje, debemos pulsar ENTER. El nombre del bróker de Kafka al que va dirigido el mensaje puede ser tanto el worker 0 como el 1 del clúster. Lo que se indica es el nombre de la máquina, como nombrecluster-w-0 (o bien w-1).  

- Cada mensaje debe ser una línea de texto con estructura JSON. Tras cada mensaje, debemos pulsar ENTER para que el productor lo envíe a Kafka.  

- El programa no admite borrado de caracteres, pero puede finalizarse en cualquier momento pulsando Ctrl + C, y volverse a ejecutar escribiendo de nuevo el comando anterior en la terminal.

Cada vez que pegue un mensaje, ejecutaré la consulta `select * from retrasosAgg` a través del método `spark.sql(...)` y mostraré el DataFrame `agregadosDF` devuelto por dicho método. Eso hará una consulta contra la vista temporal (volátil) `retrasosAgg` que se ha creado en el metastore de Hive gracias al `writeStream` anterior. 

Ejecutaré la celda de `show` tantas veces como sea necesario hasta ver un resultado distinto al que se ve en la ejecución anterior, para asegurarme de que Spark ya ha leído e incorporado el nuevo dato en su cálculo de la agregación y por tanto ha actualizado el resultado.

El método `.sql(...)` es una transformación, y por tanto, se re-ejecuta la consulta cada vez que invoco a la acción `show()` sobre el resultado, ya que no voy a cachear nada, precisamente para forzar la reevaluación de la consulta y poder ver así el contenido actualizado de dicha tabla (en memoria) de Hive cada vez que hacemos `show()`.

- No hay problema por evaluar muchas veces una misma celda, ya que el cálculo sólo se actualizará una vez. Las siguientes veces que la ejecute seguirá mostrando el mismo resultado mientras no se envíe otro nuevo mensaje en Kafka.

Introduciré los siguientes mensajes JSON uno por uno en el productor de consola:

- {"dest": "GRX", "arr_delay": 2.6}
- {"dest": "MAD", "arr_delay": 5.4}
- {"dest": "GRX", "arr_delay": 1.5}
- {"dest": "MAD", "arr_delay": 20.0}

<div style="text-align: center;">
  <img src="images/10kafkamensajes.png">
</div>


Estos mensajes tienen un campo `dest` y un campo `arr_delay`, simulando la información que estaríamos recibiendo en tiempo real de los distintos aeropuertos a medida que los vuelos van aterrizando. 

In [29]:
agregadosDF = spark.sql("SELECT * FROM retrasosAgg") # Consulto el DataFrame de streaming con Spark SQL



In [32]:
agregadosDF.show() # Ejecuto varias veces esta celda tras enviar el primer mensaje, hasta ver que el DataFrame no es vacío

+----+-------------+---------------------+
|dest|retraso_medio|Ingeniero_responsable|
+----+-------------+---------------------+
| GRX|          2.6|          Rubén Jaime|
+----+-------------+---------------------+



                                                                                

Se recibe correctamente el primer mensaje. 

In [33]:
agregadosDF.show() # Ejecuto varias veces esta celda tras enviar el segundo mensaje, hasta ver que el DataFrame ha cambiado

+----+-------------+---------------------+
|dest|retraso_medio|Ingeniero_responsable|
+----+-------------+---------------------+
| MAD|          5.4|          Rubén Jaime|
| GRX|          2.6|          Rubén Jaime|
+----+-------------+---------------------+



                                                                                

Se recibe correctamente el segundo mensaje. 

In [35]:
agregadosDF.show() # Ejecuta varias veces esta celda tras enviar el tercer mensaje, hasta ver que el DataFrame ha cambiado

+----+-------------+---------------------+
|dest|retraso_medio|Ingeniero_responsable|
+----+-------------+---------------------+
| MAD|          5.4|          Rubén Jaime|
| GRX|         2.05|          Rubén Jaime|
+----+-------------+---------------------+



                                                                                

Se recibe correctamente el tercer mensaje, ya que el retraso medio de GRX ahora refleja la media del retraso del 1er mensaje y del 3er mensaje.

In [36]:
agregadosDF.show() # Ejecuta varias veces esta celda tras enviar el cuarto mensaje, hasta ver que el DataFrame ha cambiado

+----+-------------+---------------------+
|dest|retraso_medio|Ingeniero_responsable|
+----+-------------+---------------------+
| MAD|         12.7|          Rubén Jaime|
| GRX|         2.05|          Rubén Jaime|
+----+-------------+---------------------+



Se recibe correctamente el cuarto mensaje, ya que el retraso medio de MAD ahora refleja la media del retraso del 2o mensaje y del 4o mensaje, consiguiendo así el objetivo propuesto.