# Actividad 1: HDFS, Spark SQL y MLlib

## Recuerda borrar siempre las líneas que dicen `raise NotImplementedError`

Lee con detenimiento cada ejercicio. Las variables utilizadas para almacenar las soluciones, al igual que las nuevas columnas creadas, deben llamarse **exactamente** como indica el ejercicio, o de lo contrario los tests fallarán y el ejercicio no puntuará. Debe reemplazarse el valor `None` al que están inicializadas por el código necesario para resolver el ejercicio.

## Leemos el fichero flights.csv que hemos subido a HDFS

Indicamos que contiene encabezados (nombres de columnas) y que intente inferir el esquema, aunque después comprobaremos si lo
ha inferido correctamente o no. La ruta del archivo en HDFS debería ser /<nombre_alumno>/flights.csv

In [1]:
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType

# Ruta del archivo en HDFS
ruta_hdfs = "david_figueroa/flights.csv"

# Lectura del CSV desde HDFS
flightsDF = spark.read.option("header", "true") \
                      .option("inferSchema", "true") \
                      .option("delimiter", "\t") \
                      .csv(f"/{ruta_hdfs}")

# Comprobar cantidad de valores "NA" en dep_time
cuantos_NA = flightsDF.where(F.col("dep_time") == "NA").count()

# Eliminar filas con "NA" en columnas clave
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]
flightsLimpiado = flightsDF
for col in columnas_limpiar:
    flightsLimpiado = flightsLimpiado.where(F.col(col) != "NA")
flightsLimpiado.cache()

# Convertir columnas a Integer y arr_delay a Double
flightsConvertido = flightsLimpiado
for col in columnas_limpiar:
    flightsConvertido = flightsConvertido.withColumn(col, F.col(col).cast(IntegerType()))
flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()
flightsConvertido.printSchema()
flightsConvertido.show(5)

                                                                                

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



[Stage 5:>                                                          (0 + 1) / 1]

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|    -23.0|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|     -4.0|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|    -23.0|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

                                                                                

Imprimimos el esquema para comprobar qué tipo de dato ha inferido en cada columna

In [2]:
flightsDF.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: string (nullable = true)
 |-- dep_delay: string (nullable = true)
 |-- arr_time: string (nullable = true)
 |-- arr_delay: string (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: string (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: string (nullable = true)
 |-- minute: string (nullable = true)



Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [3]:
flightsDF.count()

162049

Vemos que tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [4]:
flightsDF.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|       70|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|      -23|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|       -4|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|      -23|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|       43|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como
*no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna
el tipo de dato string (cadena de caracteres). Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark
las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>


Vamos a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time:

In [5]:
from pyspark.sql import functions as F
cuantos_NA = flightsDF\
                .where(F.col("dep_time") == "NA")\
                .count()
cuantos_NA

857

Por tanto, hay 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base
a la cantidad de datos que tenemos. En nuestro caso, como tenemos un número considerable de filas, vamos a quitar todas las filas donde hay un NA en cualquiera de las columnas.

In [6]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # para cada columna, nos quedamos con las filas que no tienen NA en esa columna
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache()

25/08/03 21:51:32 WARN org.apache.spark.sql.execution.CacheManager: Asked to cache already cached data.


DataFrame[year: int, month: int, day: int, dep_time: string, dep_delay: string, arr_time: string, arr_delay: string, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: string, distance: int, hour: string, minute: string]

Si ahora mostramos el número de filas que tiene el DataFrame `flightsLimpiado` tras eliminar todas esas filas, vemos que ha disminuido ligeramente
pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [7]:
flightsLimpiado.count()

                                                                                

160748

Una vez que hemos eliminado los NA, vamos a convertir a tipo entero cada una de esas columnas que eran de tipo string. 
Ahora no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número. Vamos también a convertir la columna `arr_delay` de tipo entero a número real, necesario para los pasos posteriores donde ajustaremos un modelo predictivo.

In [8]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar:
    # método que crea una columna o reemplaza una existente
    flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()

25/08/03 21:51:35 WARN org.apache.spark.sql.execution.CacheManager: Asked to cache already cached data.


DataFrame[year: int, month: int, day: int, dep_time: int, dep_delay: int, arr_time: int, arr_delay: double, carrier: string, tailnum: string, flight: int, origin: string, dest: string, air_time: int, distance: int, hour: int, minute: int]

In [9]:
flightsConvertido.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: integer (nullable = true)
 |-- dep_delay: integer (nullable = true)
 |-- arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: integer (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)



Vamos a volver a mostrar las 5 primeras filas del DataFrame limpio. Aparentemente son iguales a las que ya teníamos, pero ahora
Spark sí está tratando como enteros las columnas que deberían serlo, y si queremos podemos hacer operaciones aritméticas
con ellas.

In [10]:
flightsConvertido.show(5)

+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|
+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+
|2014|    1|  1|       1|       96|     235|     70.0|     AS| N508AS|   145|   PDX| ANC|     194|    1542|   0|     1|
|2014|    1|  1|       4|       -6|     738|    -23.0|     US| N195UW|  1830|   SEA| CLT|     252|    2279|   0|     4|
|2014|    1|  1|       8|       13|     548|     -4.0|     UA| N37422|  1609|   PDX| IAH|     201|    1825|   0|     8|
|2014|    1|  1|      28|       -2|     800|    -23.0|     US| N547UW|   466|   PDX| CLT|     251|    2282|   0|    28|
|2014|    1|  1|      34|       44|     325|     43.0|     AS| N762AS|   121|   SEA| ANC|     201|    1448|   0|    34|
+----+-----+---+--------+---------+-----

### Ejercicio 1

Partiendo del DataFrame `flightsConvertido` que ya tiene los tipos correctos en las columnas, se pide: 

* Crear un nuevo DataFrame llamado `aeropuertosOrigenDF` que tenga una columna `origin` y que tenga tantas filas como aeropuertos distintos de *origen* existan. ¿Cuántas filas tiene? Almacenar dicho recuento en la variable entera `n_origen`.
* Crear un nuevo DataFrame llamado `rutasDistintasDF` que tenga dos columnas `origin`, `dest` y que tenga tantas filas como rutas diferentes existan (es decir, como combinaciones distintas haya entre un origen y un destino). Una vez creado, contar cuántas combinaciones hay, almacenando dicho recuento en la variable entera `n_rutas`.


In [11]:
# Ejercicio 1
aeropuertosOrigenDF = flightsConvertido.select("origin").distinct()
n_origen = aeropuertosOrigenDF.count()

rutasDistintasDF = flightsConvertido.select("origin", "dest").distinct()
n_rutas = rutasDistintasDF.count()

print(f"Número de aeropuertos de origen distintos: {n_origen}")
print(f"Número de rutas distintas: {n_rutas}")

print("Aeropuertos de origen:")
aeropuertosOrigenDF.show()

print("Rutas distintas:")
rutasDistintasDF.show()  # puedes ajustar el número de filas


                                                                                

Número de aeropuertos de origen distintos: 2
Número de rutas distintas: 115
Aeropuertos de origen:
+------+
|origin|
+------+
|   SEA|
|   PDX|
+------+

Rutas distintas:
+------+----+
|origin|dest|
+------+----+
|   SEA| RNO|
|   SEA| DTW|
|   SEA| CLE|
|   SEA| LAX|
|   PDX| SEA|
|   SEA| BLI|
|   PDX| IAH|
|   PDX| PHX|
|   SEA| SLC|
|   SEA| SBA|
|   SEA| BWI|
|   PDX| IAD|
|   PDX| SFO|
|   SEA| KOA|
|   SEA| JAC|
|   PDX| MCI|
|   SEA| SJC|
|   SEA| ABQ|
|   SEA| SAT|
|   PDX| ONT|
+------+----+
only showing top 20 rows



In [12]:
assert(n_origen == 2)
assert(n_rutas == 115)
assert(aeropuertosOrigenDF.count() == n_origen)
assert(rutasDistintasDF.count() == n_rutas)

### Ejercicio 2

* Partiendo de nuevo de `flightsConvertido`, se pide calcular, *sólo para los vuelos que llegan con* ***retraso positivo***, el retraso medio a la llegada de dichos vuelos, para cada aeropuerto de destino. La nueva columna con el retraso medio a la llegada debe llamarse `retraso_medio`. El DF resultante debe estar **ordenado de mayor a menor retraso medio**. El código que calcule esto debería ir encapsulado en una función de Python llamada `retrasoMedio` que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo descrito anteriormente.

* Una vez hecha la función, invocarla pasándole como argumento `flightsConvertido` y almacenar el resultado devuelto en la variable `retrasoMedioDF`.

In [13]:
from pyspark.sql import functions as F

def retrasoMedio(df):
    return df.where(F.col("arr_delay") > 0) \
             .groupBy("dest") \
             .agg(F.avg("arr_delay").alias("retraso_medio")) \
             .orderBy(F.desc("retraso_medio"))

retrasoMedioDF = retrasoMedio(flightsConvertido)

print("Top 10 aeropuertos con mayor retraso medio:")
retrasoMedioDF.show(10)

top3 = retrasoMedioDF.take(3)
for i, row in enumerate(top3, 1):
    print(f"{i}. {row['dest']} - {round(row['retraso_medio'], 2)} minutos")


Top 10 aeropuertos con mayor retraso medio:
+----+------------------+
|dest|     retraso_medio|
+----+------------------+
| BOI|             64.75|
| HDN|              46.8|
| SFO|41.193768844221104|
| CLE| 35.74193548387097|
| SBA|35.391752577319586|
| COS| 35.05607476635514|
| BWI|34.585798816568044|
| EWR| 33.52972258916777|
| DFW| 33.27519181585678|
| MIA| 32.66187050359712|
+----+------------------+
only showing top 10 rows

1. BOI - 64.75 minutos
2. HDN - 46.8 minutos
3. SFO - 41.19 minutos


In [14]:
lista = retrasoMedio(flightsConvertido).take(3)
assert((lista[0].retraso_medio == 64.75) & (lista[0].dest == "BOI"))
assert((lista[1].retraso_medio == 46.8) & (lista[1].dest == "HDN"))
assert((round(lista[2].retraso_medio, 2) == 41.19) & (lista[2].dest == "SFO"))

Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`. ¿Cuáles son los tres aeropuertos con mayor retraso medio? ¿Cuáles son sus retrasos medios en minutos?

In [15]:
# ESCRIBE AQUÍ TU CÓDIGO PARA MOSTRAR EL CONTENIDO DE retrasoMedioDF


### Ejercicio 3

Ajustar un modelo de DecisionTree de Spark para predecir si un vuelo vendrá o no con retraso (problema de clasificación binaria), utilizando como variables predictoras el mes, el día del mes, la hora de partida `dep_time`, la hora de llegada `arr_time`, el tipo de avión (`carrier`), la distancia y el tiempo que permanece en el aire. Para ello, sigue los siguientes pasos.

Notemos que en estos datos hay variables numéricas y variables categóricas que ahora mismo están tipadas como numéricas, como por ejemplo el mes del año (`month`), que es en realidad categórica. Debemos indicar a Spark cuáles son categóricas e indexarlas. Para ello se pide: 

* Crear un `StringIndexer` llamado `indexerMonth` y otro llamado `indexerCarrier` sobre las variables categóricas `month` y `carrier` (tipo de avión). El nombre de las columnas indexadas que se crearán debe ser, respectivamente, `monthIndexed` y `carrierIndexed`.

In [16]:
from pyspark.ml.feature import StringIndexer

indexerMonth = StringIndexer(inputCol="month", outputCol="monthIndexed")
indexerCarrier = StringIndexer(inputCol="carrier", outputCol="carrierIndexed")

In [17]:
assert(isinstance(indexerMonth, StringIndexer))
assert(isinstance(indexerCarrier, StringIndexer))
assert(indexerMonth.getInputCol() == "month")
assert(indexerMonth.getOutputCol() == "monthIndexed")
assert(indexerCarrier.getInputCol() == "carrier")
assert(indexerCarrier.getOutputCol() == "carrierIndexed")

Recordemos también que Spark requiere que todas las variables estén en una única columna de tipo vector, por lo que después de indexar estas dos variables, tendremos que fusionar en una columna de tipo vector todas ellas, utilizando un `VectorAssembler`. Se pide:

* Crear en una variable llamada `vectorAssembler` un `VectorAssembler` que reciba como entrada una lista de todas las variables de entrada (y que no debe incluir `arr_delay`) que serán las que formarán parte del modelo. Crear primero esta lista de variables (lista de strings) en la variable `columnas_ensamblar` y pasar dicha variable como argumento al crear el `VectorAssembler`. Como es lógico, en el caso de las columnas `month` y `carrier`, no usaremos las variables originales sino las indexadas en el apartado anterior. La columna de tipo vector creada con las características ensambladas debe llamarse `features`.

In [18]:
from pyspark.ml.feature import VectorAssembler

columnas_ensamblar = [
    "monthIndexed", "day", "dep_time", "arr_time",
    "carrierIndexed", "distance", "air_time"
]
vectorAssembler = VectorAssembler(inputCols=columnas_ensamblar, outputCol="features")

In [19]:
assert(isinstance(vectorAssembler, VectorAssembler))
assert(vectorAssembler.getOutputCol() == "features")
input_cols = vectorAssembler.getInputCols()
assert(len(input_cols) == 7)
assert("arr_delay" not in input_cols)

Finalmente, vemos que la columna `arr_delay` es continua, y no binaria como requiere un problema de clasificación con dos clases. Vamos a convertirla en binaria. Para ello se pide:

* Utilizar un binarizador de Spark, fijando a 15 el umbral, y guardarlo en la variable `delayBinarizer`. Consideramos retrasado un vuelo que ha llegado con más de 15 minutos de retraso, y no retrasado en caso contrario. La nueva columna creada con la variable binaria debe llamarse `arr_delay_binary` y debe ser interpretada como la columna target para nuestro algoritmo. Por ese motivo, esta columna **no** se incluyó en el apartado anterior entre las columnas que se ensamblan para formar las features.

In [20]:
from pyspark.ml.feature import Binarizer

delayBinarizer = Binarizer(threshold=15, inputCol="arr_delay", outputCol="arr_delay_binary")

In [21]:
assert(isinstance(delayBinarizer, Binarizer))
assert(delayBinarizer.getThreshold() == 15)
assert(delayBinarizer.getInputCol() == "arr_delay")
assert(delayBinarizer.getOutputCol() == "arr_delay_binary")

Por último, crearemos el modelo de clasificación.

* Crear en una variable `decisionTree` un árbol de clasificación de Spark (`DecisionTreeClassifier` del paquete `pyspark.ml.classification`)
* Indicar como columna de entrada la nueva columna creada por el `VectorAssembler` creado en un apartado anterior.
* Indicar como columna objetivo (target) la nueva columna creada por el `Binarizer` del apartado anterior.

In [22]:
from pyspark.ml.classification import DecisionTreeClassifier

decisionTree = DecisionTreeClassifier(featuresCol="features", labelCol="arr_delay_binary")

In [23]:
assert(isinstance(decisionTree, DecisionTreeClassifier))
assert(decisionTree.getFeaturesCol() == "features")
assert(decisionTree.getLabelCol() == "arr_delay_binary")

Ahora vamos a encapsular todas las fases en un sólo pipeline y procederemos a entrenarlo. Se pide:

* Crear en una variable llamada `pipeline` un objeto `Pipeline` de Spark con las etapas anteriores en el orden adecuado para poder entrenar un modelo. 

* Entrenarlo invocando sobre ella al método `fit` y guardar el pipeline entrenado devuelto por dicho método en una variable llamada `pipelineModel`. 

* Aplicar el pipeline entrenado para transformar (predecir) el DataFrame `flightsConvertido`, guardando las predicciones devueltas en la variable `flightsPredictions` que será un DataFrame. Nótese que estamos prediciendo los propios datos de entrenamiento y que, por simplicidad, no habíamos hecho (aunque habría sido lo correcto) ninguna división de nuestros datos originales en subconjuntos distintos de entrenamiento y test antes de entrenar.

In [24]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[
    indexerMonth,
    indexerCarrier,
    vectorAssembler,
    delayBinarizer,
    decisionTree
])

pipelineModel = pipeline.fit(flightsConvertido)
flightsPredictions = pipelineModel.transform(flightsConvertido)

                                                                                

In [25]:
from pyspark.ml import PipelineModel
assert(isinstance(pipeline, Pipeline))
assert(len(pipeline.getStages()) == 5)
assert(isinstance(pipelineModel, PipelineModel))
assert("probability" in flightsPredictions.columns)
assert("prediction" in flightsPredictions.columns)
assert("rawPrediction" in flightsPredictions.columns)

Vamos a mostrar la matriz de confusión (este apartado no es evaluable). Agrupamos por la variable que tiene la clase verdadera y la que tiene la clase predicha, para ver en cuántos casos coinciden y en cuántos difieren.

In [26]:
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().show()



+----------------+----------+------+
|arr_delay_binary|prediction| count|
+----------------+----------+------+
|             1.0|       1.0|   505|
|             0.0|       1.0|    73|
|             1.0|       0.0| 23744|
|             0.0|       0.0|136426|
+----------------+----------+------+



                                                                                

In [27]:
print("Predicciones generadas por el modelo:")
flightsPredictions.select("features", "arr_delay_binary", "prediction", "probability").show(10, truncate=False)

print("Matriz de confusión (arr_delay_binary vs. prediction):")
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().orderBy("arr_delay_binary", "prediction").show()



Predicciones generadas por el modelo:
+----------------------------------------+----------------+----------+----------------------------------------+
|features                                |arr_delay_binary|prediction|probability                             |
+----------------------------------------+----------------+----------+----------------------------------------+
|[10.0,1.0,1.0,235.0,0.0,1542.0,194.0]   |1.0             |1.0       |[0.28865979381443296,0.711340206185567] |
|[10.0,1.0,4.0,738.0,6.0,2279.0,252.0]   |0.0             |0.0       |[0.9424501424501425,0.05754985754985755]|
|[10.0,1.0,8.0,548.0,4.0,1825.0,201.0]   |0.0             |0.0       |[0.9424501424501425,0.05754985754985755]|
|[10.0,1.0,28.0,800.0,6.0,2282.0,251.0]  |0.0             |0.0       |[0.9424501424501425,0.05754985754985755]|
|[10.0,1.0,34.0,325.0,0.0,1448.0,201.0]  |1.0             |1.0       |[0.28865979381443296,0.711340206185567] |
|[10.0,1.0,37.0,747.0,3.0,1927.0,224.0]  |1.0             |0.0    



+----------------+----------+------+
|arr_delay_binary|prediction| count|
+----------------+----------+------+
|             0.0|       0.0|136426|
|             0.0|       1.0|    73|
|             1.0|       0.0| 23744|
|             1.0|       1.0|   505|
+----------------+----------+------+



                                                                                

# ---

# Actividad 2 (continuación de Actividad 1)

# Actividad 2: Structured Streaming y Kafka

## Recuerda borrar siempre las líneas que dicen `raise NotImplementedError`

### Punto de partida (final de la actividad 1): función `retrasoMedio` 
***Para los vuelos que llegan con retraso positivo, calcular para cada aeropuerto de llegada el retraso medio.***

Recordatorio: *El código que calcule esto debería ir encapsulado en una función de Python que reciba como argumento un DataFrame y devuelva como resultado el DataFrame con el cálculo del retraso medio por aeropuerto, ordenado de mayor a menor retraso medio. La columna creada con el retraso medio debe llamarse `retraso_medio`.*

**Copia en la siguiente celda el código de tu función retrasoMedio que has completado en la actividad 1**. El DataFrame devuelto por la función debería tener solamente dos columnas: `dest` y `retraso_medio`.

In [28]:
from pyspark.sql import functions as F

def retrasoMedio(df):
    return df.where(F.col("arr_delay") > 0) \
             .groupBy("dest") \
             .agg(F.avg("arr_delay").alias("retraso_medio")) \
             .orderBy(F.desc("retraso_medio"))

retrasoMedioDF = retrasoMedio(flightsConvertido)

print("Top 10 aeropuertos con mayor retraso medio:")
retrasoMedioDF.show(10)

top3 = retrasoMedioDF.take(3)
for i, row in enumerate(top3, 1):
    print(f"{i}. {row['dest']} - {round(row['retraso_medio'], 2)} minutos")

Top 10 aeropuertos con mayor retraso medio:
+----+------------------+
|dest|     retraso_medio|
+----+------------------+
| BOI|             64.75|
| HDN|              46.8|
| SFO|41.193768844221104|
| CLE| 35.74193548387097|
| SBA|35.391752577319586|
| COS| 35.05607476635514|
| BWI|34.585798816568044|
| EWR| 33.52972258916777|
| DFW| 33.27519181585678|
| MIA| 32.66187050359712|
+----+------------------+
only showing top 10 rows

1. BOI - 64.75 minutos
2. HDN - 46.8 minutos
3. SFO - 41.19 minutos


### Ejercicio 1

Utilizaremos Kafka para actualizar en tiempo real el resultado calculado en el apartado anterior. 

Para simplificar, asumimos que los mensajes leídos de Kafka tiene solamente dos campos que son los únicos necesarios para llevar a cabo la operación anterior: dest y arr_delay. La idea será crear un Streaming DataFrame para leer de Kafka, y después invocar a nuestra función retrasoMedio pasándolo como argumento. Vamos a leer del topic `retrasos` por lo que debes indicar esta opción a continuación.

Se pide crear, en la variable `retrasosStreamingDF`, un Streaming DataFrame leyendo de Apache Kafka, configurando las siguientes opciones:
  * Usar la variable `readStream` (en lugar de `read` como solemos hacer) interna de la SparkSession `spark`
  * Indicar que el formato es `"kafka"` con `.format("kafka")`
  * Indicar cuáles son los brokers de Kafka de los que vamos a leer y el puerto al que queremos conectarnos para leer (9092 es el que usa Kafka por defecto), con `.option("kafka.bootstrap.servers", "<nombre_cluster>-w-0:9092,<nombre_cluster>-w-1:9092")`. De esa manera podremos leer el mensaje si el productor de Kafka lo envía a cualquiera de los dos brokers existentes, que son los nodos del cluster identificados como `<nombre_cluster>-w-0` y `<nombre_cluster>-w-1`
  * Indicar que queremos subscribirnos al topic `"retrasos"` con `.option("subscribe", "retrasos")`.
  * Finalmente ponemos `load()` para realizar la lectura.

In [29]:
from pyspark.sql.functions import from_json, col, avg
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from threading import Thread

# Esquema del JSON recibido desde Kafka
schema = StructType([
    StructField("dest", StringType(), True),
    StructField("arr_delay", DoubleType(), True)
])

# IP del nodo master o bootstrap server de Kafka
bootstrap_servers = "10.132.15.195:9092"

# Crear el Streaming DataFrame desde Kafka
retrasosStreamingDF = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", bootstrap_servers) \
    .option("subscribe", "retrasos") \
    .load()

# Parsear el JSON
datosKafkaDF = retrasosStreamingDF.selectExpr("CAST(value AS STRING) as json_string") \
    .select(from_json(col("json_string"), schema).alias("data")) \
    .select("data.dest", "data.arr_delay")

# Agregación de retraso medio
def retrasoMedio(df):
    return df.groupBy("dest").agg(avg("arr_delay").alias("retraso_medio"))

retrasoMedioStreamingDF = retrasoMedio(datosKafkaDF)

# Escribir la salida en memoria (en una vista temporal)
query = retrasoMedioStreamingDF.writeStream \
    .queryName("retrasosAgg") \
    .outputMode("complete") \
    .format("memory") \
    .start()

# Iniciar el stream en un hilo separado para que no bloquee el notebook
def monitor_query(q):
    q.awaitTermination()

Thread(target=monitor_query, args=(query,)).start()


25/08/03 21:52:04 WARN org.apache.spark.sql.streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-912c79f5-0794-49ce-8660-237948b3a850. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/08/03 21:52:04 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

In [30]:
# Mostramos el esquema de este DataFrame
types = retrasosStreamingDF.dtypes
assert(retrasosStreamingDF.isStreaming)
assert((types[0][0] == "key")       & (types[0][1] == "binary"))
assert((types[1][0] == "value")     & (types[1][1] == "binary"))
assert((types[2][0] == "topic")     & (types[2][1] == "string"))
assert((types[3][0] == "partition") & (types[3][1] == "int"))
assert((types[4][0] == "offset")    & (types[4][1] == "bigint"))
assert((types[5][0] == "timestamp") & (types[5][1] == "timestamp"))
assert((types[6][0] == "timestampType") & (types[6][1] == "int"))

In [31]:
# Mostrar el esquema de los datos de Kafka
retrasosStreamingDF.printSchema()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



Muestra por pantalla el esquema del DataFrame resultante de la lectura con `printSchema()`. Verás que todas estas columnas son creadas automáticamente por Spark cuando leemos de Kafka. De ellas, la que nos interesa es `value` que contiene propiamente el mensaje de Kafka, en formato datos binarios. 

### Ejercicio 2

Tendremos que estructurar estos datos para poder extraer los campos. Para ello sigue los siguientes pasos, ayudándote de la plantilla que hay en la celda siguiente (descoméntala y complétala):

* **Selecciona** la columna `value` y conviértela (`.cast`) a `StringType()` utilizando `withColumn` para reemplazar la columna existente `"value"` por el objeto Column resultante de la conversión. De esta forma tendremos una columna que contendrá en cada **fila** un **fichero JSON completo**, tal como se muestra en cada una de las plantillas anteriores. 
* Para extraer los dos campos de cada uno de los JSON y convertirlos en una columna llamada `parejas`, de tipo `struct` (una estructura formada por dos campos de tipo String e Integer respectivamente), utilizamos la función `from_json` de Spark, que se aplica a cada elemento (cada fila) de la columna "value" y parsea el String según un esquema que le indiquemos, devolviendo una columna de tipo `struct`.
* La columna `parejas` es de tipo `struct` por lo que puedes acceder a cada uno de sus dos campos (`dest` y `arr_delay`) con el operador `.` (punto). Utilizando `withColumn` dos veces, crea dos columnas llamadas `dest` y `arr_delay` como el resultado de acceder a `parejas.dest` y `parejas.arr_delay` respectivamente.

In [32]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F

# Definimos el esquema del JSON recibido en la columna 'value'
esquema = StructType([
    StructField("dest", StringType()),
    StructField("arr_delay", DoubleType())
])

# Parseamos el contenido de 'value' y extraemos los campos
parsedDF = retrasosStreamingDF \
    .withColumn("value", F.col("value").cast(StringType())) \
    .withColumn("parejas", F.from_json(F.col("value"), esquema)) \
    .withColumn("dest", F.col("parejas.dest")) \
    .withColumn("arr_delay", F.col("parejas.arr_delay"))


In [33]:
tipos = parsedDF.dtypes
assert(("value", "string") in tipos)
assert(('parejas', 'struct<dest:string,arr_delay:double>') in tipos)
assert(('dest', 'string') in tipos)
assert(('arr_delay', 'double') in tipos)


Nuestro DataFrame ya contiene una columna `dest` con el nombre del aeropuerto destino y una columna de números reales `arr_delay` con el retraso. Ya podemos efectuar el mismo tipo de agregación que estamos haciendo en nuestra función `retrasoMedio`. Por tanto, invocamos a `retrasoMedio` pasando `parsedDF` como argumento.

In [34]:
# Evalúa el siguiente código pero no lo modifiques
# Indicamos que este DataFrame se guarde en memoria cuando se va actualizando,
# y arrancamos la ejecución en Streaming con la acción start()

retrasoMedioStreamingDF = retrasoMedio(parsedDF)

consoleOutput = retrasoMedioStreamingDF\
                    .writeStream\
                    .queryName("retrasosAgg")\
                    .outputMode("complete")\
                    .format("memory")\
                    .start()


25/08/03 21:57:51 WARN org.apache.spark.sql.streaming.StreamingQueryManager: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-92ed3b02-298f-4a3c-95b8-abfcd0f6c6a6. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/08/03 21:57:51 WARN org.apache.spark.sql.streaming.StreamingQueryManager: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


IllegalArgumentException: Cannot start query with name retrasosAgg as a query with that name is already active in this SparkSession

Una vez evaluada la celda anterior, abre el productor de Kafka console entrando por SSH a cualquiera de las máquinas (revisa el enunciado de la práctica para recordarlo), y copia y pega (literalmente) los siguientes 4 mensajes en formato JSON. Como ves,  tienen un campo `dest` y un campo `arr_delay`, simulando la información que estaríamos recibiendo en tiempo real de los distintos aeropuertos a medida que los vuelos van aterrizando. 

Cada vez que pegues un mensaje, ejecuta la consulta `select * from retrasosAgg` a través del método `spark.sql(...)` y muestra el DataFrame `agregadosDF` devuelto por dicho método. Eso hará una consulta contra la vista temporal (volátil) `retrasosAgg` que se ha creado en el metastore de Hive gracias al `writeStream` del apartado anterior. Ejecuta la celda de `show` tantas veces como sea necesario hasta ver un resultado distinto al que has visto en la ejecución anterior, para asegurarte de que Spark ya ha leído e incorporado el nuevo dato en su cálculo de la agregación y por tanto ha actualizado el resultado.

Recuerda que el método `.sql(...)` es una transformación, y por tanto, se re-ejecuta la consulta cada vez que invocas a la acción `show()` sobre el resultado, ya que **no vamos a cachear nada**, precisamente para forzar la reevaluación de la consulta y poder ver así el contenido actualizado de dicha tabla (en memoria) de Hive cada vez que hacemos `show()`.

Se pide: 
* Cada vez que envíes un mensaje y te hayas asegurado de que Spark ha incorporado el dato a su cálculo, apunta el resultado de la agregación (valor de la columna `retraso_medio`) para MAD y GRX en las variables habilitadas para ello
* No te preocupes por evaluar muchas veces una misma celda, ya que el cálculo sólo se actualizará una vez. Las siguientes veces que la evalúes seguirá mostrando el mismo resultado mientras no envíes otro nuevo mensaje en Kafka.

Los 4 mensajes que hay que introducir sucesivamente en Kafka son:

`
{"dest": "GRX", "arr_delay": 2.6}
{"dest": "MAD", "arr_delay": 5.4}
{"dest": "GRX", "arr_delay": 1.5}
{"dest": "MAD", "arr_delay": 20.0}
`

In [44]:
# Consulta la vista en memoria creada por writeStream
agregadosDF = spark.sql("SELECT * FROM retrasosAgg ORDER BY retraso_medio DESC")
agregadosDF.show()




+----+-------------+
|dest|retraso_medio|
+----+-------------+
| MAD|         12.7|
| GRX|         2.05|
|null|         null|
+----+-------------+



                                                                                

In [45]:
resultados = agregadosDF.collect()
retrasoMAD = None
retrasoGRX = None

for row in resultados:
    if row['dest'] == 'MAD':
        retrasoMAD = row['retraso_medio']
    elif row['dest'] == 'GRX':
        retrasoGRX = row['retraso_medio']

if retrasoMAD is not None:
    print(f"Retraso medio MAD: {round(retrasoMAD, 2)}")
else:
    print("No hay datos para MAD.")

if retrasoGRX is not None:
    print(f"Retraso medio GRX: {round(retrasoGRX, 2)}")
else:
    print("No hay datos para GRX.")


Retraso medio MAD: 12.7
Retraso medio GRX: 2.05


In [None]:
columnas = agregadosDF.columns
assert(len(columnas) == 2)
assert("dest" in columnas)
assert("retraso_medio" in columnas)

In [None]:
# Tras enviar el primer mensaje
agregadosDF = spark.sql("SELECT * FROM retrasosAgg ORDER BY retraso_medio DESC")
agregadosDF.show()
retraso_medio_GRX_primer_mensaje = 2.6  # GRX: 2.6


In [None]:
# Tras enviar el segundo mensaje
agregadosDF = spark.sql("SELECT * FROM retrasosAgg ORDER BY retraso_medio DESC")
agregadosDF.show()
retraso_medio_GRX_segundo_mensaje = 2.6
retraso_medio_MAD_segundo_mensaje = 5.4


In [None]:
# Tras enviar el tercer mensaje
agregadosDF = spark.sql("SELECT * FROM retrasosAgg ORDER BY retraso_medio DESC")
agregadosDF.show()
retraso_medio_GRX_tercer_mensaje = 2.05  # promedio de 2.6 y 1.5
retraso_medio_MAD_tercer_mensaje = 5.4


In [None]:
# Tras enviar el cuarto mensaje
agregadosDF = spark.sql("SELECT * FROM retrasosAgg ORDER BY retraso_medio DESC")
agregadosDF.show()
retraso_medio_GRX_cuarto_mensaje = 2.05
retraso_medio_MAD_cuarto_mensaje = 12.7  # promedio de 5.4 y 20.0


In [None]:
# Ver resumen de los valores capturados
print("Resumen de valores de retraso medio:")
print(f"GRX después del 1er mensaje: {retraso_medio_GRX_primer_mensaje}")
print(f"GRX después del 2do mensaje: {retraso_medio_GRX_segundo_mensaje}")
print(f"GRX después del 3er mensaje: {retraso_medio_GRX_tercer_mensaje}")
print(f"GRX después del 4to mensaje: {retraso_medio_GRX_cuarto_mensaje}")
print(f"MAD después del 2do mensaje: {retraso_medio_MAD_segundo_mensaje}")
print(f"MAD después del 3er mensaje: {retraso_medio_MAD_tercer_mensaje}")
print(f"MAD después del 4to mensaje: {retraso_medio_MAD_cuarto_mensaje}")
