In [0]:

# 📥 Ingestão de arquivos Parquet da API para tabelas Delta (camada Bronze)

# COMMAND ----------
# 🧱 BLOCO 1 - Imports e Configurações Iniciais
import re
from pyspark.sql import SparkSession

catalog = "ted_dev"
schema = "bronze"
caminho_api = "/Volumes/ted_dev/bronze/raw/api/"

spark = SparkSession.builder.getOrCreate()



In [0]:
# COMMAND ----------
# 🧱 BLOCO 2 - Função para normalizar nomes de tabela

def normalizar_nome_tabela(caminho):
    nome = caminho.rstrip("/").split("/")[-1]
    nome = re.sub(r"[^a-zA-Z0-9_]", "_", nome).lower()
    return f"raw_api_{nome}_db"


In [0]:
# COMMAND ----------
# 🧱 BLOCO 3 - Função para criar tabela Delta a partir de uma subpasta

def criar_tabela_gerenciada(caminho_base):
    nome_tabela = normalizar_nome_tabela(caminho_base)
    if not nome_tabela:
        print(f"❌ Nome inválido para: {caminho_base}")
        return

    # Buscar arquivos .parquet dentro da subpasta (recursivamente)
    arquivos_parquet = dbutils.fs.ls(caminho_base)
    arquivos_validos = [f.path for f in arquivos_parquet if f.path.endswith(".parquet") or f.path.endswith(".gz.parquet")]

    # Se não encontrar diretamente, entra recursivamente
    if not arquivos_validos:
        for subdir in arquivos_parquet:
            if subdir.isDir():
                sub_arquivos = dbutils.fs.ls(subdir.path)
                arquivos_validos += [f.path for f in sub_arquivos if f.path.endswith(".parquet") or f.path.endswith(".gz.parquet")]

    if not arquivos_validos:
        print(f"⚠️ Nenhum arquivo Parquet encontrado em: {caminho_base}")
        return

    print(f"📦 Lendo arquivos: {arquivos_validos[:1]} ... (total: {len(arquivos_validos)})")

    # Lê todos os arquivos juntos
    df = spark.read.format("parquet").load(arquivos_validos)

    # Remove a tabela do metastore, se existir
    spark.sql(f"DROP TABLE IF EXISTS {catalog}.{schema}.{nome_tabela}")
    
    # Escreve como tabela Delta gerenciada
    df.write.format("delta")\
      .mode("overwrite")\
      .saveAsTable(f"{catalog}.{schema}.{nome_tabela}")

    print(f"✅ Tabela {catalog}.{schema}.{nome_tabela} criada com sucesso!")

# COMMAND ----------

In [0]:
# 🧱 BLOCO 4 - Loop para processar todas as subpastas
subpastas = dbutils.fs.ls(caminho_api)

for pasta in subpastas:
    if pasta.isDir():
        criar_tabela_gerenciada(pasta.path)

print("🎉 Processo concluído!")

📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/bronze/raw/api/PurchaseOrderDetail/PurchaseOrderDetail-20250719_004511-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.bronze.raw_api_purchaseorderdetail_db criada com sucesso!
📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/bronze/raw/api/PurchaseOrderHeader/PurchaseOrderHeader-20250719_004511-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.bronze.raw_api_purchaseorderheader_db criada com sucesso!
📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/bronze/raw/api/SalesOrderDetail/SalesOrderDetail-20250719_004511-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.bronze.raw_api_salesorderdetail_db criada com sucesso!
📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/bronze/raw/api/SalesOrderHeader/SalesOrderHeader-20250719_004511-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.bronze.raw_api_salesorderheader_db criada com sucesso!
🎉 Processo concluído!
