In [7]:
from pyspark.sql import functions as F

# Caminho físico do lakehouse de destino
path_destino = "abfss://ws_tecnologia_infomacao@onelake.dfs.fabric.microsoft.com/lk_tecnologia_informacao.Lakehouse/Tables/tab_gold_fato_processos_execucoes_mensais"

# Leitura das tabelas Delta de origem
def_proces = spark.read.format("delta").load(
    "abfss://ws_dados_central@onelake.dfs.fabric.microsoft.com/lk_fluig_database.Lakehouse/Tables/def_proces"
)
def_proces.createOrReplaceTempView("def_proces")

proces_workflow = spark.read.format("delta").load(
    "abfss://ws_dados_central@onelake.dfs.fabric.microsoft.com/lk_fluig_database.Lakehouse/Tables/proces_workflow"
)
proces_workflow.createOrReplaceTempView("proces_workflow")

# ============================================================
# 1) Cria tabela "salted" para reduzir Data Skew em COD_DEF_PROCES
# ============================================================

# Número de "baldes" (salts). Pode aumentar se ainda tiver skew forte.
NUM_SALTS = 50

proces_salted_df = (
    proces_workflow
        # Limita pelos registros relevantes (até ontem)
        .where(F.col("START_DATE") <= F.date_sub(F.current_date(), 1))
        # Mês de referência truncado
        .withColumn("MES_REF", F.date_trunc("month", F.col("START_DATE")))
        # Salt baseado em COD_DEF_PROCES + NUM_PROCES
        .withColumn(
            "salt",
            F.pmod(F.abs(F.hash(F.col("COD_DEF_PROCES"), F.col("NUM_PROCES"))), F.lit(NUM_SALTS))
        )
)

# Repartitiona usando as colunas críticas para distribuir melhor os dados
proces_salted_df = proces_salted_df.repartition(
    "COD_DEF_PROCES", "MES_REF", "salt"
)

# Cria view temporária para usar no SQL
proces_salted_df.createOrReplaceTempView("proces_salted")

# ============================================================
# 2) SQL com meses, agregação salted e join com def_proces
# ============================================================

df_resultado = spark.sql(f"""
WITH meses AS (
    SELECT
        explode(
            sequence(
                to_date('2025-01-01'),
                trunc(date_sub(current_date, 1), 'month'),
                interval 1 month
            )
        ) AS dt
),

-- 1) Primeiro agrupamento: por COD_DEF_PROCES + MES_REF + SALT
pw_agg_salted AS (
    SELECT
        COD_DEF_PROCES,
        MES_REF,
        salt,

        COUNT(NUM_PROCES) AS QTD_PROCESSOS,

        SUM(CASE WHEN STATUS = 2 THEN 1 ELSE 0 END) AS QTD_CONCLUIDOS,
        SUM(CASE WHEN STATUS = 1 THEN 1 ELSE 0 END) AS QTD_CANCELADOS,
        SUM(
            CASE 
                WHEN STATUS = 0
                     OR (STATUS IS NOT NULL AND END_DATE IS NULL)
                THEN 1 ELSE 0
            END
        ) AS QTD_EM_ANDAMENTO
    FROM proces_salted
    GROUP BY
        COD_DEF_PROCES,
        MES_REF,
        salt
),

-- 2) Segundo agrupamento: remove o SALT (volta para o nível do processo/mês)
pw_agg AS (
    SELECT
        COD_DEF_PROCES,
        MES_REF,

        SUM(QTD_PROCESSOS)    AS QTD_PROCESSOS,
        SUM(QTD_CONCLUIDOS)   AS QTD_CONCLUIDOS,
        SUM(QTD_CANCELADOS)   AS QTD_CANCELADOS,
        SUM(QTD_EM_ANDAMENTO) AS QTD_EM_ANDAMENTO
    FROM pw_agg_salted
    GROUP BY
        COD_DEF_PROCES,
        MES_REF
),

-- 3) Gera grade de (mês x processo)
grade AS (
    SELECT
        dp.COD_DEF_PROCES,
        dp.DES_DEF_PROCES AS PROCESSO,
        dp.COD_CATEG      AS SETOR,
        CASE 
            WHEN dp.LOG_ATIV = 1 THEN 'Ativo' ELSE 'Inativo' 
        END               AS SITUACAO,
        m.dt              AS MES_REF
    FROM meses m
    CROSS JOIN def_proces dp
)

-- 4) Junta grade + agregados
SELECT
    g.PROCESSO,
    g.SETOR,
    g.SITUACAO,
    year(g.MES_REF)  AS ANO,
    month(g.MES_REF) AS MES,

    COALESCE(pw.QTD_PROCESSOS,    0) AS QTD_PROCESSOS,
    COALESCE(pw.QTD_CONCLUIDOS,   0) AS QTD_CONCLUIDOS,
    COALESCE(pw.QTD_CANCELADOS,   0) AS QTD_CANCELADOS,
    COALESCE(pw.QTD_EM_ANDAMENTO, 0) AS QTD_EM_ANDAMENTO

FROM grade g
LEFT JOIN pw_agg pw
    ON pw.COD_DEF_PROCES = g.COD_DEF_PROCES
   AND pw.MES_REF        = g.MES_REF
""")

# ============================================================
# 3) Escrita da tabela destino em Delta
# ============================================================

df_resultado.write.format("delta").mode("overwrite").save(path_destino)


StatementMeta(, 59606aae-4220-489a-bfd2-d729d504e6b3, 9, Finished, Available, Finished)