# Camada Bronze - Padronização SIAPE

---

## Objetivo
Este notebook é responsável por ingerir os arquivos CSV brutos armazenados na Staging Area (Volumes), aplicar um tratamento estrutural primário de esquema e salvá-los como Tabelas Gerenciadas no formato **Delta Lake**. O foco da Camada Bronze é manter os dados em sua granularidade original (raw), adicionando apenas metadados de governança.

## Decisões de Engenharia e Governança
1. **Tratamento Dinâmico de Esquema:** Os cabeçalhos originais do governo possuem caracteres inválidos para o formato Parquet/Delta (espaços, parênteses, etc.). A função `sanitize_column_names` utiliza Regex para higienizar todas as colunas de forma escalável e independente do domínio.
2. **Data Lineage (Rastreabilidade):** Atendendo ao requisito do desafio, foi incorporada a coluna `arquivo_origem` (utilizando metadados nativos do Spark `_metadata.file_path`) para garantir a rastreabilidade exata do arquivo de origem de cada registro.
3. **Encoding de Legado:** Configuração explícita de leitura em `ISO-8859-1` e separador `;` para garantir a integridade de caracteres da língua portuguesa.
4. **Native Partition Discovery:** O pipeline reconhece automaticamente a estrutura física de pastas `posicao_base=YYYYMM` gerada na etapa de Extract & Load, particionando o Delta Lake de forma otimizada para consultas temporais.

---
**Origem:** Arquivos CSV particionados (`/Volumes/workspace/default/staging-siape-tables`)

**Destino:** Delta Tables Gerenciadas (`public_informations.bronze_...`)

In [0]:
from pyspark.sql import functions as F
import logging
import re

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(message)s')
logger = logging.getLogger(__name__)

VOLUME_PATH = "/Volumes/workspace/default/staging-siape-tables"
SCHEMA_NAME = "public_informations"
DOMAINS = ["cadastro", "remuneracao", "afastamentos", "observacoes"]

spark.sql(f"CREATE DATABASE IF NOT EXISTS {SCHEMA_NAME}")

def sanitize_column_names(df):
    """
    Remove caracteres inválidos do Delta Lake e substitui espaços por underline.
    Exemplo: 'REMUNERAÇÃO BÁSICA (R$)' vira 'REMUNERAÇÃO_BÁSICA_R$'
    """
    for col_name in df.columns:
        # Substitui os caracteres proibidos do Delta e espaços por '_'
        clean_name = re.sub(r'[ ,;{}()\n\t=]+', '_', col_name)
        # Remove underlines duplicados ou no final
        clean_name = re.sub(r'_+', '_', clean_name).strip('_')
        
        df = df.withColumnRenamed(col_name, clean_name)
    return df

def run_bronze_from_partitioned_volumes():
    for domain in DOMAINS:
        logger.info(f"Processando domínio: {domain.upper()}")
        
        source_path = f"{VOLUME_PATH}/{domain}/"
        table_name = f"{SCHEMA_NAME}.bronze_{domain}"
        
        try:
            df_raw = (spark.read
                      .format("csv")
                      .option("header", "true")
                      .option("sep", ";")
                      .option("encoding", "ISO-8859-1")
                      .load(source_path))
            
            df_clean_cols = sanitize_column_names(df_raw)
            
            df_bronze = (df_clean_cols
                         .withColumn("arquivo_origem", F.col("_metadata.file_path"))
                         .withColumn("timestamp_carga_bronze", F.current_timestamp()))
            
            (df_bronze.write
             .format("delta")
             .mode("overwrite") 
             .partitionBy("posicao_base")
             .saveAsTable(table_name))
            
            logger.info(f" -> ✅ Tabela {table_name} catalogada com sucesso!")
            
        except Exception as e:
            logger.error(f" -> ❌ Erro ao processar {domain.upper()}: {e}", exc_info=False)

if __name__ == "__main__":
    logger.info("Iniciando processamento da Camada Bronze...")
    run_bronze_from_partitioned_volumes()
    logger.info("Camada Bronze finalizada!")

2026-02-25 22:01:07,653 - Iniciando processamento da Camada Bronze...
2026-02-25 22:01:07,653 - Processando domínio: CADASTRO
2026-02-25 22:01:48,627 -  -> ✅ Tabela public_informations.bronze_cadastro catalogada com sucesso!
2026-02-25 22:01:48,627 - Processando domínio: REMUNERACAO
2026-02-25 22:02:00,577 -  -> ✅ Tabela public_informations.bronze_remuneracao catalogada com sucesso!
2026-02-25 22:02:00,577 - Processando domínio: AFASTAMENTOS
2026-02-25 22:02:06,417 -  -> ✅ Tabela public_informations.bronze_afastamentos catalogada com sucesso!
2026-02-25 22:02:06,417 - Processando domínio: OBSERVACOES
2026-02-25 22:02:12,255 -  -> ✅ Tabela public_informations.bronze_observacoes catalogada com sucesso!
2026-02-25 22:02:12,256 - Camada Bronze finalizada!
