In [0]:
# print da sessão spark já definida pelo DataBricks
print(spark)

# gerando um spark dataframe a partir do file_path de upload de arquivo do dbfs:
file_path = "dbfs:/FileStore/shared_uploads/vinisegan@gmail.com/airports_database.csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)



<pyspark.sql.session.SparkSession object at 0x7fb0bb664160>


In [0]:
# importing pyspark.sql
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window


##### 1. Qual é o número total de voos no conjunto de dados?
**Resposta:** contar o número total de ids distintos = 336776

In [0]:
resposta_1 = df.select(countDistinct("id").alias("voos_total"))
resposta_1.show()

+----------+
|voos_total|
+----------+
|    336776|
+----------+



##### 2. Quantos voos foram cancelados? (Considerando que voos cancelados têm dep_time e arr_time nulos)
**Resposta:** contar ids distintos onde 'dep_time' **e** 'arr_time' são nulos = 8255

In [0]:
resposta_2 = df.filter((col("dep_time").isNull()) & (col("arr_time")).isNull()).select(countDistinct("id").alias("voos_cancelados"))
resposta_2.show()

+---------------+
|voos_cancelados|
+---------------+
|           8255|
+---------------+



##### 3. Qual é o atraso médio na partida dos voos (dep_delay)?
**Resposta:** considerando tanto voos adiantados quanto voos atrasados, temos uma média de 12.64 minutos de atraso.


In [0]:
resposta_3 = (
    df.filter((col("dep_time").isNotNull()) | (col("arr_time")).isNotNull())
    .select(round(avg("dep_delay"),2).alias("avg_dep_delay"))
)

resposta_3.show()

+-------------+
|avg_dep_delay|
+-------------+
|        12.64|
+-------------+



##### 4. Quais são os 5 aeroportos com maior número de pousos?
**Resposta:** desconsiderando os voos cancelados e analisando somente os pousos reais, os 5 aeroportos com maior número de pousos são: 
1. ATL
2. ORD
3. LAX
4. BOS
5. MCO

In [0]:
resposta_4 = (
    df.filter((col("dep_time").isNotNull()) | (col("arr_time")).isNotNull())
    .groupBy(col("dest"))
    .agg(countDistinct(col("id")).alias("num_pouso"))
    .sort(col("num_pouso").desc())
    .limit(5)
)

display(resposta_4)

dest,num_pouso
ATL,16898
ORD,16642
LAX,16076
BOS,15049
MCO,13982


##### 5. Qual é a rota mais frequente (par origin-dest)?
**Resposta:** pensando somente nas rotas independente do voo ter sido ou não cancelado, a rota mais frequente é: "JFK -> LAX" 

In [0]:
resposta_5 = (
    df.groupBy(col("origin"),col("dest"))
    .agg(countDistinct(col("id")).alias("num_rotas"))
    .sort(col("num_rotas").desc())
    .limit(1)
)
display(resposta_5)

origin,dest,num_rotas
JFK,LAX,11262


##### 6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada? (Exiba também o tempo)
**Resposta:** As companhias aéreas com maior tempo médio de atraso e a respectiva média estão na tabela abaixo.

In [0]:
resposta_6 = (
    df.groupBy(col("name"))
    .agg(round(avg(col("arr_delay")),2).alias("avg_arr_delay"))
    .sort(col("avg_arr_delay").desc())
    .limit(5)
)
display(resposta_6)

name,avg_arr_delay
Frontier Airlines Inc.,21.92
AirTran Airways Corporation,20.12
ExpressJet Airlines Inc.,15.8
Mesa Airlines Inc.,15.56
SkyWest Airlines Inc.,11.93


##### 7. Qual é o dia da semana com maior número de voos?
**Resposta:** Considerando todos os voos programados e apenas horários programados:  
Segunda-feira é o dia com mais voos.

In [0]:
df_with_day_of_week = df.withColumn(
    "day_of_week",
    date_format(df["time_hour"], "EEEE")
)
resposta_7 = (
    df_with_day_of_week.groupBy(col("day_of_week"))
    .agg(countDistinct(col("id")).alias("week_day"))
    .sort(col("week_day").desc())
    .limit(1)
)

display(resposta_7)

day_of_week,week_day
Monday,50690


##### 8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?
**Resposta:** o resultado final pode ser visto na tabela abaixo.  
A data inicial do mês corresponde a cada mês e o valor da porcentagem **NÃO** foi multiplicado por 100.


In [0]:
delayed_flights_df = df.filter(col("dep_delay") > 30)
monthly_total_df = df.groupBy(date_trunc('month',col("time_hour")).alias("month")).agg(countDistinct("id").alias("total_flights"))
monthly_delayed_df = delayed_flights_df.groupBy(date_trunc('month',col("time_hour")).alias("month")).agg(countDistinct("id").alias("delayed_flights"))

monthly_base = monthly_total_df.join(
    monthly_delayed_df, 
    on = "month",
    how = "left")
resposta_8 = (
    monthly_base.select(to_date(col("month")).alias('month'),
                        round(col("delayed_flights") / col("total_flights") ,4).alias("delay_percentage"))
                .orderBy(col("month"))
)
display(resposta_8)


month,delay_percentage
2013-01-01,0.1241
2013-02-01,0.1275
2013-03-01,0.1494
2013-04-01,0.1599
2013-05-01,0.1534
2013-06-01,0.2024
2013-07-01,0.2098
2013-08-01,0.1445
2013-09-01,0.0877
2013-10-01,0.0934


##### 9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?
**Resposta:** Considerando todos os voos programados, a origem mais comum é **JFK**  
 (Aeroporto Internacional John F. Kennedy, em NY)

In [0]:
resposta_9 = (df.filter(col("dest") == 'SEA')
                .groupBy('origin')
                .agg(countDistinct(col("id")).alias("total_flights"))
                .orderBy(col('total_flights').desc())
                .limit(1)
)
display(resposta_9)

origin,total_flights
JFK,2092



##### 10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?
**Resposta:** A média de atraso por dia, considerando tanto voos adiantados quanto atrasados, está na tabela abaixo.

In [0]:
# Recriando a tabela caso ela não esteja em memória:
df_with_day_of_week = df.withColumn(
    "day_of_week",
    date_format(df["time_hour"], "EEEE")
)

resposta_10 = (
    df_with_day_of_week.groupBy(col('day_of_week'))
    .agg(round(avg(col('dep_delay')),2).alias('avg_dep_delay'))
    .orderBy(col('avg_dep_delay').desc())
)
display(resposta_10)

day_of_week,avg_dep_delay
Thursday,16.15
Monday,14.78
Friday,14.7
Wednesday,11.8
Sunday,11.59
Tuesday,10.63
Saturday,7.65


##### 11. Qual é a rota que teve o maior tempo de voo médio (air_time)?
**Resposta:**  A rota com maior tempo médio de voo foi: **JFK -> HNL**

In [0]:
resposta_11 = (
    df.groupBy(col("origin"),col("dest"))
    .agg(avg('air_time').alias('avg_air_time'))
    .sort(col('avg_air_time').desc())
    .limit(1)

)
display(resposta_11)

origin,dest,avg_air_time
JFK,HNL,623.0877192982456


##### 12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?
**Resposta:** 
1. EWR -> ORD 
2. JFK -> LAX
3. LGA -> ATL

In [0]:
window_12 = Window.orderBy(col("num_rotas").desc()).partitionBy("origin")

all_routs_volume = (
    df.groupBy(col("origin"),col("dest"))
    .agg(countDistinct(col("id")).alias("num_rotas"))
    .withColumn("rank",rank().over(window_12))
)

resposta_12 = all_routs_volume.filter(col('rank') == 1).select('origin',col('dest').alias('most_common_dest'))
display(resposta_12)


origin,most_common_dest
EWR,ORD
JFK,LAX
LGA,ATL


##### 13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (air_time) ?
**Resposta:** O desvio padrão mede a dispersão dos valores em torno da média, então pensando nisso as 3 rotas com maior variação são:
1. LGA -> MYR
2. EWR -> HNL
3. JFK -> HNL

In [0]:
resposta_13 = (df.groupBy("origin", "dest")
                .agg(stddev("air_time").alias("air_time_stdev"))
                .orderBy(col('air_time_stdev').desc())
                .limit(3)   
                )
display(resposta_13)

origin,dest,air_time_stdev
LGA,MYR,25.32455988429677
EWR,HNL,21.266135468474197
JFK,HNL,20.688824842787028


##### Pergunta: 14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?
**Resposta:** A média de atraso para esses voos é de 119.05 minutos.

In [0]:
df_1hour_dep_delay = df.filter(col('dep_delay') > 60)

resposta_14 = df_1hour_dep_delay.select(round(avg(col('arr_delay')),2).alias('avg_arr_delay'))
display(resposta_14)


avg_arr_delay
119.05


##### 15. Qual é a média de voos diários para cada mês do ano?
**Resposta:** Considerando todos os voos agendados e também que nessa base existem voos para todos os dias do período filtrado, a tabela está abaixo:

In [0]:
#df com quantidade de dias presente em cada mês:
monthly_days_df = (df.groupBy(to_date(date_trunc('month',col("time_hour"))).alias("month_trunc"))
                   .agg(countDistinct('day').alias('total_days'))
                   .orderBy('month_trunc')
                   )

monthly_total_df = (df.groupBy(to_date(date_trunc('month',col("time_hour"))).alias("month_trunc"))
                      .agg(countDistinct("id").alias("total_flights"))
                    )

resposta_15 = (monthly_total_df.join(
        monthly_days_df,
        on = 'month_trunc',
        how = 'left')
    .select(col('month_trunc'), round(col('total_flights')/col('total_days'),1).alias('avg_daily_flights'))
    .sort('month_trunc')
)
display(resposta_15)

month_trunc,avg_daily_flights
2013-01-01,871.1
2013-02-01,891.1
2013-03-01,930.1
2013-04-01,944.3
2013-05-01,928.9
2013-06-01,941.4
2013-07-01,949.2
2013-08-01,946.0
2013-09-01,919.1
2013-10-01,931.9


##### 16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?
**Resposta:** Das rotas com atrasos na chegada superiores a 30 minutos, as 3 mais comuns são:
1. LGA -> ATL
2. JFK -> LAX
3. LGA -> ORD

In [0]:
df_30min_arr_delay = df.filter(col('arr_delay') > 30)

resposta_16 = (df_30min_arr_delay.groupBy('origin','dest')
                .agg(countDistinct('id').alias('total_flights_with_delay'))
                .sort(col('total_flights_with_delay').desc())
                .limit(3)
)
display(resposta_16)


origin,dest,total_flights_with_delay
LGA,ATL,1563
JFK,LAX,1286
LGA,ORD,1188


##### 17. Para cada origem, qual o principal destino?
**Resposta:** 
1. EWR -> ORD 
2. JFK -> LAX
3. LGA -> ATL


In [0]:
# de forma semelhante a questao 12
window_17 = Window.orderBy(col("num_rotas").desc()).partitionBy("origin")

all_routs_volume = (
    df.groupBy(col("origin"),col("dest"))
    .agg(countDistinct(col("id")).alias("num_rotas"))
    .withColumn("rank",rank().over(window_17))
)

resposta_17 = all_routs_volume.filter(col('rank') == 1).select('origin',col('dest').alias('most_common_dest'))
display(resposta_17)

origin,most_common_dest
EWR,ORD
JFK,LAX
LGA,ATL


##### Criando funções para as APIs

In [0]:
import requests
from datetime import datetime, timedelta

# chaves expostas
airportdb_key = '3e82af3f61afc227845300b55754a2773d15c5b0feb942767916c80946c6e29ca7736dc63451fe964dac0f5717bcaab1'
weatherbit_key = '6fc410b714e2493fbfa537336920712e'

def get_airport_location(airport_code):
    # a exemplo do case onde adota-se a primeira letra do código como 'K' (primeira letra do ICAO da região do US)
    url = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_key}"
    response = requests.get(url)

    if response.status_code == requests.codes.ok:
        data = response.json()
        return data['latitude_deg'] , data['longitude_deg']
    elif response.status_code == requests.codes.not_found:
        return None, None
    else:
        raise Exception("Erro na request!")
    

def get_wind_speed(location, date):
    # chamando a funcao get_airport_location
    location = get_airport_location(location)
    latitude, longitude = location
    if latitude is None or longitude is None:
        return None

    end_datetime = date + timedelta(days=1) 
    end_date = end_datetime.strftime("%Y-%m-%d")
    start_date = date.strftime("%Y-%m-%d") 
     

    url = 'https://api.weatherbit.io/v2.0/history/daily'
    params = {
    'lat': latitude,
    'lon': longitude,
    'start_date': start_date,
    'end_date': end_date,
    'key': weatherbit_key,
    }
    headers = {
    'Accept': 'application/json',
    }

    response = requests.get(url, params=params, headers=headers)

    # parar codigo se atingir o limite de requests.
    if response.status_code == 429:
        raise Exception("Too many requests!")
    
    wheter_data = response.json()
    try:    
        return wheter_data['data'][0]['wind_spd']
    except:
        return None

# registrando UserDefinedFunction no pyspark
get_wind_speed_udf = udf(get_wind_speed, FloatType())

##### Pergunta Final: Enriqueça a base de dados de voos com as condições meteorológicas (velocidade do vento) para os aeroportos de origem e destino. Mostre as informações enriquecidas para os 5 voos com maior atraso na chegada.
**Considerações:** 
1. Não enriqueci a base completa devido ao limite de requisições das APIs, mas enriqueci os 5 voos com maior atraso na chegada.
2. O aeroporto de HNL não foi enriquecido pois seu código ICAO inicia-se com 'P' ao invés de 'K' como definido anteriormente.

In [0]:
most_delayed_flights = df.orderBy(col('arr_delay').desc()).select('id','origin','dest','arr_delay','time_hour').limit(5)

df_new = most_delayed_flights.withColumn('wind_speed_origin',get_wind_speed_udf(col('origin'),to_date(col('time_hour'))))
df_new = df_new.withColumn('wind_speed_dest',get_wind_speed_udf(col('dest'),to_date(col('time_hour'))))

display(df_new)


id,origin,dest,arr_delay,time_hour,wind_speed_origin,wind_speed_dest
7072,JFK,HNL,1272.0,2013-01-09T09:00:00.000+0000,3.5,
235778,JFK,CMH,1127.0,2013-06-15T19:00:00.000+0000,4.1,1.9
8239,EWR,ORD,1109.0,2013-01-10T16:00:00.000+0000,4.1,4.1
327043,JFK,SFO,1007.0,2013-09-20T18:00:00.000+0000,3.7,4.3
270376,JFK,CVG,989.0,2013-07-22T16:00:00.000+0000,3.2,2.3
