# Integração Apache Iceberg com Bancos Relacionais

Este notebook demonstra como integrar dados do Apache Iceberg com bancos relacionais (PostgreSQL), realizando operações de merge e sincronização de dados entre diferentes sistemas de armazenamento.

## Contexto e Objetivos:

### Cenário Empresarial:
- **Sistemas OLTP**: PostgreSQL com dados transacionais em tempo real
- **Data Lake**: Apache Iceberg para análises e histórico
- **Necessidade**: Sincronização eficiente entre os sistemas
- **Desafio**: Manter consistência e performance

### Benefícios da Integração:
- **Arquitetura Híbrida**: Melhor de ambos os mundos (OLTP + OLAP)
- **Análises em Tempo Real**: Dados atualizados para BI
- **Histórico Completo**: Versionamento e auditoria no Iceberg
- **Performance**: Otimizações específicas para cada workload
- **Escalabilidade**: Separação de cargas transacionais e analíticas

### Estratégias de Sincronização:
1. **Batch ETL**: Cargas periódicas completas ou incrementais
2. **Change Data Capture (CDC)**: Captura de mudanças em tempo real
3. **Merge Operations**: Upserts eficientes com MERGE INTO
4. **Event-Driven**: Sincronização baseada em eventos

## Setup do Ambiente

Configuração do Spark com suporte ao Iceberg e driver JDBC para PostgreSQL.

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql.functions import to_date, col

# Sessão Spark
# Para o Spark se estiver rodando
try:
    spark.stop()
except:
    pass
spark = SparkSession.builder \
    .appName("IcebergRollbacks") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.hadoop_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.hadoop_catalog.type", "hadoop") \
    .config("spark.sql.catalog.hadoop_catalog.warehouse", "file:///home/tavares/warehouse") \
    .config("spark.sql.default.catalog", "hadoop_catalog") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .getOrCreate()

print(f"Warehouse configurado para: {spark.conf.get('spark.sql.catalog.hadoop_catalog.warehouse')}")

Warehouse configurado para: file:///home/tavares/warehouse


## Importações para Manipulação de Datas

Bibliotecas necessárias para trabalhar com datas e períodos, úteis para operações de sincronização incremental.

In [2]:
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta

## Criação da Tabela Iceberg de Destino

Criamos uma tabela Iceberg que espelha a estrutura da tabela `customers` do PostgreSQL. Esta tabela servirá como destino para os dados sincronizados e aproveitará todos os benefícios do Iceberg como versionamento, ACID transactions e schema evolution.

In [8]:
# Criamos a tabela customers no Iceberg
spark.sql("DROP TABLE IF EXISTS hadoop_catalog.default.customers")

''' Este formato de tabela apresentou erros nos scripts abaixo. Com isso foi retirado o NOT NULL dos dois primeiros campos.
spark.sql("""
    CREATE TABLE hadoop_catalog.default.customers (
        customer_id string NOT NULL,
        company_name string NOT NULL,
        contact_name string,
        contact_title string,
        address string,
        city string,
        region string,
        postal_code string,
        country string,
        phone string,
        fax string
    )
    USING iceberg          
""")
'''

spark.sql("""
    CREATE TABLE hadoop_catalog.default.customers (
        customer_id string,
        company_name string,
        contact_name string,
        contact_title string,
        address string,
        city string,
        region string,
        postal_code string,
        country string,
        phone string,
        fax string
    )
    USING iceberg          
""")

DataFrame[]

## Configuração da Conexão JDBC

Definimos os parâmetros de conexão com o banco PostgreSQL. O hostname `postgres-erp` corresponde ao nome do serviço definido no docker-compose, permitindo comunicação entre containers na mesma rede Docker.

In [9]:
# Credenciais do PostgreSQL
jdbc_hostname = "postgres-erp"
jdbc_port = 5432
jdbc_database = "northwind"
jdbc_username = "postgres"
jdbc_password = "postgres"

# URL JDBC de conexão
jdbc_url = f"jdbc:postgresql://{jdbc_hostname}:{jdbc_port}/{jdbc_database}"

# Propriedades de conexão JDBC
connection_properties = {
    "user": jdbc_username,
    "password": jdbc_password,
    "driver": "org.postgresql.Driver"
}

## Leitura de Dados do PostgreSQL

Implementamos uma função para ler dados da tabela `customers` do PostgreSQL usando JDBC. A query é encapsulada em uma subquery para permitir maior flexibilidade na seleção de dados. Esta abordagem permite:

- **Filtragem**: Aplicar WHERE clauses para dados específicos
- **Transformações**: Aplicar funções SQL durante a leitura
- **Performance**: Reduzir volume de dados transferidos
- **Flexibilidade**: Adaptar queries conforme necessário

In [10]:
def read_postgres_data():
    query = """
        (SELECT *
         FROM customers
        ) AS customers
    """
    df = spark.read.jdbc(
        url=jdbc_url,
        table=query,
        properties=connection_properties
    )
    return df

# Ler dados do PostgreSQL
df_postgres = read_postgres_data()

# Exibir os dados lidos
df_postgres.show()

+-----------+--------------------+------------------+--------------------+--------------------+------------+------+-----------+-----------+--------------+--------------+
|customer_id|        company_name|      contact_name|       contact_title|             address|        city|region|postal_code|    country|         phone|           fax|
+-----------+--------------------+------------------+--------------------+--------------------+------------+------+-----------+-----------+--------------+--------------+
|      ALFKI| Alfreds Futterkiste|      Maria Anders|Sales Representative|       Obere Str. 57|      Berlin|  null|      12209|    Germany|   030-0074321|   030-0076545|
|      ANATR|Ana Trujillo Empa...|      Ana Trujillo|               Owner|Avda. de la Const...| México D.F.|  null|      05021|     Mexico|  (5) 555-4729|  (5) 555-3745|
|      ANTON|Antonio Moreno Ta...|    Antonio Moreno|               Owner|     Mataderos  2312| México D.F.|  null|      05023|     Mexico|  (5) 555-3

## Carregamento Inicial dos Dados

Realizamos o carregamento inicial (full load) dos dados do PostgreSQL para a tabela Iceberg. Esta operação cria o primeiro snapshot da tabela e estabelece a base para futuras sincronizações incrementais.

In [11]:
# Carregar dados do PostgreSQL para Iceberg
df_postgres.writeTo("hadoop_catalog.default.customers").append()

print("Dados carregados na tabela Iceberg")

# Verificar dados na tabela Iceberg
spark.sql("SELECT COUNT(*) as total FROM hadoop_catalog.default.customers").show()
spark.sql("SELECT * FROM hadoop_catalog.default.customers LIMIT 5").show()

Dados carregados na tabela Iceberg
+-----+
|total|
+-----+
|   91|
+-----+

+-----------+--------------------+------------------+--------------------+--------------------+-----------+------+-----------+-------+--------------+--------------+
|customer_id|        company_name|      contact_name|       contact_title|             address|       city|region|postal_code|country|         phone|           fax|
+-----------+--------------------+------------------+--------------------+--------------------+-----------+------+-----------+-------+--------------+--------------+
|      ALFKI| Alfreds Futterkiste|      Maria Anders|Sales Representative|       Obere Str. 57|     Berlin|  null|      12209|Germany|   030-0074321|   030-0076545|
|      ANATR|Ana Trujillo Empa...|      Ana Trujillo|               Owner|Avda. de la Const...|México D.F.|  null|      05021| Mexico|  (5) 555-4729|  (5) 555-3745|
|      ANTON|Antonio Moreno Ta...|    Antonio Moreno|               Owner|     Mataderos  2312|Méxi

## Simulação de Mudanças nos Dados

Para demonstrar o processo de merge, vamos simular mudanças que normalmente ocorreriam no sistema PostgreSQL:

- **INSERT**: Novos clientes adicionados
- **UPDATE**: Informações de clientes existentes modificadas
- **DELETE**: Clientes removidos (se aplicável)

Em um ambiente real, essas mudanças seriam capturadas através de CDC ou queries incrementais baseadas em timestamps.

In [12]:
# Simular mudanças adicionando novos dados diretamente na tabela Iceberg
# (Em um cenário real, essas mudanças viriam do PostgreSQL)

# Adicionar um novo cliente
spark.sql("""
    INSERT INTO hadoop_catalog.default.customers VALUES
    ('NEWCO', 'New Company Ltd', 'John Doe', 'Manager', '123 New St', 'New City', 'NY', '12345', 'USA', '555-1234', '555-5678')
""")

# Atualizar um cliente existente
spark.sql("""
    UPDATE hadoop_catalog.default.customers 
    SET phone = '555-9999', contact_name = 'Maria Updated'
    WHERE customer_id = 'ALFKI'
""")

print("Mudanças simuladas aplicadas")

# Verificar as mudanças
spark.sql("SELECT COUNT(*) as total FROM hadoop_catalog.default.customers").show()
spark.sql("SELECT * FROM hadoop_catalog.default.customers WHERE customer_id IN ('NEWCO', 'ALFKI')").show()

Mudanças simuladas aplicadas
+-----+
|total|
+-----+
|   92|
+-----+

+-----------+-------------------+-------------+--------------------+-------------+--------+------+-----------+-------+--------+-----------+
|customer_id|       company_name| contact_name|       contact_title|      address|    city|region|postal_code|country|   phone|        fax|
+-----------+-------------------+-------------+--------------------+-------------+--------+------+-----------+-------+--------+-----------+
|      ALFKI|Alfreds Futterkiste|Maria Updated|Sales Representative|Obere Str. 57|  Berlin|  null|      12209|Germany|555-9999|030-0076545|
|      NEWCO|    New Company Ltd|     John Doe|             Manager|   123 New St|New City|    NY|      12345|    USA|555-1234|   555-5678|
+-----------+-------------------+-------------+--------------------+-------------+--------+------+-----------+-------+--------+-----------+



## Operação MERGE INTO

O `MERGE INTO` é uma operação fundamental para sincronização de dados, permitindo:

- **UPSERT**: Inserir novos registros ou atualizar existentes em uma única operação
- **Atomicidade**: Operação ACID que garante consistência
- **Performance**: Mais eficiente que INSERT + UPDATE separados
- **Flexibilidade**: Diferentes ações baseadas em condições

Esta operação é essencial para manter a sincronização entre PostgreSQL e Iceberg.

In [13]:
# Criar uma tabela temporária com dados "novos" do PostgreSQL
spark.sql("""
    CREATE OR REPLACE TEMPORARY VIEW customers_updates AS
    SELECT 'ALFKI' as customer_id, 'Alfreds Futterkiste UPDATED' as company_name, 'Maria Final' as contact_name, 
           'Sales Rep' as contact_title, 'New Address 123' as address, 'Berlin' as city, 
           NULL as region, '12209' as postal_code, 'Germany' as country, 
           '030-0074321' as phone, '030-0076545' as fax
    UNION ALL
    SELECT 'NEWC2' as customer_id, 'Another New Company' as company_name, 'Jane Smith' as contact_name,
           'Director' as contact_title, '456 Another St' as address, 'Another City' as city,
           'CA' as region, '54321' as postal_code, 'USA' as country,
           '555-4321' as phone, '555-8765' as fax
""")

# Executar MERGE INTO
spark.sql("""
    MERGE INTO hadoop_catalog.default.customers AS target
    USING customers_updates AS source
    ON target.customer_id = source.customer_id
    WHEN MATCHED THEN
        UPDATE SET 
            company_name = source.company_name,
            contact_name = source.contact_name,
            contact_title = source.contact_title,
            address = source.address,
            phone = source.phone
    WHEN NOT MATCHED THEN
        INSERT (customer_id, company_name, contact_name, contact_title, address, city, region, postal_code, country, phone, fax)
        VALUES (source.customer_id, source.company_name, source.contact_name, source.contact_title, 
                source.address, source.city, source.region, source.postal_code, source.country, source.phone, source.fax)
""")

print("MERGE INTO executado com sucesso")

# Verificar resultados
spark.sql("SELECT * FROM hadoop_catalog.default.customers WHERE customer_id IN ('ALFKI', 'NEWC2')").show(truncate=False)

MERGE INTO executado com sucesso
+-----------+---------------------------+------------+-------------+---------------+------------+------+-----------+-------+-----------+-----------+
|customer_id|company_name               |contact_name|contact_title|address        |city        |region|postal_code|country|phone      |fax        |
+-----------+---------------------------+------------+-------------+---------------+------------+------+-----------+-------+-----------+-----------+
|ALFKI      |Alfreds Futterkiste UPDATED|Maria Final |Sales Rep    |New Address 123|Berlin      |null  |12209      |Germany|030-0074321|030-0076545|
|NEWC2      |Another New Company        |Jane Smith  |Director     |456 Another St |Another City|CA    |54321      |USA    |555-4321   |555-8765   |
+-----------+---------------------------+------------+-------------+---------------+------------+------+-----------+-------+-----------+-----------+



## Monitoramento e Auditoria

Uma das grandes vantagens do Iceberg é o sistema de snapshots que permite auditoria completa de todas as operações. Cada operação (INSERT, UPDATE, DELETE, MERGE) cria um novo snapshot, proporcionando:

- **Rastreabilidade**: Histórico completo de mudanças
- **Rollback**: Capacidade de reverter para estados anteriores
- **Time Travel**: Consultas em pontos específicos do tempo
- **Compliance**: Atendimento a requisitos de auditoria

In [15]:
# Verificar snapshots após todas as operações
print("Histórico de Snapshots:")
spark.sql("""
    SELECT snapshot_id, committed_at, operation
    FROM hadoop_catalog.default.customers.snapshots 
    ORDER BY committed_at
""").show(truncate=False)

# Verificar histórico completo
print("\nHistórico da Tabela:")
spark.sql("SELECT * FROM hadoop_catalog.default.customers.history").show(truncate=False)

# Contagem final
print("\nContagem final de registros:")
spark.sql("SELECT COUNT(*) as total FROM hadoop_catalog.default.customers").show()

Histórico de Snapshots:
+-------------------+-----------------------+---------+
|snapshot_id        |committed_at           |operation|
+-------------------+-----------------------+---------+
|894653696084846250 |2025-11-06 23:55:01.399|append   |
|3151425052900505849|2025-11-06 23:55:09.852|append   |
|5501151205264156785|2025-11-06 23:55:14.978|overwrite|
|1516900522667827493|2025-11-06 23:55:22.347|overwrite|
+-------------------+-----------------------+---------+


Histórico da Tabela:
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2025-11-06 23:55:01.399|894653696084846250 |null               |true               |
|2025-11-06 23:55:09.852|3151425052900505849|894653696084846250 |true               |
|2025-11-06 23:55:14.978|5501151205264156785|3151425052900505849|true      

## Resumo dos Benefícios e Casos de Uso

### Vantagens da Integração Iceberg + PostgreSQL:

1. **Arquitetura Híbrida Otimizada**:
   - PostgreSQL: Transações OLTP de alta performance
   - Iceberg: Análises OLAP escaláveis e eficientes

2. **Operações ACID Completas**:
   - Consistência garantida em operações de merge
   - Isolamento entre leituras e escritas
   - Durabilidade dos dados

3. **Versionamento e Auditoria**:
   - Histórico completo de mudanças
   - Capacidade de rollback
   - Compliance regulatório

4. **Performance Otimizada**:
   - Particionamento inteligente no Iceberg
   - Compactação automática de arquivos
   - Índices e estatísticas para otimização de queries

5. **Flexibilidade Operacional**:
   - Schema evolution sem downtime
   - Múltiplas estratégias de sincronização
   - Integração com ferramentas de BI

### Casos de Uso Empresariais:

- **Data Warehousing**: ETL de sistemas transacionais para análise
- **Real-time Analytics**: Dashboards com dados atualizados
- **Data Lake Architecture**: Centralização de dados de múltiplas fontes
- **Backup e Arquivamento**: Histórico de dados transacionais
- **Compliance**: Auditoria e rastreabilidade de mudanças
- **Machine Learning**: Datasets para treinamento de modelos

### Considerações para Produção:

- **Monitoramento**: Acompanhar performance e latência
- **Segurança**: Criptografia e controle de acesso
- **Backup**: Estratégias de recuperação de desastres
- **Escalabilidade**: Planejamento para crescimento de dados
- **Manutenção**: Compactação e limpeza de snapshots antigos