Desafío - Spark II
● Para realizar este desafío debes haber estudiado previamente todo el material
disponibilizado correspondiente a la unidad.
● Una vez terminado el desafío, comprime la carpeta que contiene el desarrollo de los
requerimientos solicitados y sube el .zip en el LMS.
● Desarrollo desafío:
○ El desafío se debe desarrollar de manera Individual/Grupal.
○ Debes tener precaución con las rutas y el sistema operativo que utilices.

Sobre los datos del ejercicio
● El Buró de Estadísticas de Transportes del Departamento de Transporte de los
Estados Unidos de América realiza un seguimiento en tiempo real del desempeño de
los vuelos domésticos realizados por operadores de gran escala.
● Para este ejercicio se trabajará con tres tablas en formato columnar parquet que se
detallan a continuación:
○ flights.parquet: Tabla correspondiente a todos los vuelos realizados en el
2015.
○ airports.parquet: Tabla correspondiente a todos los aeropuertos dentro
de los Estados Unidos de América.
○ airlines.parquet: Tabla correspondiente a todos los operadores de gran
escala en el 2015.

Ejercicio 1: Preliminares
● Genere una instancia de trabajo en AWS EMR con los componentes necesarios de
Spark y habilite un puerto dinámico para utilizar un notebook desde JupyterHub.
● Genere un objeto con SparkSession y asegúrese de habilitar el soporte con Hive.
● Utilizando su objeto creado con SparkSession, realice el import de los objetos
parquet que se encuentran en la siguiente dirección del bucket del curso
s3://bigdata-desafio/challenges/u4act2/.
● Infiera el schema de cada objeto creado.

In [1]:
sc

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1638643277577_0002,pyspark,idle,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-1>

In [2]:
#Se generan objetos con las rutas
path_airlines='s3://bigdata-desafio/challenges/u4act2/airlines.parquet'
path_airport='s3://bigdata-desafio/challenges/u4act2/airports.parquet'
path_flights='s3://bigdata-desafio/challenges/u4act2/flights.parquet'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
#Se importan los parquets
df_airlines =spark.read.parquet(path_airlines)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
df_airport =spark.read.parquet(path_airport)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
df_flights =spark.read.parquet(path_flights)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Se infieren schemas del los parquets importados
df_airlines.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRLINE: string (nullable = true)

In [7]:
df_airport.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- IATA_CODE: string (nullable = true)
 |-- AIRPORT: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)

In [8]:
df_flights.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- YEAR: string (nullable = true)
 |-- MONTH: string (nullable = true)
 |-- DAY: string (nullable = true)
 |-- DAY_OF_WEEK: string (nullable = true)
 |-- AIRLINE: string (nullable = true)
 |-- FLIGHT_NUMBER: string (nullable = true)
 |-- TAIL_NUMBER: string (nullable = true)
 |-- ORIGIN_AIRPORT: string (nullable = true)
 |-- DESTINATION_AIRPORT: string (nullable = true)
 |-- SCHEDULED_DEPARTURE: string (nullable = true)
 |-- DEPARTURE_TIME: string (nullable = true)
 |-- DEPARTURE_DELAY: string (nullable = true)
 |-- TAXI_OUT: string (nullable = true)
 |-- WHEELS_OFF: string (nullable = true)
 |-- SCHEDULED_TIME: string (nullable = true)
 |-- ELAPSED_TIME: string (nullable = true)
 |-- AIR_TIME: string (nullable = true)
 |-- DISTANCE: string (nullable = true)
 |-- WHEELS_ON: string (nullable = true)
 |-- TAXI_IN: string (nullable = true)
 |-- SCHEDULED_ARRIVAL: string (nullable = true)
 |-- ARRIVAL_TIME: string (nullable = true)
 |-- ARRIVAL_DELAY: string (nullable = true)
 |-- D

In [9]:
#trabajando con spark si quiero tratar mis dataframes como si fueran tablas que pueda manipular con SQL debo hacer lo siguuente:
#debo materializar el dataframe como una vista temporal de spark sql y eso se hace asi =>

df_airlines.createOrReplaceTempView("airlinestb")

#ahora debo utilizar la funcion magic de jupyter para indicar que voy a lanzar una sentencia SQL

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#voy a transformar mi df airport en una vista de spark sql
df_airport.createOrReplaceTempView("airport_tb")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
#vamos con el daframe vuelos para poder usar en sql context

df_flights.createOrReplaceTempView("flights_tb")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Ejercicio 2: Implementación de Queries
En su calidad de Científico de Datos, su jefe le genera una serie de consultas que deberá
implementar utilizando sus conocimientos en SparkSQL y sus objetos DataFrame. La única
limitante es que estará trabajando en un cluster habilitado sólo con el kernel PySpark3, por
lo que no podrá utilizar librerías como pandas, numpy y matplotlib. Cabe destacar que usted
no tendrá permisos de superusuario para instalar librerías.

● Query 1: Cantidad de vuelos por mes. Reporte los meses con una mayor cantidad de
vuelos.

In [12]:
%%sql
SELECT MONTH, COUNT(1)
FROM flights_tb 
GROUP BY MONTH
ORDER BY 2 DESC
LIMIT 10


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

● Query 2: Cantidad de vuelos por día y mes. Reporte los días con una mayor cantidad
de vuelos.

In [13]:
%%sql
SELECT MONTH, DAY, COUNT(1)
FROM flights_tb 
GROUP BY MONTH, DAY
ORDER BY 3 DESC
LIMIT 10

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

● Query 3: Cantidad de aeropuertos por Estado. Reporte los estados con una mayor
cantidad de aeropuertos.

In [14]:
%%sql
SELECT STATE, COUNT(AIRPORT)
FROM airport_tb 
GROUP BY STATE
ORDER BY 2 DESC
LIMIT 10

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

Query 4: Excluyendo los aeropuertos que no aparezcan en la tabla airports,
identifique los aeropuertos con una mayor cantidad de vuelos.
Tips:
● Para identificar los aeropuertos, utilice la columna ORIGIN_AIRPORT de la tabla
flights.
● Haga un join con la tabla airports, utilizando el IATA_CODE como registro
identificador.

In [15]:
%%sql
SELECT *
FROM airport_tb JOIN flights_tb  
where flights_tb.ORIGIN_AIRPORT == airport_tb.IATA_CODE

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

In [16]:
%%sql
SELECT AIRPORT, COUNT(AIRPORT)
FROM airport_tb JOIN flights_tb  
where flights_tb.ORIGIN_AIRPORT == airport_tb.IATA_CODE
GROUP BY AIRPORT
ORDER BY 2 DESC
LIMIT 10

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

● Query 5: Excluyendo los aeropuertos que no aparezcan en la tabla airports,
identifique los estados con una mayor cantidad de vuelos.
Tips:
● Se sugiere implementar una query de SQL para resolver este problema.
● No se olvide de registrar las tablas temporales.

In [17]:
%%sql
SELECT STATE, COUNT(STATE)
FROM airport_tb JOIN flights_tb  
where flights_tb.ORIGIN_AIRPORT == airport_tb.IATA_CODE
GROUP BY STATE
ORDER BY 2 DESC
LIMIT 10

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

● Query 6: Excluyendo los aeropuertos que no aparezcan en la tabla airports,
identifique el promedio de retraso en partida (con la columna DEPARTURE_DELAY) y
llegada (con la columna ARRIVAL_DELAY) para cada aeropuerto de origen (con la
columna ORIGIN_AIRPORT). Reporte los cinco aeropuertos con un mayor retraso
promedio de partida.

In [18]:
%%sql
SELECT ORIGIN_AIRPORT, AVG(DEPARTURE_DELAY) AS PROMEDIO_DEPARTURE_DELAY, AVG(ARRIVAL_DELAY) AS PROMEDIO_ARRIVAL_DELAY
FROM airport_tb JOIN flights_tb 
where flights_tb.ORIGIN_AIRPORT == airport_tb.IATA_CODE
GROUP BY ORIGIN_AIRPORT
ORDER BY 2 DESC
LIMIT 10

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

● Query 7: Excluyendo los aeropuertos que no aparezcan en la tabla airports,
identifique las principales razones de cancelación de vuelos.

In [19]:
%%sql
SELECT CANCELLATION_REASON, COUNT(CANCELLATION_REASON)
FROM airport_tb JOIN flights_tb 
where flights_tb.ORIGIN_AIRPORT == airport_tb.IATA_CODE
GROUP BY CANCELLATION_REASON
ORDER BY 2 DESC

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

● Query 8: Excluyendo los aeropuertos que no aparezcan en la tabla airports y sólo
considerando los cinco aeropuertos con un mayor retraso promedio de partida,
identifique las principales causas de cancelación de vuelos.
Tips:
● Genere una lista con las siglas de cada aeropuerto con mayor retraso.
● Para filtrar registros por elementos dentro de una lista, pueden hacer uso de
las funciones where e isin.

In [20]:
%%sql
SELECT CANCELLATION_REASON, AVG(DEPARTURE_DELAY) AS PROMEDIO_DEPARTURE_DELAY
FROM airport_tb JOIN flights_tb 
WHERE flights_tb.ORIGIN_AIRPORT == airport_tb.IATA_CODE
GROUP BY CANCELLATION_REASON
ORDER BY 2 DESC
LIMIT 5

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()

Query 9: Excluyendo los aeropuertos que no aparezcan en la tabla airports,
identifique el tiempo promedio de retraso en partida y llegada para cada aerolínea.
Tips:
● Se sugiere implementar una query de SQL para resolver este problema.
● No se olvide de registrar las tablas temporales.

In [21]:
%%sql
SELECT airlinestb.AIRLINE, AVG(ARRIVAL_DELAY) AS PROMEDIO_ARRIVAL_DELAY, AVG(DEPARTURE_DELAY) AS PROMEDIO_DEPARTURE_DELAY
FROM airlinestb JOIN flights_tb 
where flights_tb.AIRLINE == airlinestb.IATA_CODE
GROUP BY airlinestb.AIRLINE

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

VBox(children=(HBox(children=(HTML(value='Type:'), Button(description='Table', layout=Layout(width='70px'), st…

Output()