In [0]:
from pyspark.sql import functions as F

# Carrega a tabela da camada silver
def read_silver():
    df_silver = spark.table("silver.ipca_silver")
    return df_silver

# Trata coluna ano
def year(df_silver):
    df_silver = df_silver.withColumn("ano", F.year(F.col("data")))
    return df_silver

# Agrupa por ano e calcula o IPCA acumulado (soma dos valores mensais)
def accumulated(df_silver):
    return(
        df_silver.groupBy("ano")
        .agg(F.sum("valor").alias("ipca_acumulado"))
        .orderBy("ano")
        # Filtra últimos 20 anos
        .filter(F.col("ano") >= (F.year(F.current_date()) - 20))
    )

# Salva a tabela IPCA acumulado na camada gold
def write_accumulated(df_anual):
    (df_anual.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("gold.ipca_anual"))

# KPI - média dos últimos 5 anos, desconsiderando 2020 e 2021
def average_5_years(df_anual):
    ano_atual = int(df_anual.agg(F.max("ano")).first()[0])
    ultimos_5_anos = [ano for ano in range(ano_atual - 4, ano_atual + 1) if ano not in [2020, 2021]]
    return df_anual.filter(F.col("ano").isin(ultimos_5_anos)).agg(F.avg("ipca_acumulado")).first()[0]

# Ano com maior IPCA
def bigger_year(df_anual):
    row = df_anual.orderBy(F.desc("ipca_acumulado")).first()
    return row["ano"], row["ipca_acumulado"]


# Ano com menor IPCA
def smaller_year(df_anual):
    row = df_anual.orderBy(F.asc("ipca_acumulado")).first()
    return row["ano"], row["ipca_acumulado"]

# Cria um dataframe com os KPIs
def kpi_dataframe(media_5_anos, maior_ipca_ano, maior_ipca_valor, menor_ipca_ano, menor_ipca_valor):
    kpi_data = [(media_5_anos, maior_ipca_ano, maior_ipca_valor, menor_ipca_ano, menor_ipca_valor)]
    kpi_columns = [
        "media_ipca_5_anos_sem_pandemia",
        "maior_ipca_ano",
        "maior_ipca_valor",
        "menor_ipca_ano",
        "menor_ipca_valor"
    ]
    return spark.createDataFrame(kpi_data, kpi_columns)

# Salva a tabela de KPIs na camada gold
def write_kpis(df_kpi):
    (df_kpi.write
        .format("delta")
        .mode("overwrite")
        .saveAsTable("gold.ipca_kpis")
    )

# Executa o passo a passo e salva em duas tabelas
def main():
    df_silver = read_silver()
    df_silver = year(df_silver)
    df_anual = accumulated(df_silver)
    write_accumulated(df_anual)
    media_5_anos = average_5_years(df_anual)
    maior_ipca_ano, maior_ipca_valor = bigger_year(df_anual)
    menor_ipca_ano, menor_ipca_valor = smaller_year(df_anual)
    df_kpi = kpi_dataframe(media_5_anos, maior_ipca_ano, maior_ipca_valor, menor_ipca_ano, menor_ipca_valor)
    write_kpis(df_kpi)
    return df_anual, df_kpi
main()