# Data Engineering Test
# Solvex - 2024

## Resuelto por Santiago Taracena Puga

### Ejercicio 2: Procesamiento de datos con Spark y conjunto de datos de vuelos

Utiliza Apache Spark y un conjunto de datos público de vuelos, como el conjunto de datos de vuelos de 2015 proporcionado por la Oficina de Estadísticas de Transporte de EE. UU., para realizar las siguientes tareas:

- Descarga los datos de vuelos en formato CSV desde la URL pública.
- Carga los datos en un DataFrame de Spark.
- Calcula la cantidad promedio de retrasos en la llegada de vuelos en un aeropuerto específico.
- Encuentra las 10 rutas de vuelo más populares (pares de aeropuertos) en términos de la cantidad de vuelos.

Puedes encontrar conjuntos de datos de vuelos en sitios web de datos abiertos, como el Portal de Datos Abiertos del Gobierno de EE. UU.: https://www.transtats.bts.gov/DL_SelectFields.asp

### Solución

Al igual que para la realización del primer ejercicio, el primer paso del presente ejercicio consiste en descargar un dataset que contiene información sobre vuelos en los Estados Unidos durante el año 2015. Este dataset contiene múltiples columnas con información interesante sobre los vuelos, como tiempos de salida, atrasos, cancelaciones de vuelos y mucho más.

El archivo también fue descargado en una carpeta `./data/`, por lo que con el mismo disponible para la resolución del ejercicio podemos proceder a cumplir con el segundo inciso del ejercicio, que consiste en cargar los datos a Spark. Lo primero que debemos hacer es iniciar una `SparkSession` para trabajar con Spark.

In [1]:
# Instrucción para importar SparkSession e iniciar una sesión
from pyspark.sql import SparkSession

Con `SparkSession` disponible, podemos proceder a instanciar una sesión de Spark para trabajar. Esto lo podemos hacer llamando a la propiedad estática `builder` para construir una sesión, nombrar la sesión con `appName`, y finalmente crear la sesión si no ha sido creada aún con la función `getOrCreate`.

In [2]:
# Creación de una sesión de Spark para trabajar
spark = SparkSession.builder.appName("Flights").getOrCreate()
spark

Con Spark finalmente disponible para trabajar, podemos hacer la carga del dataset a un DataFrame de Spark utilizando la función `load` para cargar un archivo de formato .csv y obtener como resultado un DataFrame utilizable para la resolución de los ejercicios planteados.

In [3]:
# Lectura del dataset con los vuelos
data = spark.read.format("csv").option("header", "true").load("./data/flights.csv")
data

DataFrame[YEAR: string, MONTH: string, DAY: string, DAY_OF_WEEK: string, AIRLINE: string, FLIGHT_NUMBER: string, TAIL_NUMBER: string, ORIGIN_AIRPORT: string, DESTINATION_AIRPORT: string, SCHEDULED_DEPARTURE: string, DEPARTURE_TIME: string, DEPARTURE_DELAY: string, TAXI_OUT: string, WHEELS_OFF: string, SCHEDULED_TIME: string, ELAPSED_TIME: string, AIR_TIME: string, DISTANCE: string, WHEELS_ON: string, TAXI_IN: string, SCHEDULED_ARRIVAL: string, ARRIVAL_TIME: string, ARRIVAL_DELAY: string, DIVERTED: string, CANCELLED: string, CANCELLATION_REASON: string, AIR_SYSTEM_DELAY: string, SECURITY_DELAY: string, AIRLINE_DELAY: string, LATE_AIRCRAFT_DELAY: string, WEATHER_DELAY: string]

Podemos observar que se ha retornado un DataFrame con las columnas que se encuentran entre los corchetes del mismo. Para ver con más detalle el esquema del dataset con el que contamos actualmente, podemos utilizar la función `printSchema`.

In [4]:
# Esquema de los datos
data.printSchema()

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

Finalmente, para poder observar los datos con los que contamos de una forma similar a la que lo haríamos con una librería como Pandas, podemos utilizar la función `show` para mostrar una sección del dataset con el que contamos actualmente.

In [5]:
# Diez filas del dataset
data.show(10)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

El primer ejercicio a resolver propiamente consiste en obtener la cantidad promedio de retrasos en la llegada de vuelos en un aeropuerto específico. Lo primero que debemos realizar es definit un aeropuerto a utilizar como foco principal del ejercicio. Por esta razón, se definió una constante `AIRPORT` con el aeropuerto a utilizar.

In [6]:
# Aeropuerto a utilizar para resolver el ejercicio
AIRPORT = "JFK"
AIRPORT

'JFK'

Con nuestro aeropuerto definido, podemos observar cómo es necesario importar algunas funciones de Spark con el objetivo de encontrar, por ejemplo, un promedio. Estas funciones las podemos importar del paquete `pyspark.sql.functions`.

In [7]:
from pyspark.sql.functions import col, avg

Con estas funciones necesarias importadas, podemos comenzar filtrando los datos que tenemos actualmente. Necesitamos únicamente las filas que correspondan al aeropuerto definido anteriormente, y también debemos asegurarnos de no incluir vuelos que hayan sido cancelados.

In [8]:
airport_delays = data.filter((col("DESTINATION_AIRPORT") == AIRPORT) & (col("CANCELLED") == "0"))
airport_delays.show(5)

+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+--------+---------+-------+-----------------+------------+-------------+--------+---------+-------------------+----------------+--------------+-------------+-------------------+-------------+
|YEAR|MONTH|DAY|DAY_OF_WEEK|AIRLINE|FLIGHT_NUMBER|TAIL_NUMBER|ORIGIN_AIRPORT|DESTINATION_AIRPORT|SCHEDULED_DEPARTURE|DEPARTURE_TIME|DEPARTURE_DELAY|TAXI_OUT|WHEELS_OFF|SCHEDULED_TIME|ELAPSED_TIME|AIR_TIME|DISTANCE|WHEELS_ON|TAXI_IN|SCHEDULED_ARRIVAL|ARRIVAL_TIME|ARRIVAL_DELAY|DIVERTED|CANCELLED|CANCELLATION_REASON|AIR_SYSTEM_DELAY|SECURITY_DELAY|AIRLINE_DELAY|LATE_AIRCRAFT_DELAY|WEATHER_DELAY|
+----+-----+---+-----------+-------+-------------+-----------+--------------+-------------------+-------------------+--------------+---------------+--------+----------+--------------+------------+--------+-

Ya tenemos los vuelos cuyo destino son el aeropuerto especificado. Ahora únicamente necesitamos obtener el promedio de la columna de delays solicitada. Esto lo podemos realizar utilizando la función `avg` importada anteriormente. Podemos mostrar el dato con la misma función `show`.

In [9]:
# Retrasos promedio del aeropuerto especificado
average_delay = airport_delays.select(avg("ARRIVAL_DELAY"))
average_delay.show()

+------------------+
|avg(ARRIVAL_DELAY)|
+------------------+
| 6.699455162850623|
+------------------+



El siguiente ejercicio a realizar, y el último de este apartado, consiste en encontrar las 10 rutas de vuelo más populares (pares de aeropuertos) en términos de la cantidad de vuelos. Encontrar una cantidad de vuelos por rutas implica la necesidad de importar la función de agregación `count` para obtener la cuenta de los vuelos.

In [10]:
# Instrucción para importar la función de agregación count
from pyspark.sql.functions import count

Con esta función a nuestra disposición, podemos agrupar los datos por origen y destino utilizando la función `groupBy`, y posteriormente aplicar la función de agregación `count` para obtener la cuenta de vuelos dado un par de aeropuertos.

In [11]:
# Cuenta de rutas dado un par de aeropuertos
route_counts = data.groupBy("ORIGIN_AIRPORT", "DESTINATION_AIRPORT").agg(count("*").alias("NUMBER_OF_FLIGHTS"))
route_counts.show(10)

+--------------+-------------------+-----------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|NUMBER_OF_FLIGHTS|
+--------------+-------------------+-----------------+
|           BQN|                MCO|              441|
|           PHL|                MCO|             4869|
|           MCI|                IAH|             1698|
|           SPI|                ORD|              998|
|           SNA|                PHX|             3846|
|           LBB|                DEN|              618|
|           ORD|                PDX|             2149|
|           EWR|                STT|              239|
|           ATL|                GSP|             2470|
|           MCI|                MKE|              612|
+--------------+-------------------+-----------------+
only showing top 10 rows



Finalmente, para obtener el top 10 seleccionado, únicamente necesitamos utilizar la función `orderBy` para obtener el dataset ordenado de mayor a menor. Finalmente podemos mostrar las 10 líneas más populares volviendo a utilizar la función `show` y pasando 10 como argumento.

In [12]:
# Top 10 líneas aéreas más populares por cantidad de vuelos
popular_routes = route_counts.orderBy(col("NUMBER_OF_FLIGHTS").desc())
popular_routes.show(10)

+--------------+-------------------+-----------------+
|ORIGIN_AIRPORT|DESTINATION_AIRPORT|NUMBER_OF_FLIGHTS|
+--------------+-------------------+-----------------+
|           SFO|                LAX|            13744|
|           LAX|                SFO|            13457|
|           JFK|                LAX|            12016|
|           LAX|                JFK|            12015|
|           LAS|                LAX|             9715|
|           LGA|                ORD|             9639|
|           LAX|                LAS|             9594|
|           ORD|                LGA|             9575|
|           SFO|                JFK|             8440|
|           JFK|                SFO|             8437|
+--------------+-------------------+-----------------+
only showing top 10 rows

