In [0]:
# Imports

import traceback
import unicodedata
import re
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Inicializar spark
spark = SparkSession.builder.getOrCreate()

# Funções Utilitárias
def log(msg: str, level: str = "INFO"):
    """Logger padronizado com timestamp."""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"[{timestamp}] [{level}] {msg}")

def normalize_col_name(col_name):
    """
    Remove acentos e caracteres especiais dos nomes das colunas.
    Ex: 'DT. NOTIFICAÇÃO' -> 'dt_notificacao'
    """
    # Normaliza unicode (remove acentos)
    n = unicodedata.normalize('NFKD', col_name).encode('ASCII', 'ignore').decode('utf-8').lower()
    # Substitui tudo que não for letra/número por underscore
    n = re.sub(r'[^a-z0-9]', '_', n)
    # Remove underscores duplicados e das pontas
    return re.sub(r'_+', '_', n).strip('_')

# Widgets
try:
    dbutils.widgets.text("catalog", "srag_dev", "1. Catálogo")
    dbutils.widgets.text("schema", "raw", "2. Schema")
    dbutils.widgets.text("table_name", "srag_raw", "3. Tabela Destino")
except:
    pass

# Parâmetros
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")
table_name = dbutils.widgets.get("table_name")

# Caminhos
volume_path = f"/Volumes/{catalog}/{schema}/raw/srag_downloads"
bronze_table = f"{catalog}.{schema}.{table_name}"

# Log Inicial
log("--- SETUP CARGA BRONZE (RAW -> DELTA) ---")
log(f"Origem: {volume_path}")
log(f"Destino: {bronze_table}")

[2026-01-07 18:56:45] [INFO] === SETUP CARGA BRONZE (RAW -> DELTA) ===
[2026-01-07 18:56:45] [INFO] Origem: /Volumes/srag_dev/raw/raw/srag_downloads
[2026-01-07 18:56:45] [INFO] Destino: srag_dev.raw.srag_raw


In [0]:
# Leitura e Tratamento

log("--- Iniciando leitura e transformação ---")

try:
    # Leitura CSV (DataSUS: ; e latin1)
    df_raw = (
        spark.read
        .format("csv")
        .option("header", "true")
        .option("delimiter", ";")          
        .option("encoding", "latin1")
        .option("inferSchema", "false")
        .option("recursiveFileLookup", "true") # Garante leitura de todos arquivos no path
        .load(volume_path)
    )

    # Padronização de Colunas
    # Aplica a função de normalização em todas as colunas
    new_cols = [normalize_col_name(c) for c in df_raw.columns]
    df_renamed = df_raw.toDF(*new_cols)

    # Resiliência de Tipos (Tudo String)
    df_casted = df_renamed.select(
        [F.col(c).cast(StringType()) for c in df_renamed.columns]
    )

    # Colunas de Auditoria
    df_final = df_casted \
        .withColumn("source_file", F.col("_metadata.file_path")) \
        .withColumn("ingestion_at", F.current_timestamp())

    # Validação
    if df_final.isEmpty():
        raise ValueError("O DataFrame está vazio. Verifique se há arquivos na pasta de origem.")

    log(f"Leitura concluída. Schema normalizado com {len(df_final.columns)} colunas.")
    
except Exception as e:
    log(f"FALHA NA LEITURA: {traceback.format_exc()}", level="ERROR")
    raise

[2026-01-07 18:56:45] [INFO] --- Iniciando leitura e transformação ---
[2026-01-07 18:57:17] [INFO] Leitura concluída. Schema normalizado com 196 colunas.


In [0]:
# Escrita Bronze

log(f"--- Escrevendo na tabela Delta: {bronze_table} ---")

try:
    (
        df_final.write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .option("userMetadata", "Carga Automática Pipeline SRAG") 
        .saveAsTable(bronze_table)
    )

    # Otimização
    spark.sql(f"OPTIMIZE {bronze_table}")

    # Coletando métricas
    total_rows = spark.table(bronze_table).count()
    log(f"✅ SUCESSO! Carga finalizada.", level="SUCCESS")
    log(f"Total de registros na Bronze: {total_rows}")
    
    # Governança
    spark.sql(f"""
        COMMENT ON TABLE {bronze_table} IS 
        'Camada Bronze (Raw) de dados de SRAG. Origem: CSVs OpenDataSUS. 
        Tratamento: Encoding Latin1 -> UTF8, Nomes de colunas normalizados (snake_case), Tipagem 100% String.'
    """)
    
except Exception as e:
    log(f"FALHA NA ESCRITA: {traceback.format_exc()}", level="CRITICAL")
    raise

[2026-01-07 18:58:19] [INFO] --- Escrevendo na tabela Delta: srag_dev.raw.srag_raw ---
[2026-01-07 18:58:39] [SUCCESS] ✅ SUCESSO! Carga finalizada.
[2026-01-07 18:58:39] [INFO] Total de registros na Bronze: 316945
