# Actividad 1: Pipeline de procesamiento de datos con HDFS y Spark

## Actividad del Proyecto (actividad grupal)

Esta actividad está asociada al Proyecto transversal del título y para su desarrollo, tendrás que utilizar obligatoriamente el siguiente recurso:
* Dataset flights del Catálogo de Datos del proyecto

## Escribe aquí los nombres de los integrantes del grupo:

## Recuerda borrar siempre las líneas que dicen `raise NotImplementedError`

Lee con detenimiento cada ejercicio. Las variables utilizadas para almacenar las soluciones, al igual que las nuevas columnas creadas, deben llamarse **exactamente** como indica el ejercicio, o de lo contrario los tests fallarán y el ejercicio no puntuará. Debe reemplazarse el valor `None` al que están inicializadas por el código necesario para resolver el ejercicio.

## Leemos el fichero flights.csv que hemos subido a Google Cloud Storage

Aunque para las capturas de pantalla se pide subir el fichero a HDFS, el resto de la actividad puede hacerse leyendo el mismo fichero que hemos subido al bucket de Google Cloud Storage.

Indicamos que contiene encabezados (nombres de columnas) y que intente inferir el esquema, aunque después comprobaremos si lo
ha inferido correctamente o no.

In [1]:
# Ruta del fichero flights.csv (puedes usar HDFS o GCS)
ruta_gcs = "hdfs:///alejandro_gerena/flights.csv"

# Leer el fichero CSV con Spark
flightsDF = (
    spark.read 
    .option("header", "true") 
    .option("inferSchema", "true")
    .csv(ruta_gcs)
)

# Mostrar esquema y primeras filas para verificar
flightsDF.printSchema()
flightsDF.show(5)

AnalysisException: Path does not exist: hdfs://dataproc-unir-hagr27-m/alejandro_gerena/flights.csv

Imprimimos el esquema para comprobar qué tipo de dato ha inferido en cada columna

In [None]:
flightsDF.printSchema()

Mostramos el número de filas que tiene el DataFrame para hacernos una idea de su tamaño:

In [None]:
flightsDF.count()

Vemos que tenemos 162049 filas. Si imprimimos por pantalla las 5 primeras filas, veremos qué tipos parecen tener y en qué columnas no coincide el tipo que podríamos esperar con el tipo que ha inferido Spark.

In [None]:
flightsDF.show(5)

La causa del problema es que en muchas columnas existe un valor faltante llamado "NA". Spark no reconoce ese valor como
*no disponible* ni nada similar, sino que lo considera como un string de texto normal, y por tanto, asigna a toda la columna
el tipo de dato string (cadena de caracteres). Concretamente, las siguientes columnas deberían ser de tipo entero pero Spark
las muestra como string:
<ul>
 <li>dep_time: string (nullable = true)
 <li>dep_delay: string (nullable = true)
 <li>arr_time: string (nullable = true)
 <li>arr_delay: string (nullable = true)
 <li>air_time: string (nullable = true)
 <li>hour: string (nullable = true)
 <li>minute: string (nullable = true)    
</ul>


Vamos a averiguar cuántas filas tienen el valor "NA" (como string) en la columna dep_time:

In [None]:
from pyspark.sql import functions as F
cuantos_NA = (
    flightsDF
    .where(F.col("dep_time") == "NA")
    .count()
)
cuantos_NA

Por tanto, hay 857 filas que no tienen un dato válido en esa columna. Hay distintas maneras de trabajar con los valores faltantes, como por ejemplo imputarlos (reemplazarlos por un valor generado por nosotros según cierta lógica, por ejemplo la media de esa columna, etc). Lo más sencillo es quitar toda la fila, aunque esto depende de si nos lo podemos permitir en base
a la cantidad de datos que tenemos. En nuestro caso, como tenemos un número considerable de filas, vamos a quitar todas las filas donde hay un NA en cualquiera de las columnas.

In [None]:
columnas_limpiar = ["dep_time", "dep_delay", "arr_time", "arr_delay", "air_time", "hour", "minute"]

flightsLimpiado = flightsDF
for nombreColumna in columnas_limpiar:  # para cada columna, nos quedamos con las filas que no tienen NA en esa columna
    flightsLimpiado = flightsLimpiado.where(F.col(nombreColumna) != "NA")

flightsLimpiado.cache()

Si ahora mostramos el número de filas que tiene el DataFrame `flightsLimpiado` tras eliminar todas esas filas, vemos que ha disminuido ligeramente
pero sigue siendo un número considerable como para realizar analítica y sacar conclusiones sobre estos datos

In [None]:
flightsLimpiado.count()

Una vez que hemos eliminado los NA, vamos a convertir a tipo entero cada una de esas columnas que eran de tipo string. 
Ahora no debe haber problema ya que todas las cadenas de texto contienen dentro un número que puede ser convertido de texto a número. Vamos también a convertir la columna `arr_delay` de tipo entero a número real, necesario para los pasos posteriores donde ajustaremos un modelo predictivo.

In [None]:
from pyspark.sql.types import IntegerType, DoubleType

flightsConvertido = flightsLimpiado

for c in columnas_limpiar:
    # método que crea una columna o reemplaza una existente
    flightsConvertido = flightsConvertido.withColumn(c, F.col(c).cast(IntegerType())) 

flightsConvertido = flightsConvertido.withColumn("arr_delay", F.col("arr_delay").cast(DoubleType()))
flightsConvertido.cache()

In [None]:
flightsConvertido.printSchema()

Vamos a volver a mostrar las 5 primeras filas del DataFrame limpio. Aparentemente son iguales a las que ya teníamos, pero ahora
Spark sí está tratando como enteros las columnas que deberían serlo, y si queremos podemos hacer operaciones aritméticas
con ellas.

In [None]:
flightsConvertido.show(5)

In [None]:
flightsConvertido.groupBy("carrier").count().show()

### Ejercicio 1 (3 puntos) 
### (sólo puntúa 3 puntos si pasa la celda de autoevaluación sin errores, o 0 puntos si hay algún error. No hay puntaje intermedio)

Partiendo del DataFrame `flightsConvertido` que ya tiene los tipos correctos en las columnas, se pide **encadenar transformaciones sin crear ninguna variable intermedia**, de manera que se vayan creando las siguientes columnas:

* Una nueva columna llamada `flight_date` con la fecha del vuelo como objeto fecha. Utiliza para ello la función `F.make_date` aplicada a los argumentos (columnas) `year`, `month` y `day` tal como indica la documentación de `make_date` disponible [aquí](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.make_date.html).
* Encadenado con la transformación anterior, crear una nueva columna llamada `day_of_week` con el día de la semana correspondiente al vuelo, utilizando la función `F.dayofweek` cuya documentación puedes ver [aquí](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.dayofweek.html). Debes pasarle como argumento la columna `flight_date` creada en el apartado anterior.
* Encadenado con la transformación anterior, agrupar para obtener **tantas filas como aeropuertos de destino existan, y tantas columnas como días de la semana** hay en el DF original. 
  * Cada celda del DF resultante debe contener el *conteo* del número de vuelos que existen para ese destino en ese día de la semana.
  * El DF resultante de la agrupación seguida de pivot tendrá 8 columnas, que serán el destino, y los 7 días de la semana, llamados del 1 al 7.
* Encadenado con la transformación anterior, rellenar los valores nulos con un 0, puesto que todos los valores representan conteos, y los nulos han sido provocados porque ciertas combinaciones de (origen, destino, día de la semana) no existen en los datos (no hay vuelos para ciertas rutas en ciertos días de la semana). Por eso, tiene sentido sustituir los nulos por 0, que es el conteo para esa combinación no encontrada.
* Encadenado con la transformación anterior, renombrar las columnas de los días de la semana para que pasen a llamarse Domingo, Lunes, ..., Sabado (sin tildes). Puedes hacerlo encadenando `withColumnRenamed` varias veces, o bien con `selectExpr` utilizando `as` en cada argumento de tipo string. Consulta la documentación de `selectExpr` [aquí](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.selectExpr.html).
  * Si utilizas selectExpr, entonces debes referirte a las columnas como ``"`1`"`` (con acento grave) y no como `"1"` ya que, al tener por nombre un número entero, hay que referirse a ellas de ese modo. Si utilizas withColumnRenamed, no es necesario y puedes referirte a ellas como `"1"`.
  * Si lo deseas, puedes recategorizar la columna day_of_week, justo después de crearla, utilizando para ello otra llamada a `withColumn` para reemplazar dicha columna, usando `F.when` dentro de ella. De este modo, al llegar al groupBy(...).pivot(...) ya tendrá los nombres de columna correctos y no será necesario renombrar columnas.
* Encadenado con la transformación anterior, añadir una nueva columna llamada `n_vuelos` que contenga, en cada fila, la suma de las columnas de los 7 días de la semana. Esto representa el número de vuelos totales para ese destino, independientemente del día del vuelo.
* Encadenado con la transformación anterior, generar un nuevo DF que esté ordenado por la columna `n_vuelos` *descendentemente*.
* El DF resultante de todas estas transformaciones, que deben encadenarse sin utilizar ninguna otra variable, debe quedar guardado en la variable `conteos_df`.
* No está permitido utilizar construcciones de la forma df = df.algo(). **Sólo puede utilizarse una vez el operador de asignación (el signo `=`).**

In [None]:
from pyspark.sql import functions as F

conteos_df = (
    flightsConvertido
    .withColumn("flight_date", F.make_date(F.col("year"), F.col("month"), F.col("day")))
    .withColumn("day_of_week", F.dayofweek(F.col("flight_date")))
    .groupBy("dest")
    .pivot("day_of_week")
    .count()
    .fillna(0)
    .withColumnRenamed("1", "Domingo")
    .withColumnRenamed("2", "Lunes")
    .withColumnRenamed("3", "Martes")
    .withColumnRenamed("4", "Miercoles")
    .withColumnRenamed("5", "Jueves")
    .withColumnRenamed("6", "Viernes")
    .withColumnRenamed("7", "Sabado")
    .withColumn("n_vuelos", 
                F.col("Domingo") + F.col("Lunes") + F.col("Martes") + 
                F.col("Miercoles") + F.col("Jueves") + F.col("Viernes") + F.col("Sabado"))
    .orderBy(F.col("n_vuelos").desc())
)

# Verificar resultado
conteos_df.show(10)

In [None]:
from pyspark.sql import functions as F
c = conteos_df.columns
assert(len(c) == 9)  # debe tener 9 columnas que son dest, los días de la semana, y n_vuelos (no necesariamente en ese orden)
assert(all([x in c for x in ["dest", "n_vuelos", "Domingo", "Lunes", "Martes", "Miercoles", "Jueves", "Viernes", "Sabado"]]))  # Comprobar nombres de columnas
cnt_list = conteos_df.take(100)
assert(cnt_list[0].dest == "SFO" and cnt_list[0].Viernes == 1910)
assert(cnt_list[1].dest == "LAX" and cnt_list[1].Lunes == 1550)
assert(cnt_list[2].dest == "DEN" and cnt_list[2].n_vuelos == 9433)
assert(cnt_list[-1].Domingo == 0 and cnt_list[-1].n_vuelos == 2)  # la ruta con menos vuelos de todos. Nos aseguramos de que se hayan rellenado los nulos con 0.

### Ejercicio 2 (3 puntos)
### (sólo puntúa 3 puntos si pasa la celda de autoevaluación sin errores, o 0 puntos si hay algún error. No hay puntaje intermedio)

Partiendo de nuevo de `flightsConvertido`, se pide crear una función `retraso_medio_periodo`, que reciba como argumento un DF en el podemos asumir que existen las columnas `dep_time` y `arr_delay` y haga lo siguiente:

* Añada una nueva columna `periodo_dia` para recategorizar `dep_time` con los siguientes valores sin tildes (puedes hacerlo con withColumn y F.when):
  * `"maniana"` cuando la hora de salida del vuelo esté entre las 7 de la mañana no incluida (expresado como 700 en dep_time) y las 12 de la mañana incluida (expresado como 1200)
  * `"mediodia"` cuando la hora de salida del vuelo esté entre las 12 de la mañana no incluida (expresado como 1200) y las 5 de la tarde incluida (expresado como 1700)
  * `"tarde"` cuando la hora de salida del vuelo esté entre las 5 de la tarde no incluida (expresado como 1700) y las 9 de la noche incluida (expresado como 2100)
  * `"noche"` cuando la hora de salida del vuelo esté entre las 9 de la noche no incluida (expresado como 2100) y las 7 de la mañana incluida (expresado como 700)
  * PISTAS:
    * Recuerda que las condiciones compuestas con columnas booleanas en Spark requieren obligatoriamente utilizar paréntesis envolviendo a cada condición simple.
    * Recuerda también utilizar F.lit(...) para indicar el valor (constante) de la nueva columna para cada una de estas condiciones.
    * Cuidado con la noche: la condición es que la hora sea mayor que 2100 **o bien** (condición `|`) menor o igual que 700. En el resto de condiciones, necesitamos `&`.

* Encadenado con la transformación anterior, sólo para los vuelos que llegan con **retraso estrictamente positivo (>0)**, calcular el retraso medio a la llegada de dichos vuelos **para cada aeropuerto de destino y cada hora del día**, desplegando las franjas horarias como columnas independientes. 
  * El DF calculado debe tener tantas columnas como franjas horarias más la columna del aeropuerto de destino (es decir, 5 columnas en total)

* Encadenado con la transformación anterior, tras haber hecho la agrupación y agregación, añadir una columna constante (con `F.lit`) llamada `alumno` que contenga el primer apellido de cada miembro del grupo (ejemplo: si el grupo lo forman Pablo García, Francisco Pérez y Antonio Martín, el valor de la columna debe ser `García_Pérez_Martín`).
* Encadenado con la transformación anterior, ordenar alfabéticamente el DF resultante por aeropuerto de destino, ascendentemente. El resultado de la ordenación será un nuevo DF que debe ser devuelto como resultado de la función (recuerda que es imposible modificar el DF original).

In [None]:
from pyspark.sql import DataFrame
from pyspark.sql import functions as F

def retraso_medio_periodo(df: DataFrame) -> DataFrame:
    """
    Calcula el retraso medio a la llegada por aeropuerto de destino y periodo del día,
    solo para vuelos con retraso positivo.
    """
    return (
        df
        .withColumn("periodo_dia", 
                    F.when((F.col("dep_time") > 700) & (F.col("dep_time") <= 1200), F.lit("maniana"))
                     .when((F.col("dep_time") > 1200) & (F.col("dep_time") <= 1700), F.lit("mediodia"))
                     .when((F.col("dep_time") > 1700) & (F.col("dep_time") <= 2100), F.lit("tarde"))
                     .when((F.col("dep_time") > 2100) | (F.col("dep_time") <= 700), F.lit("noche"))
                     .otherwise(F.lit(None))
        )
        .filter(F.col("arr_delay") > 0)
        .groupBy("dest")
        .pivot("periodo_dia")
        .agg(F.avg("arr_delay"))
        .withColumn("alumno", F.lit("Granados"))
        .orderBy(F.col("dest").asc())
    )

In [None]:
retraso_medio_df = retraso_medio_periodo(flightsConvertido)
lista = retraso_medio_df.take(3)
c = retraso_medio_df.columns
assert(len(c) == 6)  #  el DF resultante debe tener 6 columnas
assert(all([x in c for x in ["dest", "maniana", "mediodia", "tarde", "noche", "alumno"]]))  # Comprobamos los nombres de columnas
assert(lista[0].dest == "ABQ" and round(lista[0].maniana, 2) == 12.91 and round(lista[0].noche, 2) == 8.33)
assert(lista[1].dest == "ANC" and round(lista[1].mediodia, 2) == 16.88 and round(lista[1].noche, 2) == 20.70)
assert(lista[2].dest == "ATL" and round(lista[2].tarde, 2) == 375.0 and round(lista[2].noche, 2) == 18.30)

Ahora invocamos a nuestra función `retrasoMedio` pasándole como argumento `flightsConvertido`. ¿Cuáles son los tres aeropuertos con mayor retraso medio? ¿Cuáles son sus retrasos medios en minutos?

### Ejercicio 3

Ajustar un modelo de DecisionTree de Spark para predecir si un vuelo vendrá o no con retraso (problema de clasificación binaria), utilizando como variables predictoras el mes, el día del mes, la hora de partida `dep_time`, la hora de llegada `arr_time`, el tipo de avión (`carrier`), la distancia y el tiempo que permanece en el aire. Para ello, sigue los siguientes pasos.

Notemos que en estos datos hay variables numéricas y variables categóricas que ahora mismo están tipadas como numéricas, como por ejemplo el mes del año (`month`), que es en realidad categórica. Debemos indicar a Spark cuáles son categóricas e indexarlas. Para ello se pide: 

* **(1 punto)** Crear un `StringIndexer` llamado `indexerMonth` y otro llamado `indexerCarrier` sobre las variables categóricas `month` y `carrier` (tipo de avión). El nombre de las columnas indexadas que se crearán debe ser, respectivamente, `monthIndexed` y `carrierIndexed`. Asegúrate de que, cuando necesite codificar una categoría que no existía cuando se entrenó, esta pieza no lance un error sino que le asigne el primer valor que esté libre (argumento `handleInvalid`).

In [None]:
# Imports necesarios
from pyspark.ml.feature import StringIndexer

# Crear StringIndexer para month
indexerMonth = StringIndexer(
    inputCol="month",
    outputCol="monthIndexed",
    handleInvalid="keep"
)

# Crear StringIndexer para carrier
indexerCarrier = StringIndexer(
    inputCol="carrier",
    outputCol="carrierIndexed",
    handleInvalid="keep"
)

In [None]:
assert(isinstance(indexerMonth, StringIndexer))
assert(isinstance(indexerCarrier, StringIndexer))
assert(indexerMonth.getInputCol() == "month" and indexerMonth.getOutputCol() == "monthIndexed" and indexerMonth.getHandleInvalid() == "keep")
assert(indexerCarrier.getInputCol() == "carrier" and indexerCarrier.getOutputCol() == "carrierIndexed" and indexerCarrier.getHandleInvalid() == "keep")

Recordemos también que Spark requiere que todas las variables estén en una única columna de tipo vector, por lo que después de indexar estas dos variables, tendremos que fusionar en una columna de tipo vector todas ellas, utilizando un `VectorAssembler`. Se pide:

* **(1 punto)** Crear en una variable llamada `vectorAssembler` un `VectorAssembler` que reciba como entrada una lista de todas las variables de entrada (y que no debe incluir `arr_delay`) que serán las que formarán parte del modelo. Crear primero esta lista de variables (lista de strings) en la variable `columnas_ensamblar` y pasar dicha variable como argumento al crear el `VectorAssembler`. Como es lógico, en el caso de las columnas `month` y `carrier`, no usaremos las variables originales sino las indexadas en el apartado anterior. La columna de tipo vector creada con las características ensambladas debe llamarse `features`.

In [None]:
# Imports necesarios
from pyspark.ml.feature import VectorAssembler

# Lista de columnas a ensamblar (variables predictoras)
columnas_ensamblar = [
    "monthIndexed",      # month indexado (categórico)
    "day",               # día del mes (numérico)
    "dep_time",          # hora de partida (numérico)
    "arr_time",          # hora de llegada (numérico)
    "carrierIndexed",    # carrier indexado (categórico)
    "distance",          # distancia (numérico)
    "air_time"           # tiempo en el aire (numérico)
]

# Crear VectorAssembler
vectorAssembler = VectorAssembler(
    inputCols=columnas_ensamblar,
    outputCol="features"
)

In [None]:
assert(isinstance(vectorAssembler, VectorAssembler))
assert(vectorAssembler.getOutputCol() == "features")
input_cols = vectorAssembler.getInputCols()
assert(len(input_cols) == 7)
assert("arr_delay" not in input_cols)

Finalmente, vemos que la columna `arr_delay` es continua, y no binaria como requiere un problema de clasificación con dos clases. Vamos a convertirla en binaria. Para ello se pide:

* **(0.5 puntos)** Utilizar un binarizador de Spark, fijando a 15 el umbral, y guardarlo en la variable `delayBinarizer`. Consideramos retrasado un vuelo que ha llegado con más de 15 minutos de retraso, y no retrasado en caso contrario. La nueva columna creada con la variable binaria debe llamarse `arr_delay_binary` y debe ser interpretada como la columna target para nuestro algoritmo. Por ese motivo, esta columna **no** se incluyó en el apartado anterior entre las columnas que se ensamblan para formar las features.

In [None]:
# Imports necesarios
from pyspark.ml.feature import Binarizer

# Crear Binarizer para arr_delay
delayBinarizer = Binarizer(
    threshold=15.0,
    inputCol="arr_delay",
    outputCol="arr_delay_binary"
)

In [None]:
assert(isinstance(delayBinarizer, Binarizer))
assert(delayBinarizer.getThreshold() == 15)
assert(delayBinarizer.getInputCol() == "arr_delay")
assert(delayBinarizer.getOutputCol() == "arr_delay_binary")

**(0.5 puntos)** Por último, crearemos el modelo de clasificación.

* Crear en una variable `decisionTree` un árbol de clasificación de Spark (`DecisionTreeClassifier` del paquete `pyspark.ml.classification`)
* Indicar como columna de entrada la nueva columna creada por el `VectorAssembler` creado en un apartado anterior.
* Indicar como columna objetivo (target) la nueva columna creada por el `Binarizer` del apartado anterior.

In [None]:
# Imports necesarios
from pyspark.ml.classification import DecisionTreeClassifier

# Crear DecisionTreeClassifier
decisionTree = DecisionTreeClassifier(
    featuresCol="features",
    labelCol="arr_delay_binary"
)

In [None]:
assert(isinstance(decisionTree, DecisionTreeClassifier))
assert(decisionTree.getFeaturesCol() == "features")
assert(decisionTree.getLabelCol() == "arr_delay_binary")

**(1 punto)** Ahora vamos a encapsular todas las fases en un sólo pipeline y procederemos a entrenarlo. Se pide:

* Crear en una variable llamada `pipeline` un objeto `Pipeline` de Spark con las etapas anteriores en el orden adecuado para poder entrenar un modelo. 

* Entrenarlo invocando sobre ella al método `fit` y guardar el pipeline entrenado devuelto por dicho método en una variable llamada `pipelineModel`. 

* Aplicar el pipeline entrenado para transformar (predecir) el DataFrame `flightsConvertido`, guardando las predicciones devueltas en la variable `flightsPredictions` que será un DataFrame. Nótese que estamos prediciendo los propios datos de entrenamiento y que, por simplicidad, no habíamos hecho (aunque habría sido lo correcto) ninguna división de nuestros datos originales en subconjuntos distintos de entrenamiento y test antes de entrenar.

In [None]:
# Imports necesarios
from pyspark.ml import Pipeline

# Crear Pipeline con todas las etapas en orden
pipeline = Pipeline(stages=[
    indexerMonth,       # 1. Indexar month → monthIndexed
    indexerCarrier,     # 2. Indexar carrier → carrierIndexed
    delayBinarizer,     # 3. Binarizar arr_delay → arr_delay_binary
    vectorAssembler,    # 4. Ensamblar features → features
    decisionTree        # 5. Entrenar DecisionTreeClassifier
])

# Entrenar el pipeline
pipelineModel = pipeline.fit(flightsConvertido)

# Aplicar el pipeline entrenado para hacer predicciones
flightsPredictions = pipelineModel.transform(flightsConvertido)

In [None]:
from pyspark.ml import PipelineModel
assert(isinstance(pipeline, Pipeline))
assert(len(pipeline.getStages()) == 5)
assert(isinstance(pipelineModel, PipelineModel))
assert("probability" in flightsPredictions.columns)
assert("prediction" in flightsPredictions.columns)
assert("rawPrediction" in flightsPredictions.columns)

Vamos a mostrar la matriz de confusión (este apartado no es evaluable). Agrupamos por la variable que tiene la clase verdadera y la que tiene la clase predicha, para ver en cuántos casos coinciden y en cuántos difieren.

In [None]:
flightsPredictions.groupBy("arr_delay_binary", "prediction").count().show()