# Tarea BDA04

# David Carlón Cembranos

# Introducción
Realiza las tareas que se plantean en cada ejercicio. En algunas tareas deberás completar las celdas que están incompletas en otras añadir nuevas celdas. Se trata de que implementes una serie de consultas con Spark Dataframes y con Spark SQL (el último ejercicio lo harás con Spark RDD).

Vamos a seguir utilizando el `dataset` de retrasos en vuelos en EEUU de la práctica BDA03. A modo de recordatorio, en el siguiente apartado, repetimos la explicación del significado de los campos.

# Dataset de retrasos en vuelos

Vamos a usar [este](https://www.kaggle.com/datasets/tylerx/flights-and-airports-data) de Kaggle
para aprender a usar tanto Hive como Pig. Kaggle es un sitio muy popular en ciencia de datos. En este sitio los científicos de datos pueden publicar y compartir sus trabajos. Además también se pueden proponer concursos en los que los participantes compiten en la construcción del mejor modelo para el problema propuesto.

El `dataset` contiene información sobre retrasos en vuelos en EEUU. Hay dos ficheros de interés: `airports.csv` y `flights.csv`.

El primero tiene información sobre los aeropuertos y consta de los siguientes campos:
   * airport_id: identificador del aeropuerto. Numérico, aunque se utilizará un campo `string` en Hive.
   * city: ciudad del aeropuerto.
   * state: estado del aeropuerto.
   * name: nombre del aeropuerto.
   
El fichero `flights` tiene la siguiente estructura:
   * DayofMonth: día del mes del vuelo.
   * DayOfWeek: día de la semana del vuelo.
   * Carrier: Identificador de la compañía aérea.
   * OriginAirportID: Identificador del aeropuerto de origen.
   * DestAirportID: Identificador del aeropuerto de destino.
   * DepDelay: Minutos de retraso en la salida de un vuelo (puede ser negativo si el vuelo sale antes de lo previsto).
   * ArrDelay: Minutos de retraso en la llegada de un vuelo (puede ser negativo si el vuelo sale antes de lo previsto).

El fichero `Tarea_BDA04.zip` contiene los dos ficheros. Para descargarlo de Kaggle hay que estar registrado y se ha incluido para que no tengas que registrarte.

# Preparación
Usa la opción `Data` del panel lateral para subir a DBDS los ficheros `airports.csv`, `flights.csv`.


#### Comprueba que los archivos "airports.csv" y "flights.csv" están en DBFS

In [None]:
%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/airports.csv,airports.csv,16308,1709814141000
dbfs:/FileStore/tables/flights.csv,flights.csv,72088113,1709814137000
dbfs:/FileStore/tables/notas.txt,notas.txt,61,1709882919000


#### Crea los dataframes `airports_df` y `flights_df` a partir de los ficheros anteriores y muestra las primeras filas y su esquema y comprueba que es el esquema es correcto.

In [None]:
# Crear el DataFrame "airports_df"
airports_df = spark.read.option("header", "true").csv("dbfs:/FileStore/tables/airports.csv")

# Crear el DataFrame "flights_df"
flights_df = spark.read.option("header", "true").csv("dbfs:/FileStore/tables/flights.csv")


In [None]:
# Mostrar contenido de airports_df
airports_df.display()

airport_id,city,state,name
10165,Adak Island,AK,Adak
10299,Anchorage,AK,Ted Stevens Anchorage International
10304,Aniak,AK,Aniak Airport
10754,Barrow,AK,Wiley Post/Will Rogers Memorial
10551,Bethel,AK,Bethel Airport
10926,Cordova,AK,Merle K Mudhole Smith
14709,Deadhorse,AK,Deadhorse Airport
11336,Dillingham,AK,Dillingham Airport
11630,Fairbanks,AK,Fairbanks International
11997,Gustavus,AK,Gustavus Airport


In [None]:
airports_df.printSchema()

root
 |-- airport_id: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- name: string (nullable = true)



In [None]:
# Mostrar contenido de flights_df
flights_df.display()


DayofMonth,DayOfWeek,Carrier,OriginAirportID,DestAirportID,DepDelay,ArrDelay
19,5,DL,11433,13303,-3,1
19,5,DL,14869,12478,0,-8
19,5,DL,14057,14869,-4,-15
19,5,DL,15016,11433,28,24
19,5,DL,11193,12892,-6,-11
19,5,DL,10397,15016,-1,-19
19,5,DL,15016,10397,0,-1
19,5,DL,10397,14869,15,24
19,5,DL,10397,10423,33,34
19,5,DL,11278,10397,323,322


In [None]:
flights_df.printSchema()

root
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- Carrier: string (nullable = true)
 |-- OriginAirportID: string (nullable = true)
 |-- DestAirportID: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- ArrDelay: string (nullable = true)




#### Usando los dataframes anteriores muestra las cinco compañías que más vuelos retrasados tienen (las celdas tiene que ser de tipo Python).
* El campo `Carrier` contiene la compañía aérea.
* Vamos a considerar que un vuelo llega con retraso cuando el vuelo llega más de 15 minutos tarde (campo `ArrDelay` > 15).
* El nombre del campo que contine la cuenta de vuelos retrasados se llamará `delayed_flights`.

In [None]:
# API DataFrame que permite usar consultas similares a Spark SQL



display(flights_df
         # Filtrar vuelos con retrasos en la llegada mayores a 15 minutos
        .where("arrdelay >15") 
        # Agrupar por aerolínea y contar el número de vuelos con retraso por aerolínea
        .groupBy("carrier")
        .count()
        .withColumnRenamed("count", "delayed_flights")
        # Ordenar por el número de vuelos retrasados en orden descendente (más a menos)
        .orderBy("delayed_flights", ascending= False)
        # Seleccionar las 5 aerolíneas principales con más vuelos retrasados
        .limit(5)
       )

carrier,delayed_flights
WN,127601
AA,59842
DL,57668
UA,57367
US,40943



#### Almacena los dataframes `airports_df` y `flights_df` en sendas tablas temporales y realiza el ejercicio anterior usando una celda de tipo SQL.

In [None]:
# Alamacenar airports_df en la tabla airports y flights_df en la tabla flights
airports_df.createOrReplaceTempView("airports")
flights_df.createOrReplaceTempView("flights")

In [None]:
%sql
-- Mostrar las cinco compañías con más vuelos retrasados
-- Seleccionar aerolínea y contar el número de vuelos retrasados (delayed_flights)
SELECT carrier, COUNT(*) as delayed_flights 
-- De la tabla 'flights'
FROM flights 
-- Filtrar vuelos con retrasos en la llegada mayores a 15 minutos
WHERE arrdelay > 15 
-- Agrupar por aerolínea
GROUP BY carrier 
-- Ordenar por el número de vuelos con retraso en orden descendente (más a menos)
ORDER BY delayed_flights DESC 
-- Limitar a las 5 aerolíneas principales con más vuelos retrasados
LIMIT 5

carrier,delayed_flights
WN,127601
AA,59842
DL,57668
UA,57367
US,40943



#### Usando los dataframes `airports_df` y `flights_df` muestra los 5 AEROPUERTOS DE DESTINO que mejor recuperación de tiempo en vuelo tienen (las celdas tienen que ser de tipo Python).
* Se considera que se ha recuperado el tiempo de un vuelo cuando habiendo salido con retraso (`DepDelay` > 15), llega sin retraso (`ArrDelay` <= 15).
* Se trata de que muestres los nombres de los 5 aeropuertos de llegada que han recuperado el tiempo en un mayor porcentaje de vuelos que salieron retrasados.

In [None]:
from pyspark.sql.functions import col, count, when


display(
    # Unir los dataframes 'flights_df' y 'airports_df' usando el 'airport_id'
    flights_df.join(airports_df.select("airport_id", "name"), col("DestAirportID") == col("airport_id"))
    
    # Filtrar los vuelos que salieron con retraso (DepDelay > 15)
    .filter(col("depdelay") > 15) 
    
    # Agrupar por el nombre del aeropuerto y calcular el número de vuelos que se recuperaron y el total de vuelos
    .groupBy("name").agg(
        count(when(col("arrdelay") <= 15, 1)).alias("recovered"),  # vuelos que se recuperaron
        count("*").alias("totals")  # total de vuelos
    )
    
    # Calcular el porcentaje de vuelos que se recuperaron
    .select(
        "name", 
        (col("recovered") / col("totals")).alias("percent_recovered")  # porcentaje de recuperación
    )
    
    # Ordenar por el porcentaje de recuperación en orden descendente y mostramos los primeros 5
    .orderBy("percent_recovered", ascending=False).limit(5)
)

name,percent_recovered
Ted Stevens Anchorage International,0.3480885311871227
John Wayne Airport-Orange County,0.2563380281690141
Metropolitan Oakland International,0.2470757603023214
Seattle/Tacoma International,0.2413364939609436
Chicago Midway International,0.2406879513492968



### Sube el fichero `notas.txt` a DBFS y calcula la nota media de cada alumno usando Spark (celdas en Python).

Nota: El fichero `notas.txt` no tiene estructura de tabla y no vas a poderlo procesar como un `Dataframe`. Lo recomendable es usar transformaciones y acciones con RDD. En la [guía de programación de RDD](https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds) puedes aprender los conceptos básicos.

Nota2: La implementación debe tener en cuenta que el fichero `notas.txt` es un simple ejemplo de un fichero más grande que contendría millones de filas. Sin embargo el número de alumnos será de unos cientos. 

In [None]:
# Comprobar si notas.txt esta DBFS


%fs ls /FileStore/tables

path,name,size,modificationTime
dbfs:/FileStore/tables/airports.csv,airports.csv,16308,1709814141000
dbfs:/FileStore/tables/flights.csv,flights.csv,72088113,1709814137000
dbfs:/FileStore/tables/notas.txt,notas.txt,61,1709882919000


In [None]:
# Mostrar contenido del archivo


%fs head dbfs:/FileStore/tables/notas.txt


In [None]:
# Leer el archivo
notas = sc.textFile("dbfs:/FileStore/tables/notas.txt")

# Separar lineas
lineas = notas.map(lambda s: s.split())

# Separar datos en tuplas
estudiantes= lineas.map(lambda x: (x[0], list(map(int, x[1:]))))

# Convertir los pares en una lista con el nombre y la nota
lista_pares = estudiantes.flatMapValues(lambda x: [(x[i],) for i in range(len(x))])

# Recoger datos por nombre y valor
union = lista_pares.reduceByKey(lambda x, y: x + y)

# Sumar datos de las notas y dividir entre su longitud para la media
suma = union.map(lambda x: (x[0], float(sum(x[1]))/len(x[1])))
media = suma.collect()
print(media)

[('pedro', 5.0), ('luis', 2.3333333333333335), ('ana', 7.0)]
