In [0]:
# ==============================================================================
# CÉLULA 1: DEFINIÇÃO DE PARÂMETROS E PREPARAÇÃO
# ==============================================================================
# Este notebook é projetado para ser executado como um job do Databricks.

# 1. Define os widgets que serão preenchidos pelo job do Databricks.
print("1. Definindo widgets de entrada...")
dbutils.widgets.text("catalog", "", "Catálogo do Unity Catalog")
dbutils.widgets.text("schema", "", "Schema de Destino")
dbutils.widgets.text("folder_volume", "", "Pasta no Volume que contém o arquivo Parquet")
dbutils.widgets.text("file_parquet", "", "Nome do Arquivo Parquet")
dbutils.widgets.text("table_name", "", "Nome Base da Tabela Delta (vindo do nome da pasta)")
# A pasta de destino não é mais necessária, pois a tabela será gerenciada
# dbutils.widgets.text("delta_folder", "api_deltatables", "Pasta de Destino para Tabelas Delta")
print("   - Widgets definidos com sucesso.")

# 2. Lê os valores dos widgets para variáveis Python.
print("\n2. Lendo parâmetros do job...")
catalog_name = dbutils.widgets.get("catalog")
schema_name = dbutils.widgets.get("schema")
folder_path_in_volume = dbutils.widgets.get("folder_volume")
parquet_filename = dbutils.widgets.get("file_parquet")
delta_table_name_raw = dbutils.widgets.get("table_name")

# Define o nome do volume base.
base_volume_name = "raw"

# 3. Constrói os caminhos e nomes de tabela.
print("\n3. Construindo nomes e caminhos...")
full_parquet_path = f"/Volumes/{catalog_name}/{schema_name}/{base_volume_name}/{folder_path_in_volume}/{parquet_filename}"

# CORREÇÃO APLICADA AQUI: O nome da tabela agora é apenas o nome do endpoint.
final_table_name = delta_table_name_raw.lower().replace('-', '_')
full_table_name = f"{catalog_name}.{schema_name}.{final_table_name}"

# Imprime as variáveis para verificação no log do job.
print(f"   - Arquivo de Origem: {full_parquet_path}")
print(f"   - Tabela de Destino Final: {full_table_name}")


In [0]:
# ==============================================================================
# CÉLULA 2: LÓGICA DE CRIAÇÃO E CARGA
# ==============================================================================

# 4. Lê o arquivo Parquet para obter seu schema dinamicamente.
print("\n4. Lendo o schema do arquivo Parquet de origem...")
try:
    source_df = spark.read.parquet(full_parquet_path)
    # Converte o schema do DataFrame para uma string no formato DDL (ex: "`col1` STRING, `col2` INT").
    schema_ddl = ", ".join([f"`{field.name}` {field.dataType.simpleString()}" for field in source_df.schema.fields])
    print("   - Schema DDL gerado com sucesso.")
except Exception as e:
    dbutils.notebook.exit(f"ERRO: Falha ao ler o arquivo Parquet para inferir o schema. Caminho: {full_parquet_path}. Erro: {e}")

# 5. Apaga a tabela antiga (se existir) e a recria com o novo schema.
# Esta abordagem de "overwrite" completo é simples e garante que a tabela reflita o arquivo mais recente.
print(f"\n5. Criando/Recriando a tabela '{full_table_name}'...")
spark.sql(f"DROP TABLE IF EXISTS {full_table_name}")
spark.sql(f"""
    CREATE TABLE {full_table_name} ({schema_ddl})
    USING DELTA
""") # Cria como uma tabela gerenciada, a melhor prática.
print("   - Tabela criada com sucesso.")

# 6. Carrega os dados na tabela recém-criada usando COPY INTO.
print(f"\n6. Carregando dados com COPY INTO...")
copy_into_sql = f"""
    COPY INTO {full_table_name}
    FROM '{full_parquet_path}'
    FILEFORMAT = PARQUET
    COPY_OPTIONS (
      'force' = 'true',            -- Força o carregamento, já que a tabela está sempre vazia neste ponto.
      'mergeSchema' = 'true'       -- Boa prática para permitir que novas colunas sejam adicionadas no futuro.
    )
"""
try:
    result_df = spark.sql(copy_into_sql)
    print("   - Carga de dados concluída.")
    display(result_df) # Mostra um resumo da operação de cópia.
except Exception as e:
    dbutils.notebook.exit(f"ERRO ao executar o COPY INTO. Erro: {e}")

# 7. Verificação Final.
print(f"\n7. Verificando o resultado final...")
try:
    record_count = spark.sql(f"SELECT COUNT(*) FROM {full_table_name}").collect()[0][0]
    print(f"   - Sucesso! A tabela '{full_table_name}' agora contém {record_count} registros.")
except Exception as e:
    dbutils.notebook.exit(f"ERRO ao verificar a contagem de registros na tabela final. Erro: {e}")

dbutils.notebook.exit("Processo finalizado com sucesso.")