In [1]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("FlightsApp").getOrCreate()

# Utilizando StructType para carregar o CSV garantindo tipagem correta e estrutura definida.

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("day", IntegerType(), True),
    StructField("dep_time", DoubleType(), True),
    StructField("sched_dep_time", IntegerType(), True),
    StructField("dep_delay", DoubleType(), True),
    StructField("arr_time", DoubleType(), True),
    StructField("sched_arr_time", IntegerType(), True),
    StructField("arr_delay", DoubleType(), True),
    StructField("carrier", StringType(), True),
    StructField("flight", IntegerType(), True),
    StructField("tailnum", StringType(), True),
    StructField("origin", StringType(), True),
    StructField("dest", StringType(), True),
    StructField("air_time", DoubleType(), True),
    StructField("distance", IntegerType(), True),
    StructField("hour", IntegerType(), True),
    StructField("minute", IntegerType(), True),
    StructField("time_hour", TimestampType(), True),
    StructField("name", StringType(), True),
])

# Lendo o CSV com o schema definido

flights_database = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("../data/airports-database.csv")

## Primeira Seção - Exploratory Data Analysis

### Nessa primeira seção, responderei com base no CSV já carregado em dataframe na etapa anterior as 17 perguntas do teste.

In [2]:
#Fazendo os imports necessários

from pyspark.sql.functions import col, count, avg, desc, weekofyear, dayofweek, dayofmonth, month, year, stddev, when, sum, round, last_day, to_date, concat, lit
from pyspark.sql.window import Window
import pyspark.sql.functions as F

# Verificar se o DataFrame existe e mostrar suas colunas
if 'flights_database' in locals():
    print("Colunas disponíveis no DataFrame:")
    flights_database.printSchema()
else:
    print("O DataFrame 'flights_database' não está definido. Por favor, carregue-o antes de executar este código.")

Colunas disponíveis no DataFrame:
root
 |-- id: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- dep_time: double (nullable = true)
 |-- sched_dep_time: integer (nullable = true)
 |-- dep_delay: double (nullable = true)
 |-- arr_time: double (nullable = true)
 |-- sched_arr_time: integer (nullable = true)
 |-- arr_delay: double (nullable = true)
 |-- carrier: string (nullable = true)
 |-- flight: integer (nullable = true)
 |-- tailnum: string (nullable = true)
 |-- origin: string (nullable = true)
 |-- dest: string (nullable = true)
 |-- air_time: double (nullable = true)
 |-- distance: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- time_hour: timestamp (nullable = true)
 |-- name: string (nullable = true)



#### 1. Qual é o número total de voos no conjunto de dados?

In [3]:
total_flights = flights_database.count()

print(f"1. Número total de voos: {total_flights}")

1. Número total de voos: 336776


#### 2. Quantos voos foram cancelados?

In [4]:
cancelled_flights = flights_database.filter((col("dep_time").isNull()) & (col("arr_time").isNull())).count()

print(f"2. Número de voos cancelados: {cancelled_flights}")

2. Número de voos cancelados: 8255


#### 3. Qual é o atraso médio na partida dos voos (dep_delay)?

In [5]:
avg_dep_delay = flights_database.select(avg("dep_delay")).first()[0]

print(f"3. Atraso médio na partida: {avg_dep_delay:.2f} minutos")

3. Atraso médio na partida: 12.64 minutos


#### 4. Quais são os 5 aeroportos com maior número de pousos?

In [6]:
top_5_airports = flights_database.groupBy("dest").agg(count("*").alias("landings")) \
    .orderBy(desc("landings")).limit(5)
    
print("4. Top 5 aeroportos com maior número de pousos:")
top_5_airports.show()

4. Top 5 aeroportos com maior número de pousos:
+----+--------+
|dest|landings|
+----+--------+
| ORD|   17283|
| ATL|   17215|
| LAX|   16174|
| BOS|   15508|
| MCO|   14082|
+----+--------+



#### 5. Qual é a rota mais frequente (par origin-dest)?

In [7]:
most_frequent_route = flights_database.groupBy("origin", "dest").agg(count("*").alias("frequency")) \
    .orderBy(desc("frequency")).limit(1)
    
print("5. Rota mais frequente:")
most_frequent_route.show()

5. Rota mais frequente:
+------+----+---------+
|origin|dest|frequency|
+------+----+---------+
|   JFK| LAX|    11262|
+------+----+---------+



#### 6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada?

In [8]:
top_5_delayed_carriers = flights_database.groupBy("carrier", "name") \
    .agg(avg("arr_delay").alias("avg_arr_delay")) \
    .orderBy(desc("avg_arr_delay")).limit(5)
    
print("6. Top 5 companhias aéreas com maior tempo médio de atraso na chegada:")
top_5_delayed_carriers.show()

6. Top 5 companhias aéreas com maior tempo médio de atraso na chegada:
+-------+--------------------+------------------+
|carrier|                name|     avg_arr_delay|
+-------+--------------------+------------------+
|     F9|Frontier Airlines...|21.920704845814978|
|     FL|AirTran Airways C...|20.115905511811025|
|     EV|ExpressJet Airlin...| 15.79643108710965|
|     YV|  Mesa Airlines Inc.|15.556985294117647|
|     OO|SkyWest Airlines ...|11.931034482758621|
+-------+--------------------+------------------+



#### 7. Qual é o dia da semana com maior número de voos?

In [9]:
busiest_day = flights_database.withColumn("day_of_week", dayofweek("time_hour")) \
    .groupBy("day_of_week") \
    .agg(count("*").alias("flights")) \
    .orderBy(desc("flights")) \
    .limit(1)

print("7. Dia da semana com maior número de voos:")
busiest_day.show()

7. Dia da semana com maior número de voos:
+-----------+-------+
|day_of_week|flights|
+-----------+-------+
|          2|  50690|
+-----------+-------+



#### 8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?

In [10]:
monthly_delay_percentage = flights_database.withColumn("month", month("time_hour")) \
    .groupBy("month") \
    .agg((sum(when(col("dep_delay") > 30, 1).otherwise(0)) / count("*") * 100).alias("delay_percentage")) \
    .orderBy("month")

print("8. Percentual mensal de voos com atraso na partida superior a 30 minutos:")
monthly_delay_percentage.show()

8. Percentual mensal de voos com atraso na partida superior a 30 minutos:
+-----+------------------+
|month|  delay_percentage|
+-----+------------------+
|    1|12.405569545252556|
|    2|12.752995871908942|
|    3|14.944163140736629|
|    4|15.993646311330744|
|    5|15.335463258785943|
|    6| 20.24218390397621|
|    7| 20.97875955819881|
|    8|14.450847342039758|
|    9|  8.77275694494814|
|   10| 9.335733324102598|
|   11| 8.757517969781428|
|   12|17.312955393637818|
+-----+------------------+



#### 9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?

In [11]:
most_common_origin_to_sea = flights_database.filter(col("dest") == "SEA") \
    .groupBy("origin").agg(count("*").alias("flights")) \
    .orderBy(desc("flights")).limit(1)

print("9. Origem mais comum para voos que pousaram em Seattle (SEA):")
most_common_origin_to_sea.show()

9. Origem mais comum para voos que pousaram em Seattle (SEA):
+------+-------+
|origin|flights|
+------+-------+
|   JFK|   2092|
+------+-------+



#### 10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?

In [12]:
avg_delay_by_day = flights_database.withColumn("day_of_week", dayofweek("time_hour")) \
    .groupBy("day_of_week") \
    .agg(avg("dep_delay").alias("avg_dep_delay")) \
    .orderBy(col("day_of_week").cast("string"))

print("10. Média de atraso na partida para cada dia da semana:")
avg_delay_by_day.show()

10. Média de atraso na partida para cada dia da semana:
+-----------+------------------+
|day_of_week|     avg_dep_delay|
+-----------+------------------+
|          1|11.589531801152422|
|          2|14.778936729330908|
|          3|10.631682565455652|
|          4|11.803512219083876|
|          5|16.148919990957108|
|          6| 14.69605749486653|
|          7| 7.650502333676133|
+-----------+------------------+



#### 11. Qual é a rota que teve o maior tempo de voo médio (air_time)?

In [13]:
route_longest_airtime = flights_database.groupBy("origin", "dest") \
    .agg(avg("air_time").alias("avg_air_time")) \
    .orderBy(desc("avg_air_time")).limit(1)

print("11. Rota com maior tempo de voo médio:")
route_longest_airtime.show()

11. Rota com maior tempo de voo médio:
+------+----+-----------------+
|origin|dest|     avg_air_time|
+------+----+-----------------+
|   JFK| HNL|623.0877192982456|
+------+----+-----------------+



#### 12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [14]:
most_common_dest_by_origin = flights_database.groupBy("origin", "dest") \
    .agg(count("*").alias("flights")) \
    .withColumn("rank", F.row_number().over(Window.partitionBy("origin").orderBy(desc("flights")))) \
    .filter(col("rank") == 1).drop("rank", "flights")
    
print("12. Destino mais comum para cada aeroporto de origem:")
most_common_dest_by_origin.show()

12. Destino mais comum para cada aeroporto de origem:
+------+----+
|origin|dest|
+------+----+
|   EWR| ORD|
|   JFK| LAX|
|   LGA| ATL|
+------+----+



#### 13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (air_time)?

In [15]:
top_3_airtime_variation = flights_database.groupBy("origin", "dest") \
    .agg(stddev("air_time").alias("airtime_stddev")) \
    .orderBy(desc("airtime_stddev")).limit(3)
    
print("13. Top 3 rotas com maior variação no tempo de voo:")
top_3_airtime_variation.show()

13. Top 3 rotas com maior variação no tempo de voo:
+------+----+------------------+
|origin|dest|    airtime_stddev|
+------+----+------------------+
|   LGA| MYR| 25.32455988429677|
|   EWR| HNL| 21.26613546847427|
|   JFK| HNL|20.688824842787056|
+------+----+------------------+



#### 14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?

In [16]:
avg_arr_delay_for_long_dep_delay = flights_database.filter(col("dep_delay") > 60) \
    .select(avg("arr_delay")).first()[0]
    
print(f"14. Média de atraso na chegada para voos com atraso na partida > 1 hora: {avg_arr_delay_for_long_dep_delay:.2f} minutos")

14. Média de atraso na chegada para voos com atraso na partida > 1 hora: 119.05 minutos


#### 15. Qual é a média de voos diários para cada mês do ano?

In [17]:
avg_daily_flights_by_month = flights_database \
    .withColumn("month", month("time_hour")) \
    .withColumn("year", year("time_hour")) \
    .groupBy("year", "month") \
    .agg(count("*").alias("total_flights")) \
    .withColumn("days_in_month", dayofmonth(last_day(to_date(concat(col("year"), lit("-"), col("month"), lit("-01")))))) \
    .withColumn("avg_daily_flights", round(col("total_flights") / col("days_in_month"), 2)) \
    .groupBy("month") \
    .agg(round(avg("avg_daily_flights"), 2).alias("avg_daily_flights")) \
    .orderBy("month")

print("15. Média de voos diários para cada mês do ano:")
avg_daily_flights_by_month.show()

15. Média de voos diários para cada mês do ano:
+-----+-----------------+
|month|avg_daily_flights|
+-----+-----------------+
|    1|            871.1|
|    2|           891.11|
|    3|           930.13|
|    4|           944.33|
|    5|            928.9|
|    6|           941.43|
|    7|           949.19|
|    8|           946.03|
|    9|           919.13|
|   10|            931.9|
|   11|           908.93|
|   12|           907.58|
+-----+-----------------+



#### 16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?

In [18]:
top_3_delayed_routes = flights_database.filter(col("arr_delay") > 30) \
    .groupBy("origin", "dest") \
    .agg(count("*").alias("delayed_flights")) \
    .orderBy(desc("delayed_flights")).limit(3)
    
print("16. Top 3 rotas mais comuns com atrasos na chegada > 30 minutos:")
top_3_delayed_routes.show()

16. Top 3 rotas mais comuns com atrasos na chegada > 30 minutos:
+------+----+---------------+
|origin|dest|delayed_flights|
+------+----+---------------+
|   LGA| ATL|           1563|
|   JFK| LAX|           1286|
|   LGA| ORD|           1188|
+------+----+---------------+



#### 17. Para cada origem, qual o principal destino?

In [19]:
main_dest_by_origin = flights_database.groupBy("origin", "dest") \
    .agg(count("*").alias("flights")) \
    .withColumn("rank", F.row_number().over(Window.partitionBy("origin").orderBy(desc("flights")))) \
    .filter(col("rank") == 1).drop("rank", "flights")
    
print("17. Principal destino para cada origem:")
main_dest_by_origin.show()

17. Principal destino para cada origem:
+------+----+
|origin|dest|
+------+----+
|   EWR| ORD|
|   JFK| LAX|
|   LGA| ATL|
+------+----+



## Segunda Parte - Data Enrichment

In [20]:
from pyspark.sql.functions import col, desc, to_timestamp, lpad, concat, lit

# Ordenar o DataFrame pelo arr_delay em ordem decrescente e selecionar os top 5
top_5_arr_delay = flights_database.orderBy(col("arr_delay").desc()).limit(5)

# Selecionar apenas as colunas relevantes para exibição
colunas_relevantes = ["id", "carrier", "flight", "origin", "dest", "arr_delay", "time_hour", "year", "month",  "day", "sched_arr_time"]
top_5_arr_delay = top_5_arr_delay.select(colunas_relevantes)

top_5_arr_delay = top_5_arr_delay.withColumn(
    "scheduled_arrival",
    to_timestamp(
        concat(
            col("year"), lit("-"),
            lpad(col("month"), 2, "0"), lit("-"),
            lpad(col("day"), 2, "0"), lit(" "),
            lpad(col("sched_arr_time").cast("string"), 4, "0")
        ),
        "yyyy-MM-dd HHmm"
    )
)

# Mostrar o resultado
filtro_colunas = ["id", "carrier", "flight", "origin", "dest", "arr_delay", "time_hour", "scheduled_arrival"]
top_5_arr_delay.select(filtro_colunas).show()

+------+-------+------+------+----+---------+-------------------+-------------------+
|    id|carrier|flight|origin|dest|arr_delay|          time_hour|  scheduled_arrival|
+------+-------+------+------+----+---------+-------------------+-------------------+
|  7072|     HA|    51|   JFK| HNL|   1272.0|2013-01-09 09:00:00|2013-01-09 15:30:00|
|235778|     MQ|  3535|   JFK| CMH|   1127.0|2013-06-15 19:00:00|2013-06-15 21:20:00|
|  8239|     MQ|  3695|   EWR| ORD|   1109.0|2013-01-10 16:00:00|2013-01-10 18:10:00|
|327043|     AA|   177|   JFK| SFO|   1007.0|2013-09-20 18:00:00|2013-09-20 22:10:00|
|270376|     MQ|  3075|   JFK| CVG|    989.0|2013-07-22 16:00:00|2013-07-22 18:15:00|
+------+-------+------+------+----+---------+-------------------+-------------------+



In [21]:
from pyspark.sql.functions import col, collect_set

# Coletar todos os valores únicos de 'origin' e 'dest'
origins = top_5_arr_delay.select("origin").distinct()
destinations = top_5_arr_delay.select("dest").distinct()

# Fazer a união dos dois conjuntos, dado que temos que enriquecer aeroportos de origem e destino
all_airports = origins.union(destinations).distinct()

# Renomear a coluna resultante
all_airports = all_airports.withColumnRenamed("origin", "airport")

# Mostrar o resultado
all_airports.show()

+-------+
|airport|
+-------+
|    EWR|
|    JFK|
|    CMH|
|    HNL|
|    CVG|
|    SFO|
|    ORD|
+-------+



In [22]:
import requests
from pyspark.sql.functions import udf, col
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Definindo chave de API
API_KEY = "0d8b852c8213776a059d152847aca594c97cbbc6765192f78a8dbfbb85ef12ed766d6cdcba263917defdcf6e83254b4d"

# Notei que existiam aeroportos em que o ICAO code não inicia com "K", o que fazia com que a API retornasse 
# valores nulos na resposta. Portanto, fiz esse dicionário de mapeamento para nomes de aeroportos "especiais"

special_icao_mapping = {
    "HNL": "PHNL",
    "SJU": "TJSJ",
    "BQN": "TJBQ"
}

# Função para chamar a API e obter latitude e longitude
def get_airport_coordinates(iata_code):
    if iata_code in special_icao_mapping:
        icao_code = special_icao_mapping[iata_code]
    else:
        icao_code = f"K{iata_code}"
    
    url = f"https://airportdb.io/api/v1/airport/{icao_code}?apiToken={API_KEY}"
    try:
        response = requests.get(url)
        if response.status_code == 200:
            data = response.json()
            return (data.get('latitude_deg'), data.get('longitude_deg'))
        else:
            print(f"Erro ao buscar dados para {iata_code}: {response.status_code}")
            return (None, None)
    except Exception as e:
        print(f"Exceção ocorreu para {iata_code}: {str(e)}")
        return (None, None)

# Definir o schema para o resultado da UDF
coordinates_schema = StructType([
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True)
])

# Criar UDF (User Defined Function)
get_coordinates_udf = udf(get_airport_coordinates, coordinates_schema)

# Obter lista única de aeroportos
unique_airports = top_5_arr_delay.select("origin").union(top_5_arr_delay.select("dest")) \
    .select(col("origin").alias("airport_code")) \
    .distinct()

# Aplicar a UDF para obter coordenadas
airports_with_coordinates = unique_airports.withColumn("coordinates", get_coordinates_udf(col("airport_code")))

# Expandir a coluna de coordenadas
airports_with_coordinates = airports_with_coordinates \
    .withColumn("latitude", col("coordinates.latitude")) \
    .withColumn("longitude", col("coordinates.longitude")) \
    .drop("coordinates")

# Mostrar os resultados
print("Aeroportos com coordenadas:")
airports_with_coordinates.show()

Aeroportos com coordenadas:
+------------+-----------------+-----------+
|airport_code|         latitude|  longitude|
+------------+-----------------+-----------+
|         EWR|        40.692501| -74.168701|
|         JFK|        40.639801|   -73.7789|
|         CMH|        39.998001| -82.891899|
|         HNL|         21.32062|-157.924228|
|         CVG|        39.048801| -84.667801|
|         SFO|37.61899948120117|   -122.375|
|         ORD|          41.9786|   -87.9048|
+------------+-----------------+-----------+



In [23]:
# Juntar as coordenadas com o DataFrame original
enriched_delayed_flights = top_5_arr_delay \
    .join(airports_with_coordinates, top_5_arr_delay.origin == airports_with_coordinates.airport_code, "left") \
    .withColumnRenamed("latitude", "origin_latitude") \
    .withColumnRenamed("longitude", "origin_longitude") \
    .drop("airport_code")

enriched_delayed_flights = enriched_delayed_flights \
    .join(airports_with_coordinates, top_5_arr_delay.dest == airports_with_coordinates.airport_code, "left") \
    .withColumnRenamed("latitude", "dest_latitude") \
    .withColumnRenamed("longitude", "dest_longitude") \
    .drop("airport_code")

# Mostrar o DataFrame de Voos enriquecido
print("DataFrame de voos atrasados enriquecido com coordenadas:")
enriched_delayed_flights.show()

DataFrame de voos atrasados enriquecido com coordenadas:
+------+-------+------+------+----+---------+-------------------+----+-----+---+--------------+-------------------+---------------+----------------+-----------------+--------------+
|    id|carrier|flight|origin|dest|arr_delay|          time_hour|year|month|day|sched_arr_time|  scheduled_arrival|origin_latitude|origin_longitude|    dest_latitude|dest_longitude|
+------+-------+------+------+----+---------+-------------------+----+-----+---+--------------+-------------------+---------------+----------------+-----------------+--------------+
|  7072|     HA|    51|   JFK| HNL|   1272.0|2013-01-09 09:00:00|2013|    1|  9|          1530|2013-01-09 15:30:00|      40.639801|        -73.7789|         21.32062|   -157.924228|
|235778|     MQ|  3535|   JFK| CMH|   1127.0|2013-06-15 19:00:00|2013|    6| 15|          2120|2013-06-15 21:20:00|      40.639801|        -73.7789|        39.998001|    -82.891899|
|  8239|     MQ|  3695|   EWR| OR

In [24]:
from pyspark.sql.functions import col, hour, date_format, date_add

# Selecione as colunas para o primeiro conjunto de dados
prep_df1 = enriched_delayed_flights.select(
    col("id"),
    col("origin").alias("location"),
    col("time_hour"),
    col("origin_latitude").alias("latitude"),
    col("origin_longitude").alias("longitude")
)

# Selecione as colunas para o segundo conjunto de dados
prep_df2 = enriched_delayed_flights.select(
    col("id"),
    col("dest").alias("location"),
    col("scheduled_arrival").alias("time_hour"),
    col("dest_latitude").alias("latitude"),
    col("dest_longitude").alias("longitude")
)

# Combine os dois DataFrames usando union
prep_df = prep_df1.union(prep_df2)

# Adicione as novas colunas
prep_df = prep_df.withColumn("start_date", date_format(col("time_hour"), "yyyy-MM-dd"))
prep_df = prep_df.withColumn("end_date", date_format(date_add(col("time_hour"), 1), "yyyy-MM-dd"))
prep_df = prep_df.withColumn("hour", date_format(col("time_hour"), "HH:00:00"))

# Se você quiser ordenar o resultado
prep_df = prep_df.orderBy("id")

prep_df.show()

+------+--------+-------------------+-----------------+-----------+----------+----------+--------+
|    id|location|          time_hour|         latitude|  longitude|start_date|  end_date|    hour|
+------+--------+-------------------+-----------------+-----------+----------+----------+--------+
|  7072|     JFK|2013-01-09 09:00:00|        40.639801|   -73.7789|2013-01-09|2013-01-10|09:00:00|
|  7072|     HNL|2013-01-09 15:30:00|         21.32062|-157.924228|2013-01-09|2013-01-10|15:00:00|
|  8239|     ORD|2013-01-10 18:10:00|          41.9786|   -87.9048|2013-01-10|2013-01-11|18:00:00|
|  8239|     EWR|2013-01-10 16:00:00|        40.692501| -74.168701|2013-01-10|2013-01-11|16:00:00|
|235778|     JFK|2013-06-15 19:00:00|        40.639801|   -73.7789|2013-06-15|2013-06-16|19:00:00|
|235778|     CMH|2013-06-15 21:20:00|        39.998001| -82.891899|2013-06-15|2013-06-16|21:00:00|
|270376|     JFK|2013-07-22 16:00:00|        40.639801|   -73.7789|2013-07-22|2013-07-23|16:00:00|
|270376|  

In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
import requests

# Definir a função que será aplicada a cada linha
def get_wind_speed(lat, lon, start_date, end_date, hour):
    api_key = "6a5c1cc424b84585a923c9541b76db58"
    
    url = f"https://api.weatherbit.io/v2.0/history/subhourly?lat={lat}&lon={lon}&start_date={start_date}&end_date={end_date}&key={api_key}"

    print(f'{url}')
    
    response = requests.get(url)
    if response.status_code == 200:
        data = response.json()
        for item in data['data']:
            if item['timestamp_local'].split('T')[1] == hour:
                return float(item['wind_spd'])
    return None

# Registrar a UDF
wind_speed_udf = udf(get_wind_speed, DoubleType())

# Aplicar a UDF ao DataFrame
enriched_df_wind = prep_df.withColumn(
    "wind_spd",
    wind_speed_udf(prep_df.latitude, prep_df.longitude, prep_df.start_date, prep_df.end_date, prep_df.hour)
)

# Mostrar os resultados
enriched_df_wind.show()

+------+--------+-------------------+-----------------+-----------+----------+----------+--------+--------+
|    id|location|          time_hour|         latitude|  longitude|start_date|  end_date|    hour|wind_spd|
+------+--------+-------------------+-----------------+-----------+----------+----------+--------+--------+
|  7072|     JFK|2013-01-09 09:00:00|        40.639801|   -73.7789|2013-01-09|2013-01-10|09:00:00|    0.52|
|  7072|     HNL|2013-01-09 15:30:00|         21.32062|-157.924228|2013-01-09|2013-01-10|15:00:00|     9.8|
|  8239|     EWR|2013-01-10 16:00:00|        40.692501| -74.168701|2013-01-10|2013-01-11|16:00:00|    4.34|
|  8239|     ORD|2013-01-10 18:10:00|          41.9786|   -87.9048|2013-01-10|2013-01-11|18:00:00|    5.09|
|235778|     JFK|2013-06-15 19:00:00|        40.639801|   -73.7789|2013-06-15|2013-06-16|19:00:00|    5.92|
|235778|     CMH|2013-06-15 21:20:00|        39.998001| -82.891899|2013-06-15|2013-06-16|21:00:00|    3.22|
|270376|     JFK|2013-07-22 

In [26]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import FloatType

# Criando um dicionário a partir de enriched_df_wind para busca rápida
wind_dict = enriched_df_wind.rdd.map(lambda x: ((x['id'], x['location']), x['wind_spd'])).collectAsMap()

# Função para buscar wind_spd_origin
def get_wind_spd_origin(id, origin):
    return wind_dict.get((id, origin))

# Função para buscar wind_spd_dest
def get_wind_spd_dest(id, dest):
    return wind_dict.get((id, dest))

# Registrar as funções como UDFs
get_wind_spd_origin_udf = udf(get_wind_spd_origin, FloatType())
get_wind_spd_dest_udf = udf(get_wind_spd_dest, FloatType())

# Aplicar as UDFs ao DataFrame
enriched_delayed_flights = enriched_delayed_flights.withColumn(
    "wind_spd_origin", 
    get_wind_spd_origin_udf(col("id"), col("origin"))
).withColumn(
    "wind_spd_dest", 
    get_wind_spd_dest_udf(col("id"), col("dest"))
)

# Retornar o DataFrame enriquecido
enriched_delayed_flights.show()

+------+-------+------+------+----+---------+-------------------+----+-----+---+--------------+-------------------+---------------+----------------+-----------------+--------------+---------------+-------------+
|    id|carrier|flight|origin|dest|arr_delay|          time_hour|year|month|day|sched_arr_time|  scheduled_arrival|origin_latitude|origin_longitude|    dest_latitude|dest_longitude|wind_spd_origin|wind_spd_dest|
+------+-------+------+------+----+---------+-------------------+----+-----+---+--------------+-------------------+---------------+----------------+-----------------+--------------+---------------+-------------+
|  7072|     HA|    51|   JFK| HNL|   1272.0|2013-01-09 09:00:00|2013|    1|  9|          1530|2013-01-09 15:30:00|      40.639801|        -73.7789|         21.32062|   -157.924228|           0.52|          9.8|
|235778|     MQ|  3535|   JFK| CMH|   1127.0|2013-06-15 19:00:00|2013|    6| 15|          2120|2013-06-15 21:20:00|      40.639801|        -73.7789|    

## Treinamento do Modelo

#### O modelo abaixo foi feito no Databricks que dá suporte a arquivos Pickle nativamente. Nesse notebook ele não irá gerar o arquivo pkl. Por conta disso, fiz outro script em Python puro com Scikit-Learn para poder exportar o modelo pkl.

In [27]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
import pickle


schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("day", IntegerType(), True),
    StructField("dep_time", DoubleType(), True),
    StructField("sched_dep_time", IntegerType(), True),
    StructField("dep_delay", DoubleType(), True),
    StructField("arr_time", DoubleType(), True),
    StructField("sched_arr_time", IntegerType(), True),
    StructField("arr_delay", DoubleType(), True),
    StructField("carrier", StringType(), True),
    StructField("flight", IntegerType(), True),
    StructField("tailnum", StringType(), True),
    StructField("origin", StringType(), True),
    StructField("dest", StringType(), True),
    StructField("air_time", DoubleType(), True),
    StructField("distance", IntegerType(), True),
    StructField("hour", IntegerType(), True),
    StructField("minute", IntegerType(), True),
    StructField("time_hour", TimestampType(), True),
    StructField("name", StringType(), True),
])

# Lê o CSV com o schema definido
df = spark.read.format("csv") \
    .option("header", "true") \
    .schema(schema) \
    .load("../data/airports-database.csv")

# Iniciar sessão Spark
spark = SparkSession.builder.appName("FlightDelayPrediction").getOrCreate()

# Tratar valores nulos
df = df.na.drop()

# Criar coluna target (1 se houve atraso, 0 caso contrário)
df = df.withColumn("delayed", (df.arr_delay > 0).cast("integer"))

# Selecionar features relevantes
features = ["month", "day", "dep_time", "distance", "carrier"]

# Criar pipeline para pré-processamento
indexers = [StringIndexer(inputCol=col, outputCol=col+"_index") for col in ["carrier"]]
assembler = VectorAssembler(inputCols=[col+"_index" if col in ["carrier"] else col for col in features], outputCol="features")

# Criar e configurar o classificador
rf = RandomForestClassifier(labelCol="delayed", featuresCol="features", numTrees=10)

# Montar pipeline completa
pipeline = Pipeline(stages=indexers + [assembler, rf])

# Dividir dados em conjuntos de treino e teste
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

# Treinar o modelo
model = pipeline.fit(train_data)

# Fazer previsões no conjunto de teste
predictions = model.transform(test_data)

# Avaliar o modelo
evaluator = BinaryClassificationEvaluator(labelCol="delayed")
auc = evaluator.evaluate(predictions)
print(f"AUC: {auc}")

# Calcular acurácia
accuracy = predictions.filter(predictions.delayed == predictions.prediction).count() / float(predictions.count())
print(f"Accuracy: {accuracy}")

# Salvar o modelo treinado # Não funciona sem ser no databricks, por isso está comentado
# model_path = "../models/flight_delay_model.pkl"
# with open(model_path, "wb") as f:
#     pickle.dump(model, f)

# print(f"Modelo salvo em: {model_path}")

AUC: 0.6456298974592598
Accuracy: 0.6370583200880303


In [28]:
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score, accuracy_score
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
import pickle

# Ler o CSV
df = pd.read_csv("../data/airports-database.csv")

# Tratar valores nulos
df = df.dropna()

# Criar coluna target (1 se houve atraso, 0 caso contrário)
df['delayed'] = (df['arr_delay'] > 0).astype(int)

# Selecionar features relevantes
features = ["month", "day", "dep_time", "distance", "carrier"]

# Preparar os dados
X = df[features]
y = df['delayed']

# Dividir dados em conjuntos de treino e teste
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Criar pipeline para pré-processamento
numeric_features = ["month", "day", "dep_time", "distance"]
categorical_features = ["carrier"]

numeric_transformer = SimpleImputer(strategy='median')
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
    ('onehot', OneHotEncoder(handle_unknown='ignore')),
])

preprocessor = ColumnTransformer(
    transformers=[
        ('num', numeric_transformer, numeric_features),
        ('cat', categorical_transformer, categorical_features),
    ])

# Criar e configurar o classificador
rf = RandomForestClassifier(n_estimators=10, random_state=42)

# Montar pipeline completa
pipeline = Pipeline([
    ('preprocessor', preprocessor),
    ('classifier', rf)
])

# Treinar o modelo
model = pipeline.fit(X_train, y_train)

# Fazer previsões no conjunto de teste
predictions = model.predict(X_test)
probabilities = model.predict_proba(X_test)[:, 1]

# Avaliar o modelo
auc = roc_auc_score(y_test, probabilities)
print(f"AUC: {auc}")

# Calcular acurácia
accuracy = accuracy_score(y_test, predictions)
print(f"Accuracy: {accuracy}")

# Salvar o modelo treinado
model_path = "../models/flight_delay_model.pkl"
with open(model_path, "wb") as f:
    pickle.dump(model, f)

print(f"Modelo salvo em: {model_path}")

AUC: 0.7807576113737936
Accuracy: 0.7270811058500076
Modelo salvo em: ../models/flight_delay_model.pkl
