In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, TimestampType
from delta.tables import DeltaTable

TABELA_ORIGEM = "v_credit.bronze.base_motivos"
TABELA_DESTINO_VALIDOS = "v_credit.silver.tb_motivo"
TABELA_DESTINO_INVALIDOS = "v_credit.silver.tb_motivo_invalidos"

CATEGORIAS = {
    "CADASTRAL": "cadastr|dados|telefone|email|vencimento|transferência|agência|cancelamento",
    "CARTAO": "cartão|bloqueio|desbloqueio|adicional|compra|contraç",
    "FINANCEIRO": "fatura|limite|contrato|renegociação|dívida|pagamento",
    "ATENDIMENTO": "aplicativo|app|site|sistema|erro|problema|técnico",
    "BENEFICIOS": "pontos|programa|benefício"
}

### Leitura e Classificacao Inteligente
Le ultima carga do Bronze e aplica transformacoes:
- **ds_categoria**: Classificacao automatica via regex (CADASTRAL, CARTAO, FINANCEIRO, ATENDIMENTO, BENEFICIOS)
- **ds_criticidade**: Padronizacao de valores (MEDIA, ALTA, BAIXA)

**Importante**: Classificacao automatica evita dependencia de cadastro manual na origem.

### Validacao de Qualidade
Adiciona flags de validacao para garantir que todos os campos criticos estao preenchidos.
Para registros invalidos, cria coluna **motivo_rejeicao** com descricao textual dos problemas.

### MERGE de Registros Validos
Faz MERGE (UPSERT) em tb_motivo. Remove flags de validacao antes do MERGE.

### Configuracao
Define origens, destinos e dicionario de regex para classificacao automatica de categorias.

In [None]:
from pyspark.sql import functions as F

df_bronze = spark.table(TABELA_ORIGEM)

max_dt_ingestao = (
    df_bronze
    .agg(F.max(F.col("ingestion_timestamp")).alias("max_ts"))
    .first()["max_ts"]
)

df_filtrado = df_bronze.filter(
    F.col("ingestion_timestamp") == F.lit(max_dt_ingestao)
)


df_transformado = (
    df_filtrado
    .select(
        F.col("id_motivo").cast("bigint").alias("cd_motivo"),
        F.regexp_replace(
            F.trim(F.col("nome_motivo")),
            r"Compra no autorizada",
            "Compra não autorizada"
        ).alias("ds_motivo"),

        F.when(
            F.lower(F.col("nome_motivo")).rlike(CATEGORIAS["CADASTRAL"]), 
            F.lit("CADASTRAL")
        ).when(
            F.lower(F.col("nome_motivo")).rlike(CATEGORIAS["CARTAO"]),
            F.lit("CARTAO")
        ).when(
            F.lower(F.col("nome_motivo")).rlike(CATEGORIAS["FINANCEIRO"]),
            F.lit("FINANCEIRO")
        ).when(
            F.lower(F.col("nome_motivo")).rlike(CATEGORIAS["ATENDIMENTO"]),
            F.lit("ATENDIMENTO")
        ).when(
            F.lower(F.col("nome_motivo")).rlike(CATEGORIAS["BENEFICIOS"]),
            F.lit("BENEFICIOS")
        ).otherwise(F.lit("NAO CLASSIFICADO")).alias("ds_categoria"),

        F.when(
            F.upper(F.trim(F.col("criticidade"))).isin(["MÉDIA", "MEDIA"]), 
            F.lit("MEDIA")
        ).when(
            F.upper(F.trim(F.col("criticidade"))) == "ALTA", 
            F.lit("ALTA")
        ).when(
            F.upper(F.trim(F.col("criticidade"))) == "BAIXA", 
            F.lit("BAIXA")
        ).alias("ds_criticidade"),

        F.col("ingestion_timestamp").alias("dt_ingestion"),
        F.coalesce(F.col("origem"), F.lit("base_motivos")).alias("dc_origem")
    )
    .dropDuplicates(["cd_motivo"])
)

print(f"✅ Transformação concluída")

In [0]:
df_validacao = (
    df_transformado
    .withColumn("flag_id_valido", F.col("cd_motivo").isNotNull())
    .withColumn(
        "flag_nome_valido", 
        (F.col("ds_motivo").isNotNull()) & (F.length(F.trim(F.col("ds_motivo"))) > 0)
    )
    .withColumn(
        "flag_categoria_valida", 
        (F.col("ds_categoria").isNotNull()) & (F.length(F.trim(F.col("ds_categoria"))) > 0)
    )
    .withColumn(
        "flag_criticidade_valida", 
        (F.col("ds_criticidade").isNotNull()) & (F.length(F.trim(F.col("ds_criticidade"))) > 0)
    )
    .withColumn(
        "flag_qualidade",
        F.when(
            F.col("flag_id_valido") &
            F.col("flag_nome_valido") &
            F.col("flag_categoria_valida") &
            F.col("flag_criticidade_valida"),
            F.lit("OK")
        ).otherwise(F.lit("ERRO"))
    )
)

df_silver = (
    df_validacao
    .filter(F.col("flag_qualidade") == "OK")
    .drop("flag_id_valido", "flag_nome_valido", "flag_categoria_valida", "flag_criticidade_valida", "flag_qualidade")
)

df_invalidos = (
    df_validacao
    .filter(F.col("flag_qualidade") == "ERRO")
    .withColumn(
        "motivo_rejeicao",
        F.concat_ws(
            "; ",
            F.when(~F.col("flag_id_valido"), F.lit("cd_motivo NULL")),
            F.when(~F.col("flag_nome_valido"), F.lit("ds_motivo vazio")),
            F.when(~F.col("flag_categoria_valida"), F.lit("ds_categoria vazio ou NULL")),
            F.when(~F.col("flag_criticidade_valida"), F.lit("ds_criticidade vazio ou NULL"))
        )
    )
    .drop("flag_id_valido", "flag_nome_valido", "flag_categoria_valida", "flag_criticidade_valida")
)

In [0]:
delta_table = DeltaTable.forName(spark, TABELA_DESTINO_VALIDOS)

delta_table.alias("target").merge(
    df_silver.alias("source"),
    "target.cd_motivo = source.cd_motivo"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()

print(f"✅ MERGE concluído: registros válidos em {TABELA_DESTINO_VALIDOS}")
