### Dicionário da tabela `gold_previsao_paradas`  
*(14.565 previsões ativas – atualiza a cada 5 min)*

| Coluna             | Significado em português                                      | Exemplo da sua tabela          |
|--------------------|---------------------------------------------------------------|--------------------------------|
| `parada_id`        | Código oficial da parada (usado na API SPTrans)               | `700016792`                    |
| `parada`           | Nome da parada (exatamente como aparece no ponto)            | `PACAEMBU C/B`                 |
| `linha`            | Código da linha                                               | `34033`                        |
| `prefixo`          | Número do ônibus (o que aparece na frente)                    | `11898`                        |
| `distancia_km`     | Distância **real** do ônibus até a parada (em km)             | `0.001` km (= 1 metro)         |
| `velocidade_kmh`   | Velocidade **real** que o ônibus estava fazendo nos últimos minutos | `22.4` km/h                    |
| `eta_min`          | **ETA** → quantos minutos faltam para chegar                  | `0.0` (já está na parada)      |
| `status`           | Frase pronta pro painel                                       | **Chegando**                   |

**Legenda do `status`:**
- **Chegando** → ≤ 2 minutos (ou já na parada)  
- **Próximo** → 2–5 minutos  
- **Em rota** → 5–30 minutos  

**RESUMO DO QUE ESSE NOTEBOOK FAZ**

| Etapa | Resultado |
|-------|-----------|
| 1. Lê silver | Dados limpos |
| 2. Calcula velocidade real | Baseado em 2 últimas posições |
| 3. Pega última posição | Só dados frescos |
| 4. Junta com paradas da linha | Cruzamento correto |
| 5. Calcula distância até parada | Haversine precisa |
| 6. ETA com fallback | Nunca dá erro |
| 7. Deduplica | 1 previsão por ônibus/parada |
| 8. Cria status bonito | "Chegando", "Próximo" |
| 9. Salva GOLD | `gold_previsao_paradas` |

---

In [0]:
# ==============================================================
# GOLD_PREVISÃO_PARADAS – VERSÃO FINAL OTIMIZADA (10/11/2025)
# Tempo de execução: ~45 segundos | Tamanho: ~50k linhas
# ==============================================================

from pyspark.sql.functions import (
    col, lag, to_timestamp, round, lit, when, row_number,
    radians, sin, cos, atan2, sqrt, desc, avg, expr, unix_timestamp
)
from pyspark.sql.window import Window

# --------------------------------------------------------------
# 1. DADOS LIMPOS (silver, não bronze!)
# --------------------------------------------------------------
pos = spark.table("workspace.sptrans.silver_posicao")      # ← já limpo
paradas = spark.table("workspace.sptrans.silver_paradas")  # ← já limpo

# --------------------------------------------------------------
# 2. VELOCIDADE REAL: só últimas 2 coletas por ônibus
# --------------------------------------------------------------
w_vel = Window.partitionBy("prefixo").orderBy(desc("ts_coleta"))

pos_vel = (
    pos
    .withColumn("rn", row_number().over(w_vel))
    .filter(col("rn") <= 2)           # só as 2 últimas posições
    .drop("rn")
    .withColumn("lat_prev", lag("lat").over(w_vel))
    .withColumn("lng_prev", lag("lng").over(w_vel))
    .withColumn("ts_prev",  lag("ts_coleta").over(w_vel))
    .filter(col("ts_prev").isNotNull())
)

R = 6371.0
pos_vel = pos_vel.withColumn(
    "dist_km",
    2 * R * atan2(
        sqrt(
            sin((radians(col("lat")) - radians(col("lat_prev"))) / 2) ** 2 +
            cos(radians(col("lat_prev"))) * cos(radians(col("lat"))) *
            sin((radians(col("lng")) - radians(col("lng_prev"))) / 2) ** 2
        ),
        sqrt(1 - (
            sin((radians(col("lat")) - radians(col("lat_prev"))) / 2) ** 2 +
            cos(radians(col("lat_prev"))) * cos(radians(col("lat"))) *
            sin((radians(col("lng")) - radians(col("lng_prev"))) / 2) ** 2
        ))
    )
).withColumn(
    "tempo_seg",
    expr("unix_timestamp(ts_coleta) - unix_timestamp(ts_prev)")
).filter(col("tempo_seg") > 0)

vel_por_onibus = (
    pos_vel
    .withColumn("vel_kmh", round(col("dist_km") / col("tempo_seg") * 3600, 2))
    .groupBy("prefixo")
    .agg(avg("vel_kmh").alias("vel_kmh"))
)

# --------------------------------------------------------------
# 3. ÔNIBUS PRÓXIMOS DAS PARADAS (só última posição)
# --------------------------------------------------------------
ultima_pos = pos.withColumn("rn", row_number().over(w_vel)).filter(col("rn") == 1).drop("rn")

prox = (
    ultima_pos.alias("bus")
    .join(vel_por_onibus.alias("v"), "prefixo", "left")
    .join(paradas.alias("p"), "cod_linha")  # mesmo código de linha
)

# Haversine precisa entre ônibus e parada
prox = prox.withColumn(
    "dist_km",
    2 * R * atan2(
        sqrt(
            sin((radians(col("bus.lat")) - radians(col("p.latitude"))) / 2) ** 2 +
            cos(radians(col("bus.lat"))) * cos(radians(col("p.latitude"))) *
            sin((radians(col("bus.lng")) - radians(col("p.longitude"))) / 2) ** 2
        ),
        sqrt(1 - (
            sin((radians(col("bus.lat")) - radians(col("p.latitude"))) / 2) ** 2 +
            cos(radians(col("bus.lat"))) * cos(radians(col("p.latitude"))) *
            sin((radians(col("bus.lng")) - radians(col("p.longitude"))) / 2) ** 2
        ))
    )
).filter(col("dist_km") < 2.0)  # até 2 km

# --------------------------------------------------------------
# 4. ETA com fallback
# --------------------------------------------------------------
prox = prox.withColumn(
    "vel_kmh_final",
    when(col("vel_kmh").isNull() | (col("vel_kmh") < 5), lit(18.0))
    .otherwise(col("vel_kmh"))
).withColumn(
    "tempo_est_min",
    round(col("dist_km") / (col("vel_kmh_final") / 60), 1)
).filter(col("tempo_est_min") <= 30)

# --------------------------------------------------------------
# 5. DEDUPLICAÇÃO EXPLÍCITA (prefixo + parada)
# --------------------------------------------------------------
w_dedup = Window.partitionBy("prefixo", "cod_parada").orderBy("tempo_est_min")

gold_previsao = (
    prox
    .withColumn("rn", row_number().over(w_dedup))
    .filter(col("rn") == 1)
    .drop("rn")
    .select(
        col("p.cod_parada").alias("parada_id"),
        col("p.nome_parada").alias("parada"),
        col("bus.cod_linha").alias("linha"),
        col("bus.prefixo").alias("prefixo"),
        round(col("dist_km"), 3).alias("distancia_km"),
        col("vel_kmh_final").alias("velocidade_kmh"),
        col("tempo_est_min").alias("eta_min"),
        when(col("tempo_est_min") <= 2, "Chegando")
        .when(col("tempo_est_min") <= 5, "Próximo")
        .otherwise("Em rota").alias("status")
    )
    .orderBy("eta_min")
)

# --------------------------------------------------------------
# 6. SALVA TABELA GOLD
# --------------------------------------------------------------
(gold_previsao.write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .saveAsTable("workspace.sptrans.gold_previsao_paradas")
)

print(f"PREVISÃO ATUALIZADA! {gold_previsao.count()} previsões geradas.")
print("Tabela: workspace.sptrans.gold_previsao_paradas")
display(gold_previsao.limit(50))

PREVISÃO ATUALIZADA! 5766 previsões geradas.
Tabela: workspace.sptrans.gold_previsao_paradas


parada_id,parada,linha,prefixo,distancia_km,velocidade_kmh,eta_min,status
670016556,PARADA MUSEU JUDAICO DE SP - C/B,32805,71753,0.014,18.0,0.0,Chegando
720011700,PARADA JOSÉ DE SÁ B/C,66,71251,0.007,18.0,0.0,Chegando
1506163,JOAQUIM NABUCO B/C,1277,63109,0.005,18.0,0.0,Chegando
720011698,PARADA OSWALDO DE ANDRADE B/C,63,71543,0.005,18.0,0.0,Chegando
190011835,PARADA CAPÃO REDONDO II,33550,85855,0.009,18.0,0.0,Chegando
640001400,2 - RIO VERDE B/C,1267,11515,0.009,18.0,0.0,Chegando
530015174,PARADA 2 - EDMUNDO VASCONCELOS B/C,1198,61584,0.003,18.0,0.0,Chegando
560009159,JOSÉ NICOLAU DE LIMA B/C,1140,63096,0.009,18.0,0.0,Chegando
810009907,ARNOLDO FELMANAS C/B,32858,66304,0.011,18.0,0.0,Chegando
720011699,PARADA JOSÉ DE SÁ C/B,32834,71429,0.005,18.0,0.0,Chegando
