## Classe LeitorCSV

* __init__ – cria SparkSession local.
* ler(caminho) – lê o CSV original de voos com inferSchema=True, devolvendo um DataFrame.

## Classe EnriquecedorVentos

A classe automatiza o enriquecimento da base de voos com fuso (GMT) e velocidade do vento, gerando as colunas:

gmt_origin, gmt_dest

dep_real, arr_real, arr_real_arred

wind_origin, wind_dest

O processo consulta duas APIs públicas a AirportDB e Open‑Meteo (a API Weatherbit API se mostrou muito limitada na quantidade de requisições), e calcula horários de partidas e chegadas reais, arredonda para “blocos” de 1 hora (passo de 30 min) e grava o resultado num CSV.
 Nenhuma linha é descartada; quando não há dado disponível o campo fica None.

---

### Construtor  __init__(df_voos, spark)

* guarda o DataFrame Spark original e a SparkSession
* cria self.aero_infos, dicionário onde serão armazenados dados de cada aeroporto.

---

### coletar_aeroportos()

* varre colunas origin e dest; gera o conjunto de IATA únicos.
* inicializa self.aero_infos com chaves vazias ({IATA:{}}) para cada código.

---

### buscar_coordenadas()

* para cada código em self.aero_infos faz GET na AirportDB;
* salva lat/lon; se a consulta falhar apenas emite um aviso e continua.

---

### baixar_vento()

* descobre primeiro/último dia presente em time_hour; adiciona +1 dia.
* para cada aeroporto com coordenada válida faz chamada à Open‑Meteo (endpoint archive).
* grava em self.aero_infos[cod]:

  * GMT (deslocamento em horas, usando utc_offset_seconds)
  * vento → dicionário { "YYYY‑MM‑DD HH:MM:SS": velocidade }.

---

### adicionar_gmt()

* cria um broadcast só com aeroportos que têm GMT;
* UDF preenche colunas gmt_origin e gmt_dest (inteiro, ex. ‑3).

---

### calcular_partida_chegada()

* dep_real = to_timestamp(time_hour) + (dep_delay minutos) – se faltam dados, vira null.
* UDF chegada converte dep_real + air_time + GMTs em string local de chegada (arr_real).

  * falhas (GMT ausente, problema no parse etc.) devolvem None em vez de lançar exceção.

---

### arredonda_meia_hora(dt_str)

* função estática que arredonda um timestamp string para o início da hora ou hora seguinte, de acordo com os minutos (00‑29 → HH:00; 30‑59 → HH+1:00).
* retorna None se a entrada também for nula ou inválida.

---

### criar_arr_redondo()

* aplica a UDF arredonda_meia_hora sobre arr_real
* cria a coluna arr_real_arred (usada para casar com vento de destino).

---

### adicionar_velocidade_vento()

* faz broadcast do mapa {IATA: dicionário de vento}.
* UDF vento_origem procura velocidade pela chave “YYYY‑MM‑DD HH:MM:SS” derivada de time_hour
* UDF vento_destino faz o mesmo usando arr_real_arred.
* campos permanecem None se chave ou aeroporto não estiverem no mapa.

---

### garantir_integridade()

* adiciona colunas faltantes cheias de None; garante que cada linha possua todas as sete colunas novas, evitando perda de registros ao gravar CSV.

---

### salvar_csv(saida="base_enriquecida.csv")

* grava o DataFrame final em modo overwrite com cabeçalho, na pasta indicada.
* útil para inspeção posterior ou ingestão num data lake.

---

### executar(saida="base_enriquecida.csv")

Pipeline orquestrador; chama, em ordem:

1. coletar_aeroportos
2. buscar_coordenadas
3. baixar_vento
4. adicionar_gmt
5. calcular_partida_chegada
6. criar_arr_redondo
7. adicionar_velocidade_vento
8. salvar_csv

Retorna o DataFrame enriquecido; imprime mensagens em cada etapa para facilitar depuração.

In [None]:
# ────────────────────────────────── IMPORTS

from datetime import datetime, timedelta, timezone
from pathlib import Path
import requests

from pyspark.sql import SparkSession, functions as F, types as T
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


# ---------------------------------------------------------------------------
# 1. Classe de suporte: Carregar DataFrame
# ---------------------------------------------------------------------------
class LeitorCSV:
    """Abre o Spark localmente e lê o CSV de voos."""

    def __init__(self, nome_app="voos‑spark"):
        self.spark = (
            SparkSession.builder.appName(nome_app)
            .master("local[*]")
            .config("spark.sql.shuffle.partitions", "4")
            .getOrCreate()
        )

    def ler(self, caminho):
        print("🔹 Lendo CSV …")
        return (
            self.spark.read.option("header", True).option("inferSchema", True).csv(caminho)
        )


# ---------------------------------------------------------------------------
# 2. Classe principal: Enriquecimento
# ---------------------------------------------------------------------------
class EnriquecedorVentos:
    """Executa todo o fluxo de enriquecimento."""

    URL_AIRPORT = "https://airportdb.io/api/v1/airport/K{code}?apiToken={key}"
    URL_METEO = "https://archive-api.open-meteo.com/v1/archive"
    API_AIRPORT_KEY = '8618d96aca630e2b49d157fe2fa9a49aebeb24dd8eea5ad4832145ebf6b215e65a2d508691ff175571fc6d896443484d'

    # ---------------------------------------------------------------------
    # 2.0 – Construtor
    # ---------------------------------------------------------------------
    def __init__(self, df_voos, spark):
        self.df = df_voos
        self.spark = spark
        self.aero_infos = {}  # {IATA: {lat, lon, GMT, vento}}
    # ---------------------------------------------------------------------
    # 2.1 – Coleta todos os aeroportos que aparecem na base
    # ---------------------------------------------------------------------
    def coletar_aeroportos(self):
        print("🔹 Coletando aeroportos …")
        codigos = (
            self.df.select("origin").union(self.df.select("dest"))
            .distinct().rdd.flatMap(lambda r: r).collect()
        )
        self.aero_infos = {c: {} for c in codigos}
        print(f"  ✅ {len(codigos)} aeroportos detectados")
    # ---------------------------------------------------------------------
    # 2.2 – Buscar latitude/longitude de cada aeroporto via API
    # ---------------------------------------------------------------------
    def buscar_coordenadas(self):
        print("🔹 Buscando coordenadas …")
        for cod in self.aero_infos:
            url = self.URL_AIRPORT.format(code=cod, key=self.API_AIRPORT_KEY)
            r = requests.get(url, timeout=10)
            if r.ok:
                js = r.json()
                self.aero_infos[cod]["lat"] = float(js["latitude_deg"])
                self.aero_infos[cod]["lon"] = float(js["longitude_deg"])
            else:
                print(f"  ⚠️  {cod}: coordenadas não encontradas")

    # ---------------------------------------------------------------------
    # 2.3 – Baixar série histórica de vento e GMT de cada aeroporto
    # ---------------------------------------------------------------------
    def baixar_vento(self):
        print("🔹 Baixando vento horário (Open‑Meteo) …")
        dmin, dmax = (
            self.df.select(F.to_date("time_hour").alias("d")).agg(F.min("d"), F.max("d")).first()
        )
        dmax += timedelta(days=1)

        sess = requests.Session()
        sess.mount("https://", HTTPAdapter(max_retries=Retry(total=5, backoff_factor=1.5)))

        for cod, info in self.aero_infos.items():
            if "lat" not in info:
                continue
            params = dict(
                latitude=info["lat"], longitude=info["lon"],
                start_date=dmin.strftime("%Y-%m-%d"), end_date=dmax.strftime("%Y-%m-%d"),
                hourly="windspeed_10m", timezone="auto"
            )
            try:
                js = sess.get(self.URL_METEO, params=params, timeout=20).json()
            except Exception as e:
                print(f"  ⚠️  {cod}: falha Open‑Meteo ({e})")
                continue
            info["GMT"] = int(js["utc_offset_seconds"] / 3600)
            info["vento"] = {
                t.replace("T", " ") + ":00": v
                for t, v in zip(js["hourly"]["time"], js["hourly"]["windspeed_10m"])
            }
            print(f"  ✅ {cod}: {len(info['vento'])} horas de vento")

    # ---------------------------------------------------------------------
    # 2.4 – Adicionar colunas GMT
    # --------------------------------------------------------------------
    def adicionar_gmt(self):
        print("🔹 Inserindo GMT …")
        gmap = {c: v["GMT"] for c, v in self.aero_infos.items() if "GMT" in v}
        bc = self.spark.sparkContext.broadcast(gmap)

        udf_gmt = F.udf(lambda c: bc.value.get(c), T.IntegerType())
        self.df = (
            self.df.withColumn("gmt_origin", udf_gmt("origin"))
                   .withColumn("gmt_dest", udf_gmt("dest"))
        )
    # -----------------------------------------------------------------
    # 2.5  Horários real de partida e chegada
    # -----------------------------------------------------------------
    def calcular_partida_chegada(self):
        print("🔹 Calculando dep_real e arr_real …")

        # Coluna de partida real (dep_real) – se 'base' for null, devolve null
        self.df = (
            self.df
            .withColumn("base", F.to_timestamp("time_hour"))          # pode virar null
            .withColumn(
                "dep_real",
                F.when(
                    F.col("base").isNull() | F.col("dep_delay").isNull(),
                    F.lit(None).cast("timestamp")
                ).otherwise(
                    F.expr("base + INTERVAL 1 MINUTE * dep_delay")
                )
            )
        )

        # Broadcast de GMTs disponíveis
        gmt_map = {c: v["GMT"] for c, v in self.aero_infos.items() if "GMT" in v}
        bc = self.spark.sparkContext.broadcast(gmt_map)

        def chegada(dep_real, orig, dest, dur):
            """Retorna string chegada ou None; nunca lança exceção."""
            if None in (dep_real, dur):                             # dados faltantes
                return None
            go, gd = bc.value.get(orig), bc.value.get(dest)
            if None in (go, gd):                                    # GMT faltante
                return None
            try:
                dep_real = dep_real.replace(tzinfo=timezone(timedelta(hours=go)))
                arr_utc = dep_real.astimezone(timezone.utc) + timedelta(minutes=dur)
                arr_dest = arr_utc.astimezone(timezone(timedelta(hours=gd)))
                return arr_dest.strftime("%Y-%m-%d %H:%M:%S")
            except Exception:
                return None                                         # falha → nulo

        udf_chegada = F.udf(chegada, T.StringType())
        self.df = self.df.withColumn(
            "arr_real",
            udf_chegada("dep_real", "origin", "dest", "air_time"),
        )

    # -----------------------------------------------------------------
    # 2.6  Arredondar em 30 min (usa arr_real, mas devolve None se arr_real nulo)
    # -----------------------------------------------------------------
    @staticmethod
    def arredonda_meia_hora(dt_str):
        if dt_str is None:
            return None
        try:
            dt = datetime.strptime(dt_str, "%Y-%m-%d %H:%M:%S")
        except ValueError:
            return None
        meia_hora = dt.replace(minute=0, second=0)
        if dt.minute >= 30:
            meia_hora += timedelta(hours=1)
        return meia_hora.strftime("%Y-%m-%d %H:%M:%S")
    # -----------------------------------------------------------------
    # 2.7 – Criar coluna de chegada arredondada
    # -----------------------------------------------------------------
    def criar_arr_redondo(self):
        """
        Gera a coluna arr_real_arred (hora de chegada arredondada ao bloco de 30 min).

        • Se arr_real for nulo ou tiver formato inválido → devolve None
        • Mantém TODAS as linhas do DataFrame
        """
        print("🔹 Criando arr_real_arred (chegada arredondada) …")

        # UDF que aplica a função arredonda_meia_hora segura para nulos
        udf_round = F.udf(self.arredonda_meia_hora, T.StringType())

        self.df = self.df.withColumn("arr_real_arred", udf_round("arr_real"))
    # -----------------------------------------------------------------
    # 2.8  Velocidade do vento (deixa resultado nulo se chave não existir)
    # -----------------------------------------------------------------
    def adicionar_velocidade_vento(self):
        print("🔹 Inserindo velocidade do vento …")

        vmap = {c: v["vento"] for c, v in self.aero_infos.items() if "vento" in v}
        bc = self.spark.sparkContext.broadcast(vmap)

        def vento_origem(cod, ts):
            if ts is None: return None
            chave = ts.strftime("%Y-%m-%d %H:%M:%S")
            return bc.value.get(cod, {}).get(chave)

        def vento_destino(cod, ts_str):
            if not ts_str: return None
            return bc.value.get(cod, {}).get(ts_str)

        self.df = (
            self.df
            .withColumn("wind_origin",
                        F.udf(vento_origem, T.DoubleType())("origin", F.to_timestamp("time_hour")))
            .withColumn("wind_dest",
                        F.udf(vento_destino, T.DoubleType())("dest", "arr_real_arred"))
        )

    # -----------------------------------------------------------------
    # 2.9  Garante que todas as linhas continuem existindo
    # -----------------------------------------------------------------
    def garantir_integridade(self):
        """Adiciona colunas vazias (cheias de None) que porventura não existam."""
        colunas_esperadas = [
            "gmt_origin", "gmt_dest",
            "dep_real", "arr_real", "arr_real_arred",
            "wind_origin", "wind_dest",
        ]
        for col in colunas_esperadas:
            if col not in self.df.columns:
                self.df = self.df.withColumn(col, F.lit(None))
   # ---------------------------------------------------------------------
    # 2.10 – Salvar CSV
    # ---------------------------------------------------------------------
    def salvar_csv(self, saida="base_enriquecida.csv"):
        print("🔹 Salvando CSV …")
        self.df.write.mode("overwrite").option("header", True).csv(saida)
        print(f"✅ Arquivos gravados em {Path(saida).resolve()}")
    # ---------------------------------------------------------------------
    # 2.11 – Executando o pipeline
    # ---------------------------------------------------------------------
    def executar(self, saida="base_enriquecida.csv"):
        self.coletar_aeroportos()
        self.buscar_coordenadas()
        self.baixar_vento()
        self.adicionar_gmt()
        self.calcular_partida_chegada()
        self.criar_arr_redondo()
        self.adicionar_velocidade_vento()
        self.salvar_csv(saida)
        return self.df



# ---------------------------------------------------------------------------
# 3. Enriquecendo
# ---------------------------------------------------------------------------
if __name__ == "__main__":
    leitor = LeitorCSV()
    voos = leitor.ler("airports-database.csv")
    voos.count()
    pipe = EnriquecedorVentos(voos, leitor.spark)
    df_final = pipe.executar("base_enriquecida.csv")
    print("🔸 Linhas finais:", df_final.count())

🔹 Lendo CSV …
🔹 Coletando aeroportos …
  ✅ 107 aeroportos detectados
🔹 Buscando coordenadas …
  ⚠️  BQN: coordenadas não encontradas
  ⚠️  PSE: coordenadas não encontradas
  ⚠️  SJU: coordenadas não encontradas
  ⚠️  HNL: coordenadas não encontradas
  ⚠️  STT: coordenadas não encontradas
  ⚠️  ANC: coordenadas não encontradas
🔹 Baixando vento horário (Open‑Meteo) …
  ✅ JFK: 8784 horas de vento
  ✅ LGA: 8784 horas de vento
  ✅ EWR: 8784 horas de vento
  ✅ IAH: 8784 horas de vento
  ✅ PBI: 8784 horas de vento
  ✅ BOS: 8784 horas de vento
  ✅ CLT: 8784 horas de vento
  ✅ SNA: 8784 horas de vento
  ✅ XNA: 8784 horas de vento
  ✅ SYR: 8784 horas de vento
  ✅ JAX: 8784 horas de vento
  ✅ CHS: 8784 horas de vento
  ✅ MEM: 8784 horas de vento
  ✅ SAN: 8784 horas de vento
  ✅ DCA: 8784 horas de vento
  ✅ MYR: 8784 horas de vento
  ✅ MDW: 8784 horas de vento
  ✅ BNA: 8784 horas de vento
  ✅ BTV: 8784 horas de vento
  ✅ EGE: 8784 horas de vento
  ✅ AVL: 8784 horas de vento
  ✅ IND: 8784 horas de 

## Classe Perguntas

A classe encapsula todas as queries de análise pedidas no case. Ela recebe um único DataFrame Spark no construtor, e cada método responde exatamente a uma das 17 perguntas, imprimindo no console o resultado. Todos os métodos utilizam apenas APIs padrão do PySpark (groupBy, agg, filter, orderBy, janelas) – simples de ler, manter e sem dependências externas.

---

### __Construtor init__(df)

* Construtor: guarda o DataFrame passado em self.df, que será reutilizado por todos os métodos – evita leituras repetidas de disco.

---

### pergunta_1

* Usa count() para obter o número total de registros na tabela de voos.

---

### pergunta_2

* Aplica filter() com condição dep_time IS NULL AND arr_time IS NULL;
  conta as linhas resultantes → total de voos cancelados.

---

### pergunta_3

* Remove cancelados (ambos horários nulos) e calcula avg(dep_delay) via agg().
  Retorna atraso médio na decolagem.

---

### pergunta_4

* Agrupa por dest, conta linhas, ordena por count decrescente e faz limit(5) → 5 aeroportos com mais pousos.

---

### pergunta_5

* Agrupa por par (origin, dest), conta e ordena decrescente;
  first() devolve rota mais frequente.

---

### pergunta_6

* Agrupa por carrier, calcula média de arr_delay, ordena e pega os 5 maiores.
  Retorna as 5 companhias com maior atraso médio na chegada.

---

### pergunta_7

* Cria coluna dow (day‑of‑week) a partir de time_hour,
  agrupa, conta e pega o maior → dia da semana com mais voos
  (Spark: 1 = Domingo, 7 = Sábado).

---

### pergunta_8

* Marca atrasado = dep_delay > 30, agrupa por month,
  calcula (sum(atrasado) / total) * 100.
  Resultado: percentual mensal de voos com decolagem atrasada > 30 min.

---

### pergunta_9

* Filtra somente voos com dest == 'SEA', agrupa por origin, conta e ordena.
  Retorna origem mais comum para Seattle (SEA).

---

### pergunta_10

* Repete lógica de dow, mas agora faz avg(dep_delay) por dia.
  Mostra atraso médio de partida por dia da semana.

---

### pergunta_11

* Agrupa (origin, dest), média de air_time;
  maior valor = rota com maior tempo médio de voo.

---

### pergunta_12

* Usa janela Window.partitionBy("origin").orderBy(desc("count"))
  para ranquear destinos dentro de cada origem; mantém rank = 1.
  Resultado: destino mais frequente por aeroporto de origem.

---

### pergunta_13

* Agrupa rotas e calcula stddev_pop(air_time); ordena e pega 3 maiores.
  Dá as três rotas com maior variação do tempo de voo.

---

### pergunta_14

* Filtra linhas onde dep_delay > 60, tira avg(arr_delay).
  Mostra média de atraso na chegada quando a partida atrasou > 1 h.

---

### pergunta_15

* Conta voos por (month, day), depois média de count dentro de cada mês.
  Retorna média de voos diários por mês.

---

### pergunta_16

* Filtra arr_delay > 30, agrupa rotas e pega top‑3 em contagem.
  Entrega as três rotas mais comuns com chegada atrasada > 30 min.

---

### pergunta_17

* Requisito repete a lógica da 12 → método simplesmente chama pergunta_12().

---

### executar_todas

* Itera de 1 a 17, faz getattr(self, f"pergunta_{i}") e executa.
  Facilita rodar todo o questionário em um único comando.

In [8]:
# -*- coding: utf‑8 -*-
"""
Classe Perguntas
────────────────
Resolve, com PySpark, as 17 perguntas pedidas sobre o DataFrame de voos.
Cada método usa **operações básicas** (groupBy, agg, orderBy, filter, etc.)
e imprime a resposta no formato:

    Pergunta X, resposta: <resultado>
"""

from pyspark.sql import functions as F, Window


class Perguntas:
    # ─────────────────────────────────────────────────────────────
    def __init__(self, df):
        """
        Recebe o DataFrame de voos já carregado em self.df
        """
        self.df = df

    # ───────────────────────── 1
    def pergunta_1(self):
        """Número total de voos."""
        total = self.df.count()
        print(f"Pergunta 1, resposta: {total}")

    # ───────────────────────── 2
    def pergunta_2(self):
        """Voos cancelados = dep_time e arr_time são nulos."""
        cancelados = (
            self.df.filter(F.col("dep_time").isNull() & F.col("arr_time").isNull())
            .count()
        )
        print(f"Pergunta 2, resposta: {cancelados}")

    def pergunta_3(self):
        """Atraso médio na partida (dep_delay), excluindo voos cancelados."""
        # Filtra voos que NÃO são cancelados (dep_time e arr_time não nulos)
        df_validos = self.df.filter(
            (F.col("dep_time").isNotNull()) &
            (F.col("arr_time").isNotNull())
        )
        # Calcula a média de dep_delay sobre esse subconjunto
        media = df_validos.agg(F.avg("dep_delay")).first()[0]
        print(f"Pergunta 3, resposta: {media:.2f} minutos")

    # ───────────────────────── 4
    def pergunta_4(self):
        """Top‑5 aeroportos de destino (mais pousos)."""
        top5 = (
            self.df.groupBy("dest").count().orderBy(F.desc("count")).limit(5).collect()
        )
        print("Pergunta 4, resposta (dest, pousos):", [(r["dest"], r["count"]) for r in top5])

    # ───────────────────────── 5
    def pergunta_5(self):
        """Rota (origin‑dest) mais frequente."""
        rota = (
            self.df.groupBy("origin", "dest")
            .count()
            .orderBy(F.desc("count"))
            .first()
        )
        print(f"Pergunta 5, resposta: {rota['origin']} → {rota['dest']} ({rota['count']} voos)")

    # ───────────────────────── 6
    def pergunta_6(self):
        """Top‑5 cias com maior atraso médio na chegada."""
        top5 = (
            self.df.groupBy("carrier")
            .agg(F.avg("arr_delay").alias("media_atraso"))
            .orderBy(F.desc("media_atraso"))
            .limit(5)
            .collect()
        )
        print(
            "Pergunta 6, resposta (carrier, atraso médio):",
            [(r["carrier"], round(r["media_atraso"], 2)) for r in top5],
        )

    # ───────────────────────── 7
    def pergunta_7(self):
        """Dia da semana com mais voos (1=Dom, 7=Sáb no Spark)."""
        dias = (
            self.df.withColumn("dow", F.dayofweek("time_hour"))
            .groupBy("dow")
            .count()
            .orderBy(F.desc("count"))
            .first()
        )
        print(f"Pergunta 7, resposta: Dia‑da‑semana {dias['dow']} ({dias['count']} voos)")

    # ───────────────────────── 8
    def pergunta_8(self):
        """Percentual mensal de voos c/ atraso de partida >30 min."""
        atrasos = (
            self.df.withColumn("atrasado", F.col("dep_delay") > 30)
            .groupBy("month")
            .agg(
                (F.sum(F.col("atrasado").cast("int")) / F.count("*") * 100).alias(
                    "perc_atraso"
                )
            )
            .orderBy("month")
            .collect()
        )
        print(
            "Pergunta 8, resposta (mês, % atrasado):",
            [(r["month"], round(r["perc_atraso"], 2)) for r in atrasos],
        )

    # ───────────────────────── 9
    def pergunta_9(self):
        """Origem mais comum para voos que pousaram em SEA."""
        origem = (
            self.df.filter(F.col("dest") == "SEA")
            .groupBy("origin")
            .count()
            .orderBy(F.desc("count"))
            .first()
        )
        print(f"Pergunta 9, resposta: {origem['origin']} ({origem['count']} voos)")

    # ───────────────────────── 10
    def pergunta_10(self):
        """Média de dep_delay por dia da semana."""
        medias = (
            self.df.withColumn("dow", F.dayofweek("time_hour"))
            .groupBy("dow")
            .agg(F.avg("dep_delay").alias("media"))
            .orderBy("dow")
            .collect()
        )
        print(
            "Pergunta 10, resposta (dow, atraso médio):",
            [(r["dow"], round(r["media"], 2)) for r in medias],
        )

    # ───────────────────────── 11
    def pergunta_11(self):
        """Rota com maior tempo médio de voo (air_time)."""
        rota = (
            self.df.groupBy("origin", "dest")
            .agg(F.avg("air_time").alias("media"))
            .orderBy(F.desc("media"))
            .first()
        )
        print(
            f"Pergunta 11, resposta: {rota['origin']} → {rota['dest']} "
            f"(média {round(rota['media'],2)} min)"
        )

    # ───────────────────────── 12
    def pergunta_12(self):
        """Destino mais comum para cada origem."""
        janela = Window.partitionBy("origin").orderBy(F.desc("count"))
        destino_frequente = (
            self.df.groupBy("origin", "dest")
            .count()
            .withColumn("rank", F.row_number().over(janela))
            .filter("rank = 1")
            .select("origin", "dest")
            .collect()
        )
        print(
            "Pergunta 12, resposta (origem → destino):",
            [(r["origin"], r["dest"]) for r in destino_frequente],
        )

    # ───────────────────────── 13
    def pergunta_13(self):
        """3 rotas com maior desvio‑padrão (variação) do air_time."""
        rotas = (
            self.df.groupBy("origin", "dest")
            .agg(F.stddev_pop("air_time").alias("std"))
            .orderBy(F.desc("std"))
            .limit(3)
            .collect()
        )
        print(
            "Pergunta 13, resposta (rota, desvio):",
            [
                (f"{r['origin']}→{r['dest']}", round(r["std"], 2))
                for r in rotas
            ],
        )

    # ───────────────────────── 14
    def pergunta_14(self):
        """Média de arr_delay quando dep_delay > 60 min."""
        media = (
            self.df.filter(F.col("dep_delay") > 60)
            .agg(F.avg("arr_delay"))
            .first()[0]
        )
        print(f"Pergunta 14, resposta: {round(media,2)} minutos")

    # ───────────────────────── 15
    def pergunta_15(self):
        """Média de voos diários para cada mês."""
        diarios = (
            self.df.groupBy("month", "day")
            .count()
            .groupBy("month")
            .agg(F.avg("count").alias("media_dia"))
            .orderBy("month")
            .collect()
        )
        print(
            "Pergunta 15, resposta (mês, média diária):",
            [(r["month"], round(r["media_dia"], 1)) for r in diarios],
        )

    # ───────────────────────── 16
    def pergunta_16(self):
        """3 rotas mais comuns com arr_delay > 30 min."""
        rotas = (
            self.df.filter(F.col("arr_delay") > 30)
            .groupBy("origin", "dest")
            .count()
            .orderBy(F.desc("count"))
            .limit(3)
            .collect()
        )
        print(
            "Pergunta 16, resposta (rota, qtd):",
            [(f"{r['origin']}→{r['dest']}", r["count"]) for r in rotas],
        )

    # ───────────────────────── 17
    def pergunta_17(self):
        """Principal destino para cada origem (mesmo que 12, mas exigido)."""
        self.pergunta_12()  # já resolve; reutilizamos

    # ───────────────────────── EXECUTA TODAS
    def executar_todas(self):
        """Chama uma a uma, na ordem."""
        for i in range(1, 18):
            metodo = getattr(self, f"pergunta_{i}")
            metodo()


perguntas = Perguntas(df_final)
perguntas.executar_todas()

Pergunta 1, resposta: 336776
Pergunta 2, resposta: 8255
Pergunta 3, resposta: 12.58 minutos
Pergunta 4, resposta (dest, pousos): [('ORD', 17283), ('ATL', 17215), ('LAX', 16174), ('BOS', 15508), ('MCO', 14082)]
Pergunta 5, resposta: JFK → LAX (11262 voos)
Pergunta 6, resposta (carrier, atraso médio): [('F9', 21.92), ('FL', 20.12), ('EV', 15.8), ('YV', 15.56), ('OO', 11.93)]
Pergunta 7, resposta: Dia‑da‑semana 2 (50690 voos)
Pergunta 8, resposta (mês, % atrasado): [(1, 12.41), (2, 12.75), (3, 14.94), (4, 15.99), (5, 15.34), (6, 20.24), (7, 20.98), (8, 14.45), (9, 8.77), (10, 9.34), (11, 8.76), (12, 17.31)]
Pergunta 9, resposta: JFK (2092 voos)
Pergunta 10, resposta (dow, atraso médio): [(1, 11.59), (2, 14.78), (3, 10.63), (4, 11.8), (5, 16.15), (6, 14.7), (7, 7.65)]
Pergunta 11, resposta: JFK → HNL (média 623.09 min)
Pergunta 12, resposta (origem → destino): [('EWR', 'ORD'), ('JFK', 'LAX'), ('LGA', 'ATL')]
Pergunta 13, resposta (rota, desvio): [('EWR→HNL', 21.24), ('LGA→MYR', 20.68), ('J

# Perguntas

### Pergunta 1

**Qual é o número total de voos no conjunto de dados?**

Resposta: 336776

### Pergunta 2

**Quantos voos foram cancelados? (Considerando que voos cancelados têm dep_time e arr_time nulos)**

Resposta: 8255

### Pergunta 3

**Qual é o atraso médio na partida dos voos (dep_delay)?**

Resposta: 12.58 minutos

### Pergunta 4

**Quais são os 5 aeroportos com maior número de pousos?**

Resposta:

| dest | n Pousos |
| ---- | -------- |
| ORD  | 17.283   |
| ATL  | 17.215   |
| LAX  | 16.174   |
| BOS  | 15.508   |
| MCO  | 14.082   |

### Pergunta 5

**Qual é a rota mais frequente (par origin-dest)?**

Resposta: JFK → LAX (11262 voos)

### Pergunta 6

**Quais são as 5 companhias aéreas com maior tempo médio de atraso na chegada? (Exiba também o tempo)**

Resposta:

[('F9', 21.92), ('FL', 20.12), ('EV', 15.8), ('YV', 15.56), ('OO', 11.93)]

| Companhia | Atraso |
| --------- | ------ |
| F9        | 21.92  |
| FL        | 20.12  |
| EV        | 15.8   |
| YV        | 15.56  |
| OO        | 11.93  |

### Pergunta 7

**Qual é o dia da semana com maior número de voos?**

Resposta: Segunda Feira 50690 voos

### Pergunta 8

**Qual o percentual mensal dos voos tiveram atraso na partida superior a 30 minutos?**

Resposta:

| Mês | Percentual |
| ---- | ---------- |
| 1    | 12.41      |
| 2    | 12.75      |
| 3    | 14.94      |
| 4    | 15.99      |
| 5    | 15.34      |
| 6    | 20.24      |
| 7    | 20.98      |
| 8    | 14.45      |
| 9    | 8.77       |
| 10   | 9.34       |
| 11   | 8.76       |
| 12   | 17.31      |

### Pergunta 9

**Qual a origem mais comum para voos que pousaram em Seattle (SEA)?**

Resposta: JFK (2092 voos)

### Pergunta 10

**Qual é a média de atraso na partida dos voos (dep_delay) para cada dia da semana?**

Resposta:

| Dia da semana | Percentual |
| ------------- | ---------- |
| Domingo       | 11.59      |
| Segunda feira | 14.78      |
| Terça feira   | 10.63      |
| Quarta feira  | 11.8       |
| Quinta feira  | 16.15      |
| Sexta feira   | 14.7       |
| Sábado        | 7.65)      |

### Pergunta 11

**Qual é a rota que teve o maior tempo de voo médio (air_time)?**

Resposta: JFK → HNL (média 623.09 min)

### Pergunta 12

**Para cada aeroporto de origem, qual é o aeroporto de destino mais comum?**

Resposta: EWR → ORD , JFK → LAX,  LGA → ATL

### Pergunta 13

**Quais são as 3 rotas que tiveram a maior variação no tempo médio de voo (air_time) ?**

Resposta: EWR→HNL 21.24 minutos, LGA→MYR 20.68 minutos, JFK→HNL 20.66 minuttos

### Pergunta 14

**Qual é a média de atraso na chegada para voos que tiveram atraso na partida superior a 1 hora?**

Resposta: 119.05 minutos

### Pergunta 15

**Qual é a média de voos diários para cada mês do ano?**

Resposta:

[(1, 871.1), (2, 891.1), (3, 930.1), (4, 944.3), (5, 928.9), (6, 941.4), (7, 949.2), (8, 946.0), (9, 919.1), (10, 931.9), (11, 908.9), (12, 907.6)]

| Mês  | Percentual |
| ---- | ---------- |
| 1    | 871.1      |
| 2    | 891.1      |
| 3    | 930.1      |
| 4    | 944.3      |
| 5    | 928.9      |
| 6    | 941.4      |
| 7    | 949.2      |
| 8    | 946.0      |
| 9    | 946.0      |
| 10   | 931.9      |
| 11   | 908.9      |
| 12   | 907.6      |

### Pergunta 16

**Quais são as 3 rotas mais comuns que tiveram atrasos na chegada superiores a 30 minutos?**

Resposta:

| Rota    | Quantidade |
| --------| ---------- |
| LGA→ATL | 1563       |
| JFK→LAX | 1286       |
| LGA→ORD | 1188       |

### Pergunta 17

**Para cada origem, qual o principal destino?**

Resposta: EWR → ORD , JFK → LAX,  LGA → ATL

## Classe Pergunta_Final

A classe isola a última tarefa do case: listar os 5 voos com maior atraso na chegada usando o DataFrame que já contém todas as colunas enriquecidas (GMT, ventos, horários reais).

---

### __Construtor init__(df_enriquecido)

* Objetivo – Armazenar o DataFrame resultante do pipeline de enriquecimento.
* Parâmetro df_enriquecido – DataFrame Spark contendo, pelo menos, as colunas:
  origin, dest, sched_dep_time, dep_delay, wind_origin, arr_real, arr_delay, wind_dest.
* Guarda esse dataframe em self.df para uso posterior.

---

### exibir_top5_atraso_chegada()

* Passo 1 – Define colunas_desejadas, lista das colunas que serão mostradas na saída.
  Foca nas métricas pedidas: aeroportos, horários, atrasos e velocidades de vento.
* Passo 2 – Ordena o DataFrame por arr_delay em ordem decrescente com F.desc_nulls_last("arr_delay"), colocando linhas sem valor de atraso (nulas) no final – assim não se perdem registros válidos.
* Passo 3 – Aplica select(*colunas_desejadas) para manter só as colunas relevantes e limit(5) para pegar os cinco piores atrasos.
  (o exemplo usou limit(6) por engano; ajuste para 5 se desejar estritamente.)
* Passo 4 – Usa collect() para trazer as 5 linhas ao driver e imprime, formatando cada campo num texto amigável.
  Mantém quaisquer valores None caso dados de vento ou horários estejam ausentes – evita perder linhas por falha de informação.

In [5]:
# -*- coding: utf‑8 -*-
"""
Classe Pergunta_Final
─────────────────────
Exibe, a partir do DataFrame **já enriquecido**, os 5 voos com MAIOR atraso na
chegada, mostrando:

• origin          – aeroporto de saída
• dest            – aeroporto de destino
• sched_dep_time  – horário programado de decolagem
• dep_delay       – atraso (ou adiantamento) na decolagem ‑‑ em minutos
• wind_origin     – velocidade do vento na origem (m/s) no instante da partida
• arr_real        – horário real de chegada (timestamp string)
• arr_delay       – atraso (ou adiantamento) na chegada ‑‑ em minutos
• wind_dest       – velocidade do vento no destino (m/s) no instante do pouso
"""

from pyspark.sql import functions as F


class Pergunta_Final:
    # ─────────────────────────────────────────────────────────────
    def __init__(self, df_enriquecido):
        self.df = df_enriquecido

    # ─────────────────────────────────────────────────────────────
    def exibir_top5_atraso_chegada(self):
        """
        Seleciona as 5 maiores arr_delay (desc), exibe colunas de interesse.
        Se houver valores nulos, eles são mantidos na saída para não perder linhas.
        """
        colunas_desejadas = [
            "origin",
            "dest",
            "sched_dep_time",
            "dep_delay",
            "wind_origin",
            "arr_real",
            "arr_delay",
            "wind_dest",
        ]

        top5 = (
            self.df.orderBy(F.desc_nulls_last("arr_delay"))
            .select(*colunas_desejadas)
            .limit(6)
            .collect()
        )

        print("\n🔶 5 voos com maior atraso na chegada:")
        for idx, linha in enumerate(top5, 1):
            print(
                f"{idx}. {linha['origin']}→{linha['dest']} |  "
                f"Hor. previsto: {linha['sched_dep_time']}  | "
                f"Atraso decolagem: {linha['dep_delay']} min  | "
                f"Vento origem: {linha['wind_origin']} m/s  | "
                f"Chegada real: {linha['arr_real']}  | "
                f"Atraso chegada: {linha['arr_delay']} min  | "
                f"Vento destino: {linha['wind_dest']} m/s"
            )
pergunta_final = Pergunta_Final(df_final)
pergunta_final.exibir_top5_atraso_chegada()


🔶 5 voos com maior atraso na chegada:
1. JFK→HNL |  Hor. previsto: 900  | Atraso decolagem: 1301.0 min  | Vento origem: 3.6 m/s  | Chegada real: None  | Atraso chegada: 1272.0 min  | Vento destino: None m/s
2. JFK→CMH |  Hor. previsto: 1935  | Atraso decolagem: 1137.0 min  | Vento origem: 7.1 m/s  | Chegada real: 2013-06-16 15:11:00  | Atraso chegada: 1127.0 min  | Vento destino: 19.6 m/s
3. EWR→ORD |  Hor. previsto: 1635  | Atraso decolagem: 1126.0 min  | Vento origem: 11.2 m/s  | Chegada real: 2013-01-11 11:37:00  | Atraso chegada: 1109.0 min  | Vento destino: 17.1 m/s
4. JFK→SFO |  Hor. previsto: 1845  | Atraso decolagem: 1014.0 min  | Vento origem: 15.5 m/s  | Chegada real: 2013-09-21 13:48:00  | Atraso chegada: 1007.0 min  | Vento destino: 14.2 m/s
5. JFK→CVG |  Hor. previsto: 1600  | Atraso decolagem: 1005.0 min  | Vento origem: 11.6 m/s  | Chegada real: 2013-07-23 10:21:00  | Atraso chegada: 989.0 min  | Vento destino: 9.7 m/s
6. JFK→TPA |  Hor. previsto: 1900  | Atraso decolag

### Pergunta Final

🔶 6 voos com maior atraso na chegada (originalmente eram 5 mas o primeiro aeroporto de destino não foi encontrado pela api ):

1. JFK→HNL |  Hor. previsto: 900  | Atraso decolagem: 1301.0 min  | Vento origem: 3.6 m/s  | Chegada real: None  | Atraso chegada: 1272.0 min  | Vento destino: None m/s **(O areroporto HNL não foi encontrado pela API)**
2. JFK→CMH |  Hor. previsto: 1935  | Atraso decolagem: 1137.0 min  | Vento origem: 7.1 m/s  | Chegada real: 2013-06-16 15:11:00  | Atraso chegada: 1127.0 min  | Vento destino: 19.6 m/s
3. EWR→ORD |  Hor. previsto: 1635  | Atraso decolagem: 1126.0 min  | Vento origem: 11.2 m/s  | Chegada real: 2013-01-11 11:37:00  | Atraso chegada: 1109.0 min  | Vento destino: 17.1 m/s
4. JFK→SFO |  Hor. previsto: 1845  | Atraso decolagem: 1014.0 min  | Vento origem: 15.5 m/s  | Chegada real: 2013-09-21 13:48:00  | Atraso chegada: 1007.0 min  | Vento destino: 14.2 m/s
5. JFK→CVG |  Hor. previsto: 1600  | Atraso decolagem: 1005.0 min  | Vento origem: 11.6 m/s  | Chegada real: 2013-07-23 10:21:00  | Atraso chegada: 989.0 min  | Vento destino: 9.7 m/s
6. JFK→TPA |  Hor. previsto: 1900  | Atraso decolagem: 960.0 min  | Vento origem: 9.4 m/s  | Chegada real: 2013-04-11 13:19:00  | Atraso chegada: 931.0 min  | Vento destino: 17.8 m/s

## Classe Treino

A classe organiza todo o pipeline de modelagem que relaciona a velocidade do vento na origem (wind_origin) com o atraso na decolagem (dep_delay).
 Recebe o DataFrame Spark enriquecido, executa as etapas de pré‑processamento, treino, avaliação e persistência do modelo em um único fluxo.

---

### Construtor __init__(df_spark, caminho_saida="modelo_wind_delay.pkl")

* Guarda o DataFrame Spark original em self.df_spark.
* Define o caminho do arquivo onde o modelo será salvo (self.caminho_pkl).
* Cria self.modelo = None, que será preenchido depois do treino.

---

### preparar_dados()

* Seleciona apenas as colunas necessárias – wind_origin e dep_delay.
* Remove nulos com filter() para evitar erros de treino.
* Converte o resultado para pandas (toPandas()), pois o conjunto é pequeno (2 colunas) e simplifica a integração com scikit‑learn.
* Imprime quantas linhas restaram após a limpeza.

---

### dividir_treino_teste()

* Extrai X (feature) como matriz 2‑D e y (target) como vetor.
* Usa train_test_split com proporção 80 % / 20 % e random_state=42 para reprodutibilidade.
* Armazena em self.X_train, self.X_test, self.y_train, self.y_test.

---

### treinar_modelo()

* Instancia LinearRegression() – modelo mais simples possível de regressão.
* Ajusta (fit) com os dados de treino.
* Salva o objeto treinado em self.modelo.

---

### avaliar_modelo()

* Aplica predict() no conjunto de teste.
* Calcula e imprime três métricas básicas:

  * MAE (erro absoluto médio)
  * RMSE (raiz do erro quadrático médio)
  * R² (coeficiente de determinação)
* Não há exigência de boa performance – a etapa serve apenas para demonstrar a avaliação.

---

### salvar_modelo()

* Usa joblib.dump() para serializar self.modelo no caminho definido.
* O arquivo .pkl servirá para ser carregado pela API posteriormente.

---

### executar_tudo()

* Método orquestrador que chama, na ordem correta, todos os passos:

  1. preparar_dados
  2. dividir_treino_teste
  3. treinar_modelo
  4. avaliar_modelo
  5. salvar_modelo
* Retorna o objeto modelo treinado, facilitando uso imediato em memória.

In [6]:
# -*- coding: utf‑8 -*-
"""
Classe Treino
─────────────
Treina um modelo simples que estima o atraso na decolagem (dep_delay)
a partir da velocidade do vento na origem (wind_origin).

• Recebe um DataFrame Spark já enriquecido.
• Executa: limpeza → split → treino → avaliação → salva .pkl.
• Retorna o objeto modelo treinado.

Dependências:
-------------
pandas
scikit‑learn
joblib
"""

from math import sqrt
import joblib
import pandas as pd
from pyspark.sql import functions as F
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score


class Treino:
    # ─────────────────────────────────────────────────────────────
    def __init__(self, df_spark, caminho_saida="modelo_wind_delay.pkl"):
        """
        df_spark      : DataFrame PySpark com colunas 'wind_origin' e 'dep_delay'
        caminho_saida : onde salvar o arquivo .pkl
        """
        self.df_spark = df_spark
        self.caminho_pkl = caminho_saida
        self.modelo = None                 # será preenchido após treinamento

    # ───────────────────────── 1. Preparar dados
    def preparar_dados(self):
        """Seleciona colunas, remove nulos e converte para pandas."""
        print("🔹 Preparando dados …")
        df_limpo = (
            self.df_spark.select("wind_origin", "dep_delay")
            .filter(F.col("wind_origin").isNotNull() & F.col("dep_delay").isNotNull())
        )
        self.df_pandas = df_limpo.toPandas()   # 2 colunas → cabe bem em RAM
        print(f"   Linhas após limpeza: {len(self.df_pandas)}")

    # ───────────────────────── 2. Split em treino / teste
    def dividir_treino_teste(self):
        """Separa X (feature) e y (target) + faz train_test_split."""
        print("🔹 Dividindo treino / teste …")
        X = self.df_pandas[["wind_origin"]].values
        y = self.df_pandas["dep_delay"].values
        self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
            X, y, test_size=0.20, random_state=42
        )
        print(f"   Treino: {len(self.X_train)}  |  Teste: {len(self.X_test)}")

    # ───────────────────────── 3. Treinar modelo
    def treinar_modelo(self):
        """Ajusta Regressão Linear simples."""
        print("🔹 Treinando modelo …")
        self.modelo = LinearRegression()
        self.modelo.fit(self.X_train, self.y_train)
        print("   ✅ Treino concluído")

    # ───────────────────────── 4. Avaliar
    def avaliar_modelo(self):
        """Calcula MAE, RMSE e R² no conjunto de teste."""
        print("🔹 Avaliando modelo …")
        y_pred = self.modelo.predict(self.X_test)
        mae = mean_absolute_error(self.y_test, y_pred)
        rmse = sqrt(mean_squared_error(self.y_test, y_pred))
        r2 = r2_score(self.y_test, y_pred)
        print(f"   MAE : {mae:.2f}")
        print(f"   RMSE: {rmse:.2f}")
        print(f"   R²  : {r2:.3f}")

    # ───────────────────────── 5. Salvar em .pkl
    def salvar_modelo(self):
        """Grava o modelo em arquivo .pkl (joblib)."""
        joblib.dump(self.modelo, self.caminho_pkl)
        print(f"🔹 Modelo salvo em {self.caminho_pkl}")

    # ───────────────────────── 6. Pipeline completo
    def executar_tudo(self):
        """Roda todas as etapas na ordem correta e devolve o modelo treinado."""
        self.preparar_dados()
        self.dividir_treino_teste()
        self.treinar_modelo()
        self.avaliar_modelo()
        self.salvar_modelo()
        return self.modelo



In [7]:
# df_final é o DataFrame Spark já enriquecido (wind_origin + dep_delay não nulos)
treino = Treino(df_final)               # construtor recebe o DataFrame
modelo_treinado = treino.executar_tudo()   # executa pipeline completo


import joblib
modelo = joblib.load("modelo_wind_delay.pkl")
atraso_prev = modelo.predict([[4.8]])[0]
print(f"Atraso estimado p/ vento 4.8 m/s: {atraso_prev:.1f} min")

🔹 Preparando dados …
   Linhas após limpeza: 328521
🔹 Dividindo treino / teste …
   Treino: 262816  |  Teste: 65705
🔹 Treinando modelo …
   ✅ Treino concluído
🔹 Avaliando modelo …
   MAE : 23.12
   RMSE: 40.51
   R²  : 0.003
🔹 Modelo salvo em modelo_wind_delay.pkl
Atraso estimado p/ vento 4.8 m/s: 9.7 min
