## Imports/setup

In [44]:
import os
import pickle
from datetime import datetime, timedelta
from io import BytesIO
from pathlib import Path
from zipfile import ZipFile

import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T
import requests
from dotenv import load_dotenv
from pyspark.sql import DataFrame, SparkSession
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.metrics import mean_absolute_error, r2_score, root_mean_squared_error
from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OrdinalEncoder

spark: SparkSession = SparkSession.builder.getOrCreate()
load_dotenv()

True

## Download/leitura do dataset

In [2]:
def load_dataset() -> DataFrame:
    dataset_url = "https://github.com/PicPay/case-machine-learning-engineer-pleno/raw/main/notebook/airports-database.zip"
    dataset_file = "airports-database.csv"
    dataset_path = Path("../data", dataset_file)
    if not dataset_path.exists():
        ZipFile(BytesIO(requests.get(dataset_url).content)).extract(dataset_file, dataset_path.parent)
    return spark.read.csv(dataset_path.as_posix(), header=True, inferSchema=True)

In [3]:
df = load_dataset()
df.show(3)
df.printSchema()

+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
| id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+---+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  0|2013|    1|  1|   517.0|           515|      2.0|   830.0|           819|     11.0|     UA|  1545| N14228|   EWR| IAH|   227.0|    1400|   5|    15|2013-01-01 05:00:00|United Air Lines ...|
|  1|2013|    1|  1|   533.0|           529|      4.0|   850.0|           830|     20.0|     UA|  1714| N24211|   LGA| IAH|   227.0|    1416|   5|    29|2013-01-01 05:00:00|United Air Lines ...|
|  2|2013|    1|  1|   54

## Perguntas

### 1. Qual é o número total de voos no conjunto de dados?

In [4]:
df.count()

336776

### 2. Quantos voos foram cancelados? (Considerando que voos cancelados têm dep_time e arr_time nulos)

In [10]:
df.filter((df["dep_time"].isNull()) & (df["arr_time"].isNull())).count()

8255

### 3. Qual é o atraso médio na partida dos voos (dep_delay)?

In [38]:
# "atraso médio" é um pouco ambíguo aqui porque um "atraso" poderia considerar apenas dep_delay > 0.
# Nesta e em outras questões que pedem o atraso médio vou considerar todos os valores das colunas dep_delay e arr_delay,
# mas pode ser que a intenção fosse manter apenas os atrasos de fato.
df.select(F.mean("dep_delay")).show()

+------------------+
|    avg(dep_delay)|
+------------------+
|12.639070257304708|
+------------------+



### 4. Quais são os 5 aeroportos com maior número de pousos?

In [15]:
df.filter(df["arr_time"].isNotNull()).groupBy("dest").count().orderBy("count", ascending=False).limit(5).show()

+----+-----+
|dest|count|
+----+-----+
| ATL|16873|
| ORD|16607|
| LAX|16058|
| BOS|15028|
| MCO|13979|
+----+-----+



### 5. Qual é a rota mais frequente (par origin-dest)?

In [6]:
df.withColumn("rota", F.concat("origin", F.lit("-"), "dest")).groupBy("rota").count().orderBy(
    "count", ascending=False
).limit(1).show()

+-------+-----+
|   rota|count|
+-------+-----+
|JFK-LAX|11262|
+-------+-----+



### 6. Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada? (Exiba também o tempo)

In [41]:
df.groupBy("name").agg({"arr_delay": "avg"}).orderBy("avg(arr_delay)", ascending=False).limit(5).show(truncate=False)

+---------------------------+------------------+
|name                       |avg(arr_delay)    |
+---------------------------+------------------+
|Frontier Airlines Inc.     |21.920704845814978|
|AirTran Airways Corporation|20.115905511811025|
|ExpressJet Airlines Inc.   |15.79643108710965 |
|Mesa Airlines Inc.         |15.556985294117647|
|SkyWest Airlines Inc.      |11.931034482758621|
+---------------------------+------------------+



### 7. Qual é o dia da semana com maior número de voos?

In [21]:
df.withColumn("dia_semana", F.date_format("time_hour", "EEEE")).groupBy("dia_semana").count().orderBy(
    "count", ascending=False
).limit(1).show()

+----------+-----+
|dia_semana|count|
+----------+-----+
|    Monday|50690|
+----------+-----+



### 8. Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?

In [24]:
@F.pandas_udf(T.DoubleType())
def delay_percent(s: pd.Series) -> float:
    return (s > 30).mean() * 100

In [30]:
df.filter(df["dep_time"].isNotNull()).groupBy("month").agg(
    delay_percent("dep_delay").alias("percentual_atraso_maior_30_min")
).show()

+-----+------------------------------+
|month|percentual_atraso_maior_30_min|
+-----+------------------------------+
|    1|            12.649624287278632|
|    2|            13.431827775432673|
|    3|            15.404139706145212|
|    4|            16.379871303593376|
|    5|            15.641270853256827|
|    6|             20.99214217522215|
|    7|             21.67105494119712|
|    8|             14.69435872542561|
|    9|             8.918958778851117|
|   10|             9.412626950057586|
|   11|             8.832994266691326|
|   12|            17.967539653264478|
+-----+------------------------------+



### 9. Qual a origem mais comum para voos que pousaram em Seattle (SEA)?

In [11]:
df.filter((df["arr_time"].isNotNull()) & (df["dest"] == "SEA")).groupBy("origin").count().orderBy(
    "count", ascending=False
).limit(1).show()

+------+-----+
|origin|count|
+------+-----+
|   JFK| 2079|
+------+-----+



### 10. Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?

In [39]:
df.withColumn("dia_semana", F.date_format("time_hour", "EEEE")).groupBy("dia_semana").agg({"dep_delay": "avg"}).show()

+----------+------------------+
|dia_semana|    avg(dep_delay)|
+----------+------------------+
| Wednesday|11.803512219083876|
|   Tuesday|10.631682565455652|
|    Friday| 14.69605749486653|
|  Thursday|16.148919990957108|
|  Saturday| 7.650502333676133|
|    Monday|14.778936729330908|
|    Sunday|11.589531801152422|
+----------+------------------+



### 11. Qual é a rota que teve o maior tempo de voo médio (air_time)?

In [30]:
df.withColumn("rota", F.concat("origin", F.lit("-"), "dest")).groupBy("rota").agg({"air_time": "avg"}).orderBy(
    "avg(air_time)", ascending=False
).limit(1).show()

+-------+-----------------+
|   rota|    avg(air_time)|
+-------+-----------------+
|JFK-HNL|623.0877192982456|
+-------+-----------------+



### 12. Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?

In [31]:
df.groupBy("origin").agg({"dest": "mode"}).show()

+------+----------+
|origin|mode(dest)|
+------+----------+
|   LGA|       ATL|
|   EWR|       ORD|
|   JFK|       LAX|
+------+----------+



### 13. Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (air_time) ?

In [35]:
df.withColumn("rota", F.concat("origin", F.lit("-"), "dest")).groupBy("rota").agg({"air_time": "stddev"}).orderBy(
    "stddev(air_time)", ascending=False
).limit(3).show()

+-------+-----------------+
|   rota| stddev(air_time)|
+-------+-----------------+
|LGA-MYR|25.32455988429677|
|EWR-HNL|21.26613546847422|
|JFK-HNL|20.68882484278704|
+-------+-----------------+



### 14. Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?

In [36]:
df.filter(df["dep_delay"] > 60).select(F.avg("arr_delay")).show()

+------------------+
|    avg(arr_delay)|
+------------------+
|119.04880549963919|
+------------------+



### 15. Qual é a média de voos diários para cada mês do ano?

In [46]:
df.groupBy("month", "day").count().groupBy("month").agg({"count": "avg"}).sort("month").show()

+-----+-----------------+
|month|       avg(count)|
+-----+-----------------+
|    1|871.0967741935484|
|    2|891.1071428571429|
|    3|930.1290322580645|
|    4|944.3333333333334|
|    5|928.9032258064516|
|    6|941.4333333333333|
|    7|949.1935483870968|
|    8|946.0322580645161|
|    9|919.1333333333333|
|   10|931.9032258064516|
|   11|908.9333333333333|
|   12|907.5806451612904|
+-----+-----------------+



### 16. Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?

In [49]:
df.filter(df["arr_delay"] > 30).withColumn("rota", F.concat("origin", F.lit("-"), "dest")).groupBy(
    "rota"
).count().orderBy("count", ascending=False).limit(3).show()

+-------+-----+
|   rota|count|
+-------+-----+
|LGA-ATL| 1563|
|JFK-LAX| 1286|
|LGA-ORD| 1188|
+-------+-----+



### 17. Para cada origem, qual o principal destino?

In [50]:
# Mesma pergunta que a 12?
df.groupBy("origin").agg({"dest": "mode"}).show()

+------+----------+
|origin|mode(dest)|
+------+----------+
|   LGA|       ATL|
|   EWR|       ORD|
|   JFK|       LAX|
+------+----------+



## Enriquecimento do dataset

In [12]:
enrichment_df = df.orderBy("arr_delay", ascending=False).limit(5)
enrichment_df.show()

+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|    id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|
+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+
|  7072|2013|    1|  9|   641.0|           900|   1301.0|  1242.0|          1530|   1272.0|     HA|    51| N384HA|   JFK| HNL|   640.0|    4983|   9|     0|2013-01-09 09:00:00|Hawaiian Airlines...|
|235778|2013|    6| 15|  1432.0|          1935|   1137.0|  1607.0|          2120|   1127.0|     MQ|  3535| N504MQ|   JFK| CMH|    74.0|     483|  19|    35|2013-06-15 19:00:00|           Envoy Air|
|  8239|20

In [83]:
airportdb_token=os.environ["AIRPORTDB_TOKEN"]
airportdb_cache = {}
def get_lat_lon(airport_code: str) -> tuple[float | None, float | None]:
    uri = f"https://airportdb.io/api/v1/airport/K{airport_code}?apiToken={airportdb_token}"
    if airport_code in airportdb_cache:
        return airportdb_cache[airport_code]
    data = requests.get(uri).json()
    try:
        ret = (data["latitude_deg"], data["longitude_deg"])
    except KeyError:
        ret = (None, None)
    airportdb_cache[airport_code] = ret
    return ret


In [84]:
weatherbit_key = os.environ["WEATHERBIT_API_KEY"]
def get_wind_speed(latitude: float, longitude: float, start_date: str) -> float:
    format = "%Y-%m-%d"
    end_date = (datetime.strptime(start_date, format) + timedelta(days=1)).strftime(format)
    url = "https://api.weatherbit.io/v2.0/history/daily"
    params = {
        "lat": latitude,
        "lon": longitude,
        "start_date": start_date,
        "end_date": end_date,
        "key": weatherbit_key,
    }
    headers = {
        "Accept": "application/json",
    }
    return requests.get(url, params=params, headers=headers).json()["data"][0]["wind_spd"]

In [90]:
schema = T.StructType(
    [   
        T.StructField("id", T.IntegerType()),
        T.StructField("origin_latitude", T.DoubleType()),
        T.StructField("origin_longitude", T.DoubleType()),
        T.StructField("dest_latitude", T.DoubleType()),
        T.StructField("dest_longitude", T.DoubleType()),
        T.StructField("origin_wind_speed", T.DoubleType()),
        T.StructField("dest_wind_speed", T.DoubleType()),
    ]
)
@F.udtf(returnType=schema)
class EnrichData:
    def eval(self, row: T.Row):
        origin_coords = get_lat_lon(row["origin"])
        dest_coords = get_lat_lon(row["dest"])
        start_date = row["time_hour"].strftime("%Y-%m-%d")
        origin_ws = get_wind_speed(*origin_coords, start_date) if origin_coords[0] else None
        dest_ws = get_wind_speed(*dest_coords, start_date) if dest_coords[0] else None
        yield (row["id"], *origin_coords, *dest_coords, origin_ws, dest_ws)
spark.udtf.register("enrich_data", EnrichData)


24/10/06 05:14:50 WARN SimpleTableFunctionRegistry: The function enrich_data replaced a previously registered function.


<pyspark.sql.udtf.UserDefinedTableFunction at 0x78c6fa6f03a0>

In [92]:
enrichment_df.createOrReplaceTempView("df")
spark.sql("""
    SELECT * 
    FROM df INNER JOIN (SELECT * FROM enrich_data(TABLE(df))) AS enrich
        ON df.id = enrich.id
""").show()

[Stage 24:>                                                         (0 + 1) / 1]

+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+------+---------------+----------------+-----------------+--------------+-----------------+---------------+
|    id|year|month|day|dep_time|sched_dep_time|dep_delay|arr_time|sched_arr_time|arr_delay|carrier|flight|tailnum|origin|dest|air_time|distance|hour|minute|          time_hour|                name|    id|origin_latitude|origin_longitude|    dest_latitude|dest_longitude|origin_wind_speed|dest_wind_speed|
+------+----+-----+---+--------+--------------+---------+--------+--------------+---------+-------+------+-------+------+----+--------+--------+----+------+-------------------+--------------------+------+---------------+----------------+-----------------+--------------+-----------------+---------------+
|  7072|2013|    1|  9|   641.0|           900|   1301.0|  1242.0|          1530|   1

                                                                                

## Treinamento do modelo

In [33]:
features = [
    "month",
    "day",
    "sched_dep_time",
    "sched_arr_time",
    "carrier",
    "tailnum",
    "origin",
    "dest",
    "distance",
]
target = "arr_delay"
df_ml = df.filter(df[target].isNotNull()).select(*[*features, target]).toPandas()

# transforma hhmm em "número de horas a partir de 0"
df_ml["sched_dep_time"] = (
    df_ml["sched_dep_time"].astype(str).str.zfill(4).apply(lambda x: float(x[:2]) + float(x[2:]) / 60)
)
df_ml["sched_arr_time"] = (
    df_ml["sched_arr_time"].astype(str).str.zfill(4).apply(lambda x: float(x[:2]) + float(x[2:]) / 60)
)

X = df_ml.loc[:, features]
y = df_ml[target]
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=15)

In [34]:
X.head()

Unnamed: 0,month,day,sched_dep_time,sched_arr_time,carrier,tailnum,origin,dest,distance
0,1,1,5.25,8.316667,UA,N14228,EWR,IAH,1400
1,1,1,5.483333,8.5,UA,N24211,LGA,IAH,1416
2,1,1,5.666667,8.833333,AA,N619AA,JFK,MIA,1089
3,1,1,5.75,10.366667,B6,N804JB,JFK,BQN,1576
4,1,1,6.0,8.616667,DL,N668DN,LGA,ATL,762


In [35]:
categorical_features = []
for feat in features:
    try:
        _ = X[feat].astype(float)
    except (TypeError, ValueError):
        categorical_features.append(feat)
numerical_features = list(set(features).difference(categorical_features))
print(categorical_features)
print(numerical_features)

['carrier', 'tailnum', 'origin', 'dest']
['sched_arr_time', 'sched_dep_time', 'distance', 'month', 'day']


In [36]:
ml_pipe = Pipeline(
    [
        (
            "transformer",
            ColumnTransformer(
                [
                    (
                        "categorical",
                        Pipeline(
                            [
                                ("imputer", SimpleImputer(strategy="constant", fill_value="<missing>")),
                                ("encoder", OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)),
                            ]
                        ),
                        categorical_features,
                    ),
                    ("numerical", SimpleImputer(strategy="median", add_indicator=True), numerical_features),
                ]
            ),
        ),
        ("model", RandomForestRegressor(n_jobs=-1)),
    ]
)
ml_pipe

In [38]:
model = ml_pipe.fit(X_train, y_train)
preds = model.predict(X_test)

In [41]:
print(f"R² Score: {r2_score(y_test, preds)}")
print(f"Mean absolute error: {mean_absolute_error(y_test, preds)}")
print(f"Root mean squared error: {root_mean_squared_error(y_test, preds)}")

R² Score: 0.2723140780507467
Mean absolute error: 23.04493508477165
Root mean squared error: 38.128910400074766


In [45]:
with open("../models/v1.pkl", "wb") as file:
    pickle.dump(model, file)