#Processamento de Raw XML to bronze

In [0]:
# ================================================================
# Bronze - Importar XML (IBGE)
# ================================================================
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from datetime import datetime
import os
import xml.etree.ElementTree as ET


In [0]:
spark = SparkSession.builder.appName("Processamento_raw_xml_to_bronze").getOrCreate()

In [0]:
# ================================================================
# Par√¢metro recebido via Job Databricks
# ================================================================
dbutils.widgets.text("data_ref_carga", "")
data_ref_carga = dbutils.widgets.get("data_ref_carga")

if not data_ref_carga:
    raise ValueError("‚ùå Par√¢metro 'data_ref_carga' n√£o informado (formato esperado: yyyy-MM-dd)")

print(f"üóìÔ∏è Data de refer√™ncia da carga: {data_ref_carga}")


In [0]:
# ================================================================
# 1Ô∏è‚É£ Configura√ß√µes
# ===============================================================
catalog = "datamasters"
schema = "b_tbra"
data_log = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
path_raw = "/Volumes/datamasters/raw/raw_tbra/"
display(dbutils.fs.ls("/Volumes/datamasters/raw/raw_tbra/"))

In [0]:
# ================================================================
# 2Ô∏è‚É£ Listar arquivos XML
# ================================================================
arquivos = [f.path.rstrip("/") for f in dbutils.fs.ls(path_raw) if f.name.lower().endswith(".xml")]
print("üìÇ Arquivos XML encontrados:")
for a in arquivos:
    print("-", a)

In [0]:
# ================================================================
# 3Ô∏è‚É£ Detectar automaticamente a tag raiz (rowTag)
# ================================================================
def detectar_rowtag(arquivo_dbfs):
    try:
        xml_str = dbutils.fs.head(arquivo_dbfs, 32768)  # l√™ at√© 32 KB
        root = ET.fromstring(xml_str)
        return root.tag
    except Exception:
        try:
            df_tmp = spark.read.format("com.databricks.spark.xml").option("rowTag", "*").load(arquivo_dbfs)
            return df_tmp.columns[0] if df_tmp.columns else "root"
        except:
            return "root"

In [0]:
# ================================================================
# 3Ô∏è‚É£ Inicializa lista de logs
# ================================================================
logs = []

In [0]:
# ================================================================
# 5Ô∏è‚É£ Processar arquivos XML
# ================================================================

for arquivo in arquivos:
    nome_arquivo = os.path.basename(arquivo)
    # prefixo "e_" + nome do arquivo em min√∫sculas, sem espa√ßos
    nome_tabela = "e_" + os.path.splitext(nome_arquivo)[0].lower().replace(" ", "_")

    print(f"\nüöÄ Processando {nome_arquivo} -> {catalog}.{schema}.{nome_tabela}")

    row_tag = detectar_rowtag(arquivo)
    print(f"   üîé rowTag detectado: {row_tag}")

    status = "OK"
    erro_msg = ""
    inicio = datetime.now()
    qtd_registros = 0

    try:
        # Ler XML
        df_xml = spark.read.format("com.databricks.spark.xml") \
            .option("rowTag", "gmd:MD_Metadata") \
            .option("inferSchema", "true") \
            .option("ignoreNamespace", "true") \
            .load(arquivo)
        
        # Adiciona coluna de parti√ß√£o √∫nica
        df_xml = df_xml.withColumn("data_ref_carga", F.lit(data_ref_carga))

        # Contar registros
        qtd_registros = df_xml.count()

        # Gravar tabela Delta particionada
        df_xml.write.format("delta") \
            .mode("overwrite") \
            .option("overwriteSchema", "true") \
            .saveAsTable(f"{catalog}.{schema}.{nome_tabela}")

        print(f"‚úÖ Tabela gravada: {catalog}.{schema}.{nome_tabela} ({qtd_registros} registros)")

    except Exception as e:
        status = "ERRO"
        erro_msg = str(e)
        print(f"‚ùå Erro ao processar {nome_arquivo}: {erro_msg}")

    fim = datetime.now()
    duracao = round((fim - inicio).total_seconds(), 2)

    # Registrar log
    logs.append(
        (
            nome_arquivo,
            nome_tabela,
            row_tag,
            status,
            erro_msg,
            data_log,
            data_ref_carga,
            qtd_registros,
            duracao,
        )
    )

# Mostrar logs finais
df_logs = spark.createDataFrame(logs, schema=[
    "arquivo",
    "tabela",
    "row_tag",
    "status",
    "erro_msg",
    "data_log",
    "data_ref_carga",
    "qtd_registros",
    "duracao_segundos"
])
display(df_logs)

In [0]:
# ================================================================
# 6Ô∏è‚É£ Gravar Log T√©cnico
# ================================================================
df_log = spark.createDataFrame(
    logs,
    [
        "arquivo",
        "tabela",
        "rowTag_detectado",
        "status",
        "mensagem",
        "data_log",
        "data_ref_carga",
        "linhas_lidas",
        "duracao_segundos",
    ],
)

(
    df_log.write
    .format("delta")
    .mode("append")
    .option("mergeSchema", "true")
    .saveAsTable(f"{catalog}.{schema}.log_carga_xml")
)

print("\nüóíÔ∏è Log salvo em:", f"{catalog}.{schema}.log_carga_xml")
print("üéØ Processo conclu√≠do para todos os XMLs.")