# Gold loader: promover Silver -> Gold

Este notebook lê tabelas Delta da camada `silver` e grava tabelas na camada `gold` usando os prefixos `dim_` (dimensões) e `fact_` (fatos).

Notas:
- Mantém convenções de nome do catálogo e schema (ex.: `mkl_bank.default.silver_<table>`).
- Em ambientes sem Databricks, passos de otimização (OPTIMIZE / ZORDER) serão ignorados.
- Execute este notebook em um ambiente com Spark e Delta habilitados.

In [None]:
# Imports e configuração
from pyspark.sql.functions import current_timestamp, col, to_date, lit, monotonically_increasing_id
import pyspark.sql.functions as F
import uuid

# Garantir sessão Spark disponível (em Databricks já existe 'spark')
catalog_name = "mkl_bank"
silver_schema = "default"
gold_schema = "gold"

# Opções de escrita comuns
WRITE_FORMAT = "delta"
WRITE_MODE = "overwrite"  # conforme especificado no outline

# Identificador de execução / carga
batch_id = str(uuid.uuid4())
load_ts = current_timestamp()

print(f"Gold loader initialized. catalog={catalog_name}, silver_schema={silver_schema}, gold_schema={gold_schema}, batch_id={batch_id}")

In [None]:
# Mapear tabelas silver -> gold (nome e tipo)
mapping = {
    'silver_agencia': ('dim_agencia', 'dim'),
    'silver_cliente': ('dim_cliente', 'dim'),
    'silver_conta': ('dim_conta', 'dim'),
    'silver_cartao': ('dim_cartao', 'dim'),
    'silver_chave_pix': ('dim_chave_pix', 'dim'),
    'silver_transacao_conta': ('fact_transacao', 'fact'),
    'silver_movimentacao_transferencia_realizada': ('fact_transferencia_realizada', 'fact'),
    'silver_movimentacao_transferencia_recebida': ('fact_transferencia_recebida', 'fact'),
    'silver_movimentacao_pix_realizado': ('fact_pix_realizado', 'fact'),
    'silver_movimentacao_pix_recebido': ('fact_pix_recebido', 'fact'),
    'silver_movimentacao_deposito_recebido': ('fact_deposito_recebido', 'fact'),
    'silver_movimentacao_pagamento_cartao': ('fact_pagamento_cartao', 'fact'),
    'silver_movimentacao_boleto_pago': ('fact_boleto_pago', 'fact')
}

# Chaves naturais/PK esperadas para dimensões (usar os IDs conforme indicado)
# O usuário pediu para considerar as chaves naturais como os campos id_* conforme os schemas
natural_keys = {
    'dim_agencia': 'codigo_agencia',   # agência usa código
    'dim_cliente': 'id_cliente',        # cliente: id_cliente
    'dim_conta': 'id_conta',            # conta: id_conta
    'dim_cartao': 'id_cartao',          # cartao: id_cartao
    'dim_chave_pix': 'id_chave_pix'     # chave pix: id_chave_pix
}

# Lista de tabelas a processar (por conveniência)
tables_to_process = list(mapping.keys())
print(f"Tables to process: {tables_to_process}")

In [None]:
# Ler tabelas Silver para DataFrames (sem cache para evitar PERSIST TABLE em serverless)
dfs = {}
for silver_name in tables_to_process:
    full_name = f"{catalog_name}.{silver_schema}.{silver_name}"
    try:
        if not spark.catalog.tableExists(full_name):
            print(f"Table {full_name} does not exist, skipping")
            dfs[silver_name] = None
            continue

        # leitura direta sem cache/persist
        df = spark.table(full_name)
        dfs[silver_name] = df

        # contar linhas é opcional; em ambientes serverless pode ser custoso, mas tentamos com tratamento
        try:
            cnt = df.count()
        except Exception as e_count:
            cnt = None
            print(f"Warning counting rows for {full_name}: {e_count}")

        print(f"Loaded {full_name} -> {cnt} rows")
    except Exception as e:
        dfs[silver_name] = None
        print(f"Warning: could not load {full_name}: {e}")

In [None]:
# Transformar e carregar dimensões (dim_)
from pyspark.sql.utils import AnalysisException

for silver_name, (gold_name, typ) in mapping.items():
    if typ != 'dim':
        continue

    df = dfs.get(silver_name)
    if df is None:
        print(f"Skipping {silver_name}: no DataFrame")
        continue

    print(f"Processing dimension {silver_name} -> {gold_name}")
    nat_key = natural_keys.get(gold_name)

    # dedupe por chave natural quando disponível
    if nat_key and nat_key in df.columns:
        df_out = df.dropDuplicates([nat_key])
    else:
        df_out = df.dropDuplicates()

    # adicionar surrogate key e timestamp de carga
    df_out = df_out.withColumn('surrogate_id', monotonically_increasing_id()).withColumn('load_datetime', current_timestamp())

    # Normalização específica: criar chave canônica para cartão (cartao_key)
    # Isso permite usar numero_cartao ou id_cartao nos fatos e juntar consistentemente.
    if gold_name == 'dim_cartao':
        df_out = df_out.withColumn('cartao_key', F.coalesce(col('numero_cartao'), col('id_cartao').cast('string')))

    target_table = f"{catalog_name}.{gold_schema}.{gold_name}"
    try:
        df_out.write.format(WRITE_FORMAT).mode(WRITE_MODE).option('overwriteSchema','true').saveAsTable(target_table)
        print(f"Wrote dimension table {target_table} (rows={spark.table(target_table).count()})")
    except AnalysisException as e:
        print(f"Failed to write {target_table}: {e}")
    except Exception as e:
        print(f"Unexpected error writing {target_table}: {e}")

In [None]:
# Transformar e carregar fatos (fact_) com particionamento por event_date quando aplicável
from pyspark.sql import DataFrame

def enrich_with_dim(df_fact: DataFrame, dim_table: str, fact_cols_candidates: list, dim_cols_candidates: list, fk_alias: str) -> DataFrame:
    """Tenta encontrar uma coluna em comum entre df_fact e dim_table para fazer o join
    - fact_cols_candidates: lista de nomes esperados no fato (prioridade)
    - dim_cols_candidates: lista de nomes esperados na dimensão (prioridade)
    - fk_alias: nome da coluna surrogate id a ser criada no fato (ex: 'conta_sk')
    """
    # verificar se fato tem pelo menos uma das colunas candidatas
    fact_col = next((c for c in fact_cols_candidates if c in df_fact.columns), None)
    if fact_col is None:
        return df_fact

    if not spark.catalog.tableExists(dim_table):
        print(f"{dim_table} not found, skipping {fk_alias}")
        return df_fact

    dim_df = spark.table(dim_table)

    # escolher coluna da dimensão para juntar
    if fact_col in dim_df.columns:
        dim_key = fact_col
    else:
        dim_key = next((c for c in dim_cols_candidates if c in dim_df.columns), None)

    if dim_key is None:
        print(f"No suitable join key found between fact and {dim_table} for {fk_alias} (fact_col={fact_col})")
        return df_fact

    # Evitar colisão de nomes de colunas renomeando a coluna da dimensão antes do join
    dim_prefixed = f"dim_{dim_key}"
    dim_sel = dim_df.select(col(dim_key).alias(dim_prefixed), col('surrogate_id').alias(fk_alias))

    # Fazer join usando a coluna renomeada
    df_joined = df_fact.join(dim_sel, df_fact[fact_col] == dim_sel[dim_prefixed], how='left')

    # Remover coluna temporária da dimensão
    if dim_prefixed in df_joined.columns:
        df_joined = df_joined.drop(dim_prefixed)

    return df_joined


for silver_name, (gold_name, typ) in mapping.items():
    if typ != 'fact':
        continue

    df = dfs.get(silver_name)
    if df is None:
        print(f"Skipping {silver_name}: no DataFrame")
        continue

    print(f"Processing fact {silver_name} -> {gold_name}")
    df_fact = df

    # tentar extrair data do evento
    if 'data_transacao' in df_fact.columns:
        df_fact = df_fact.withColumn('event_date', to_date(col('data_transacao')))
    elif 'data' in df_fact.columns:
        df_fact = df_fact.withColumn('event_date', to_date(col('data')))
    else:
        df_fact = df_fact.withColumn('event_date', lit(None).cast('date'))

    # dedupe por id_transacao quando disponível
    if 'id_transacao' in df_fact.columns:
        df_fact = df_fact.dropDuplicates(['id_transacao'])

    df_fact = df_fact.withColumn('load_datetime', current_timestamp())

    # --- Normalizar e enriquecer fatos com chaves estrangeiras (surrogate_id das dimensões) ---
    # Normalizar cartão: criar cartao_key no fato também (coalesce)
    if 'numero_cartao' in df_fact.columns or 'id_cartao' in df_fact.columns:
        df_fact = df_fact.withColumn('cartao_key', F.coalesce(col('numero_cartao'), col('id_cartao').cast('string')))

    # Conta: id_conta, id_conta
    df_fact = enrich_with_dim(
        df_fact,
        f"{catalog_name}.{gold_schema}.dim_conta",
        ['id_conta', 'id_conta'],
        ['id_conta', 'id_conta'],
        'conta_sk'
    )

    # Cartão: preferir cartao_key criado; suporte numero_cartao/id_cartao como fallback
    df_fact = enrich_with_dim(
        df_fact,
        f"{catalog_name}.{gold_schema}.dim_cartao",
        ['cartao_key', 'numero_cartao', 'id_cartao'],
        ['cartao_key', 'numero_cartao', 'id_cartao'],
        'cartao_sk'
    )

    # Chave PIX: id_chave_pix, valor_chave
    df_fact = enrich_with_dim(
        df_fact,
        f"{catalog_name}.{gold_schema}.dim_chave_pix",
        ['id_chave_pix', 'valor_chave'],
        ['id_chave_pix', 'valor_chave'],
        'chave_pix_sk'
    )

    # Cliente: id_cliente, cpf
    df_fact = enrich_with_dim(
        df_fact,
        f"{catalog_name}.{gold_schema}.dim_cliente",
        ['id_cliente', 'cpf'],
        ['id_cliente', 'cpf'],
        'cliente_sk'
    )

    # Agência: codigo_agencia
    df_fact = enrich_with_dim(
        df_fact,
        f"{catalog_name}.{gold_schema}.dim_agencia",
        ['codigo_agencia'],
        ['codigo_agencia'],
        'agencia_sk'
    )

    # Escrever fato enriquecido
    target_table = f"{catalog_name}.{gold_schema}.{gold_name}"
    try:
        if 'event_date' in df_fact.columns:
            df_fact.write.format(WRITE_FORMAT).mode(WRITE_MODE).option('overwriteSchema','true').partitionBy('event_date').saveAsTable(target_table)
        else:
            df_fact.write.format(WRITE_FORMAT).mode(WRITE_MODE).option('overwriteSchema','true').saveAsTable(target_table)
        print(f"Wrote fact table {target_table} (rows={spark.table(target_table).count()})")
    except Exception as e:
        print(f"Failed to write {target_table}: {e}")


In [None]:
# Verificação de counts, checagens básicas de qualidade e integridade referencial
for silver_name, (gold_name, typ) in mapping.items():
    silver_full = f"{catalog_name}.{silver_schema}.{silver_name}"
    gold_full = f"{catalog_name}.{gold_schema}.{gold_name}"
    try:
        s_cnt = spark.table(silver_full).count()
    except Exception:
        s_cnt = None
    try:
        g_cnt = spark.table(gold_full).count()
    except Exception:
        g_cnt = None

    print(f"{silver_full} -> silver_count={s_cnt}; {gold_full} -> gold_count={g_cnt}")

    # checar nulos nas chaves naturais para dimensões
    if typ == 'dim':
        pk = natural_keys.get(gold_name)
        if pk and spark.catalog.tableExists(gold_full):
            nulls = spark.table(gold_full).filter(col(pk).isNull()).count()
            print(f"Null key count for {gold_full}.{pk}: {nulls}")

    # checar integridade referencial mínima para fatos (FK surrogate ids criadas)
    if typ == 'fact':
        try:
            gdf = spark.table(gold_full)
            fk_candidates = [c for c in ['conta_sk','cartao_sk','chave_pix_sk','cliente_sk','agencia_sk'] if c in gdf.columns]
            for fk in fk_candidates:
                nnull = gdf.filter(col(fk).isNull()).count()
                print(f"FK nulls {gold_full}.{fk}: {nnull}")
        except Exception as e:
            print(f"Could not inspect FK columns for {gold_full}: {e}")

print("Verificações concluídas.")

In [None]:
# Opcional: otimização/compactação das tabelas Gold (Databricks)
for silver_name, (gold_name, typ) in mapping.items():
    gold_full = f"{catalog_name}.{gold_schema}.{gold_name}"
    try:
        print(f"Attempting OPTIMIZE on {gold_full}")
        spark.sql(f"OPTIMIZE {gold_full}")
        # z-order on natural key when available
        zkey = natural_keys.get(gold_name)
        if zkey:
            spark.sql(f"OPTIMIZE {gold_full} ZORDER BY ({zkey})")
    except Exception as e:
        print(f"OPTIMIZE skipped or not supported for {gold_full}: {e}")

print("Optional optimization step finished.")