# 01 - Pipeline Bronze (Ingestão de Dados Brutos)
## Camada Bronze: Raw Data

### O que é a Camada Bronze?
A Camada Bronze é o primeiro estágio do nosso pipeline de dados. Pense nela como um "armazém de dados brutos" onde guardamos as informações exatamente como elas vêm das fontes originais, sem nenhuma modificação ou limpeza.

### Por que precisamos dela?
- **Backup histórico**: Se algo der errado nas próximas etapas, podemos sempre voltar aos dados originais
- **Rastreabilidade**: Sabemos exatamente o que foi recebido da fonte e quando
- **Flexibilidade**: Podemos reprocessar os dados de formas diferentes sem precisar baixá-los novamente

### O que este notebook faz?
1. **Busca dados de 2 APIs públicas na internet**: Uma com informações de países e outra com taxas de câmbio
2. **Salva os dados em formato bruto**: JSON (formato de texto estruturado)
3. **Adiciona informações de controle**: Data e hora em que os dados foram coletados
4. **Organiza os dados por data**: Para facilitar a busca futura

### APIs utilizadas:
- **REST Countries API**: Dados de 195 países (população, área, capital, idiomas, etc.)
- **Exchange Rate API**: Taxas de câmbio (quanto vale cada moeda em relação ao dólar)


## 1. Preparação do Ambiente

### O que vai acontecer aqui?
Antes de começar a trabalhar, precisamos preparar nosso ambiente de trabalho. É como arrumar uma bancada antes de cozinhar: pegamos todos os utensílios e ingredientes que vamos usar.

### O que vamos preparar?
1. **Bibliotecas**: Ferramentas prontas que nos ajudam a fazer tarefas específicas (ex: baixar dados da internet, trabalhar com datas)
2. **Endereços das APIs**: URLs (links) de onde vamos buscar os dados
3. **Configurações de segurança**: Tempo máximo de espera, número de tentativas se algo falhar
4. **Nomes das tabelas**: Onde vamos salvar os dados no banco de dados
5. **Funções auxiliares**: "Receitas" de código que vamos reutilizar várias vezes

### Por que isso é importante?
Se organizarmos bem essa etapa, o resto do processo fica muito mais fácil e podemos reutilizar esse código no futuro.


In [0]:
# Importações
import requests
import json
from datetime import datetime, date
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time

# Configurações (inline para evitar problemas com %run)
API_URLS = {
    'countries': 'https://restcountries.com/v3.1/independent?status=true',
    'exchange_rates': 'https://api.exchangerate-api.com/v4/latest/USD'
}

REQUEST_CONFIG = {
    'timeout': 30,
    'max_retries': 3,
    'retry_delay': 5
}

SCHEMAS = {
    'bronze': 'workspace.bronze',
    'silver': 'workspace.silver',
    'gold': 'workspace.gold'
}

BRONZE_TABLES = {
    'countries': f"{SCHEMAS['bronze']}.countries_raw",
    'exchange_rates': f"{SCHEMAS['bronze']}.exchange_rates_raw"
}

# Funções utilitárias
def fetch_api_data(url, timeout=30, max_retries=3):
    """Faz requisição HTTP com retry logic"""
    for attempt in range(max_retries):
        try:
            response = requests.get(url, timeout=timeout)
            response.raise_for_status()
            return response.json()
        except requests.exceptions.RequestException as e:
            print(f"Tentativa {attempt + 1}/{max_retries} falhou: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(REQUEST_CONFIG['retry_delay'])
            else:
                raise Exception(f"Falha ao acessar API após {max_retries} tentativas: {str(e)}")

def get_execution_metadata():
    """Retorna metadados de execução com ID único por execução"""
    now = datetime.now()
    return {
        'ingestion_timestamp': now,
        'execution_date': now.date(),
        'execution_id': now.strftime('%Y%m%d_%H%M%S')  # ID único: YYYYMMDD_HHMMSS
    }

def log_metrics(stage, table_name, record_count, execution_time=None):
    """Loga métricas de execução"""
    print(f"\n{'='*60}")
    print(f"MÉTRICAS - {stage.upper()}")
    print(f"{'='*60}")
    print(f"Tabela: {table_name}")
    print(f"Registros: {record_count:,}")
    if execution_time:
        print(f"Tempo de execução: {execution_time:.2f} segundos")
    print(f"Timestamp: {datetime.now()}")
    print(f"{'='*60}\n")

def create_database_if_not_exists(spark, schema_name):
    """Cria database se não existir"""
    spark.sql(f"CREATE DATABASE IF NOT EXISTS {schema_name}")
    print(f"Database '{schema_name}' verificado/criado com sucesso")

def optimize_table(spark, table_name, zorder_columns=None):
    """Otimiza tabela Delta com OPTIMIZE e Z-ORDER"""
    print(f"Otimizando tabela {table_name}...")
    if zorder_columns:
        zorder_cols = ', '.join(zorder_columns)
        spark.sql(f"OPTIMIZE {table_name} ZORDER BY ({zorder_cols})")
    else:
        spark.sql(f"OPTIMIZE {table_name}")
    print(f"Tabela {table_name} otimizada com sucesso")

print("✓ Configurações e funções carregadas com sucesso!")

✓ Configurações e funções carregadas com sucesso!


### Iniciando o Processo

**O que este código faz?**
- Marca o momento exato em que o pipeline começou a rodar
- Mostra na tela informações sobre quando o processo iniciou

**Por que fazer isso?**
Isso nos ajuda a medir quanto tempo o processo levou e a identificar quando os dados foram coletados. Se algo der errado, sabemos exatamente em que momento aconteceu.


In [0]:
import time
start_time = time.time()

print("Pipeline Bronze - Iniciado")
print(f"Timestamp: {datetime.now()}")
print(f"Data de Execução: {date.today()}")

Pipeline Bronze - Iniciado
Timestamp: 2025-12-07 13:04:48.931962
Data de Execução: 2025-12-07


## 2. Criando o "Armazém" Bronze

### O que é um schema/database?
Pense em um schema como uma **pasta no computador**. Dentro dele, vamos criar várias "planilhas" (tabelas) com nossos dados. Estamos criando uma pasta chamada `workspace.bronze` para guardar os dados brutos.

### Por que verificar se já existe?
Se esta pasta já existe de uma execução anterior, não queremos criar outra. É como verificar se uma pasta já existe antes de tentar criá-la no Windows.

### O que acontece aqui?
O código cria a pasta `bronze` se ela não existir ainda. Se já existir, apenas confirma que ela está lá.


In [0]:
# Criar schema bronze se não existir
create_database_if_not_exists(spark, SCHEMAS['bronze'])

Database 'workspace.bronze' verificado/criado com sucesso


## 3. Coletando Dados dos Países

### O que é uma API?
Uma API é como um **garçom em um restaurante**. Você faz um pedido (requisição), ele vai até a cozinha (servidor) e traz sua comida (dados). No nosso caso, estamos pedindo informações sobre países para a REST Countries API.

### Passo 1: Fazendo o Pedido à API

**O que este código faz?**
1. Acessa o endereço da API na internet (como abrir um site no navegador)
2. Faz o pedido: "Me dê informações sobre todos os países independentes"
3. Espera a resposta (até 30 segundos)
4. Se der erro (internet caiu, servidor lento), tenta novamente até 3 vezes
5. Recebe os dados em formato JSON (como um grande dicionário de informações)

**O que esperamos receber?**
Informações de aproximadamente **195 países por execução**, incluindo:
- Nome oficial e comum
- Capital
- População
- Área territorial
- Idiomas falados
- Moedas utilizadas
- Coordenadas geográficas
- E muito mais!


In [0]:
print("\n" + "="*60)
print("INGESTÃO: REST COUNTRIES API")
print("="*60)

# Fazer requisição à API
countries_data = fetch_api_data(
    url=API_URLS['countries'],
    timeout=REQUEST_CONFIG['timeout'],
    max_retries=REQUEST_CONFIG['max_retries']
)

print(f"✓ Dados recebidos: {len(countries_data)} países")


INGESTÃO: REST COUNTRIES API
✓ Dados recebidos: 195 países


### Passo 2: Preparando os Dados para Salvar

**O que está acontecendo aqui?**
Agora que temos os dados dos países, precisamos **adicionar informações de controle** antes de salvá-los. É como carimbar um documento com a data de recebimento.

**Informações que estamos adicionando:**
1. **data**: O JSON completo do país (todas as informações) convertido para texto
2. **ingestion_timestamp**: Data e hora exata em que coletamos esses dados (ex: 2025-12-05 14:32:15)
3. **data_source**: De onde vieram os dados (rest_countries_api)
4. **execution_date**: Apenas a data (ex: 2025-12-05) - usada para organizar os dados por dia

**Por que converter JSON para texto (string)?**
Na camada Bronze, queremos manter os dados **exatamente** como vieram da API. Guardar como texto garante que nada seja modificado ou perdido. Nas próximas etapas, vamos "abrir" esse texto e extrair as informações que precisamos.

**Analogia:**
É como receber 195 cartas lacradas (cada país = uma carta). Em vez de abrir e organizar o conteúdo agora, estamos apenas:
- Guardando as cartas lacradas
- Anotando quando recebemos cada uma
- Marcando de onde vieram


In [0]:
# Obter metadados de execução
metadata = get_execution_metadata()

# Preparar dados com metadados
countries_records = []
for country in countries_data:
    record = {
        'data': json.dumps(country),  # JSON como string
        'ingestion_timestamp': metadata['ingestion_timestamp'],
        'data_source': 'rest_countries_api',
        'execution_date': metadata['execution_date']
    }
    countries_records.append(record)

print(f"✓ Registros preparados: {len(countries_records)}")

✓ Registros preparados: 195


### Passo 3: Transformando em uma "Planilha" (DataFrame)

**O que é um DataFrame?**
Um DataFrame é como uma **planilha do Excel** em memória, com linhas e colunas. Cada linha é um país, e cada coluna é um tipo de informação (data, timestamp, fonte, etc.).

**O que estamos fazendo aqui?**
1. **Definindo a estrutura da planilha** (schema): Quais colunas teremos e que tipo de dado cada uma vai guardar
   - `data`: Texto (STRING) - obrigatório (NOT NULL)
   - `ingestion_timestamp`: Data e hora (TIMESTAMP) - obrigatório
   - `data_source`: Texto (STRING) - obrigatório
   - `execution_date`: Data (DATE) - obrigatório

2. **Criando a planilha**: Pegamos os 195 registros que preparamos e colocamos nessa estrutura

3. **Mostrando uma amostra**: Exibimos 3 linhas na tela para confirmar que está tudo certo

**Por que definir tipos de dados?**
Se dissermos que uma coluna é de data, o banco de dados sabe fazer operações específicas com ela (ex: filtrar por mês, calcular diferenças de dias). Se deixarmos tudo como texto, perdemos essas facilidades.


In [0]:
# Criar DataFrame
schema = StructType([
    StructField('data', StringType(), False),
    StructField('ingestion_timestamp', TimestampType(), False),
    StructField('data_source', StringType(), False),
    StructField('execution_date', DateType(), False)
])

df_countries_bronze = spark.createDataFrame(countries_records, schema=schema)

# Visualizar amostra
print("\nAmostra dos dados:")
df_countries_bronze.select(
    substring(col('data'), 1, 100).alias('data_sample'),
    'ingestion_timestamp',
    'data_source',
    'execution_date'
).show(3, truncate=False)


Amostra dos dados:
+----------------------------------------------------------------------------------------------------+--------------------------+------------------+--------------+
|data_sample                                                                                         |ingestion_timestamp       |data_source       |execution_date|
+----------------------------------------------------------------------------------------------------+--------------------------+------------------+--------------+
|{"name": {"common": "Antigua and Barbuda", "official": "Antigua and Barbuda", "nativeName": {"eng": |2025-12-07 13:04:51.165756|rest_countries_api|2025-12-07    |
|{"name": {"common": "Bhutan", "official": "Kingdom of Bhutan", "nativeName": {"dzo": {"official": "\|2025-12-07 13:04:51.165756|rest_countries_api|2025-12-07    |
|{"name": {"common": "Italy", "official": "Italian Republic", "nativeName": {"ita": {"official": "Rep|2025-12-07 13:04:51.165756|rest_countries_api|2025-12-07  

### Passo 4: Salvando no Banco de Dados (Delta Lake)

**O que é Delta Lake?**
Delta Lake é um **formato moderno de armazenamento de dados** que funciona como um banco de dados, mas é otimizado para grandes volumes e oferece recursos avançados como:
- "Time travel" (ver como os dados estavam no passado)
- Transações ACID (garantia de consistência)
- Performance otimizada para análises

**O que este código faz?**
1. **Pega a planilha** (DataFrame) que criamos
2. **Salva em formato Delta Lake**
3. **Modo append**: Adiciona os novos dados sem apagar os antigos (como adicionar linhas no final de uma planilha)
4. **Particionamento por data**: Organiza os dados em "pastas" separadas por dia
   - Exemplo: pasta `execution_date=2025-12-05/` vai ter os dados de hoje
   - pasta `execution_date=2025-12-06/` terá os dados de amanhã
5. **Cria uma tabela**: Registra a tabela no catálogo do Databricks com o nome `workspace.bronze.countries_raw`

**Por que particionar por data?**
Imagine que você tem 1 ano de dados (365 dias). Se precisar consultar só os dados de ontem, o sistema vai direto na "pasta" de ontem, sem precisar vasculhar os 365 dias. Isso deixa as consultas **muito mais rápidas**.

**Analogia:**
É como organizar documentos em pastas por mês. Se você precisa de algo de dezembro, vai direto na pasta de dezembro, em vez de procurar em um monte de papel solto.


In [0]:
# Salvar em Delta Lake (modo append, particionado por execution_date)
(
    df_countries_bronze
    .write
    .format('delta')
    .mode('append')
    .partitionBy('execution_date')
    .saveAsTable(BRONZE_TABLES['countries'])
)

print(f"\n✓ Dados salvos em: {BRONZE_TABLES['countries']}")


✓ Dados salvos em: workspace.bronze.countries_raw


In [0]:
# =====================================================
# DOCUMENTAÇÃO UNITY CATALOG - countries_raw
# =====================================================
# Registrar metadados da tabela e colunas no Unity Catalog

print("Registrando documentação no Unity Catalog...")

# Propriedades da tabela
spark.sql("""
ALTER TABLE workspace.bronze.countries_raw 
SET TBLPROPERTIES (
    'comment' = 'Dados brutos de paises coletados da REST Countries API. Armazena snapshot completo de 195 paises independentes por execucao em formato JSON.',
    'source' = 'REST Countries API v3.1',
    'source_url' = 'https://restcountries.com/v3.1/independent',
    'layer' = 'bronze',
    'update_frequency' = 'daily',
    'retention_days' = '90'
)
""")

# Comentários das colunas
spark.sql("ALTER TABLE workspace.bronze.countries_raw ALTER COLUMN data COMMENT 'JSON bruto completo retornado pela API com todos os atributos do pais (nome, populacao, area, moedas, idiomas, etc)'")
spark.sql("ALTER TABLE workspace.bronze.countries_raw ALTER COLUMN ingestion_timestamp COMMENT 'Timestamp UTC exato do momento da ingestao dos dados no pipeline'")
spark.sql("ALTER TABLE workspace.bronze.countries_raw ALTER COLUMN data_source COMMENT 'Identificador da fonte de dados: rest_countries_api'")
spark.sql("ALTER TABLE workspace.bronze.countries_raw ALTER COLUMN execution_date COMMENT 'Data de execucao do pipeline (YYYY-MM-DD), usada como chave de particionamento'")

print("✓ Documentação Unity Catalog aplicada: countries_raw")


Registrando documentação no Unity Catalog...
✓ Documentação Unity Catalog aplicada: countries_raw


### Passo 5: Verificando se Deu Tudo Certo

**O que este código faz?**
Depois de salvar os dados, precisamos **confirmar que realmente foram salvos corretamente**. É como conferir se um arquivo foi salvo no computador depois de clicar em "Salvar".

**Passos da verificação:**
1. **Lê a tabela que acabamos de criar** do banco de dados
2. **Conta quantas linhas tem** (esperamos **195 por execução**, uma para cada país)
3. **Mostra um resumo** com métricas:
   - Nome da tabela
   - Número de registros salvos
   - Data e hora da verificação

**Por que fazer isso?**
Às vezes algo pode dar errado no meio do caminho (memória insuficiente, erro de rede, etc.) e os dados não serem salvos completamente. Essa verificação garante que **todos os 195 países por execução** foram salvos com sucesso.

**O que significa "count"?**
É literalmente contar quantas linhas existem na tabela. Se salvamos 195 países, devemos ter 195 linhas.


In [0]:
# Verificar dados salvos
df_verify = spark.table(BRONZE_TABLES['countries'])
count = df_verify.count()

log_metrics(
    stage='bronze',
    table_name=BRONZE_TABLES['countries'],
    record_count=count
)


MÉTRICAS - BRONZE
Tabela: workspace.bronze.countries_raw
Registros: 975
Timestamp: 2025-12-07 13:05:00.732936



## 4. Coletando Taxas de Câmbio

### O que são taxas de câmbio?
Taxas de câmbio dizem **quanto vale cada moeda em relação ao dólar americano (USD)**. Por exemplo:
- 1 USD = 5,20 BRL (reais brasileiros)
- 1 USD = 0,92 EUR (euros)
- 1 USD = 150 JPY (ienes japoneses)

### Por que coletar essas taxas?
Vários países usam moedas diferentes. Ter as taxas de câmbio nos permite comparar economias e entender o poder de compra em cada país.

### Passo 1: Fazendo o Pedido à API

**O que este código faz?**
1. Acessa a Exchange Rate API na internet
2. Pede: "Me dê as taxas de câmbio atuais tendo o dólar (USD) como base"
3. Espera a resposta (até 30 segundos)
4. Se der erro, tenta novamente até 3 vezes
5. Recebe um JSON com todas as taxas (aproximadamente 160 moedas)

**Exemplo do que recebemos:**
```json
{
  "base": "USD",
  "date": "2025-12-05",
  "rates": {
    "BRL": 5.20,
    "EUR": 0.92,
    "GBP": 0.79,
    "JPY": 150.32,
    ...
  }
}
```


In [0]:
print("\n" + "="*60)
print("INGESTÃO: EXCHANGE RATE API")
print("="*60)

# Fazer requisição à API
exchange_data = fetch_api_data(
    url=API_URLS['exchange_rates'],
    timeout=REQUEST_CONFIG['timeout'],
    max_retries=REQUEST_CONFIG['max_retries']
)

print(f"✓ Dados recebidos: {len(exchange_data.get('rates', {}))} taxas de câmbio")


INGESTÃO: EXCHANGE RATE API
✓ Dados recebidos: 166 taxas de câmbio


### Passo 2: Preparando e Criando a Planilha

**O que está acontecendo aqui?**
Assim como fizemos com os países, agora vamos:

1. **Adicionar metadados de controle**:
   - Converter o JSON completo para texto
   - Marcar data e hora da coleta
   - Identificar a fonte (exchange_rate_api)
   - Registrar a data de execução

2. **Criar um DataFrame** (planilha) com a mesma estrutura de antes:
   - Coluna `data`: JSON completo como texto
   - Coluna `ingestion_timestamp`: Quando coletamos
   - Coluna `data_source`: De onde veio
   - Coluna `execution_date`: Data para particionamento

3. **Mostrar uma amostra** dos primeiros 150 caracteres do JSON

**Diferença em relação aos países:**
Com países, tínhamos 195 registros (uma linha para cada país). Com taxas de câmbio, temos **apenas 1 registro** contendo TODAS as taxas dentro do JSON. É uma diferença de modelagem: um único arquivo com todas as conversões.

**Analogia:**
- Países: 195 cartas lacradas (uma para cada país)
- Taxas de câmbio: 1 carta lacrada contendo uma tabela com 160 moedas


In [0]:
# Preparar dados com metadados
metadata = get_execution_metadata()

exchange_record = [{
    'data': json.dumps(exchange_data),
    'ingestion_timestamp': metadata['ingestion_timestamp'],
    'data_source': 'exchange_rate_api',
    'execution_date': metadata['execution_date']
}]

# Criar DataFrame
df_exchange_bronze = spark.createDataFrame(exchange_record, schema=schema)

print("\nAmostra dos dados:")
df_exchange_bronze.select(
    substring(col('data'), 1, 150).alias('data_sample'),
    'ingestion_timestamp',
    'data_source'
).show(truncate=False)


Amostra dos dados:
+------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+-----------------+
|data_sample                                                                                                                                           |ingestion_timestamp       |data_source      |
+------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+-----------------+
+------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+-----------------+



### Passo 3: Salvando as Taxas de Câmbio

**O que este código faz?**
Mesma operação que fizemos com os países:
1. Pega o DataFrame (planilha) com as taxas de câmbio
2. Salva em formato Delta Lake
3. Modo append: Adiciona aos dados existentes sem apagar
4. Particiona por data: Organiza em "pastas" por dia de coleta
5. Cria a tabela no catálogo: `workspace.bronze.exchange_rates_raw`

**Por que particionar se temos só 1 registro?**
Hoje temos 1 registro. Mas se rodarmos esse pipeline todos os dias por 1 ano, teremos 365 registros (um para cada dia). O particionamento permite consultar rapidamente "qual era a taxa de câmbio em 15 de março?" sem vasculhar todos os 365 dias.

**Uso futuro:**
Essas taxas podem mudar diariamente. Guardando o histórico particionado, podemos fazer análises como:
- "Como o real se valorizou/desvalorizou nos últimos 6 meses?"
- "Qual foi a maior taxa EUR/USD do ano?"


In [0]:
# Salvar em Delta Lake
(
    df_exchange_bronze
    .write
    .format('delta')
    .mode('append')
    .partitionBy('execution_date')
    .saveAsTable(BRONZE_TABLES['exchange_rates'])
)

print(f"\n✓ Dados salvos em: {BRONZE_TABLES['exchange_rates']}")


✓ Dados salvos em: workspace.bronze.exchange_rates_raw


In [0]:
# =====================================================
# DOCUMENTAÇÃO UNITY CATALOG - exchange_rates_raw
# =====================================================
# Registrar metadados da tabela e colunas no Unity Catalog

print("Registrando documentação no Unity Catalog...")

# Propriedades da tabela
spark.sql("""
ALTER TABLE workspace.bronze.exchange_rates_raw 
SET TBLPROPERTIES (
    'comment' = 'Taxas de cambio diarias em relacao ao USD coletadas da Exchange Rate API. Um registro por execucao contendo todas as moedas (~160 taxas).',
    'source' = 'Exchange Rate API',
    'source_url' = 'https://api.exchangerate-api.com/v4/latest/USD',
    'layer' = 'bronze',
    'update_frequency' = 'daily',
    'retention_days' = '90'
)
""")

# Comentários das colunas
spark.sql("ALTER TABLE workspace.bronze.exchange_rates_raw ALTER COLUMN data COMMENT 'JSON contendo taxas de cambio: {base: USD, date: YYYY-MM-DD, rates: {moeda: valor}}'")
spark.sql("ALTER TABLE workspace.bronze.exchange_rates_raw ALTER COLUMN ingestion_timestamp COMMENT 'Timestamp UTC do momento da ingestao dos dados no pipeline'")
spark.sql("ALTER TABLE workspace.bronze.exchange_rates_raw ALTER COLUMN data_source COMMENT 'Identificador da fonte de dados: exchange_rate_api'")
spark.sql("ALTER TABLE workspace.bronze.exchange_rates_raw ALTER COLUMN execution_date COMMENT 'Data de execucao do pipeline (YYYY-MM-DD), usada como chave de particionamento'")

print("✓ Documentação Unity Catalog aplicada: exchange_rates_raw")


Registrando documentação no Unity Catalog...
✓ Documentação Unity Catalog aplicada: exchange_rates_raw


**⚠️ Nota sobre a tabela exchange_rates_raw**

Esta tabela foi criada para demonstrar **ingestão multi-fonte** (coleta de dados de múltiplas APIs), uma boa prática em engenharia de dados. No entanto, os dados de taxas de câmbio **não foram utilizados** nas camadas Silver e Gold deste MVP.

**Por que coletar se não vamos usar?**
- Demonstrar capacidade técnica de integração com múltiplas fontes
- Preparar infraestrutura para evoluções futuras
- As 13 perguntas de negócio definidas no OBJETIVO.md focavam em demografia/geografia, não em análises econômicas que requereriam câmbio

**Oportunidades futuras:**
- Integrar as taxas de câmbio à `dim_currencies` para análises econômicas
- Normalizar PIB de países para USD usando essas taxas
- Analisar volatilidade cambial ao longo do tempo
- Criar métricas de poder de compra (ex: "PIB per capita ajustado por paridade")

Para o escopo do MVP acadêmico, optou-se por focar nas perguntas definidas no planejamento inicial.

### Passo 4: Verificando a Tabela de Câmbio

**O que este código faz?**
Mesma verificação que fizemos com países:
1. Lê a tabela recém-criada do banco de dados
2. Conta quantas linhas tem (esperamos 1 linha com todas as taxas)
3. Mostra um resumo com métricas de sucesso

**Confirmação de sucesso:**
Se aparecer "1 registro", significa que o JSON completo com as ~160 moedas foi salvo corretamente.


In [0]:
# Verificar dados salvos
df_verify_exchange = spark.table(BRONZE_TABLES['exchange_rates'])
count_exchange = df_verify_exchange.count()

log_metrics(
    stage='bronze',
    table_name=BRONZE_TABLES['exchange_rates'],
    record_count=count_exchange
)


MÉTRICAS - BRONZE
Tabela: workspace.bronze.exchange_rates_raw
Registros: 4
Timestamp: 2025-12-07 13:05:09.829069



## 5. Otimização das Tabelas

### O que é otimização?
Imagine que você tem uma estante com 1000 livros desorganizados. Encontrar um livro específico leva muito tempo. Se você organizar por autor e depois por título, fica muito mais rápido encontrar o que precisa. **Otimização faz isso com os dados**.

### O que o comando OPTIMIZE faz?
1. **Compactação de arquivos**: Delta Lake salva os dados em vários arquivos pequenos. OPTIMIZE junta esses arquivos em arquivos maiores e mais eficientes.
   - **Antes**: 1000 arquivos de 1 MB cada = lento para ler
   - **Depois**: 10 arquivos de 100 MB cada = muito mais rápido

2. **Organização interna**: Rearranja os dados de forma que leituras futuras sejam mais rápidas

### Por que não fazemos Z-ORDER aqui?
Z-ORDER é outra técnica de otimização que organiza os dados por colunas específicas. **MAS** não podemos usar Z-ORDER em colunas de partição (execution_date). Como nossas tabelas são particionadas por data, não especificamos zorder_columns.

### Quando isso é útil?
- Primeira execução: Ganho pequeno (poucos dados)
- Depois de 6 meses rodando diariamente: Ganho enorme (milhares de arquivos compactados)

**Analogia:**
É como desfragmentar um disco rígido no Windows: organiza os arquivos para que o acesso seja mais rápido.


In [0]:
# Otimizar tabelas (sem Z-ORDER em colunas de partição)
print("\nOtimizando tabelas Bronze...\n")

optimize_table(spark, BRONZE_TABLES['countries'])
optimize_table(spark, BRONZE_TABLES['exchange_rates'])

print("\n✓ Otimização concluída")


Otimizando tabelas Bronze...

Otimizando tabela workspace.bronze.countries_raw...
Tabela workspace.bronze.countries_raw otimizada com sucesso
Otimizando tabela workspace.bronze.exchange_rates_raw...
Tabela workspace.bronze.exchange_rates_raw otimizada com sucesso

✓ Otimização concluída


## 6. Resumo Final da Execução

### O que este código faz?
Calcula e mostra um **relatório final** com tudo que foi feito:

1. **Tempo total de execução**: Quanto tempo levou do início ao fim (diferença entre hora de início e hora atual)
2. **Tabelas criadas**: Quais tabelas foram criadas e quantos registros cada uma tem
3. **Status**: Se tudo deu certo (SUCESSO) ou se houve erros
4. **Timestamp final**: Quando o processo terminou

### Por que isso é importante?
Este resumo serve para:
- **Monitoramento**: Se o tempo aumentar muito de um dia para outro, pode indicar problemas
- **Documentação**: Registro de que o pipeline foi executado com sucesso
- **Auditoria**: Prova de que os dados foram coletados em determinado horário
- **Troubleshooting**: Se algo der errado, sabemos exatamente quando e onde parou

### Exemplo de output esperado:
```
PIPELINE BRONZE - RESUMO FINAL
Tabelas criadas:
  1. workspace.bronze.countries_raw (195 registros)
  2. workspace.bronze.exchange_rates_raw (1 registro)
Tempo total de execução: 12.45 segundos
Status: ✓ SUCESSO
Timestamp final: 2025-12-05 14:35:28
```


In [0]:
execution_time = time.time() - start_time

print("\n" + "="*60)
print("PIPELINE BRONZE - RESUMO FINAL")
print("="*60)
print(f"\nTabelas criadas:")
print(f"  1. {BRONZE_TABLES['countries']} ({count:,} registros)")
print(f"  2. {BRONZE_TABLES['exchange_rates']} ({count_exchange:,} registros)")
print(f"\nTempo total de execução: {execution_time:.2f} segundos")
print(f"Status: ✓ SUCESSO")
print(f"Timestamp final: {datetime.now()}")
print("="*60)


PIPELINE BRONZE - RESUMO FINAL

Tabelas criadas:
  1. workspace.bronze.countries_raw (975 registros)
  2. workspace.bronze.exchange_rates_raw (4 registros)

Tempo total de execução: 28.21 segundos
Status: ✓ SUCESSO
Timestamp final: 2025-12-07 13:05:17.140668


## 7. Consultas de Validação

### Para que servem essas consultas?
Agora que salvamos os dados, vamos fazer **perguntas ao banco de dados** para ter certeza de que tudo está correto. É como fazer perguntas para validar o que foi feito:
- "Os dados de hoje foram salvos?"
- "Quantos países temos para cada data?"
- "O JSON está íntegro?"

### Consulta 1: Verificar Partições

**O que esta consulta faz?**
Agrupa os dados por data de execução e mostra:
- Quantas linhas existem para cada data
- De qual fonte vieram (rest_countries_api)
- Ordena da data mais recente para a mais antiga

**Por que fazer isso?**
Se rodarmos este pipeline todos os dias, vamos acumulando dados:
- 2025-12-05: 195 países
- 2025-12-06: 195 países
- 2025-12-07: 195 países
- ...

Esta consulta confirma que não estamos perdendo dados no caminho e que o particionamento está funcionando.

**Output esperado (primeira execução):**
```
+---------------+-------------+-------------------+
|execution_date |total_records|data_source        |
+---------------+-------------+-------------------+
|2025-12-05     |195          |rest_countries_api |
+---------------+-------------+-------------------+
```


In [0]:
# Verificar partições da tabela countries
print("\nPartições - Countries:")
spark.sql(f"""
    SELECT 
        execution_date,
        COUNT(*) as total_records,
        data_source
    FROM {BRONZE_TABLES['countries']}
    GROUP BY execution_date, data_source
    ORDER BY execution_date DESC
""").show()


Partições - Countries:
+--------------+-------------+------------------+
|execution_date|total_records|       data_source|
+--------------+-------------+------------------+
|    2025-12-07|          585|rest_countries_api|
|    2025-12-06|          390|rest_countries_api|
+--------------+-------------+------------------+



### Consulta 2: Verificar Estrutura dos Dados

**O que esta consulta faz?**
Mostra os **primeiros 200 caracteres do JSON** de 2 países, junto com seus metadados (timestamp e data de execução).

**Por que fazer isso?**
É uma "inspeção visual" para confirmar que:
1. O JSON foi salvo corretamente (não foi corrompido)
2. Os timestamps estão corretos
3. A estrutura do JSON está como esperado

**O que esperamos ver?**
Algo assim:
```
{"name":{"common":"Brazil","official":"Federative Republic of Brazil"},"tld":[".br"],"cca2":"BR","cca3":"BRA","population":213993437,"area":8515767.0,"capital":["Brasília"],...
```

**Por que truncar (cortar) o JSON?**
O JSON completo de um país tem milhares de caracteres. Mostrar tudo na tela seria ilegível. Os primeiros 200 caracteres são suficientes para validar que a estrutura está correta.

**LIMIT 2:**
Mostra apenas 2 países como amostra. Não precisamos ver os 195 para validar.


In [0]:
# Verificar estrutura dos dados
print("\nEstrutura dos dados (primeiros caracteres do JSON):")
spark.sql(f"""
    SELECT 
        SUBSTRING(data, 1, 200) as data_sample,
        ingestion_timestamp,
        execution_date
    FROM {BRONZE_TABLES['countries']}
    LIMIT 2
""").show(truncate=False)


Estrutura dos dados (primeiros caracteres do JSON):
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+--------------+
|data_sample                                                                                                                                                                                             |ingestion_timestamp       |execution_date|
+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------+--------------+
|{"name": {"common": "Antigua and Barbuda", "official": "Antigua and Barbuda", "nativeName": {"eng": {"official": "Antigua and Barbuda", "common": "Antigua and Barbuda"}}}, "tld": [".ag"], "cca2": "AG"|2025-12-06