### Finger 3
Considerando la siguiente información del set de datos

 - airlines.csv
 - airports.csv
 - flights.csv

Se plantean los siguientes puntos a realizar:

 - Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones.
 - Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenadas cantidad de vuelos
 - Mostrar y Analizar el Query Plan del punto 2 para entender las optimizaciones que realiza Catalyst Optimizer, contestando las siguientes preguntas:
 - ¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?
 - ¿Que tipo de Join Físico se realiza? ¿En qué etapa?


In [1]:
airlines_df = sqlContext.read.option('header','true').csv('../dataPractica/airlines.csv')
airports_df = sqlContext.read.option('header','true').csv('../dataPractica/airports.csv')
flights_df = sqlContext.read.option('header','true').csv('../dataPractica/flights.csv')

In [2]:
##Conociendo los datos
airlines_df.first()

Row(IATA_CODE='UA', AIRLINE='United Air Lines Inc.')

In [3]:
airports_df.describe()

DataFrame[summary: string, IATA_CODE: string, AIRPORT: string, CITY: string, STATE: string, COUNTRY: string, LATITUDE: string, LONGITUDE: string]

In [4]:
flights_df.columns

['YEAR',
 'MONTH',
 'DAY',
 'DAY_OF_WEEK',
 'AIRLINE',
 'FLIGHT_NUMBER',
 'TAIL_NUMBER',
 'ORIGIN_AIRPORT',
 'DESTINATION_AIRPORT',
 'SCHEDULED_DEPARTURE',
 'DEPARTURE_TIME',
 'DEPARTURE_DELAY',
 'TAXI_OUT',
 'WHEELS_OFF',
 'SCHEDULED_TIME',
 'ELAPSED_TIME',
 'AIR_TIME',
 'DISTANCE',
 'WHEELS_ON',
 'TAXI_IN',
 'SCHEDULED_ARRIVAL',
 'ARRIVAL_TIME',
 'ARRIVAL_DELAY',
 'DIVERTED',
 'CANCELLED',
 'CANCELLATION_REASON',
 'AIR_SYSTEM_DELAY',
 'SECURITY_DELAY',
 'AIRLINE_DELAY',
 'LATE_AIRCRAFT_DELAY',
 'WEATHER_DELAY']

In [5]:
flights_df.first()

Row(YEAR='2015', MONTH='1', DAY='1', DAY_OF_WEEK='4', AIRLINE='AS', FLIGHT_NUMBER='98', TAIL_NUMBER='N407AS', ORIGIN_AIRPORT='ANC', DESTINATION_AIRPORT='SEA', SCHEDULED_DEPARTURE='0005', DEPARTURE_TIME='2354', DEPARTURE_DELAY='-11', TAXI_OUT='21', WHEELS_OFF='0015', SCHEDULED_TIME='205', ELAPSED_TIME='194', AIR_TIME='169', DISTANCE='1448', WHEELS_ON='0404', TAXI_IN='4', SCHEDULED_ARRIVAL='0430', ARRIVAL_TIME='0408', ARRIVAL_DELAY='-22', DIVERTED='0', CANCELLED='0', CANCELLATION_REASON=None, AIR_SYSTEM_DELAY=None, SECURITY_DELAY=None, AIRLINE_DELAY=None, LATE_AIRCRAFT_DELAY=None, WEATHER_DELAY=None)

In [6]:
#Creo las "tablas" estilo SQL
flights_df.createOrReplaceTempView('flights')
airlines_df.createOrReplaceTempView('airlines')
airports_df.createOrReplaceTempView('airports')


###  Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones

In [7]:
query_cancelaciones = 'SELECT A.AIRPORT, F.ORIGIN_AIRPORT, COUNT(*) as cant_cancelaciones\
                       FROM flights AS F\
                       INNER JOIN airports AS A\
                           ON F.ORIGIN_AIRPORT = A.IATA_CODE\
                       WHERE CANCELLED = 1\
                       GROUP BY A.AIRPORT, F.ORIGIN_AIRPORT\
                       ORDER BY cant_cancelaciones DESC\
                       LIMIT 5'

vuelos_cancelados = spark.sql(query_cancelaciones).show()


+--------------------+--------------+------------------+
|             AIRPORT|ORIGIN_AIRPORT|cant_cancelaciones|
+--------------------+--------------+------------------+
|Chicago O'Hare In...|           ORD|              8548|
|Dallas/Fort Worth...|           DFW|              6254|
|LaGuardia Airport...|           LGA|              4531|
|Newark Liberty In...|           EWR|              3110|
|Gen. Edward Lawre...|           BOS|              2654|
+--------------------+--------------+------------------+



###  Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenadas cantidad de vuelos

In [8]:
query_vuelos_atl_lax = "SELECT A.AIRLINE, COUNT(*) as cant_vuelos\
                        FROM flights AS F\
                        INNER JOIN airlines AS A\
                            ON F.AIRLINE = A.IATA_CODE\
                        WHERE \
                            F.ORIGIN_AIRPORT = 'ATL' AND DESTINATION_AIRPORT = 'LAX'\
                        GROUP BY A.AIRLINE\
                        ORDER BY cant_vuelos DESC"


vuelos_atl_lax = spark.sql(query_vuelos_atl_lax)
vuelos_atl_lax.show()

+--------------------+-----------+
|             AIRLINE|cant_vuelos|
+--------------------+-----------+
|Delta Air Lines Inc.|       3624|
|Southwest Airline...|        962|
|American Airlines...|        765|
|Frontier Airlines...|        215|
|    Spirit Air Lines|        103|
+--------------------+-----------+



### Mostrar y Analizar el Query Plan del punto 2 para entender las optimizaciones que realiza Catalyst Optimizer

In [9]:
spark.sql(query_vuelos_atl_lax).explain(True)

== Parsed Logical Plan ==
'Sort ['cant_vuelos DESC NULLS LAST], true
+- 'Aggregate ['A.AIRLINE], ['A.AIRLINE, 'COUNT(1) AS cant_vuelos#624]
   +- 'Filter (('F.ORIGIN_AIRPORT = ATL) && ('DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('F.AIRLINE = 'A.IATA_CODE)
         :- 'SubqueryAlias F
         :  +- 'UnresolvedRelation `flights`
         +- 'SubqueryAlias A
            +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, cant_vuelos: bigint
Sort [cant_vuelos#624L DESC NULLS LAST], true
+- Aggregate [AIRLINE#13], [AIRLINE#13, count(1) AS cant_vuelos#624L]
   +- Filter ((ORIGIN_AIRPORT#63 = ATL) && (DESTINATION_AIRPORT#64 = LAX))
      +- Join Inner, (AIRLINE#60 = IATA_CODE#12)
         :- SubqueryAlias F
         :  +- SubqueryAlias flights
         :     +- Relation[YEAR#56,MONTH#57,DAY#58,DAY_OF_WEEK#59,AIRLINE#60,FLIGHT_NUMBER#61,TAIL_NUMBER#62,ORIGIN_AIRPORT#63,DESTINATION_AIRPORT#64,SCHEDULED_DEPARTURE#65,DEPARTURE_TIME#66,DEPARTURE_DELAY#67,TAXI_O

### ¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?
Se realizan filter pushdowns filtrando valores nulos de las tablas a "joinear", en Optimized Logical Plan.
También proyecta qué columnas utilizará de cada tabla, en la misma etapa.
### ¿Que tipo de Join Físico se realiza? ¿En qué etapa?
Termina reaizando un Broadcast Hash Join en la etapa de Physical Plan, por más que en la query se eligiera Inner Join.