# Job ETL: Silver para Gold

**Objetivo:** Transformar os dados confiáveis da camada Silver em um modelo dimensional otimizado (Star Schema) com mnemônicos padronizados para a camada Gold.

**Etapas do Pipeline:**
1. **Extração:** Ler dados da tabela `tb_games_silver` do PostgreSQL (ou usar dados de exemplo)
2. **Transformação:** Criar dimensões, aplicar mnemônicos e construir tabela fato
3. **Carga (Load):** Salvar todas as tabelas dimensionais e fato no PostgreSQL
4. **Validação:** Verificar integridade dos dados carregados

**Modelo Dimensional (Star Schema):**
- 6 Dimensões (DIM_*)
- 1 Tabela Fato (FAT_JGO)
- Nomenclatura em português com prefixos mnemônicos


## Importações e Configurações
Setup inicial com bibliotecas necessárias e configurações do Pandas para melhor visualização.


In [1]:
import pandas as pd
import numpy as np
import warnings
from sqlalchemy import create_engine, text

pd.set_option('display.max_columns', None)
warnings.filterwarnings('ignore')

print("Bibliotecas importadas e configurações definidas.")

Bibliotecas importadas e configurações definidas.


## 1. Extração (Extract)

Carregamento dos dados da camada Silver do PostgreSQL.

**Conexão PostgreSQL:**
- Conecta ao banco de dados `steam_bi` usando as credenciais `steam_bi_user`
- Lê a tabela `tb_games_silver` com todos os registros disponíveis
- A conexão é obrigatória para prosseguir com o pipeline

In [2]:
# 1. Extração da Camada Silver
# Conecta ao banco de dados PostgreSQL e carrega os dados
engine = None
df_silver = None

print("Conectando ao banco de dados PostgreSQL...\n")

engine = create_engine('postgresql://steam_bi_user:steam_bi_user@localhost:5432/steam_bi')

# Teste de conexão
with engine.connect() as conn:
    result = conn.execute(text("SELECT 1"))
    _ = result.fetchone()

print("✓ Conexão estabelecida com sucesso!\n")

print("Extraindo dados da camada Silver...")
# Carrega a tabela silver
df_silver = pd.read_sql("SELECT * FROM tb_games_silver", engine)
print(f"✓ {len(df_silver)} registros carregados do PostgreSQL.")
print(f"✓ Colunas: {list(df_silver.columns)}")

Conectando ao banco de dados PostgreSQL...

✓ Conexão estabelecida com sucesso!

Extraindo dados da camada Silver...
✓ 122610 registros carregados do PostgreSQL.
✓ Colunas: ['id_jogo', 'nome_jogo', 'idade_minima', 'preco', 'nota_metacritic', 'nota_usuario', 'desenvolvedores', 'publicadoras', 'categorias', 'generos', 'tags', 'ano_lancamento', 'tier_preco', 'tem_ptbr_interface', 'tem_ptbr_audio']


## 2. Transformação (Transform)

Construção do modelo dimensional com o padrão Star Schema. O processo segue estas etapas:

### 2.1 Criação de Dimensões (DIM_*)
Transformação das colunas da Silver em dimensões normalizadas:
- **DIM_OFR**: Ofertas únicas (preço + tier de preço)
- **DIM_DEV**: Desenvolvedoras distintas
- **DIM_PBS**: Publicadoras distintas
- **DIM_CAT**: Categorias de jogos
- **DIM_GEN**: Gêneros de jogos
- **DIM_TAG**: Tags descritivas

Cada dimensão recebe:
- **SRK_XXX**: Surrogate Key (chave substituta numérica)
- **NME_XXX**: Nome/Descrição do elemento

### 2.2 Construção da Tabela Fato (FAT_JGO)
Integração das dimensões com as métricas de negócio:
- Merge com cada dimensão criando relacionamentos
- **Otimização**: Extrai primeiro valor de atributos multivalorados (evita explosão de memória)
- Mantém proporção 1:1 com Silver (122k linhas)
- Inclui todas as chaves estrangeiras (SRK_*)

### 2.3 Nomenclatura Mnemônica
Padronização de nomes em português:
- **SRK_**: Surrogate Key (chave substituta)
- **NME_**: Nome / Descrição
- **VLR_**: Valor numérico (preço)
- **TIR_**: Tier / Categoria
- **NTA_**: Nota / Score
- **ANO_**: Ano
- **TEM_**: Booleano (tem/possui)


In [None]:
# Função para criar dimensões baseadas em listas (ex: Gêneros, Tags)
def create_text_dimension(df, cols_origem: list, nome_tabela, col_id, col_nome):
    """
    Converte uma coluna com valores multivalorados separados por vírgula em uma dimensão normalizada.
    
    Exemplo:
        Input:  col_origem = 'generos' → ['Indie, Ação', 'RPG, Indie', ...]
        Output: DIM_GEN com SRK_GEN [1, 2, 3, ...] e NME_GEN ['Indie', 'Ação', 'RPG', ...]
    
    Parâmetros:
        df: DataFrame com os dados brutos
        cols_origem: Nomes das colunas de origem (Ex: ['preco', 'tier preco'])
        nome_tabela: Nome da tabela (para logging)
        col_id: Nome da coluna ID a criar (ex: 'SRK_GEN')
        col_nome: Nome da coluna descritiva (ex: 'NME_GEN')
    
    Retorna:
        DataFrame com 2 colunas: [col_id, col_nome] - dimensão normalizada
    """
    print(f"Criando {nome_tabela}...")
    
    # PASSO 1: Seleciona apenas a coluna de origem
    dim = df[col_origem].copy()
    
    # PASSO 2: Reseta índice e cria ID sequencial
    dim = dim.reset_index(drop=True)
    dim[col_id] = dim.index + 1  # Surrogate Key: 1, 2, 3, ...
    
    # PASSO 3: Renomeia coluna de origem para o padrão mnemônico
    dim = dim.rename(columns={col_origem: col_nome})
    
    return dim

In [5]:
# Extrai o primeiro valor de uma string multivalorada separada por vírgula
def get_first_value(col_string):
    """
    Estratégia de otimização: extrai apenas o PRIMEIRO valor de uma lista multivalorada.
    
    Isso EVITA a explosão de dados (multiplicação de linhas) mantendo proporção 1:1 com Silver.
    
    Exemplo:
        "Indie, Ação, 2D" → "Indie"
        "RPG" → "RPG"
        NaN → ""
    
    Benefício:
        - 122k linhas permanecem 122k (sem explode)
        - Menos memória utilizada
        - Queries mais rápidas no DW
    
    Tradeoff:
        - Perde informação de múltiplos gêneros por jogo
        - Solução: Se precisar todos, usar explode_and_merge()
    """
    if pd.isna(col_string):
        return ''
    return str(col_string).split(',')[0].strip()

In [6]:
def build_gold_layer(df_silver: pd.DataFrame):
    """
    Constrói o modelo Star Schema com 6 dimensões e 1 tabela fato.
    
    ESTRUTURA FINAL:
    
    ┌─ DIM_OFR (Ofertas)
    │  ├─ SRK_OFR [1, 2, 3, ...]
    │  ├─ VLR_PRC [9.99, 19.99, ...]
    │  └─ TIR_PRC ['Budget', 'Padrão', ...]
    │
    ├─ DIM_DEV (Desenvolvedoras)
    │  ├─ SRK_DEV [1, 2, 3, ...]
    │  └─ NME_DEV ['EA Games', 'Valve', ...]
    │
    ├─ DIM_PBS (Publicadoras)
    ├─ DIM_CAT (Categorias)
    ├─ DIM_GEN (Gêneros)
    ├─ DIM_TAG (Tags)
    │
    └─ FAT_JGO (Tabela Fato)
       ├─ Dimensões do jogo: SRK_JGO, NME_JGO, VLR_IDD, TEM_PTB_ITF, TEM_PTB_AUD
       ├─ Métricas: NTA_USR, NTA_MTC, ANO_LNC
       └─ Chaves Estrangeiras: SRK_OFR, SRK_DEV, SRK_PBS, SRK_CAT, SRK_GEN, SRK_TAG
    """
    print("Iniciando transformação para o novo modelo Gold (Mnemônicos)...\n")
    df = df_silver.copy()

    # ==============================================================================
    # 1. CRIAÇÃO DAS DIMENSÕES (DIM)
    # ==============================================================================
    print("─" * 70)
    print("FASE 1: CRIAÇÃO DE DIMENSÕES")
    print("─" * 70)

    # --- DIM_OFR (Oferta) ---
    # ✓ Dimensão simples: 1 linha por combinação única de (preço, tier)
    # ✓ NÃO usa explode (não há valores multivalorados)
    # ✓ Resultado: ~20-30 linhas (poucas combinações únicas)
    print("\nCriando DIM_OFR...")
    dim_ofr = df[['preco', 'tier_preco']].drop_duplicates().reset_index(drop=True)
    dim_ofr['SRK_OFR'] = dim_ofr.index + 1  # Surrogate Key: 1, 2, 3, ...
    dim_ofr = dim_ofr.rename(columns={'preco': 'VLR_PRC', 'tier_preco': 'TIR_PRC'})
    print(f"  └─ {len(dim_ofr)} ofertas únicas")

    # --- DIMENSÕES DE TEXTO (Explode) ---
    # ✓ Dimensões complexas: valores separados por vírgula
    # ✓ ESTRATÉGIA: create_text_dimension() padroniza o processo
    # ✓ Processa cada coluna multivalorada de forma idêntica
    print("\nCriando dimensões de texto (com explode)...")
    
    dim_dev = create_text_dimension(df, 'desenvolvedoras', 'DIM_DEV', 'SRK_DEV', 'NME_DEV')
    print(f"  └─ {len(dim_dev)} desenvolvedoras distintas")
    
    dim_pbs = create_text_dimension(df, 'publicadoras',    'DIM_PBS', 'SRK_PBS', 'NME_PBS')
    print(f"  └─ {len(dim_pbs)} publicadoras distintas")
    
    dim_cat = create_text_dimension(df, 'categorias',      'DIM_CAT', 'SRK_CAT', 'NME_CAT')
    print(f"  └─ {len(dim_cat)} categorias distintas")
    
    dim_gen = create_text_dimension(df, 'generos',         'DIM_GEN', 'SRK_GEN', 'NME_GEN')
    print(f"  └─ {len(dim_gen)} gêneros distintos")
    
    dim_tag = create_text_dimension(df, 'tags',            'DIM_TAG', 'SRK_TAG', 'NME_TAG')
    print(f"  └─ {len(dim_tag)} tags distintas")

    # ==============================================================================
    # 2. CONSTRUÇÃO DA TABELA FATO (FAT_JGO)
    # ==============================================================================
    print("\n" + "─" * 70)
    print("FASE 2: CONSTRUÇÃO DA TABELA FATO")
    print("─" * 70 + "\n")
    print("Construindo FAT_JGO (Isso pode demorar dependendo do volume)...\n")
    fato = df.copy()

    # PASSO 1: Renomear colunas simples para mnemônicos
    # ✓ Aplica padrão: SRK_XXX (surrogate key), NME_XXX (nome), VLR_XXX (valor), etc
    print("PASSO 1: Renomeando colunas para padrão mnemônico...")
    fato = fato.rename(columns={
        'id_jogo': 'SRK_JGO',              # ID do jogo → Surrogate Key
        'nome_jogo': 'NME_JGO',            # Nome
        'idade_minima': 'VLR_IDD',         # Valor (mínimo)
        'tem_ptbr_interface': 'TEM_PTB_ITF',  # Tem (booleano)
        'tem_ptbr_audio': 'TEM_PTB_AUD',      # Tem (booleano)
        'nota_usuario': 'NTA_USR',         # Nota (score)
        'nota_metacritic': 'NTA_MTC',      # Nota (score)
        'ano_lancamento': 'ANO_LNC'        # Ano
    })

    # PASSO 2: Merge com DIM_OFR
    # ✓ Conecta cada jogo com sua oferta (preço + tier)
    # ✓ Adiciona a Foreign Key SRK_OFR à tabela fato
    print("PASSO 2: Mergeando com DIM_OFR (preço + tier)...")
    fato = fato.merge(dim_ofr, left_on=['preco', 'tier_preco'], right_on=['VLR_PRC', 'TIR_PRC'], how='left')
    print(f"  └─ Fato tem agora coluna SRK_OFR (Foreign Key)")

    # PASSO 3: ESTRATÉGIA DE OTIMIZAÇÃO
    # ✓ Extrai PRIMEIRO valor de colunas multivaloradas
    # ✓ Mantém 122k linhas (não explode para milhões)
    # ✓ Tradeoff: perde informação de múltiplos gêneros, mas ganha performance
    print("\nPASSO 3: Extraindo PRIMEIRO valor de atributos multivalorados...")
    print("  ⚠ ESTRATÉGIA: Evita explosão de dados")
    print("     Exemplo: 'Indie, Ação' → 'Indie'")
    
    fato['generos'] = fato['generos'].apply(get_first_value)
    fato['categorias'] = fato['categorias'].apply(get_first_value)
    fato['tags'] = fato['tags'].apply(get_first_value)
    fato['desenvolvedores'] = fato['desenvolvedores'].apply(get_first_value)
    fato['publicadoras'] = fato['publicadoras'].apply(get_first_value)
    print(f"  └─ Proporção mantida: {len(fato)} linhas (sem aumento)")

    # PASSO 4: Merges com as dimensões de texto
    # ✓ Agora são merges 1:1 (porque extraímos o primeiro valor)
    # ✓ Cada merge adiciona uma Foreign Key (SRK_*)
    print("\nPASSO 4: Mergeando com dimensões de texto (1:1)...")
    
    fato = fato.merge(dim_gen, left_on='generos', right_on='NME_GEN', how='left')
    print(f"  ├─ Merge DIM_GEN → Coluna SRK_GEN adicionada")
    
    fato = fato.merge(dim_cat, left_on='categorias', right_on='NME_CAT', how='left')
    print(f"  ├─ Merge DIM_CAT → Coluna SRK_CAT adicionada")
    
    fato = fato.merge(dim_tag, left_on='tags', right_on='NME_TAG', how='left')
    print(f"  ├─ Merge DIM_TAG → Coluna SRK_TAG adicionada")
    
    fato = fato.merge(dim_dev, left_on='desenvolvedores', right_on='NME_DEV', how='left')
    print(f"  ├─ Merge DIM_DEV → Coluna SRK_DEV adicionada")
    
    fato = fato.merge(dim_pbs, left_on='publicadoras', right_on='NME_PBS', how='left')
    print(f"  └─ Merge DIM_PBS → Coluna SRK_PBS adicionada")

    # PASSO 5: Seleção final das colunas
    # ✓ Remove colunas desnecessárias (descritivas, originais)
    # ✓ Mantém apenas: PKs, FKs, métricas e dimensões degeneras
    # ✓ Ordem: de acordo com o DLD (Data Lineage Diagram)
    print("\nPASSO 5: Selecionando colunas finais...")
    cols_finais = [
        # Dimensões degeneras (atributos do jogo)
        'SRK_JGO', 'NME_JGO', 'VLR_IDD', 'TEM_PTB_ITF', 'TEM_PTB_AUD', 
        # Métricas
        'NTA_USR', 'NTA_MTC', 'ANO_LNC',
        # Chaves estrangeiras (Foreign Keys → Dimensões)
        'SRK_OFR', 'SRK_DEV', 'SRK_PBS', 'SRK_CAT', 'SRK_GEN', 'SRK_TAG'
    ]

    fat_jgo = fato[cols_finais]
    
    print(f"  └─ Tabela fato final: {len(fat_jgo)} linhas × {len(cols_finais)} colunas")
    print(f"  └─ 8 colunas de dimensões degeneras + 6 Foreign Keys")

    # ==============================================================================
    # 3. RETORNO DO DICIONÁRIO DE TABELAS
    # ==============================================================================
    print("\n" + "─" * 70)
    print("RESUMO DO MODELO STAR SCHEMA")
    print("─" * 70)
    
    return {
        "DIM_OFR": dim_ofr,
        "DIM_DEV": dim_dev,
        "DIM_PBS": dim_pbs,
        "DIM_CAT": dim_cat,
        "DIM_GEN": dim_gen,
        "DIM_TAG": dim_tag,
        "FAT_JGO": fat_jgo
    }

## 3. Carregamento (Load)

Persistência de todas as tabelas no PostgreSQL usando SQLAlchemy.

**Modo de Operação:**
- **if_exists='replace'**: Recria as tabelas do zero para garantir atualização completa
- **Chunksize**: Dados enviados em lotes de 10.000 linhas para melhor performance
- **Índices**: Criados automaticamente no PostgreSQL para otimizar queries

**Tabelas Salvas:**
- 6 Dimensões (DIM_OFR, DIM_DEV, DIM_PBS, DIM_CAT, DIM_GEN, DIM_TAG)
- 1 Tabela Fato (FAT_JGO)


In [7]:
# --- EXECUÇÃO E CARGA ---
print("\n=== ETAPA 3: PROCESSAMENTO E CARREGAMENTO (LOAD) ===\n")

try:
    # Processamento
    tables = build_gold_layer(df_silver.copy())

    print("\n--- Resumo da Transformação ---")
    for nome, df_table in tables.items():
        print(f"{nome}: {len(df_table)} linhas")

    # Carga no Banco de Dados
    print("\n--- Salvando no PostgreSQL ---")
    
    for nome_tabela, df_tabela in tables.items():
        print(f"Salvando {nome_tabela}...", end=" ")
        df_tabela.to_sql(nome_tabela, engine, if_exists='replace', index=False)
        print(f"✓ ({len(df_tabela)} registros)")
        
    print("\n✓ Processo Gold concluído com sucesso!")

except Exception as e:
    print(f"⚠ Erro durante o processamento: {e}")
    print("\nℹ Se o banco estiver indisponível, os dados foram modelados em memória.")
    print("Execute novamente quando o PostgreSQL estiver acessível para salvar.")



=== ETAPA 3: PROCESSAMENTO E CARREGAMENTO (LOAD) ===

Iniciando transformação para o novo modelo Gold (Mnemônicos)...

──────────────────────────────────────────────────────────────────────
FASE 1: CRIAÇÃO DE DIMENSÕES
──────────────────────────────────────────────────────────────────────

Criando DIM_OFR...
  └─ 941 ofertas únicas

Criando dimensões de texto (com explode)...
Criando DIM_DEV...
⚠ Erro durante o processamento: "None of [Index(['desenvolvedoras'], dtype='object')] are in the [columns]"

ℹ Se o banco estiver indisponível, os dados foram modelados em memória.
Execute novamente quando o PostgreSQL estiver acessível para salvar.


## 4. Validação Pós-Carga (Verificação)

Consulta ao banco de dados para garantir que todos os dados foram carregados corretamente.

In [8]:
# --- VERIFICAÇÃO PÓS-CARGA ---
print("\n=== ETAPA 4: VALIDAÇÃO PÓS-CARGA ===\n")

try:
    # Verificar contagem de registros em cada tabela
    print("--- Contagem de Registros por Tabela ---")
    tabelas = [('DIM_OFR', 'Ofertas'), ('DIM_DEV', 'Desenvolvedoras'), ('DIM_PBS', 'Publicadoras'), 
               ('DIM_CAT', 'Categorias'), ('DIM_GEN', 'Gêneros'), ('DIM_TAG', 'Tags'), ('FAT_JGO', 'Fato Games')]
    
    for nome_tabela, desc in tabelas:
        from sqlalchemy import text
        qtd = pd.read_sql(text(f'SELECT COUNT(*) as total FROM "{nome_tabela}"'), engine)
        print(f"{desc:20} ({nome_tabela}): {qtd.iloc[0, 0]:>8,} registros")
    
    # Amostra de dados da fato
    print("\n--- Amostra da Tabela Fato (FAT_JGO) ---")
    amostra = pd.read_sql(
        text('SELECT "SRK_JGO", "NME_JGO", "VLR_IDD", "NTA_MTC", "NTA_USR", "ANO_LNC" FROM "FAT_JGO" LIMIT 5'), 
        engine
    )
    print(amostra.to_string(index=False))
    
    print("\n✓ Validação concluída com sucesso!")
    
except Exception as e:
    print(f"⚠ Erro na validação: {e}")



=== ETAPA 4: VALIDAÇÃO PÓS-CARGA ===

--- Contagem de Registros por Tabela ---
⚠ Erro na validação: (psycopg2.errors.UndefinedTable) relation "DIM_OFR" does not exist
LINE 1: SELECT COUNT(*) as total FROM "DIM_OFR"
                                      ^

[SQL: SELECT COUNT(*) as total FROM "DIM_OFR"]
(Background on this error at: https://sqlalche.me/e/20/f405)
