# Setup Data Mesh - Ingestão e Criação de Tabelas

Este notebook é responsável por:
1. Inicializar a Sessão Spark.
2. Criar o Catálogo `olist_dataset` (se suportado/Unity Catalog).
3. Criar os Schemas (Bancos de Dados Lógicos) para cada domínio dentro do catálogo.
4. Ler os arquivos CSV brutos.
5. Salvar as tabelas no formato gerenciado (Parquet/Delta) com `overwrite`.

In [None]:
%load_ext autoreload
%autoreload 2

import os
import sys
from pyspark.sql import SparkSession

# Garante que o diretório app seja visível
sys.path.append(os.getcwd())

In [None]:
# Inicialização da Sessão Spark
spark = SparkSession.builder \
    .appName("SetupDataMesh") \
    .enableHiveSupport() \
    .getOrCreate()

print("Spark Session Ativa")

In [None]:
# Configuração dos Caminhos
DATA_PATH = "data/"

# Definição dos Domínios e Arquivos
DOMAINS = {
    "olist_sales": [
        "olist_orders_dataset.csv", 
        "olist_order_items_dataset.csv", 
        "olist_products_dataset.csv"
    ],
    "olist_logistics": [
        "olist_sellers_dataset.csv", 
        "olist_geolocation_dataset.csv", 
        "olist_customers_dataset.csv"
    ],
    "olist_finance": [
        "olist_order_payments_dataset.csv"
    ],
    "olist_cx": [
        "olist_order_reviews_dataset.csv"
    ]
}

In [None]:
def ingest_domain(domain_name, file_list):
    print(f"\n>>> Configurando Domínio: {domain_name}")
    
    # 0. Criar Catálogo
    # Nota: CREATE CATALOG geralmente requer o Unity Catalog no Databricks.
    # Se rodar localmente sem UC, isso pode falhar ou precisa ser adaptado.
    try:
        print("Creating Catalog olist_dataset...")
        spark.sql("CREATE CATALOG IF NOT EXISTS olist_dataset")
    except Exception as e:
        print(f"Warning: Could not create catalog (might be local spark). Error: {e}")
        print("Creating database in default catalog instead...")
    
    # 1. Tentar definir o nome do banco com catálogo
    target_db = f"olist_dataset.{domain_name}"
    try:
        print(f"Creating Schema {target_db}...")
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_db}")
    except:
        target_db = domain_name
        print(f"Fallback: Creating Schema {target_db} (default catalog)...")
        spark.sql(f"CREATE DATABASE IF NOT EXISTS {target_db}")
    
    # 2. Iterar arquivos e criar tabelas
    for filename in file_list:
        file_path = os.path.join(DATA_PATH, filename)
        
        if not os.path.exists(file_path):
            print(f"  [SKIP] Arquivo não encontrado: {file_path}")
            continue
            
        # Nome da tabela limpo
        table_name = filename.replace("olist_", "").replace("_dataset.csv", "")
        full_table_name = f"{target_db}.{table_name}"
        
        print(f"  Ingestão: {filename} -> {full_table_name}")
        
        # Leitura do CSV
        try:
            df = spark.read.option("header", "true") \
                           .option("inferSchema", "true") \
                           .csv(file_path)
            
            # Escrita com Overwrite (SaveAsTable)
            df.write.mode("overwrite").saveAsTable(full_table_name)
            print(f"  [OK] Tabela {full_table_name} atualizada com sucesso.")
            
        except Exception as e:
            print(f"  [ERRO] Falha ao processar {filename}: {str(e)}")

In [None]:
# Execução Geral
for domain, files in DOMAINS.items():
    ingest_domain(domain, files)
    
print("\n=== Data Mesh Setup Concluído ===")

In [None]:
# Validação: Listar tabelas criadas
# Tenta listar do catálogo especifico ou do default
for domain in DOMAINS.keys():
    target_db = f"olist_dataset.{domain}"
    print(f"\nTabelas em {target_db} (ou fallback {domain}):")
    try:
        spark.sql(f"SHOW TABLES IN {target_db}").show(truncate=False)
    except:
        spark.sql(f"SHOW TABLES IN {domain}").show(truncate=False)