# **Finger 3**

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Finger 3") \
    .getOrCreate()

In [2]:
airlines = spark.read.csv('./dataset/airlines.csv', sep=",", inferSchema="true", header="true")
airlines.createOrReplaceTempView('airlines')

In [3]:
flights = spark.read.csv('./dataset/flights.csv', sep=",", inferSchema="true", header="true")
flights.createOrReplaceTempView('flights')

In [4]:
airports = spark.read.csv('./dataset/airports.csv', sep=",", inferSchema="true", header="true")
airports.createOrReplaceTempView('airports')

### **1. Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones.**

In [5]:
top_5_cancelations = spark.sql('''
    SELECT airports.AIRPORT
    FROM flights INNER JOIN airports
    ON flights.ORIGIN_AIRPORT = airports.IATA_CODE
    WHERE flights.CANCELLED = 1
    GROUP BY airports.AIRPORT
    ORDER BY COUNT(flights.CANCELLED) DESC
    LIMIT 5
    ''')

top_5_cancelations.show()

+--------------------+
|             AIRPORT|
+--------------------+
|Chicago O'Hare In...|
|Dallas/Fort Worth...|
|LaGuardia Airport...|
|Newark Liberty In...|
|Gen. Edward Lawre...|
+--------------------+



### **2. Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenadas por cantidad de vuelos**

In [6]:
from_atl_to_lax = spark.sql('''
    SELECT airlines.AIRLINE, COUNT(flights.AIRLINE) as FLIGHT_COUNT
    FROM flights INNER JOIN airlines
    ON flights.AIRLINE = airlines.IATA_CODE
    WHERE flights.ORIGIN_AIRPORT = "ATL" AND flights.DESTINATION_AIRPORT = "LAX"
    GROUP BY airlines.AIRLINE
    ORDER BY FLIGHT_COUNT DESC
''')

from_atl_to_lax.show()

+--------------------+------------+
|             AIRLINE|FLIGHT_COUNT|
+--------------------+------------+
|Delta Air Lines Inc.|        3624|
|Southwest Airline...|         962|
|American Airlines...|         765|
|Frontier Airlines...|         215|
|    Spirit Air Lines|         103|
+--------------------+------------+



### **3. Mostrar y Analizar el Query Plan del punto 2**

In [7]:
from_atl_to_lax.explain(True)

== Parsed Logical Plan ==
'Sort ['FLIGHT_COUNT DESC NULLS LAST], true
+- 'Aggregate ['airlines.AIRLINE], ['airlines.AIRLINE, 'COUNT('flights.AIRLINE) AS FLIGHT_COUNT#124]
   +- 'Filter (('flights.ORIGIN_AIRPORT = ATL) && ('flights.DESTINATION_AIRPORT = LAX))
      +- 'Join Inner, ('flights.AIRLINE = 'airlines.IATA_CODE)
         :- 'UnresolvedRelation `flights`
         +- 'UnresolvedRelation `airlines`

== Analyzed Logical Plan ==
AIRLINE: string, FLIGHT_COUNT: bigint
Sort [FLIGHT_COUNT#124L DESC NULLS LAST], true
+- Aggregate [AIRLINE#11], [AIRLINE#11, count(AIRLINE#28) AS FLIGHT_COUNT#124L]
   +- Filter ((ORIGIN_AIRPORT#31 = ATL) && (DESTINATION_AIRPORT#32 = LAX))
      +- Join Inner, (AIRLINE#28 = IATA_CODE#10)
         :- SubqueryAlias flights
         :  +- Relation[YEAR#24,MONTH#25,DAY#26,DAY_OF_WEEK#27,AIRLINE#28,FLIGHT_NUMBER#29,TAIL_NUMBER#30,ORIGIN_AIRPORT#31,DESTINATION_AIRPORT#32,SCHEDULED_DEPARTURE#33,DEPARTURE_TIME#34,DEPARTURE_DELAY#35,TAXI_OUT#36,WHEELS_OFF#37,SCHEDULE

#### **¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?**

Se realiza un _filter pushdown_: en la etapa de _Analized Logical Plan_ se hace primero el join y despues el filter, pero en _Optimized Logical Plan_ el  Catalyst Optimizer mueve el filter lo antes posible para desestimar datos innecesarios y optimizar el uso de memoria.

Tambien se utiliza _projection pruning_ para descartar las columnas que no se usan. 

#### **¿Que tipo de Join Físico se realiza? ¿En qué etapa?**

En la etapa de _Physical Plan_ se decide que se va a realizar un _BroadcastHashJoin_.