# ETL: Popular Banco de Dados (Raw to Silver)

Este notebook implementa o pipeline ETL para popular o Banco:
- **Bronze**: Dados brutos do CSV
- **Silver**: Star Schema (dimensões + fato)

## Estrutura 

**Bronze:**
- `bronze.vehicle_prices` - Dados brutos do CSV

**Silver (Star Schema):**
- `silver.dim_modelo` - Dimensão de modelos
- `silver.dim_especificacao` - Dimensão de especificações
- `silver.fato_veiculo` - Tabela fato com métricas

## 1. Importações e Configuração

In [25]:
import sys
import os
import time
import pandas as pd
from datetime import datetime
from sqlalchemy import create_engine, text

In [None]:
# Configuração do banco de dados
DB_CONFIG = {
    'host': os.getenv('POSTGRES_HOST', 'postgres'),
    'port': os.getenv('POSTGRES_PORT', '5432'),
    'database': os.getenv('POSTGRES_DB', 'sbd2_vehicle'),
    'user': os.getenv('POSTGRES_USER', 'sbd2_vehicle'),
    'password': os.getenv('POSTGRES_PASSWORD', 'sbd2_vehicle')
}

# Caminhos dos arquivos (relativos ao diretório do notebook)
CSV_PATH = '../DataLayer/raw/vehicle_price_prediction.csv'
DDL_PATH = '../DataLayer/silver/ddl.sql'

CSV_PATH = os.path.abspath(CSV_PATH)
DDL_PATH = os.path.abspath(DDL_PATH)

print("Configuração carregada!")
print(f"Diretório atual: {os.getcwd()}")
print(f"CSV: {CSV_PATH}")
print(f"DDL: {DDL_PATH}")
print(f"CSV existe? {os.path.exists(CSV_PATH)}")
print(f"DDL existe? {os.path.exists(DDL_PATH)}")


Configuração carregada!
Diretório atual: /home/jovyan
CSV: /home/DataLayer/raw/vehicle_price_prediction.csv
DDL: /home/DataLayer/silver/ddl.sql
CSV existe? True
DDL existe? True


In [31]:
def get_engine():
    """
    Cria e retorna a engine SQLAlchemy para conexão com PostgreSQL
    """
    connection_string = (
        f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}"
        f"@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    )
    return create_engine(connection_string)

def wait_for_database(max_attempts=30, delay=5):
    """
    Aguarda o banco de dados ficar disponível
    """
    print("Aguardando banco de dados...")
    for attempt in range(max_attempts):
        try:
            engine = get_engine()
            with engine.connect() as conn:
                conn.execute(text("SELECT 1"))
            print(f" Banco disponível após {attempt + 1} tentativa(s)")
            return True, engine
        except Exception as e:
            if attempt < max_attempts - 1:
                print(f"Tentativa {attempt + 1}/{max_attempts}: Aguardando...")
                time.sleep(delay)
    
    print("ERRO: Timeout - banco não ficou disponível")
    return False, None

print("Funções auxiliares definidas!")


Funções auxiliares definidas!


## 2. Conectar ao Banco de Dados

In [32]:
print("="*60)
print("ETL: CSV -> BRONZE -> SILVER (Star Schema)")
print("="*60)
print(f"Início: {datetime.now().strftime('%H:%M:%S')}\n")

# Conectar ao banco
success, engine = wait_for_database()
if not success:
    raise Exception("Não foi possível conectar ao banco de dados")

print("\n Conexão estabelecida com sucesso!")


ETL: CSV -> BRONZE -> SILVER (Star Schema)
Início: 04:09:36

Aguardando banco de dados...
 Banco disponível após 1 tentativa(s)

 Conexão estabelecida com sucesso!


## 3. Criar Tabela Bronze


In [33]:
print("\n[1/4] Criando tabela Bronze...")

ddl = """
CREATE SCHEMA IF NOT EXISTS bronze;
DROP TABLE IF EXISTS bronze.vehicle_prices CASCADE;
CREATE TABLE IF NOT EXISTS bronze.vehicle_prices (
    id SERIAL PRIMARY KEY,
    make VARCHAR(100),
    model VARCHAR(100),
    year INTEGER,
    mileage INTEGER,
    engine_hp FLOAT,
    transmission VARCHAR(50),
    fuel_type VARCHAR(50),
    drivetrain VARCHAR(50),
    body_type VARCHAR(50),
    exterior_color VARCHAR(50),
    interior_color VARCHAR(50),
    owner_count INTEGER,
    accident_history VARCHAR(50),
    seller_type VARCHAR(50),
    condition VARCHAR(50),
    trim VARCHAR(50),
    vehicle_age INTEGER,
    mileage_per_year FLOAT,
    brand_popularity FLOAT,
    price FLOAT,
    _ingestion_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    _source_file VARCHAR(255)
);
"""

try:
    with engine.connect() as conn:
        conn.execute(text(ddl))
        conn.commit()
    print("  Tabela Bronze criada")
except Exception as e:
    print(f"  Erro ao criar tabela Bronze: {e}")
    raise



[1/4] Criando tabela Bronze...
  Tabela Bronze criada


## 4. Criar Tabelas Silver (Star Schema)


In [34]:
print("\n[2/4] Criando tabelas Silver (Star Schema)...")

try:
    # Ler o arquivo DDL usando a variável DDL_PATH
    with open(DDL_PATH, 'r') as f:
        ddl_content = f.read()
    
    # Executar o DDL
    with engine.connect() as conn:
        # Criar schema silver se não existir
        conn.execute(text("CREATE SCHEMA IF NOT EXISTS silver"))
        
        # Executar cada statement do DDL
        for statement in ddl_content.split(';'):
            if statement.strip():
                conn.execute(text(statement))
        
        conn.commit()
    
    print("  Tabelas Silver criadas:")
    print("    - silver.dim_modelo")
    print("    - silver.dim_especificacao")
    print("    - silver.fato_veiculo")
    
except Exception as e:
    print(f"  Erro ao criar tabelas Silver: {e}")
    raise



[2/4] Criando tabelas Silver (Star Schema)...
  Tabelas Silver criadas:
    - silver.dim_modelo
    - silver.dim_especificacao
    - silver.fato_veiculo


## 5. Carregar Dados Bronze (CSV -> Banco)


In [35]:
print("\n[3/4] Carregando dados Bronze (CSV -> Banco)...")
print("="*60)

start_time = datetime.now()

try:
    # Verificar se já tem dados
    with engine.connect() as conn:
        result = conn.execute(text("SELECT COUNT(*) FROM bronze.vehicle_prices"))
        count = result.scalar()
        if count > 0:
            print(f"  Bronze já contém {count:,} registros. Pulando.")
        else:
            # Ler CSV
            print("  Lendo CSV...")
            df = pd.read_csv(CSV_PATH)
            print(f"  {len(df):,} linhas carregadas")
            
            # TRANSFORMAÇÃO: Tratar nulos em accident_history
            print("  Aplicando transformação: accident_history")
            df['accident_history'] = df['accident_history'].fillna('None')
            
            # Adicionar metadados
            df['_ingestion_timestamp'] = datetime.now()
            df['_source_file'] = 'vehicle_price_prediction.csv'
            
            # Inserir no banco em chunks
            print("  Inserindo no banco...")
            df.to_sql(
                'vehicle_prices',
                engine,
                schema='bronze',
                if_exists='append',
                index=False,
                chunksize=10000
            )
            
            duration = (datetime.now() - start_time).total_seconds()
            print(f"  Bronze carregado: {len(df):,} registros em {duration:.1f}s")
            
except Exception as e:
    print(f"  Erro: {e}")
    raise



[3/4] Carregando dados Bronze (CSV -> Banco)...
  Lendo CSV...
  1,000,000 linhas carregadas
  Aplicando transformação: accident_history
  Inserindo no banco...
  Bronze carregado: 1,000,000 registros em -47.3s


## 6. Transformar e Carregar Silver (Star Schema)


In [36]:
print("\n[4/4] Transformando e carregando Silver...")
print("="*60)

start_time = datetime.now()

try:
    # Verificar se já tem dados
    with engine.connect() as conn:
        result = conn.execute(text("SELECT COUNT(*) FROM silver.fato_veiculo"))
        count = result.scalar()
        if count > 0:
            print(f"  Silver já contém {count:,} registros. Pulando.")
        else:
            # Ler dados da Bronze
            print("  Lendo dados Bronze...")
            df = pd.read_sql("SELECT * FROM bronze.vehicle_prices", engine)
            print(f"  {len(df):,} registros lidos")
            
            # PASSO 1: Criar dim_modelo
            print("\n  Criando dim_modelo...")
            dim_modelo_cols = [
                'make', 'model', 'year', 'engine_hp',
                'transmission', 'fuel_type', 'drivetrain',
                'body_type', 'trim'
            ]
            df_dim_modelo = df[dim_modelo_cols].drop_duplicates().reset_index(drop=True)
            df_dim_modelo.insert(0, 'id_modelo', range(1, len(df_dim_modelo) + 1))
            df_dim_modelo.to_sql('dim_modelo', engine, schema='silver', if_exists='append', index=False, chunksize=5000)
            print(f"  {len(df_dim_modelo):,} modelos únicos")
            
            # PASSO 2: Criar dim_especificacao
            print("\n  Criando dim_especificacao...")
            dim_espec_cols = [
                'exterior_color', 'interior_color', 'owner_count',
                'accident_history', 'seller_type', 'condition',
                'vehicle_age'
            ]
            df_dim_espec = df[dim_espec_cols].drop_duplicates().reset_index(drop=True)
            df_dim_espec.insert(0, 'id_especificacao', range(1, len(df_dim_espec) + 1))
            df_dim_espec.to_sql('dim_especificacao', engine, schema='silver', if_exists='append', index=False, chunksize=5000)
            print(f"  {len(df_dim_espec):,} especificações únicas")
            
            # PASSO 3: Criar tabela FATO
            print("\n  Criando fato_veiculo...")
            df_fato = df.merge(df_dim_modelo, on=dim_modelo_cols, how='left')
            df_fato = df_fato.merge(df_dim_espec, on=dim_espec_cols, how='left')
            df_fato = df_fato[['id_modelo', 'id_especificacao', 'mileage', 'mileage_per_year', 'brand_popularity', 'price']].copy()
            df_fato.insert(0, 'id_fato', range(1, len(df_fato) + 1))
            df_fato.to_sql('fato_veiculo', engine, schema='silver', if_exists='append', index=False, chunksize=10000)
            
            duration = (datetime.now() - start_time).total_seconds()
            print(f"\n  Silver completo: {len(df_fato):,} fatos em {duration:.1f}s")
            
except Exception as e:
    print(f"  Erro: {e}")
    import traceback
    traceback.print_exc()
    raise



[4/4] Transformando e carregando Silver...
  Silver já contém 1,000,000 registros. Pulando.


## 7. Verificação de Dados


In [37]:
print("\n" + "="*60)
print("VERIFICAÇÃO DE DADOS")
print("="*60)

try:
    with engine.connect() as conn:
        # Contar registros
        result = conn.execute(text("SELECT COUNT(*) FROM bronze.vehicle_prices"))
        bronze_count = result.scalar()
        print(f"\n  Bronze: {bronze_count:,} registros")
        
        result = conn.execute(text("SELECT COUNT(*) FROM silver.dim_modelo"))
        modelo_count = result.scalar()
        print(f"  Dim Modelo: {modelo_count:,} modelos únicos")
        
        result = conn.execute(text("SELECT COUNT(*) FROM silver.dim_especificacao"))
        espec_count = result.scalar()
        print(f"  Dim Especificação: {espec_count:,} especificações únicas")
        
        result = conn.execute(text("SELECT COUNT(*) FROM silver.fato_veiculo"))
        fato_count = result.scalar()
        print(f"  Fato Veículo: {fato_count:,} registros")
        
        # Estatísticas de preço
        print("\n" + "-"*60)
        print("ESTATÍSTICAS DE PREÇO:")
        print("-"*60)
        result = conn.execute(text("""
            SELECT 
                ROUND(AVG(price)::numeric, 2) as preco_medio,
                ROUND(MIN(price)::numeric, 2) as preco_minimo,
                ROUND(MAX(price)::numeric, 2) as preco_maximo
            FROM silver.fato_veiculo
        """))
        stats = result.fetchone()
        print(f"Preço Médio:  ${stats[0]:,.2f}")
        print(f"Preço Mínimo: ${stats[1]:,.2f}")
        print(f"Preço Máximo: ${stats[2]:,.2f}")
        
        # Top 5 marcas
        print("\n" + "-"*60)
        print("TOP 5 MARCAS (por volume):")
        print("-"*60)
        result = conn.execute(text("""
            SELECT m.make, COUNT(*) as total
            FROM silver.fato_veiculo f
            JOIN silver.dim_modelo m ON f.id_modelo = m.id_modelo
            GROUP BY m.make
            ORDER BY total DESC
            LIMIT 5
        """))
        for row in result:
            print(f"  {row[0]:15s}: {row[1]:,}")
    
    print("\n" + "="*60)
    print("  ETL CONCLUÍDO COM SUCESSO!")
    print("="*60)
    print(f"Fim: {datetime.now().strftime('%H:%M:%S')}")
    print("\nEstrutura criada:")
    print("  • bronze.vehicle_prices (1M registros)")
    print("  • silver.dim_modelo (modelos únicos)")
    print("  • silver.dim_especificacao (especificações únicas)")
    print("  • silver.fato_veiculo (1M fatos)")
    
except Exception as e:
    print(f"  Erro na verificação: {e}")
    raise



VERIFICAÇÃO DE DADOS

  Bronze: 1,000,000 registros
  Dim Modelo: 981,719 modelos únicos
  Dim Especificação: 29,671 especificações únicas
  Fato Veículo: 1,000,000 registros

------------------------------------------------------------
ESTATÍSTICAS DE PREÇO:
------------------------------------------------------------
Preço Médio:  $20,329.30
Preço Mínimo: $1,500.00
Preço Máximo: $93,422.09

------------------------------------------------------------
TOP 5 MARCAS (por volume):
------------------------------------------------------------
  Kia            : 40,484
  Mazda          : 40,247
  Subaru         : 40,230
  Tesla          : 40,226
  Nissan         : 40,217

  ETL CONCLUÍDO COM SUCESSO!
Fim: 04:16:21

Estrutura criada:
  • bronze.vehicle_prices (1M registros)
  • silver.dim_modelo (modelos únicos)
  • silver.dim_especificacao (especificações únicas)
  • silver.fato_veiculo (1M fatos)
