In [1]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ============================================================
# LER A SILVER (AGORA NO dbo_1)
# ============================================================
df = spark.read.table("dbo_1.voos_silver_v2")

# ============================================================
# FUNÇÃO PARA IDs DETERMINÍSTICOS
# ============================================================
def add_int_id(df, id_col_name, order_col):
    window = Window.orderBy(order_col)
    return df.withColumn(id_col_name, F.row_number().over(window).cast("int"))

# ============================================================
# 1) DIM DATA (REFERÊNCIA)
# ============================================================

dim_data = (
    df.select("dt_referencia")
      .dropDuplicates()
)

dim_data = add_int_id(dim_data, "id_data", "dt_referencia")

dim_data.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_data")

# ============================================================
# 2) DIM COMPANHIA
# ============================================================

dim_companhia = (
    df.select("sg_empresa_icao")
      .dropDuplicates()
)

dim_companhia = add_int_id(dim_companhia, "id_companhia", "sg_empresa_icao")

dim_companhia.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_companhia")

# ============================================================
# 3) DIM AERONAVE
# ============================================================

dim_aeronave = (
    df.select("sg_equipamento_icao")
      .dropDuplicates()
)

dim_aeronave = add_int_id(dim_aeronave, "id_aeronave", "sg_equipamento_icao")

dim_aeronave.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_aeronave")

# ============================================================
# 4) DIM TIPO SERVIÇO
# ============================================================

dim_tipo_servico = (
    df.select("ds_tipo_servico")
      .dropDuplicates()
)

dim_tipo_servico = add_int_id(dim_tipo_servico, "id_tipo_servico", "ds_tipo_servico")

dim_tipo_servico.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_tipo_servico")

# ============================================================
# 5) DIM AEROPORTO ORIGEM
# ============================================================

dim_aeroporto_origem = (
    df.select(F.col("sg_icao_origem"))
      .dropDuplicates()
)

dim_aeroporto_origem = add_int_id(
    dim_aeroporto_origem,
    "id_aeroporto_origem",
    "sg_icao_origem"
)

dim_aeroporto_origem.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_aeroporto_origem")

# ============================================================
# 6) DIM AEROPORTO DESTINO
# ============================================================

dim_aeroporto_destino = (
    df.select(F.col("sg_icao_destino"))
      .dropDuplicates()
)

dim_aeroporto_destino = add_int_id(
    dim_aeroporto_destino,
    "id_aeroporto_destino",
    "sg_icao_destino"
)

dim_aeroporto_destino.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_aeroporto_destino")

# ============================================================
# 7) DIM DATA PARTIDA
# ============================================================

dim_data_partida = (
    df.select(F.to_date("dt_partida_prevista_utc").alias("data_partida"))
      .dropDuplicates()
      .orderBy("data_partida")
)

dim_data_partida = add_int_id(dim_data_partida, "id_data_partida", "data_partida")

dim_data_partida.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_data_partida")

# ============================================================
# 8) DIM DATA CHEGADA
# ============================================================

dim_data_chegada = (
    df.select(F.to_date("dt_chegada_prevista_utc").alias("data_chegada"))
      .dropDuplicates()
      .orderBy("data_chegada")
)

dim_data_chegada = add_int_id(dim_data_chegada, "id_data_chegada", "data_chegada")

dim_data_chegada.write.format("delta").mode("overwrite").saveAsTable("dbo.dim_data_chegada")

# ============================================================
# 9) FATO VOOS (JOIN COMPLETO)
# ============================================================

fato = (
    df
    .join(dim_data, "dt_referencia", "left")
    .join(dim_companhia, "sg_empresa_icao", "left")
    .join(dim_aeronave, "sg_equipamento_icao", "left")
    .join(dim_tipo_servico, "ds_tipo_servico", "left")
    .join(dim_aeroporto_origem, "sg_icao_origem", "left")
    .join(dim_aeroporto_destino, "sg_icao_destino", "left")
    .join(dim_data_partida,
          F.to_date("dt_partida_prevista_utc") == F.col("data_partida"),
          "left")
    .join(dim_data_chegada,
          F.to_date("dt_chegada_prevista_utc") == F.col("data_chegada"),
          "left")
)

fato_final = fato.select(
    F.col("id_data").cast("int"),
    F.col("id_companhia").cast("int"),
    F.col("id_aeronave").cast("int"),
    F.col("id_tipo_servico").cast("int"),
    F.col("id_aeroporto_origem").cast("int"),
    F.col("id_aeroporto_destino").cast("int"),
    F.col("id_data_partida").cast("int"),
    F.col("id_data_chegada").cast("int"),

    "qt_assentos_previstos",
    "dt_partida_prevista_utc",
    "dt_chegada_prevista_utc"
)

fato_final.write.format("delta").mode("overwrite").saveAsTable("dbo.fato_voos")



StatementMeta(, f320ddfe-5e2a-4872-88fd-eb5aaa61c6b6, 3, Finished, Available, Finished)

In [3]:
from pyspark.sql import functions as F

# Carrega a tabela fato
df = spark.read.table("dbo.fato_voos")

# Datas mínimas e máximas
min_date = df.select(F.min("dt_partida_prevista_utc")).first()[0].date()
max_date = df.select(F.max("dt_chegada_prevista_utc")).first()[0].date()

# Quantidade de dias no intervalo
num_days = (max_date - min_date).days

# Cria intervalo contínuo de datas
calendar = (
    spark.range(0, num_days + 1)
        .withColumn("id_int", F.col("id").cast("int"))
        .withColumn("Date", F.expr(f"date_add('{min_date}', id_int)"))
        .drop("id", "id_int")
)

# Adiciona colunas de calendário
calendar = (
    calendar
        .withColumn("Ano", F.year("Date"))
        .withColumn("Mes", F.month("Date"))
        .withColumn("Dia", F.dayofmonth("Date"))
        .withColumn("AnoMes", F.date_format("Date", "yyyy-MM"))
)

# Salva tabela no schema dbo com nome Calendar
calendar.write.format("delta").mode("overwrite").saveAsTable("dbo.Calendar")




StatementMeta(, f320ddfe-5e2a-4872-88fd-eb5aaa61c6b6, 5, Finished, Available, Finished)