# Apache Iceberg Demo

## Demonstra√ß√£o do Apache Iceberg com Spark

Este notebook demonstra como usar Apache Iceberg para:
- Criar tabelas Iceberg
- Inserir dados
- Consultar dados
- Versionamento e time travel
- Integra√ß√£o com PostgreSQL

In [None]:
# Configurar Spark com Iceberg
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IcebergDemo") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hadoop") \
    .config("spark.sql.catalog.iceberg.warehouse", "/home/jovyan/iceberg-warehouse") \
    .getOrCreate()

print("‚úÖ Spark com Iceberg configurado!")
print(f"Spark Version: {spark.version}")

In [None]:
# Criar tabela Iceberg de exemplo
spark.sql("""
CREATE TABLE IF NOT EXISTS iceberg.northwind_customers (
    customer_id STRING,
    company_name STRING,
    contact_name STRING,
    country STRING,
    created_at TIMESTAMP
) USING ICEBERG
""")

print("‚úÖ Tabela Iceberg criada!")

In [None]:
# Conectar ao PostgreSQL e extrair dados
postgres_df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://postgres_erp:5432/northwind") \
    .option("dbtable", "customers") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .option("driver", "org.postgresql.Driver") \
    .load()

print(f"üìä {postgres_df.count()} registros extra√≠dos do PostgreSQL")
postgres_df.show(5)

In [None]:
# Transformar e inserir dados no Iceberg
from pyspark.sql.functions import current_timestamp, col

iceberg_df = postgres_df.select(
    col("customer_id"),
    col("company_name"),
    col("contact_name"),
    col("country"),
    current_timestamp().alias("created_at")
)

# Inserir dados na tabela Iceberg
iceberg_df.writeTo("iceberg.northwind_customers").append()

print("‚úÖ Dados inseridos na tabela Iceberg!")

In [None]:
# Consultar dados do Iceberg
result = spark.sql("SELECT * FROM iceberg.northwind_customers LIMIT 10")
result.show()

# Estat√≠sticas da tabela
count = spark.sql("SELECT COUNT(*) as total FROM iceberg.northwind_customers").collect()[0]['total']
print(f"üìä Total de registros na tabela Iceberg: {count}")

In [None]:
# Demonstrar versionamento - inserir mais dados
new_data = spark.createDataFrame([
    ("TEST1", "Test Company 1", "John Doe", "Brazil"),
    ("TEST2", "Test Company 2", "Jane Smith", "Argentina")
], ["customer_id", "company_name", "contact_name", "country"])

new_data_with_timestamp = new_data.withColumn("created_at", current_timestamp())
new_data_with_timestamp.writeTo("iceberg.northwind_customers").append()

print("‚úÖ Novos dados inseridos!")

# Verificar nova contagem
new_count = spark.sql("SELECT COUNT(*) as total FROM iceberg.northwind_customers").collect()[0]['total']
print(f"üìä Nova contagem: {new_count}")

In [None]:
# Visualizar hist√≥rico da tabela
history = spark.sql("SELECT * FROM iceberg.northwind_customers.history")
history.show(truncate=False)

print("üìà Hist√≥rico de vers√µes da tabela Iceberg")

In [None]:
# Demonstrar Time Travel (consultar vers√£o anterior)
snapshots = spark.sql("SELECT * FROM iceberg.northwind_customers.snapshots")
snapshots.show(truncate=False)

# Se houver snapshots, consultar o primeiro
snapshot_ids = [row['snapshot_id'] for row in snapshots.collect()]
if len(snapshot_ids) > 1:
    first_snapshot = snapshot_ids[0]
    time_travel_query = f"SELECT COUNT(*) as count_at_snapshot FROM iceberg.northwind_customers VERSION AS OF {first_snapshot}"
    result = spark.sql(time_travel_query)
    result.show()
    print(f"üïê Time Travel: Dados no primeiro snapshot")

In [None]:
# An√°lise por pa√≠s
country_analysis = spark.sql("""
SELECT country, COUNT(*) as customer_count
FROM iceberg.northwind_customers
GROUP BY country
ORDER BY customer_count DESC
LIMIT 10
""")

country_analysis.show()
print("üåç An√°lise de clientes por pa√≠s")

In [None]:
# Informa√ß√µes sobre arquivos da tabela
files_info = spark.sql("SELECT * FROM iceberg.northwind_customers.files")
files_info.show(truncate=False)

print("üìÅ Informa√ß√µes sobre arquivos da tabela Iceberg")

## Conclus√£o

Este notebook demonstrou:

- ‚úÖ **Configura√ß√£o do Iceberg** com Spark
- ‚úÖ **Cria√ß√£o de tabelas** Iceberg
- ‚úÖ **Integra√ß√£o com PostgreSQL** para extra√ß√£o de dados
- ‚úÖ **Inser√ß√£o de dados** em formato Iceberg
- ‚úÖ **Versionamento** autom√°tico de dados
- ‚úÖ **Time Travel** para consultas hist√≥ricas
- ‚úÖ **An√°lise de dados** com SQL
- ‚úÖ **Metadados** da tabela e arquivos

### Pr√≥ximos Passos

- Integrar com Apache Atlas para cataloga√ß√£o
- Implementar pipelines ETL automatizados
- Configurar particionamento para performance
- Adicionar compacta√ß√£o autom√°tica