In [0]:
%python
# Databricks notebook source
# MAGIC %md
# MAGIC ## Ingestão de Parquet do SQL Server para Delta Lake
# MAGIC Este notebook percorre os diretórios de origem contendo arquivos Parquet exportados do SQL Server e converte cada diretório em uma tabela Delta gerenciada no Databricks com prefixo `raw_sqlserver_`.

# COMMAND ----------
# 🧱 BLOCO 1 - Configurações
catalog = "ted_dev"
schema = "dev_alexandre_filho"
caminho_raw = "/Volumes/ted_dev/dev_alexandre_filho/raw/sqlserver/"

# COMMAND ----------
# 🧱 BLOCO 2 - Função para normalizar nomes de tabelas
import re

def normalizar_nome_tabela(caminho):
    # Extrai nome do diretório
    nome = caminho.rstrip("/").split("/")[-1]
    nome_normalizado = re.sub(r"[^a-zA-Z0-9]", "_", nome).lower()
    if nome_normalizado:
        return f"raw_sqlserver_{nome_normalizado}_db"
    return None

# COMMAND ----------
# 🧱 BLOCO 3 - Função para criar tabela Delta

def criar_tabela_delta(caminho_base):
    nome_tabela = normalizar_nome_tabela(caminho_base)
    if not nome_tabela:
        print(f"❌ Nome inválido para: {caminho_base}")
        return

    # Buscar arquivos parquet
    arquivos_parquet = dbutils.fs.ls(caminho_base)
    arquivos_validos = [f.path for f in arquivos_parquet if f.path.endswith(".parquet") or f.path.endswith(".gz.parquet")]

    if not arquivos_validos:
        for subdir in arquivos_parquet:
            if subdir.isDir():
                sub_arquivos = dbutils.fs.ls(subdir.path)
                arquivos_validos += [f.path for f in sub_arquivos if f.path.endswith(".parquet") or f.path.endswith(".gz.parquet")]

    if not arquivos_validos:
        print(f"⚠️ Nenhum arquivo Parquet encontrado em: {caminho_base}")
        return

    print(f"📦 Lendo arquivos: {arquivos_validos[:1]} ... (total: {len(arquivos_validos)})")
    df = spark.read.format("parquet").load(arquivos_validos)

    # Renomear colunas inválidas
    for col in df.columns:
        new_col = re.sub(r"[ ,;{}()\n\t=]", "_", col)
        df = df.withColumnRenamed(col, new_col)

    df.write.format("delta") \
      .mode("overwrite") \
      .saveAsTable(f"{catalog}.{schema}.{nome_tabela}")

    print(f"✅ Tabela {catalog}.{schema}.{nome_tabela} criada com sucesso!")

# COMMAND ----------
# 🧱 BLOCO 4 - Execução para cada subdiretório
subpastas = dbutils.fs.ls(caminho_raw)

for pasta in subpastas:
    if pasta.isDir():
        criar_tabela_delta(pasta.path)

📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/dev_alexandre_filho/raw/sqlserver/HumanResources-Department/HumanResources-Department-20250614_180707-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.dev_alexandre_filho.raw_sqlserver_humanresources_department_db criada com sucesso!
📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/dev_alexandre_filho/raw/sqlserver/HumanResources-Employee/HumanResources-Employee-20250614_180707-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.dev_alexandre_filho.raw_sqlserver_humanresources_employee_db criada com sucesso!
📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/dev_alexandre_filho/raw/sqlserver/HumanResources-EmployeeDepartmentHistory/HumanResources-EmployeeDepartmentHistory-20250614_180707-0-0.gz.parquet'] ... (total: 1)
✅ Tabela ted_dev.dev_alexandre_filho.raw_sqlserver_humanresources_employeedepartmenthistory_db criada com sucesso!
📦 Lendo arquivos: ['dbfs:/Volumes/ted_dev/dev_alexandre_filho/raw/sqlserver/HumanResources-EmployeePayHistory/HumanResources-Employee