# SCD Type 2 (Slowly Changing Dimensions) - Tutorial Completo

## 🎯 **Objetivo**

Este notebook demonstra como implementar **SCD Type 2** usando **PostgreSQL** e **Python**, mantendo o histórico completo de mudanças em dimensões do Data Warehouse.

## 📚 **O que você vai aprender:**

1. **Conceitos fundamentais** de SCD Type 2
2. **Configuração** do ambiente com Docker + PostgreSQL
3. **Implementação prática** com Python e Pandas
4. **Consultas históricas** com Point-in-Time Joins
5. **Pipeline ETL completo** para processamento em lote

## 🗂️ **Estrutura do Data Warehouse:**

- **Staging**: `staging.clientes_source` - dados de origem
- **Dimensão**: `dw.dim_cliente` - SCD Type 2 com histórico
- **Fato**: `dw.fato_vendas` - vendas particionadas por `dt_ref`

---

**Antes de começar:** Certifique-se de que o Docker está rodando e execute:
```bash
docker-compose up -d
```

## 1️⃣ Environment Setup and Docker Configuration

### 🔧 **Melhorias implementadas:**

✅ **Arquivo `.env`**: Configurações centralizadas e seguras  
✅ **SQLAlchemy**: Instalado automaticamente via notebook  
✅ **Código limpo**: Eliminada duplicação e complexidade  

### 📁 **Estrutura de configuração:**
```
├── .env                    # Configurações do ambiente
├── docker-compose.yml     # PostgreSQL + PgAdmin
└── notebooks/
    └── scd_type2_tutorial.ipynb
```

### 🐳 **Para iniciar o ambiente:**
```bash
docker-compose up -d
```

In [26]:
# Importar bibliotecas necessárias
import pandas as pd
import numpy as np
import psycopg2
from datetime import datetime, date, timedelta
import os
import warnings
warnings.filterwarnings('ignore')

# Carregar variáveis do arquivo .env
from dotenv import load_dotenv
load_dotenv()

# Tentar importar SQLAlchemy
try:
    from sqlalchemy import create_engine, text
    SQLALCHEMY_AVAILABLE = True
    print("✅ SQLAlchemy importado com sucesso!")
except Exception as e:
    print(f"❌ Erro no SQLAlchemy: {e}")
    print("   Instalando dependências...")
    raise

# Configurações do banco de dados (carregadas do .env)
DB_CONFIG = {
    'host': os.getenv('DB_HOST', 'localhost'),
    'port': os.getenv('DB_PORT', '5432'),
    'database': os.getenv('DB_NAME', 'datawarehouse'),
    'username': os.getenv('DB_USER', 'dw_user'),
    'password': os.getenv('DB_PASSWORD', 'dw_password'),
}

print("🔧 Configuração carregada do .env:")
print(f"   Host: {DB_CONFIG['host']}:{DB_CONFIG['port']}")
print(f"   Database: {DB_CONFIG['database']}")
print(f"   User: {DB_CONFIG['username']}")
print("   ✅ Ambiente configurado!")

✅ SQLAlchemy importado com sucesso!
🔧 Configuração carregada do .env:
   Host: localhost:5432
   Database: datawarehouse
   User: dw_user
   ✅ Ambiente configurado!


## 2️⃣ PostgreSQL Connection and Database Creation

In [27]:
# Criar conexão com PostgreSQL
def create_db_connection():
    """Cria conexão com o banco PostgreSQL usando configurações do .env"""
    connection_string = f"postgresql://{DB_CONFIG['username']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['database']}"
    return create_engine(connection_string)

# Testar conexão
try:
    engine = create_db_connection()
    
    # Teste de conectividade
    with engine.connect() as conn:
        result = conn.execute(text("SELECT version()"))
        version = result.fetchone()[0]
        print("🎉 Conexão estabelecida com sucesso!")
        print(f"   PostgreSQL: {version.split(',')[0]}")
        
except Exception as e:
    print(f"❌ Erro na conexão: {e}")
    print("   💡 Verifique se o Docker está rodando: docker-compose up -d")

# Funções auxiliares (corrigidas para SQLAlchemy com auto-commit)
def execute_query(query, params=None):
    """Executa uma query SQL com sintaxe correta do SQLAlchemy"""
    with engine.begin() as conn:  # begin() garante auto-commit
        if params:
            return conn.execute(text(query), params)
        else:
            return conn.execute(text(query))

def load_dataframe(query, params=None):
    """Carrega dados do banco para DataFrame"""
    with engine.connect() as conn:
        if params:
            result = conn.execute(text(query), params)
        else:
            result = conn.execute(text(query))
        
        # Converter resultado para DataFrame
        rows = result.fetchall()
        columns = result.keys()
        return pd.DataFrame(rows, columns=columns)

print("✅ Funções de banco configuradas!")

🎉 Conexão estabelecida com sucesso!
   PostgreSQL: PostgreSQL 15.14 (Debian 15.14-1.pgdg13+1) on x86_64-pc-linux-gnu
✅ Funções de banco configuradas!


## 3️⃣ Create Initial Dimension and Fact Tables

Vamos verificar se as tabelas foram criadas corretamente pelo script de inicialização.

In [10]:
# Verificar as tabelas criadas
print("📊 Verificando estrutura das tabelas...")

# 1. Tabela de staging
print("\n1️⃣ STAGING.CLIENTES_SOURCE:")
staging_df = load_dataframe("SELECT * FROM staging.clientes_source LIMIT 5")
print(f"   Registros: {len(staging_df)}")
print(f"   Colunas: {list(staging_df.columns)}")
display(staging_df)

# 2. Dimensão SCD Type 2
print("\n2️⃣ DW.DIM_CLIENTE (SCD Type 2):")
dim_df = load_dataframe("SELECT * FROM dw.dim_cliente ORDER BY id_cliente, sk_cliente")
print(f"   Registros: {len(dim_df)}")
print(f"   Colunas: {list(dim_df.columns)}")
display(dim_df)

# 3. Fato de vendas
print("\n3️⃣ DW.FATO_VENDAS:")
fato_df = load_dataframe("SELECT * FROM dw.fato_vendas LIMIT 5")
print(f"   Registros: {len(fato_df)}")
print(f"   Colunas: {list(fato_df.columns)}")
display(fato_df)

print("\n✅ Estrutura inicial verificada!")

📊 Verificando estrutura das tabelas...

1️⃣ STAGING.CLIENTES_SOURCE:
   Registros: 4
   Colunas: ['id_cliente', 'nm_cliente', 'ds_email', 'cidade', 'uf', 'telefone', 'dt_nascimento', 'dt_processamento']


Unnamed: 0,id_cliente,nm_cliente,ds_email,cidade,uf,telefone,dt_nascimento,dt_processamento
0,1,João Silva,joao.silva@email.com,São Paulo,SP,11999999999,1985-03-15,2024-10-01
1,2,Maria Santos,maria.santos@email.com,Rio de Janeiro,RJ,21888888888,1990-07-22,2024-10-01
2,3,Carlos Oliveira,carlos.oliveira@email.com,Belo Horizonte,MG,31777777777,1987-12-10,2024-10-01
3,4,Ana Costa,ana.costa@email.com,Porto Alegre,RS,51666666666,1992-05-18,2024-10-01



2️⃣ DW.DIM_CLIENTE (SCD Type 2):
   Registros: 4
   Colunas: ['sk_cliente', 'id_cliente', 'nm_cliente', 'ds_email', 'cidade', 'uf', 'telefone', 'dt_nascimento', 'dt_inicio', 'dt_fim', 'fl_corrente', 'dt_criacao', 'dt_atualizacao']


Unnamed: 0,sk_cliente,id_cliente,nm_cliente,ds_email,cidade,uf,telefone,dt_nascimento,dt_inicio,dt_fim,fl_corrente,dt_criacao,dt_atualizacao
0,1,1,João Silva,joao.silva@email.com,São Paulo,SP,11999999999,1985-03-15,2024-10-01,9999-12-31,True,2025-10-27 02:56:43.865833,2025-10-27 02:56:43.865833
1,2,2,Maria Santos,maria.santos@email.com,Rio de Janeiro,RJ,21888888888,1990-07-22,2024-10-01,9999-12-31,True,2025-10-27 02:56:43.865833,2025-10-27 02:56:43.865833
2,3,3,Carlos Oliveira,carlos.oliveira@email.com,Belo Horizonte,MG,31777777777,1987-12-10,2024-10-01,9999-12-31,True,2025-10-27 02:56:43.865833,2025-10-27 02:56:43.865833
3,4,4,Ana Costa,ana.costa@email.com,Porto Alegre,RS,51666666666,1992-05-18,2024-10-01,9999-12-31,True,2025-10-27 02:56:43.865833,2025-10-27 02:56:43.865833



3️⃣ DW.FATO_VENDAS:
   Registros: 4
   Colunas: ['sk_venda', 'sk_cliente', 'id_produto', 'nm_produto', 'dt_venda', 'vl_venda', 'qtd_vendida', 'dt_ref', 'dt_criacao']


Unnamed: 0,sk_venda,sk_cliente,id_produto,nm_produto,dt_venda,vl_venda,qtd_vendida,dt_ref,dt_criacao
0,1,1,110,Produto 110,2024-10-15,550.0,1,2024-10-15,2025-10-27 02:56:43.868938
1,2,2,120,Produto 120,2024-10-15,600.0,1,2024-10-15,2025-10-27 02:56:43.868938
2,3,3,130,Produto 130,2024-10-15,650.0,1,2024-10-15,2025-10-27 02:56:43.868938
3,4,4,140,Produto 140,2024-10-15,700.0,1,2024-10-15,2025-10-27 02:56:43.868938



✅ Estrutura inicial verificada!


## 4️⃣ Load Sample Data for SCD2 Demo

Vamos simular a chegada de novos dados com mudanças para demonstrar o SCD Type 2.

In [11]:
# Simular chegada de novos dados (dt_ref = 2024-10-25)
dt_ref = '2024-10-25'
print(f"📅 Simulando processamento para dt_ref = {dt_ref}")

# Primeiro, limpar dados duplicados se existirem
try:
    execute_query("DELETE FROM staging.clientes_source WHERE dt_processamento = :dt_ref", {'dt_ref': dt_ref})
    print(f"🧹 Limpeza: dados antigos de {dt_ref} removidos")
except Exception as e:
    print(f"ℹ️  Limpeza: {e}")

# Novos dados que chegaram no sistema
novos_dados = [
    # Cliente 1: João mudou de cidade (São Paulo → Brasília)
    (1, 'João Silva', 'joao.silva@email.com', 'Brasília', 'DF', '11999999999', '1985-03-15'),
    
    # Cliente 2: Maria mudou email e telefone
    (2, 'Maria Santos', 'maria.santos.new@gmail.com', 'Rio de Janeiro', 'RJ', '21777777777', '1990-07-22'),
    
    # Cliente 3: Carlos sem mudanças
    (3, 'Carlos Oliveira', 'carlos.oliveira@email.com', 'Belo Horizonte', 'MG', '31777777777', '1987-12-10'),
    
    # Cliente 5: Novo cliente
    (5, 'Pedro Costa', 'pedro.costa@email.com', 'Salvador', 'BA', '71555555555', '1995-08-30'),
]

# Inserir na tabela de staging
insert_query = """
INSERT INTO staging.clientes_source 
(id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento, dt_processamento)
VALUES (:id_cliente, :nm_cliente, :ds_email, :cidade, :uf, :telefone, :dt_nascimento, :dt_processamento)
"""

inseridos = 0
for dados in novos_dados:
    try:
        params = {
            'id_cliente': dados[0],
            'nm_cliente': dados[1],
            'ds_email': dados[2],
            'cidade': dados[3],
            'uf': dados[4],
            'telefone': dados[5],
            'dt_nascimento': dados[6],
            'dt_processamento': dt_ref
        }
        execute_query(insert_query, params)
        inseridos += 1
        print(f"   ✓ Cliente {dados[0]} - {dados[1]} inserido")
    except Exception as e:
        print(f"   ❌ Erro ao inserir cliente {dados[0]}: {e}")

# Verificar dados inseridos
print(f"\n📋 Resumo: {inseridos} registros inseridos")
staging_novos = load_dataframe("SELECT * FROM staging.clientes_source WHERE dt_processamento = :dt_ref", {'dt_ref': dt_ref})
print(f"📊 Dados na staging para {dt_ref}:")
display(staging_novos)

print(f"\n✅ {len(staging_novos)} registros prontos para processamento!")

📅 Simulando processamento para dt_ref = 2024-10-25
🧹 Limpeza: dados antigos de 2024-10-25 removidos
   ✓ Cliente 1 - João Silva inserido
   ✓ Cliente 2 - Maria Santos inserido
   ✓ Cliente 3 - Carlos Oliveira inserido
   ✓ Cliente 5 - Pedro Costa inserido

📋 Resumo: 4 registros inseridos
📊 Dados na staging para 2024-10-25:


Unnamed: 0,id_cliente,nm_cliente,ds_email,cidade,uf,telefone,dt_nascimento,dt_processamento
0,1,João Silva,joao.silva@email.com,Brasília,DF,11999999999,1985-03-15,2024-10-25
1,2,Maria Santos,maria.santos.new@gmail.com,Rio de Janeiro,RJ,21777777777,1990-07-22,2024-10-25
2,3,Carlos Oliveira,carlos.oliveira@email.com,Belo Horizonte,MG,31777777777,1987-12-10,2024-10-25
3,5,Pedro Costa,pedro.costa@email.com,Salvador,BA,71555555555,1995-08-30,2024-10-25



✅ 4 registros prontos para processamento!


## 5️⃣ Implement SCD2 Logic - Identify Changes

Agora vamos implementar a lógica para identificar que tipos de mudanças ocorreram.

In [12]:
def analisar_mudancas_scd2(dt_ref):
    """
    Analisa as mudanças entre os dados de staging e a dimensão atual
    Retorna DataFrames categorizados por tipo de mudança
    """
    
    # 1. Carregar dados de origem (staging)
    df_source = load_dataframe(f"""
        SELECT id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento
        FROM staging.clientes_source 
        WHERE dt_processamento = '{dt_ref}'
    """)
    
    # 2. Carregar dimensão atual (apenas registros correntes)
    df_current = load_dataframe("""
        SELECT sk_cliente, id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento
        FROM dw.dim_cliente 
        WHERE fl_corrente = TRUE
    """)
    
    print(f"📊 Análise de mudanças para {dt_ref}:")
    print(f"   Source: {len(df_source)} registros")
    print(f"   Current: {len(df_current)} registros")
    
    # 3. Fazer merge para identificar tipos de mudança
    df_merged = df_source.merge(
        df_current, 
        on='id_cliente', 
        how='outer', 
        suffixes=('_source', '_current')
    )
    
    # 4. Categorizar mudanças
    
    # Novos clientes (estão no source, mas não no current)
    novos_clientes = df_merged[df_merged['sk_cliente'].isna()].copy()
    
    # Clientes que saíram (estão no current, mas não no source) - opcional
    clientes_saidos = df_merged[df_merged['nm_cliente_source'].isna()].copy()
    
    # Clientes existentes (estão em ambos)
    clientes_existentes = df_merged[
        df_merged['sk_cliente'].notna() & 
        df_merged['nm_cliente_source'].notna()
    ].copy()
    
    # Identificar mudanças nos clientes existentes
    def verificar_mudanca(row):
        campos_comparacao = ['nm_cliente', 'ds_email', 'cidade', 'uf', 'telefone', 'dt_nascimento']
        for campo in campos_comparacao:
            if str(row[f'{campo}_source']) != str(row[f'{campo}_current']):
                return True
        return False
    
    if not clientes_existentes.empty:
        clientes_existentes['tem_mudanca'] = clientes_existentes.apply(verificar_mudanca, axis=1)
        
        # Separar em com mudança e sem mudança
        clientes_com_mudanca = clientes_existentes[clientes_existentes['tem_mudanca']].copy()
        clientes_sem_mudanca = clientes_existentes[~clientes_existentes['tem_mudanca']].copy()
    else:
        clientes_com_mudanca = pd.DataFrame()
        clientes_sem_mudanca = pd.DataFrame()
    
    # 5. Mostrar resultados da análise
    print(f"\n🔍 Resultados da análise:")
    print(f"   🆕 Novos clientes: {len(novos_clientes)}")
    print(f"   🔄 Clientes com mudança: {len(clientes_com_mudanca)}")
    print(f"   ➡️  Clientes sem mudança: {len(clientes_sem_mudanca)}")
    print(f"   🚪 Clientes que saíram: {len(clientes_saidos)}")
    
    return {
        'novos_clientes': novos_clientes,
        'clientes_com_mudanca': clientes_com_mudanca,
        'clientes_sem_mudanca': clientes_sem_mudanca,
        'clientes_saidos': clientes_saidos,
        'df_source': df_source,
        'df_current': df_current
    }

# Executar análise
resultado_analise = analisar_mudancas_scd2(dt_ref)

# Mostrar detalhes das mudanças
if len(resultado_analise['clientes_com_mudanca']) > 0:
    print("\n📋 Detalhes dos clientes com mudança:")
    for _, row in resultado_analise['clientes_com_mudanca'].iterrows():
        print(f"\n   Cliente {row['id_cliente']} - {row['nm_cliente_source']}:")
        campos = ['nm_cliente', 'ds_email', 'cidade', 'uf', 'telefone']
        for campo in campos:
            valor_antigo = row[f'{campo}_current']
            valor_novo = row[f'{campo}_source']
            if str(valor_antigo) != str(valor_novo):
                print(f"     {campo}: '{valor_antigo}' → '{valor_novo}'")

if len(resultado_analise['novos_clientes']) > 0:
    print("\n📋 Novos clientes:")
    display(resultado_analise['novos_clientes'][['id_cliente', 'nm_cliente_source', 'cidade_source']].rename(columns={
        'nm_cliente_source': 'nome',
        'cidade_source': 'cidade'
    }))

📊 Análise de mudanças para 2024-10-25:
   Source: 4 registros
   Current: 4 registros

🔍 Resultados da análise:
   🆕 Novos clientes: 1
   🔄 Clientes com mudança: 2
   ➡️  Clientes sem mudança: 1
   🚪 Clientes que saíram: 1

📋 Detalhes dos clientes com mudança:

   Cliente 1 - João Silva:
     cidade: 'São Paulo' → 'Brasília'
     uf: 'SP' → 'DF'

   Cliente 2 - Maria Santos:
     ds_email: 'maria.santos@email.com' → 'maria.santos.new@gmail.com'
     telefone: '21888888888' → '21777777777'

📋 Novos clientes:


Unnamed: 0,id_cliente,nome,cidade
4,5,Pedro Costa,Salvador


## 6️⃣ Process Historical Records (Expire Old Versions)

Agora vamos "aposentar" os registros antigos que tiveram mudanças.

In [13]:
def expirar_registros_antigos(dt_ref, clientes_com_mudanca):
    """
    Expira os registros antigos setando fl_corrente = FALSE e dt_fim = dt_ref
    """
    if clientes_com_mudanca.empty:
        print("   ℹ️  Nenhum registro para expirar.")
        return
    
    print(f"🕒 Expirando registros antigos (dt_fim = {dt_ref})...")
    
    # Lista de IDs de clientes que tiveram mudança
    ids_clientes_mudanca = clientes_com_mudanca['id_cliente'].tolist()
    
    # Atualizar registros na dimensão (sintaxe correta SQLAlchemy)
    update_query = """
        UPDATE dw.dim_cliente 
        SET fl_corrente = FALSE,
            dt_fim = :dt_ref,
            dt_atualizacao = CURRENT_TIMESTAMP
        WHERE id_cliente = ANY(:ids_clientes)
          AND fl_corrente = TRUE
    """
    
    params = {
        'dt_ref': dt_ref,
        'ids_clientes': ids_clientes_mudanca
    }
    
    execute_query(update_query, params)
    
    print(f"   ✅ {len(ids_clientes_mudanca)} registros expirados.")
    
    # Verificar resultado (usando parâmetros também)
    verificacao = load_dataframe("""
        SELECT id_cliente, nm_cliente, cidade, dt_inicio, dt_fim, fl_corrente
        FROM dw.dim_cliente 
        WHERE id_cliente = ANY(:ids_clientes)
          AND dt_fim = :dt_ref
        ORDER BY id_cliente
    """, {
        'ids_clientes': ids_clientes_mudanca,
        'dt_ref': dt_ref
    })
    
    if not verificacao.empty:
        print("\n📋 Registros expirados:")
        display(verificacao)
    
    return verificacao

# Executar expiração dos registros antigos
registros_expirados = expirar_registros_antigos(
    dt_ref, 
    resultado_analise['clientes_com_mudanca']
)

🕒 Expirando registros antigos (dt_fim = 2024-10-25)...
   ✅ 2 registros expirados.

📋 Registros expirados:


Unnamed: 0,id_cliente,nm_cliente,cidade,dt_inicio,dt_fim,fl_corrente
0,1,João Silva,São Paulo,2024-10-01,2024-10-25,False
1,2,Maria Santos,Rio de Janeiro,2024-10-01,2024-10-25,False


## 7️⃣ Insert New Versions for Changed Records

Agora vamos inserir as novas versões dos clientes que mudaram + novos clientes.

In [14]:
def inserir_novas_versoes(dt_ref, clientes_com_mudanca, novos_clientes):
    """
    Insere novas versões dos clientes que mudaram + novos clientes
    """
    
    total_insercoes = 0
    
    # 1. Inserir novas versões de clientes que mudaram
    if not clientes_com_mudanca.empty:
        print(f"🔄 Inserindo novas versões de clientes que mudaram...")
        
        for _, row in clientes_com_mudanca.iterrows():
            insert_query = """
                INSERT INTO dw.dim_cliente 
                (id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento, 
                 dt_inicio, dt_fim, fl_corrente, dt_criacao, dt_atualizacao)
                VALUES (:id_cliente, :nm_cliente, :ds_email, :cidade, :uf, 
                        :telefone, :dt_nascimento, :dt_inicio, '9999-12-31', 
                        TRUE, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
            """
            
            params = {
                'id_cliente': row['id_cliente'],
                'nm_cliente': row['nm_cliente_source'],
                'ds_email': row['ds_email_source'],
                'cidade': row['cidade_source'],
                'uf': row['uf_source'],
                'telefone': row['telefone_source'],
                'dt_nascimento': row['dt_nascimento_source'],
                'dt_inicio': dt_ref
            }
            
            execute_query(insert_query, params)
            total_insercoes += 1
        
        print(f"   ✅ {len(clientes_com_mudanca)} novas versões inseridas.")
    
    # 2. Inserir novos clientes
    if not novos_clientes.empty:
        print(f"🆕 Inserindo novos clientes...")
        
        for _, row in novos_clientes.iterrows():
            insert_query = """
                INSERT INTO dw.dim_cliente 
                (id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento, 
                 dt_inicio, dt_fim, fl_corrente, dt_criacao, dt_atualizacao)
                VALUES (:id_cliente, :nm_cliente, :ds_email, :cidade, :uf, 
                        :telefone, :dt_nascimento, :dt_inicio, '9999-12-31', 
                        TRUE, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
            """
            
            params = {
                'id_cliente': row['id_cliente'],
                'nm_cliente': row['nm_cliente_source'],
                'ds_email': row['ds_email_source'],
                'cidade': row['cidade_source'],
                'uf': row['uf_source'],
                'telefone': row['telefone_source'],
                'dt_nascimento': row['dt_nascimento_source'],
                'dt_inicio': dt_ref
            }
            
            execute_query(insert_query, params)
            total_insercoes += 1
        
        print(f"   ✅ {len(novos_clientes)} novos clientes inseridos.")
    
    print(f"\n🎉 Total de inserções: {total_insercoes}")
    
    # Verificar resultado final
    print(f"\n📊 Estado atual da dimensão:")
    resultado_final = load_dataframe("""
        SELECT 
            id_cliente,
            nm_cliente,
            cidade,
            dt_inicio,
            dt_fim,
            fl_corrente,
            sk_cliente
        FROM dw.dim_cliente 
        ORDER BY id_cliente, sk_cliente
    """)
    
    display(resultado_final)
    
    return resultado_final

# Executar inserção das novas versões
resultado_final = inserir_novas_versoes(
    dt_ref,
    resultado_analise['clientes_com_mudanca'],
    resultado_analise['novos_clientes']
)

🔄 Inserindo novas versões de clientes que mudaram...
   ✅ 2 novas versões inseridas.
🆕 Inserindo novos clientes...
   ✅ 1 novos clientes inseridos.

🎉 Total de inserções: 3

📊 Estado atual da dimensão:


Unnamed: 0,id_cliente,nm_cliente,cidade,dt_inicio,dt_fim,fl_corrente,sk_cliente
0,1,João Silva,São Paulo,2024-10-01,2024-10-25,False,1
1,1,João Silva,Brasília,2024-10-25,9999-12-31,True,5
2,2,Maria Santos,Rio de Janeiro,2024-10-01,2024-10-25,False,2
3,2,Maria Santos,Rio de Janeiro,2024-10-25,9999-12-31,True,6
4,3,Carlos Oliveira,Belo Horizonte,2024-10-01,9999-12-31,True,3
5,4,Ana Costa,Porto Alegre,2024-10-01,9999-12-31,True,4
6,5,Pedro Costa,Salvador,2024-10-25,9999-12-31,True,7


## 8️⃣ Query Historical Data with Point-in-Time Joins

Agora vamos demonstrar como consultar dados históricos usando Point-in-Time Joins.

In [15]:
# Primeiro, vamos adicionar algumas vendas para demonstrar o histórico
print("💰 Adicionando vendas para demonstrar Point-in-Time Joins...")

# Vendas antes das mudanças (2024-10-20)
vendas_antigas = [
    (1, 'Produto A', '2024-10-20', 1000.00),
    (2, 'Produto B', '2024-10-20', 750.00),
]

# Vendas depois das mudanças (2024-10-26) 
vendas_novas = [
    (1, 'Produto C', '2024-10-26', 1200.00),  # João já em Brasília
    (2, 'Produto D', '2024-10-26', 850.00),   # Maria com novo email
    (5, 'Produto E', '2024-10-26', 600.00),   # Pedro (novo cliente)
]

def inserir_vendas(vendas, dt_ref_venda):
    for id_cliente, produto, dt_venda, valor in vendas:
        # Encontrar a sk_cliente válida na data da venda (sintaxe corrigida)
        sk_query = """
            SELECT sk_cliente 
            FROM dw.dim_cliente 
            WHERE id_cliente = :id_cliente 
              AND :dt_venda BETWEEN dt_inicio AND dt_fim
        """
        
        result = execute_query(sk_query, {
            'id_cliente': id_cliente,
            'dt_venda': dt_venda
        })
        
        sk_cliente = result.fetchone()
        if sk_cliente:
            insert_venda = """
                INSERT INTO dw.fato_vendas 
                (sk_cliente, id_produto, nm_produto, dt_venda, vl_venda, qtd_vendida, dt_ref)
                VALUES (:sk_cliente, :id_produto, :nm_produto, :dt_venda, 
                        :vl_venda, 1, :dt_ref)
            """
            
            execute_query(insert_venda, {
                'sk_cliente': sk_cliente[0],
                'id_produto': hash(produto) % 1000,
                'nm_produto': produto,
                'dt_venda': dt_venda,
                'vl_venda': valor,
                'dt_ref': dt_ref_venda
            })

# Inserir vendas
inserir_vendas(vendas_antigas, '2024-10-20')
inserir_vendas(vendas_novas, '2024-10-26')

print("✅ Vendas inseridas!")

# Agora fazer as consultas históricas
print("\n🔍 CONSULTA 1: Vendas com informações históricas corretas")
print("   (Point-in-Time Join - mostra a cidade onde o cliente estava na época da venda)")

consulta_historica = load_dataframe("""
    SELECT 
        f.dt_venda,
        f.nm_produto,
        f.vl_venda,
        d.id_cliente,
        d.nm_cliente,
        d.cidade AS cidade_na_epoca_da_venda,
        d.ds_email AS email_na_epoca_da_venda,
        d.dt_inicio AS cliente_valido_desde,
        d.dt_fim AS cliente_valido_ate,
        d.fl_corrente
    FROM dw.fato_vendas f
    INNER JOIN dw.dim_cliente d 
        ON f.sk_cliente = d.sk_cliente
    ORDER BY f.dt_venda, d.id_cliente
""")

display(consulta_historica)

print("\n📊 Observe que:")
print("   • João comprou em 2024-10-20 quando estava em São Paulo")
print("   • João comprou em 2024-10-26 quando já estava em Brasília")
print("   • Maria comprou com o email antigo em 2024-10-20")
print("   • Maria comprou com o email novo em 2024-10-26")
print("   • Pedro só aparece nas vendas de 2024-10-26 (é cliente novo)")

💰 Adicionando vendas para demonstrar Point-in-Time Joins...
✅ Vendas inseridas!

🔍 CONSULTA 1: Vendas com informações históricas corretas
   (Point-in-Time Join - mostra a cidade onde o cliente estava na época da venda)


Unnamed: 0,dt_venda,nm_produto,vl_venda,id_cliente,nm_cliente,cidade_na_epoca_da_venda,email_na_epoca_da_venda,cliente_valido_desde,cliente_valido_ate,fl_corrente
0,2024-10-15,Produto 110,550.0,1,João Silva,São Paulo,joao.silva@email.com,2024-10-01,2024-10-25,False
1,2024-10-15,Produto 120,600.0,2,Maria Santos,Rio de Janeiro,maria.santos@email.com,2024-10-01,2024-10-25,False
2,2024-10-15,Produto 130,650.0,3,Carlos Oliveira,Belo Horizonte,carlos.oliveira@email.com,2024-10-01,9999-12-31,True
3,2024-10-15,Produto 140,700.0,4,Ana Costa,Porto Alegre,ana.costa@email.com,2024-10-01,9999-12-31,True
4,2024-10-20,Produto A,1000.0,1,João Silva,São Paulo,joao.silva@email.com,2024-10-01,2024-10-25,False
5,2024-10-20,Produto B,750.0,2,Maria Santos,Rio de Janeiro,maria.santos@email.com,2024-10-01,2024-10-25,False
6,2024-10-26,Produto C,1200.0,1,João Silva,Brasília,joao.silva@email.com,2024-10-25,9999-12-31,True
7,2024-10-26,Produto D,850.0,2,Maria Santos,Rio de Janeiro,maria.santos.new@gmail.com,2024-10-25,9999-12-31,True
8,2024-10-26,Produto E,600.0,5,Pedro Costa,Salvador,pedro.costa@email.com,2024-10-25,9999-12-31,True



📊 Observe que:
   • João comprou em 2024-10-20 quando estava em São Paulo
   • João comprou em 2024-10-26 quando já estava em Brasília
   • Maria comprou com o email antigo em 2024-10-20
   • Maria comprou com o email novo em 2024-10-26
   • Pedro só aparece nas vendas de 2024-10-26 (é cliente novo)


## 9️⃣ Query Current Data Only

Agora vamos mostrar como obter apenas os dados atuais dos clientes.

In [16]:
print("🔍 CONSULTA 2: Apenas dados atuais (fl_corrente = TRUE)")
print("   (Para dashboards, APIs, relatórios que só precisam do 'estado atual')")

clientes_atuais = load_dataframe("""
    SELECT 
        id_cliente,
        nm_cliente,
        ds_email,
        cidade,
        uf,
        telefone,
        dt_nascimento,
        dt_inicio AS valido_desde,
        sk_cliente
    FROM dw.dim_cliente 
    WHERE fl_corrente = TRUE
    ORDER BY id_cliente
""")

print(f"\n📊 Clientes atuais ({len(clientes_atuais)} registros):")
display(clientes_atuais)

print("\n🔍 CONSULTA 3: Comparação Antes vs Depois")
print("   (Histórico completo mostrando todas as versões)")

historico_completo = load_dataframe("""
    SELECT 
        id_cliente,
        nm_cliente,
        cidade,
        ds_email,
        dt_inicio,
        dt_fim,
        fl_corrente,
        sk_cliente,
        CASE 
            WHEN fl_corrente = TRUE THEN '🟢 ATUAL'
            ELSE '🔴 HISTÓRICO'
        END as status
    FROM dw.dim_cliente 
    ORDER BY id_cliente, sk_cliente
""")

print(f"\n📊 Histórico completo ({len(historico_completo)} registros):")
display(historico_completo)

print("\n💡 INSIGHTS:")
print("   • Cliente 1 (João): 2 versões - mudou de São Paulo para Brasília")
print("   • Cliente 2 (Maria): 2 versões - mudou email e telefone") 
print("   • Cliente 3 (Carlos): 1 versão - sem mudanças")
print("   • Cliente 4 (Ana): 1 versão - não apareceu nos novos dados")
print("   • Cliente 5 (Pedro): 1 versão - cliente novo")

🔍 CONSULTA 2: Apenas dados atuais (fl_corrente = TRUE)
   (Para dashboards, APIs, relatórios que só precisam do 'estado atual')

📊 Clientes atuais (5 registros):


Unnamed: 0,id_cliente,nm_cliente,ds_email,cidade,uf,telefone,dt_nascimento,valido_desde,sk_cliente
0,1,João Silva,joao.silva@email.com,Brasília,DF,11999999999,1985-03-15,2024-10-25,5
1,2,Maria Santos,maria.santos.new@gmail.com,Rio de Janeiro,RJ,21777777777,1990-07-22,2024-10-25,6
2,3,Carlos Oliveira,carlos.oliveira@email.com,Belo Horizonte,MG,31777777777,1987-12-10,2024-10-01,3
3,4,Ana Costa,ana.costa@email.com,Porto Alegre,RS,51666666666,1992-05-18,2024-10-01,4
4,5,Pedro Costa,pedro.costa@email.com,Salvador,BA,71555555555,1995-08-30,2024-10-25,7



🔍 CONSULTA 3: Comparação Antes vs Depois
   (Histórico completo mostrando todas as versões)

📊 Histórico completo (7 registros):


Unnamed: 0,id_cliente,nm_cliente,cidade,ds_email,dt_inicio,dt_fim,fl_corrente,sk_cliente,status
0,1,João Silva,São Paulo,joao.silva@email.com,2024-10-01,2024-10-25,False,1,🔴 HISTÓRICO
1,1,João Silva,Brasília,joao.silva@email.com,2024-10-25,9999-12-31,True,5,🟢 ATUAL
2,2,Maria Santos,Rio de Janeiro,maria.santos@email.com,2024-10-01,2024-10-25,False,2,🔴 HISTÓRICO
3,2,Maria Santos,Rio de Janeiro,maria.santos.new@gmail.com,2024-10-25,9999-12-31,True,6,🟢 ATUAL
4,3,Carlos Oliveira,Belo Horizonte,carlos.oliveira@email.com,2024-10-01,9999-12-31,True,3,🟢 ATUAL
5,4,Ana Costa,Porto Alegre,ana.costa@email.com,2024-10-01,9999-12-31,True,4,🟢 ATUAL
6,5,Pedro Costa,Salvador,pedro.costa@email.com,2024-10-25,9999-12-31,True,7,🟢 ATUAL



💡 INSIGHTS:
   • Cliente 1 (João): 2 versões - mudou de São Paulo para Brasília
   • Cliente 2 (Maria): 2 versões - mudou email e telefone
   • Cliente 3 (Carlos): 1 versão - sem mudanças
   • Cliente 4 (Ana): 1 versão - não apareceu nos novos dados
   • Cliente 5 (Pedro): 1 versão - cliente novo


## 🔟 Complete SCD2 ETL Pipeline Function

Agora vamos criar uma função completa que automatiza todo o processo SCD2.

In [28]:
def processar_scd2_completo(dt_ref):
    """
    Pipeline ETL completo para SCD Type 2
    
    Args:
        dt_ref (str): Data de referência no formato 'YYYY-MM-DD'
    
    Returns:
        dict: Relatório do processamento
    """
    
    print(f"🚀 INICIANDO PROCESSAMENTO SCD2 para {dt_ref}")
    print("=" * 60)
    
    relatorio = {
        'dt_ref': dt_ref,
        'novos_clientes': 0,
        'clientes_atualizados': 0,
        'registros_expirados': 0,
        'total_inseridos': 0,
        'erros': []
    }
    
    try:
        # PASSO 1: Carregar dados de origem
        print(f"📥 PASSO 1: Carregando dados de origem...")
        df_source = load_dataframe("""
            SELECT id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento
            FROM staging.clientes_source 
            WHERE dt_processamento = :dt_ref
        """, {'dt_ref': dt_ref})
        
        if df_source.empty:
            print(f"   ⚠️  Nenhum dado encontrado para {dt_ref}")
            return relatorio
        
        print(f"   ✅ {len(df_source)} registros carregados")
        
        # PASSO 2: Carregar dimensão atual
        print(f"📊 PASSO 2: Carregando dimensão atual...")
        df_current = load_dataframe("""
            SELECT sk_cliente, id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento
            FROM dw.dim_cliente 
            WHERE fl_corrente = TRUE
        """)
        print(f"   ✅ {len(df_current)} registros atuais carregados")
        
        # PASSO 3: Identificar mudanças
        print(f"🔍 PASSO 3: Identificando mudanças...")
        df_merged = df_source.merge(df_current, on='id_cliente', how='outer', suffixes=('_source', '_current'))
        
        # Novos clientes
        novos_clientes = df_merged[df_merged['sk_cliente'].isna()].copy()
        
        # Clientes existentes
        clientes_existentes = df_merged[
            df_merged['sk_cliente'].notna() & df_merged['nm_cliente_source'].notna()
        ].copy()
        
        # Verificar mudanças
        def verificar_mudanca(row):
            campos = ['nm_cliente', 'ds_email', 'cidade', 'uf', 'telefone', 'dt_nascimento']
            return any(str(row[f'{campo}_source']) != str(row[f'{campo}_current']) for campo in campos)
        
        if not clientes_existentes.empty:
            clientes_existentes['tem_mudanca'] = clientes_existentes.apply(verificar_mudanca, axis=1)
            clientes_com_mudanca = clientes_existentes[clientes_existentes['tem_mudanca']].copy()
        else:
            clientes_com_mudanca = pd.DataFrame()
        
        print(f"   🆕 Novos clientes: {len(novos_clientes)}")
        print(f"   🔄 Clientes com mudança: {len(clientes_com_mudanca)}")
        
        relatorio['novos_clientes'] = len(novos_clientes)
        relatorio['clientes_atualizados'] = len(clientes_com_mudanca)
        
        # PASSO 4: Expirar registros antigos
        if not clientes_com_mudanca.empty:
            print(f"🕒 PASSO 4: Expirando registros antigos...")
            ids_clientes_mudanca = clientes_com_mudanca['id_cliente'].tolist()
            
            update_query = """
                UPDATE dw.dim_cliente 
                SET fl_corrente = FALSE, dt_fim = :dt_ref, dt_atualizacao = CURRENT_TIMESTAMP
                WHERE id_cliente = ANY(:ids_clientes) AND fl_corrente = TRUE
            """
            
            execute_query(update_query, {
                'dt_ref': dt_ref,
                'ids_clientes': ids_clientes_mudanca
            })
            
            relatorio['registros_expirados'] = len(ids_clientes_mudanca)
            print(f"   ✅ {len(ids_clientes_mudanca)} registros expirados")
        
        # PASSO 5: Inserir novas versões
        print(f"💾 PASSO 5: Inserindo novas versões...")
        
        # Preparar dados para inserção
        registros_para_inserir = []
        
        # Novos clientes
        for _, row in novos_clientes.iterrows():
            registros_para_inserir.append({
                'id_cliente': row['id_cliente'],
                'nm_cliente': row['nm_cliente_source'],
                'ds_email': row['ds_email_source'],
                'cidade': row['cidade_source'],
                'uf': row['uf_source'],
                'telefone': row['telefone_source'],
                'dt_nascimento': row['dt_nascimento_source'],
                'dt_inicio': dt_ref
            })
        
        # Clientes atualizados
        for _, row in clientes_com_mudanca.iterrows():
            registros_para_inserir.append({
                'id_cliente': row['id_cliente'],
                'nm_cliente': row['nm_cliente_source'],
                'ds_email': row['ds_email_source'],
                'cidade': row['cidade_source'],
                'uf': row['uf_source'],
                'telefone': row['telefone_source'],
                'dt_nascimento': row['dt_nascimento_source'],
                'dt_inicio': dt_ref
            })
        
        # Inserir em lote (sintaxe corrigida)
        insert_query = """
            INSERT INTO dw.dim_cliente 
            (id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento, 
             dt_inicio, dt_fim, fl_corrente, dt_criacao, dt_atualizacao)
            VALUES (:id_cliente, :nm_cliente, :ds_email, :cidade, :uf, 
                    :telefone, :dt_nascimento, :dt_inicio, '9999-12-31', 
                    TRUE, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)
        """
        
        for registro in registros_para_inserir:
            execute_query(insert_query, registro)
        
        relatorio['total_inseridos'] = len(registros_para_inserir)
        print(f"   ✅ {len(registros_para_inserir)} registros inseridos")
        
        # PASSO 6: Relatório final
        print(f"📋 PASSO 6: Gerando relatório final...")
        
        total_dim = execute_query("SELECT COUNT(*) FROM dw.dim_cliente").fetchone()[0]
        total_atuais = execute_query("SELECT COUNT(*) FROM dw.dim_cliente WHERE fl_corrente = TRUE").fetchone()[0]
        
        print(f"\n🎉 PROCESSAMENTO CONCLUÍDO COM SUCESSO!")
        print(f"   📊 Total de registros na dimensão: {total_dim}")
        print(f"   📊 Registros atuais: {total_atuais}")
        print(f"   🆕 Novos clientes: {relatorio['novos_clientes']}")
        print(f"   🔄 Clientes atualizados: {relatorio['clientes_atualizados']}")
        print(f"   💾 Total inserido: {relatorio['total_inseridos']}")
        
    except Exception as e:
        print(f"❌ ERRO durante o processamento: {e}")
        relatorio['erros'].append(str(e))
    
    print("=" * 60)
    return relatorio

# TESTE: Simular um novo lote de dados
print("🧪 TESTANDO PIPELINE COMPLETO COM NOVOS DADOS")
print()

# Adicionar dados para teste (dt_ref = 2024-10-27)
dt_ref_teste = '2024-10-27'

# Novos dados de teste
dados_teste = [
    # João mudou telefone
    (1, 'João Silva', 'joao.silva@email.com', 'Brasília', 'DF', '11888888888', '1985-03-15'),
    
    # Carlos mudou de cidade
    (3, 'Carlos Oliveira', 'carlos.oliveira@email.com', 'São Paulo', 'SP', '31777777777', '1987-12-10'),
    
    # Ana voltou (estava ausente no lote anterior)
    (4, 'Ana Costa', 'ana.costa@email.com', 'Porto Alegre', 'RS', '51666666666', '1992-05-18'),
    
    # Cliente totalmente novo
    (6, 'Lucas Fernandes', 'lucas.fernandes@email.com', 'Curitiba', 'PR', '41444444444', '1988-11-25'),
]

# Inserir na staging (sintaxe corrigida)
for dados in dados_teste:
    params = {
        'id_cliente': dados[0],
        'nm_cliente': dados[1],
        'ds_email': dados[2],
        'cidade': dados[3],
        'uf': dados[4],
        'telefone': dados[5],
        'dt_nascimento': dados[6],
        'dt_processamento': dt_ref_teste
    }
    execute_query("""
        INSERT INTO staging.clientes_source 
        (id_cliente, nm_cliente, ds_email, cidade, uf, telefone, dt_nascimento, dt_processamento)
        VALUES (:id_cliente, :nm_cliente, :ds_email, :cidade, :uf, :telefone, :dt_nascimento, :dt_processamento)
    """, params)

# Executar pipeline
relatorio_teste = processar_scd2_completo(dt_ref_teste)

🧪 TESTANDO PIPELINE COMPLETO COM NOVOS DADOS

🚀 INICIANDO PROCESSAMENTO SCD2 para 2024-10-27
📥 PASSO 1: Carregando dados de origem...
   ✅ 4 registros carregados
📊 PASSO 2: Carregando dimensão atual...
   ✅ 4 registros atuais carregados
🔍 PASSO 3: Identificando mudanças...
   🆕 Novos clientes: 1
   🔄 Clientes com mudança: 2
🕒 PASSO 4: Expirando registros antigos...
   ✅ 2 registros expirados
💾 PASSO 5: Inserindo novas versões...
   ✅ 3 registros inseridos
📋 PASSO 6: Gerando relatório final...

🎉 PROCESSAMENTO CONCLUÍDO COM SUCESSO!
   📊 Total de registros na dimensão: 7
   📊 Registros atuais: 5
   🆕 Novos clientes: 1
   🔄 Clientes atualizados: 2
   💾 Total inserido: 3


## 🎯 Resumo e Próximos Passos

### ✅ O que você aprendeu:

1. **Conceitos SCD Type 2**: Como manter histórico completo de mudanças
2. **Estrutura de dados**: Surrogate Key, Business Key, dt_inicio, dt_fim, fl_corrente
3. **Point-in-Time Joins**: Como consultar dados históricos corretamente
4. **Pipeline ETL**: Processo completo automatizado para SCD2
5. **Dados atuais vs históricos**: Como obter ambas as visões conforme necessário

### 🚀 Próximos passos sugeridos:

1. **Implementar com PySpark**: Para volumes maiores de dados
2. **Delta Lake MERGE**: Usar `MERGE INTO` para SCD2 mais eficiente
3. **Monitoramento**: Adicionar logs e métricas ao pipeline
4. **Testes automatizados**: Validar integridade dos dados SCD2
5. **Performance**: Otimizar índices e particionamento

### 📚 Comandos Docker úteis:

```bash
# Iniciar ambiente
docker-compose up -d

# Parar ambiente  
docker-compose down

# Ver logs
docker-compose logs postgres

# Acessar PgAdmin: http://localhost:8080
# Email: admin@datawarehouse.com
# Senha: admin123
```

### 💡 Pontos importantes:

- **SCD Type 2** é essencial para Data Warehousing
- **Point-in-Time Joins** garantem precisão histórica
- **Pipeline automatizado** reduz erros manuais
- **fl_corrente** facilita consultas de dados atuais
- **Particionamento** melhora performance em grandes volumes

**🎉 Parabéns! Você dominou os conceitos fundamentais de SCD Type 2!**