In [None]:
import pyspark
spark = pyspark.sql.SparkSession.builder.appName("Airlines").getOrCreate()
df_airlines = spark.read.csv('airlines.csv', inferSchema="true", header="true")
df_airlines.createOrReplaceTempView("airlines")
df_flights = spark.read.csv('flights.csv', inferSchema="true", header="true")
df_flights.createOrReplaceTempView("flights")
df_airports = spark.read.csv('airports.csv', inferSchema="true", header="true")
df_airports.createOrReplaceTempView("airports")

## 1. Mostrar los 5 aeropuertos de origen que tienen mayor cantidad de cancelaciones.

In [None]:
query = 'SELECT airports.AIRPORT as origin_airport, COUNT(flights.FLIGHT_NUMBER) as cancelled_count\
                        FROM flights INNER JOIN airports ON flights.ORIGIN_AIRPORT = airports.IATA_CODE\
                        WHERE CANCELLED = 1\
                        GROUP BY airports.AIRPORT\
                        ORDER BY cancelled_count DESC\
                        LIMIT 5'

spark.sql(query).show(20, False)

## 2. Mostrar el nombre de aerolíneas y la cantidad de vuelos desde Atlanta (ATL) a Los Ángeles (LAX) ordenadas cantidad de vuelos

In [None]:
query = "SELECT airlines.AIRLINE as airline_name, COUNT(flights.FLIGHT_NUMBER) as flights_ATL_to_ALX\
        FROM flights INNER JOIN airlines ON flights.AIRLINE=airlines.IATA_CODE\
        WHERE flights.ORIGIN_AIRPORT = 'ATL' AND flights.DESTINATION_AIRPORT = 'LAX'\
        GROUP BY airline_name\
        ORDER BY flights_ATL_to_ALX DESC"

spark.sql(query).show(20, False)

## 3. Mostrar y Analizar el Query Plan del punto 2 para entender las optimizaciones que realiza Catalyst Optimizer, contestando las siguientes preguntas:

### A. ¿Se realiza alguna optimización lógica, como filter pushdown? ¿En qué etapa?
### B. ¿Que tipo de Join Físico se realiza? ¿En qué etapa?

In [None]:
spark.sql(query).explain(True)

1. Unresolved Logical Plan: Se lee por primera vez (parser) el plan, no se hicieron las relaciones necesarias, todavía no hay optimizaciones ni acciones físicas

2. Analyzed Logical Plan: Se analiza el plan, todavía no hay optimizaciones ni acciones físicas

3. Optimized Logical Plan: Se hacen las optimizaciones, en este caso un filter pushdown. ¿Como nos damos cuenta? Comparamos el plan original contra el plan optimizado:

Plan original, truncado para ver solo operaciones:
```
Sort 
    Aggregate
        Filter
            Join Inner
```

Plan optimizado, truncado para ver solo operaciones:
```
Sort
    Aggregate
        Project
            Join Inner
                Project
                    Filter
                        Filter isnotnull
```

**Se hizo un pushdown del primer filter (paso a estar abajo del join inner). También, se agrego el filter de si no es nulo (en este caso, el IATA)**

No hay acciones físicas en esta etapa, solo optimizaciones.

4. Physical Plan: Se ejecuta el plan de acción.
    
**Leyendo el plan, se puede ver que el join ejecutado es un BroadcastHashJoin:**
    
`+- *(2) BroadcastHashJoin [AIRLINE#358], [IATA_CODE#340], Inner, BuildRight``