# Exercícios de Big Data com Spark + Iceberg + Hive Metastore + HDFS

Este notebook contém 20 exercícios desenvolvidos para o ambiente lab (Jupyter + Spark + Iceberg + HDFS + Hive Metastore).

**Instruções:**
- Execute as células em ordem sequencial
- Alguns exercícios dependem dos anteriores
- Verifique os outputs de cada célula

## Exercício 1: Criar um DataFrame simples
Crie um DataFrame com três linhas e duas colunas (`id`, `nome`) e mostre seu conteúdo.

In [None]:
from pyspark.sql import SparkSession

# Inicializar Spark Session
spark = SparkSession.builder.appName("Exercicio1").getOrCreate()

# Criar DataFrame
data = [
    (1, "Alice"),
    (2, "Bob"),
    (3, "Carlos")
]

df = spark.createDataFrame(data, ["id", "nome"])

# Mostrar conteúdo
df.show()

## Exercício 2: Salvar DataFrame no HDFS como CSV
Crie um DataFrame e salve em `hdfs://namenode:9000/data/ex1.csv` no formato CSV.

In [None]:
# Salvar no HDFS como CSV
df.write.mode("overwrite").csv("hdfs://namenode:9000/data/ex1.csv", header=True)

print("DataFrame salvo em HDFS com sucesso!")

## Exercício 3: Ler CSV do HDFS
Leia o arquivo salvo no exercício anterior usando `spark.read.csv()` e exiba o DataFrame.

In [None]:
# Ler CSV do HDFS
df_read = spark.read.csv("hdfs://namenode:9000/data/ex1.csv", header=True, inferSchema=True)

# Exibir DataFrame
df_read.show()
df_read.printSchema()

## Exercício 4: Criar namespace Iceberg
Crie um namespace chamado `lab.db` no catálogo Iceberg.

In [None]:
# Criar namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS lab.db")

print("Namespace lab.db criado com sucesso!")

# Verificar namespaces existentes
spark.sql("SHOW NAMESPACES").show()

## Exercício 5: Criar tabela Iceberg
Crie uma tabela Iceberg `lab.db.pessoas (id INT, nome STRING)` usando SQL.

In [None]:
# Criar tabela Iceberg
spark.sql("""
    CREATE TABLE IF NOT EXISTS lab.db.pessoas (
        id INT,
        nome STRING
    ) USING ICEBERG
""")

print("Tabela lab.db.pessoas criada com sucesso!")

# Verificar tabelas no namespace
spark.sql("SHOW TABLES IN lab.db").show()

## Exercício 6: Inserir dados na tabela Iceberg
Insira 3 valores manualmente usando SQL `INSERT INTO`.

In [None]:
# Inserir dados na tabela
spark.sql("INSERT INTO lab.db.pessoas VALUES (1, 'Alice')")
spark.sql("INSERT INTO lab.db.pessoas VALUES (2, 'Bob')")
spark.sql("INSERT INTO lab.db.pessoas VALUES (3, 'Carlos')")

print("Dados inseridos com sucesso!")

# Verificar dados inseridos
spark.sql("SELECT * FROM lab.db.pessoas").show()

## Exercício 7: Ler tabela Iceberg
Faça uma query `SELECT * FROM lab.db.pessoas` e exiba o resultado.

In [None]:
# Ler tabela Iceberg
df_pessoas = spark.sql("SELECT * FROM lab.db.pessoas")

# Exibir resultado
df_pessoas.show()

print(f"Total de colunas: {len(df_pessoas.columns)}")
print(f"Colunas: {df_pessoas.columns}")

## Exercício 8: Contar registros
Conte quantos registros existem na tabela `lab.db.pessoas`.

In [None]:
# Contar registros
count = spark.sql("SELECT COUNT(*) as total FROM lab.db.pessoas")
count.show()

# Alternativa usando DataFrame API
df_pessoas = spark.table("lab.db.pessoas")
total = df_pessoas.count()
print(f"Total de registros: {total}")

## Exercício 9: Atualizar um registro
Atualize um nome usando Iceberg SQL.

In [None]:
# Mostrar dados antes da atualização
print("Dados antes da atualização:")
spark.sql("SELECT * FROM lab.db.pessoas").show()

# Atualizar registro
spark.sql("UPDATE lab.db.pessoas SET nome = 'Alice Silva' WHERE nome = 'Alice'")

# Mostrar dados após atualização
print("Dados após atualização:")
spark.sql("SELECT * FROM lab.db.pessoas").show()

## Exercício 10: Deletar um registro
Remova uma linha da tabela usando `DELETE FROM`.

In [None]:
# Mostrar dados antes da deleção
print("Dados antes da deleção:")
spark.sql("SELECT * FROM lab.db.pessoas").show()

# Deletar registro
spark.sql("DELETE FROM lab.db.pessoas WHERE id = 3")

# Mostrar dados após deleção
print("Dados após deleção:")
spark.sql("SELECT * FROM lab.db.pessoas").show()

## Exercício 11: Criar tabela particionada
Crie uma tabela Iceberg com partição por ano.

In [None]:
# Criar tabela particionada
spark.sql("""
    CREATE TABLE IF NOT EXISTS lab.db.vendas (
        id INT,
        valor DOUBLE,
        ano INT
    ) USING ICEBERG
    PARTITIONED BY (ano)
""")

print("Tabela lab.db.vendas criada com sucesso!")

# Verificar detalhes da tabela
spark.sql("DESCRIBE EXTENDED lab.db.vendas").show(truncate=False)

## Exercício 12: Inserir dados particionados
Insira dados variando o valor de `ano`.

In [None]:
# Inserir dados de diferentes anos
spark.sql("INSERT INTO lab.db.vendas VALUES (1, 100.50, 2022)")
spark.sql("INSERT INTO lab.db.vendas VALUES (2, 250.75, 2022)")
spark.sql("INSERT INTO lab.db.vendas VALUES (3, 150.00, 2023)")
spark.sql("INSERT INTO lab.db.vendas VALUES (4, 300.25, 2023)")
spark.sql("INSERT INTO lab.db.vendas VALUES (5, 450.80, 2024)")

print("Dados inseridos com sucesso!")

# Verificar dados inseridos
spark.sql("SELECT * FROM lab.db.vendas ORDER BY ano, id").show()

## Exercício 13: Consultar apenas um particionamento
Leia somente as vendas do ano 2023.

In [None]:
# Consultar apenas vendas de 2023
print("Vendas do ano 2023:")
spark.sql("SELECT * FROM lab.db.vendas WHERE ano = 2023").show()

# Alternativa usando DataFrame API
df_vendas = spark.table("lab.db.vendas")
vendas_2023 = df_vendas.filter(df_vendas.ano == 2023)
print("\nUsando DataFrame API:")
vendas_2023.show()

## Exercício 14: Ver metadados da tabela
Use `DESCRIBE HISTORY` e `DESCRIBE DETAIL` para ver metadados.

In [None]:
# Ver histórico da tabela
print("Histórico da tabela vendas:")
spark.sql("DESCRIBE HISTORY lab.db.vendas").show(truncate=False)

print("\n" + "="*80 + "\n")

# Ver detalhes da tabela
print("Detalhes da tabela vendas:")
spark.sql("DESCRIBE DETAIL lab.db.vendas").show(truncate=False)

## Exercício 15: Criar tabela Iceberg a partir de DataFrame
Crie um DataFrame artificial e grave diretamente com `writeTo()`.

In [None]:
# Criar DataFrame artificial
data = [
    (1, "Produto A", 50.00),
    (2, "Produto B", 75.50),
    (3, "Produto C", 120.00),
    (4, "Produto D", 35.99),
    (5, "Produto E", 200.00)
]

df_produtos = spark.createDataFrame(data, ["id", "produto", "preco"])

print("DataFrame criado:")
df_produtos.show()

# Gravar como tabela Iceberg
df_produtos.writeTo("lab.db.tabela_df").createOrReplace()

print("\nTabela criada com sucesso!")

# Verificar tabela criada
spark.sql("SELECT * FROM lab.db.tabela_df").show()

## Exercício 16: Converter tabela para Iceberg
Crie uma tabela Parquet simples e converta para Iceberg.

In [None]:
# Criar DataFrame
data = [
    (1, "Item A"),
    (2, "Item B"),
    (3, "Item C")
]

df_items = spark.createDataFrame(data, ["id", "descricao"])

# Salvar como tabela Parquet
df_items.write.mode("overwrite").saveAsTable("lab.db.tabela_parquet", format="parquet")

print("Tabela Parquet criada!")

# Ler dados da tabela Parquet e recriar como Iceberg
df_parquet = spark.table("lab.db.tabela_parquet")
df_parquet.writeTo("lab.db.tabela_iceberg_convertida").createOrReplace()

print("Tabela convertida para Iceberg!")

# Verificar nova tabela
spark.sql("SELECT * FROM lab.db.tabela_iceberg_convertida").show()

## Exercício 17: Leitura incremental (Time Travel)
Volte para uma versão anterior da tabela usando `VERSION AS OF`.

In [None]:
# Mostrar histórico da tabela
print("Histórico da tabela vendas:")
spark.sql("DESCRIBE HISTORY lab.db.vendas").show(truncate=False)

print("\n" + "="*80 + "\n")

# Ler versão atual
print("Dados da versão atual:")
spark.sql("SELECT * FROM lab.db.vendas").show()

print("\n" + "="*80 + "\n")

# Ler versão específica (versão 1) - Time Travel
print("Dados da versão 1 (Time Travel):")
spark.sql("SELECT * FROM lab.db.vendas VERSION AS OF 1").show()

## Exercício 18: Exportar tabela Iceberg para CSV
Leia a tabela Iceberg e salve para `hdfs://namenode:9000/export/vendas.csv`.

In [None]:
# Ler tabela Iceberg
df_export = spark.sql("SELECT * FROM lab.db.vendas")

print("Dados da tabela vendas:")
df_export.show()

# Exportar para CSV no HDFS
output_path = "hdfs://namenode:9000/export/vendas.csv"
df_export.write.mode("overwrite").csv(output_path, header=True)

print(f"\nTabela exportada para {output_path} com sucesso!")

# Verificar se o arquivo foi criado
print("\nLendo arquivo exportado:")
df_exported = spark.read.csv(output_path, header=True, inferSchema=True)
df_exported.show()

## Exercício 19: Criar um Dashboard no Superset

### Objetivo
Adicionar o banco Trino no Superset, conectar ao catálogo Iceberg e criar uma visualização simples.

### Passos:

#### 1. Acessar o Superset
- Acesse: `http://localhost:8088`
- Faça login com as credenciais configuradas

#### 2. Adicionar conexão com Trino
1. Vá em **Data** → **Databases** → **+ Database**
2. Selecione **Trino**
3. Connection string: `trino://trino@trino:8080/iceberg`
4. Teste e salve

#### 3. Conectar ao catálogo Iceberg
1. **Data** → **Datasets** → **+ Dataset**
2. Database: Trino, Schema: lab.db, Table: vendas
3. Salve o dataset

#### 4. Criar visualização
1. **Charts** → **+ Chart**
2. Configure métricas e dimensões
3. Execute e salve

#### 5. Criar Dashboard
1. **Dashboards** → **+ Dashboard**
2. Adicione os charts
3. Organize e salve

**Nota:** Este exercício é manual e deve ser realizado na interface web do Superset.

## Exercício 20: Ler tabela Iceberg via Trino

### Queries SQL para executar no Superset ou CLI do Trino:

```sql
-- Conectar ao catálogo
USE iceberg;

-- Listar schemas
SHOW SCHEMAS;

-- Ler tabela pessoas
SELECT * FROM iceberg.lab.db.pessoas;

-- Ler tabela vendas com agregação
SELECT 
    ano,
    COUNT(*) as total_vendas,
    SUM(valor) as valor_total,
    AVG(valor) as valor_medio
FROM iceberg.lab.db.vendas
GROUP BY ano
ORDER BY ano;
```

**Nota:** Execute essas queries na interface do Superset ou no CLI do Trino.