# Exerc√≠cios de Big Data com Spark + Iceberg + Hive Metastore + HDFS

Este notebook cont√©m as solu√ß√µes para os 20 exerc√≠cios de Big Data usando:
- **Apache Spark** para processamento distribu√≠do
- **Apache Iceberg** para formato de tabela
- **Hive Metastore** para metadados
- **HDFS** para armazenamento distribu√≠do

**Pr√©-requisitos:**
- Ambiente `lab` configurado com todos os servi√ßos
- Jupyter Notebook rodando dentro do container
- Conectividade com HDFS, Hive Metastore e cat√°logo Iceberg

## 1. Setup e Configura√ß√£o

Primeiro, vamos configurar o Spark com o cat√°logo Iceberg e verificar a conectividade.

In [None]:
# Importar bibliotecas necess√°rias
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import *

# Criar sess√£o Spark com configura√ß√£o Iceberg
spark = SparkSession.builder \
    .appName("BigDataExercicios") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.spark_catalog.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.iceberg.type", "hive") \
    .config("spark.sql.catalog.iceberg.uri", "thrift://hive-metastore:9083") \
    .config("spark.sql.catalog.iceberg.warehouse", "hdfs://namenode:9000/warehouse") \
    .getOrCreate()

# Configurar n√≠vel de log
spark.sparkContext.setLogLevel("WARN")

print("‚úÖ Spark Session criada com sucesso!")
print(f"Spark Version: {spark.version}")
print(f"Default Catalog: {spark.conf.get('spark.sql.defaultCatalog', 'spark_catalog')}")

In [None]:
# Verificar conectividade com HDFS
try:
    spark.sparkContext._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark.sparkContext._jvm.java.net.URI("hdfs://namenode:9000"),
        spark.sparkContext._jsc.hadoopConfiguration()
    ).listStatus(spark.sparkContext._jvm.org.apache.hadoop.fs.Path("/"))
    print("‚úÖ HDFS acess√≠vel!")
except Exception as e:
    print(f"‚ùå Erro ao acessar HDFS: {e}")

# Verificar cat√°logos dispon√≠veis
print("\nüìã Cat√°logos dispon√≠veis:")
spark.sql("SHOW CATALOGS").show()

## 2. Exerc√≠cio 1: Criar um DataFrame simples

In [None]:
# Exerc√≠cio 1: Criar um DataFrame com tr√™s linhas e duas colunas (id, nome)
data = [(1, "Jo√£o"), (2, "Maria"), (3, "Pedro")]
schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True)
])

df_simples = spark.createDataFrame(data, schema)

print("üìä DataFrame criado:")
df_simples.show()

print("üìã Schema do DataFrame:")
df_simples.printSchema()

print(f"üìà N√∫mero de registros: {df_simples.count()}")

## 3. Exerc√≠cio 2: Salvar DataFrame no HDFS como CSV

In [None]:
# Exerc√≠cio 2: Salvar DataFrame no HDFS como CSV
hdfs_path = "hdfs://namenode:9000/data/ex1.csv"

# Criar um DataFrame para salvar
data_ex2 = [(1, "Alice"), (2, "Bob"), (3, "Charlie"), (4, "Diana")]
df_ex2 = spark.createDataFrame(data_ex2, schema)

print("üìä DataFrame a ser salvo:")
df_ex2.show()

# Salvar como CSV no HDFS (mode='overwrite' para sobrescrever se existir)
df_ex2.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(hdfs_path)

print(f"‚úÖ DataFrame salvo em: {hdfs_path}")

# Verificar se o arquivo foi criado
try:
    # Listar arquivos no diret√≥rio
    import subprocess
    result = subprocess.run(['hdfs', 'dfs', '-ls', '/data/'], 
                          capture_output=True, text=True)
    print("üìÅ Arquivos no diret√≥rio /data/:")
    print(result.stdout)
except Exception as e:
    print(f"‚ö†Ô∏è N√£o foi poss√≠vel listar arquivos: {e}")

## 4. Exerc√≠cio 3: Ler CSV do HDFS

In [None]:
# Exerc√≠cio 3: Ler CSV do HDFS
df_lido = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(hdfs_path)

print("üìñ DataFrame lido do HDFS:")
df_lido.show()

print("üìã Schema inferido:")
df_lido.printSchema()

print(f"üìà N√∫mero de registros lidos: {df_lido.count()}")

# Verificar se os dados s√£o iguais
print("\nüîç Compara√ß√£o com DataFrame original:")
print("DataFrame original:")
df_ex2.show()
print("DataFrame lido:")
df_lido.show()

## 5. Exerc√≠cio 4: Criar namespace Iceberg

In [None]:
# Exerc√≠cio 4: Criar namespace Iceberg
print("üóÇÔ∏è Criando namespace 'lab.db'...")

# Criar namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.lab")
spark.sql("CREATE NAMESPACE IF NOT EXISTS iceberg.lab.db")

print("‚úÖ Namespace 'lab.db' criado com sucesso!")

# Verificar namespaces criados
print("\nüìã Namespaces dispon√≠veis no cat√°logo iceberg:")
try:
    spark.sql("SHOW NAMESPACES IN iceberg").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao mostrar namespaces: {e}")
    # Alternativa
    spark.sql("SHOW DATABASES").show()

## 6. Exerc√≠cio 5: Criar tabela Iceberg

In [None]:
# Exerc√≠cio 5: Criar tabela Iceberg pessoas
print("üóÉÔ∏è Criando tabela Iceberg 'pessoas'...")

create_table_sql = """
CREATE TABLE IF NOT EXISTS iceberg.lab.db.pessoas (
    id INT,
    nome STRING
) USING ICEBERG
"""

spark.sql(create_table_sql)
print("‚úÖ Tabela 'lab.db.pessoas' criada com sucesso!")

# Verificar se a tabela foi criada
print("\nüìã Tabelas no namespace lab.db:")
try:
    spark.sql("SHOW TABLES IN iceberg.lab.db").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro: {e}")
    # Alternativa
    spark.sql("SHOW TABLES").show()

# Mostrar estrutura da tabela
print("\nüìä Estrutura da tabela pessoas:")
spark.sql("DESCRIBE iceberg.lab.db.pessoas").show()

## 7. Exerc√≠cio 6: Inserir dados na tabela Iceberg

In [None]:
# Exerc√≠cio 6: Inserir dados na tabela Iceberg
print("üìù Inserindo dados na tabela pessoas...")

# Inserir 3 registros
insert_sql = """
INSERT INTO iceberg.lab.db.pessoas VALUES 
    (1, 'Alice'),
    (2, 'Bob'),
    (3, 'Charlie')
"""

spark.sql(insert_sql)
print("‚úÖ Dados inseridos com sucesso!")

# Verificar inser√ß√£o
print("\nüìä Dados na tabela pessoas:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

## 8. Exerc√≠cio 7: Ler tabela Iceberg

In [None]:
# Exerc√≠cio 7: Ler tabela Iceberg
print("üìñ Lendo dados da tabela pessoas...")

# Query SELECT simples
df_pessoas = spark.sql("SELECT * FROM iceberg.lab.db.pessoas")
df_pessoas.show()

# Tamb√©m podemos usar a API DataFrame
df_pessoas_api = spark.table("iceberg.lab.db.pessoas")
print("\nüìä Usando API DataFrame:")
df_pessoas_api.show()

# Mostrar informa√ß√µes do DataFrame
print(f"üìà Total de registros: {df_pessoas.count()}")
print("üìã Schema:")
df_pessoas.printSchema()

## 9. Exerc√≠cio 8: Contar registros

In [None]:
# Exerc√≠cio 8: Contar registros
print("üî¢ Contando registros na tabela pessoas...")

# Usando SQL COUNT
count_result = spark.sql("SELECT COUNT(*) as total FROM iceberg.lab.db.pessoas")
count_result.show()

# Capturar o valor para uso
total_registros = count_result.collect()[0]['total']
print(f"üìä Total de registros na tabela pessoas: {total_registros}")

# Alternativa usando DataFrame API
total_api = spark.table("iceberg.lab.db.pessoas").count()
print(f"üìä Total usando DataFrame API: {total_api}")

## 10. Exerc√≠cio 9: Atualizar um registro

In [None]:
# Exerc√≠cio 9: Atualizar um registro
print("‚úèÔ∏è Atualizando registro na tabela pessoas...")

# Mostrar dados antes da atualiza√ß√£o
print("üìä Dados ANTES da atualiza√ß√£o:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

# Atualizar Alice para Alice Silva
update_sql = """
UPDATE iceberg.lab.db.pessoas 
SET nome = 'Alice Silva' 
WHERE nome = 'Alice'
"""

spark.sql(update_sql)
print("‚úÖ Registro atualizado com sucesso!")

# Mostrar dados ap√≥s a atualiza√ß√£o
print("üìä Dados AP√ìS a atualiza√ß√£o:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

## 11. Exerc√≠cio 10: Deletar um registro

In [None]:
# Exerc√≠cio 10: Deletar um registro
print("üóëÔ∏è Deletando registro da tabela pessoas...")

# Mostrar dados antes da exclus√£o
print("üìä Dados ANTES da exclus√£o:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

# Deletar Bob
delete_sql = """
DELETE FROM iceberg.lab.db.pessoas 
WHERE nome = 'Bob'
"""

spark.sql(delete_sql)
print("‚úÖ Registro deletado com sucesso!")

# Mostrar dados ap√≥s a exclus√£o
print("üìä Dados AP√ìS a exclus√£o:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

print(f"üìà Total de registros ap√≥s exclus√£o: {spark.sql('SELECT COUNT(*) FROM iceberg.lab.db.pessoas').collect()[0][0]}")

## 12. Exerc√≠cio 11: Criar tabela particionada

In [None]:
# Exerc√≠cio 11: Criar tabela particionada por ano
print("üóÇÔ∏è Criando tabela particionada 'vendas'...")

create_vendas_sql = """
CREATE TABLE IF NOT EXISTS iceberg.lab.db.vendas (
    id INT,
    valor DOUBLE,
    ano INT
) USING ICEBERG
PARTITIONED BY (ano)
"""

spark.sql(create_vendas_sql)
print("‚úÖ Tabela 'vendas' particionada criada com sucesso!")

# Verificar estrutura da tabela
print("\nüìä Estrutura da tabela vendas:")
spark.sql("DESCRIBE iceberg.lab.db.vendas").show()

# Mostrar informa√ß√µes de particionamento
print("\nüóÇÔ∏è Informa√ß√µes de particionamento:")
spark.sql("SHOW PARTITIONS iceberg.lab.db.vendas").show()

## 13. Exerc√≠cio 12: Inserir dados particionados

In [None]:
# Exerc√≠cio 12: Inserir dados particionados
print("üìù Inserindo dados particionados na tabela vendas...")

insert_vendas_sql = """
INSERT INTO iceberg.lab.db.vendas VALUES 
    (1, 1000.50, 2022),
    (2, 1500.75, 2022),
    (3, 2000.00, 2023),
    (4, 2500.25, 2023),
    (5, 3000.00, 2023),
    (6, 1750.80, 2024),
    (7, 2200.60, 2024)
"""

spark.sql(insert_vendas_sql)
print("‚úÖ Dados particionados inseridos com sucesso!")

# Verificar dados inseridos
print("\nüìä Dados na tabela vendas:")
spark.sql("SELECT * FROM iceberg.lab.db.vendas ORDER BY ano, id").show()

# Verificar parti√ß√µes criadas
print("\nüóÇÔ∏è Parti√ß√µes criadas:")
try:
    spark.sql("SHOW PARTITIONS iceberg.lab.db.vendas").show()
except:
    print("Informa√ß√£o de parti√ß√µes n√£o dispon√≠vel via SHOW PARTITIONS")

print(f"\nüìà Total de registros: {spark.sql('SELECT COUNT(*) FROM iceberg.lab.db.vendas').collect()[0][0]}")

## 14. Exerc√≠cio 13: Consultar apenas um particionamento

In [None]:
# Exerc√≠cio 13: Consultar apenas vendas de 2023
print("üîç Consultando apenas vendas do ano 2023...")

vendas_2023 = spark.sql("SELECT * FROM iceberg.lab.db.vendas WHERE ano = 2023")
vendas_2023.show()

print(f"üìä Total de vendas em 2023: {vendas_2023.count()}")
print(f"üí∞ Valor total vendido em 2023: {vendas_2023.agg({'valor': 'sum'}).collect()[0][0]}")

# Comparar com outros anos
print("\nüìà Resumo por ano:")
spark.sql("""
    SELECT ano, 
           COUNT(*) as qtd_vendas, 
           SUM(valor) as total_valor,
           AVG(valor) as valor_medio
    FROM iceberg.lab.db.vendas 
    GROUP BY ano 
    ORDER BY ano
""").show()

## 15. Exerc√≠cio 14: Ver metadados da tabela

In [None]:
# Exerc√≠cio 14: Ver metadados da tabela
print("üìã Visualizando metadados da tabela vendas...")

# DESCRIBE HISTORY - hist√≥rico da tabela
print("üìú Hist√≥rico da tabela (DESCRIBE HISTORY):")
try:
    spark.sql("DESCRIBE HISTORY iceberg.lab.db.vendas").show(truncate=False)
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao mostrar hist√≥rico: {e}")

print("\n" + "="*80)

# DESCRIBE DETAIL - detalhes da tabela  
print("üîç Detalhes da tabela (DESCRIBE DETAIL):")
try:
    spark.sql("DESCRIBE DETAIL iceberg.lab.db.vendas").show(truncate=False)
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao mostrar detalhes: {e}")

print("\n" + "="*80)

# Informa√ß√µes adicionais
print("‚ÑπÔ∏è Informa√ß√µes adicionais:")
print("üìä Estrutura da tabela:")
spark.sql("DESCRIBE iceberg.lab.db.vendas").show()

print("üìÅ Propriedades da tabela:")
try:
    spark.sql("SHOW TBLPROPERTIES iceberg.lab.db.vendas").show()
except Exception as e:
    print(f"‚ö†Ô∏è Propriedades n√£o dispon√≠veis: {e}")

## 16. Exerc√≠cio 15: Criar tabela Iceberg a partir de DataFrame

In [None]:
# Exerc√≠cio 15: Criar tabela Iceberg a partir de DataFrame
print("üîÑ Criando tabela Iceberg diretamente a partir de DataFrame...")

# Criar DataFrame artificial com dados de produtos
data_produtos = [
    (1, "Notebook", 2500.00, "Eletr√¥nicos"),
    (2, "Mouse", 50.00, "Eletr√¥nicos"),
    (3, "Teclado", 150.00, "Eletr√¥nicos"),
    (4, "Monitor", 800.00, "Eletr√¥nicos"),
    (5, "Cadeira", 400.00, "M√≥veis"),
    (6, "Mesa", 600.00, "M√≥veis")
]

schema_produtos = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True),
    StructField("preco", DoubleType(), True),
    StructField("categoria", StringType(), True)
])

df_produtos = spark.createDataFrame(data_produtos, schema_produtos)

print("üìä DataFrame produtos criado:")
df_produtos.show()

# Criar tabela Iceberg usando writeTo()
df_produtos.writeTo("iceberg.lab.db.tabela_df").createOrReplace()
print("‚úÖ Tabela 'tabela_df' criada com sucesso a partir do DataFrame!")

# Verificar tabela criada
print("\nüìã Dados na nova tabela:")
spark.sql("SELECT * FROM iceberg.lab.db.tabela_df").show()

print("\nüìä Estat√≠sticas por categoria:")
spark.sql("""
    SELECT categoria, 
           COUNT(*) as qtd_produtos,
           AVG(preco) as preco_medio,
           MAX(preco) as preco_maximo
    FROM iceberg.lab.db.tabela_df 
    GROUP BY categoria
""").show()

## 17. Exerc√≠cio 16: Converter tabela para Iceberg

In [None]:
# Exerc√≠cio 16: Converter tabela Parquet para Iceberg
print("üîÑ Criando tabela Parquet e convertendo para Iceberg...")

# Primeiro, criar uma tabela Parquet simples
data_temp = [
    (1, "Dados tempor√°rios 1"),
    (2, "Dados tempor√°rios 2"),
    (3, "Dados tempor√°rios 3")
]

df_temp = spark.createDataFrame(data_temp, StructType([
    StructField("id", IntegerType(), True),
    StructField("descricao", StringType(), True)
]))

# Salvar como tabela Parquet
df_temp.write.mode("overwrite").saveAsTable("temp_parquet")
print("‚úÖ Tabela Parquet 'temp_parquet' criada!")

# Verificar tabela criada
print("\nüìä Dados na tabela Parquet:")
spark.sql("SELECT * FROM temp_parquet").show()

# Converter para Iceberg (simula√ß√£o - o comando exato pode variar)
print("\nüîÑ Convertendo para formato Iceberg...")
try:
    # M√©todo 1: Recriar como Iceberg
    df_temp.writeTo("iceberg.lab.db.temp_iceberg").createOrReplace()
    print("‚úÖ Tabela convertida para Iceberg como 'temp_iceberg'!")
    
    # Verificar convers√£o
    print("\nüìä Dados na tabela Iceberg convertida:")
    spark.sql("SELECT * FROM iceberg.lab.db.temp_iceberg").show()
    
except Exception as e:
    print(f"‚ö†Ô∏è Erro na convers√£o: {e}")
    print("üí° Nota: A convers√£o direta pode requerer configura√ß√µes espec√≠ficas do ambiente")

## 18. Exerc√≠cio 17: Leitura incremental (Time Travel)

In [None]:
# Exerc√≠cio 17: Time Travel - Leitura incremental
print("‚è∞ Demonstrando funcionalidade Time Travel do Iceberg...")

# Primeiro, vamos ver o hist√≥rico atual da tabela vendas
print("üìú Hist√≥rico atual da tabela vendas:")
try:
    history_df = spark.sql("DESCRIBE HISTORY iceberg.lab.db.vendas")
    history_df.show(truncate=False)
    
    # Capturar snapshots dispon√≠veis
    snapshots = history_df.collect()
    if len(snapshots) > 0:
        print(f"\nüìä Total de snapshots dispon√≠veis: {len(snapshots)}")
        
        # Tentar acessar vers√£o anterior (snapshot 1)
        print("\n‚èÆÔ∏è Acessando dados da vers√£o 1:")
        try:
            spark.sql("SELECT * FROM iceberg.lab.db.vendas VERSION AS OF 1").show()
        except Exception as e:
            print(f"‚ö†Ô∏è N√£o foi poss√≠vel acessar vers√£o 1: {e}")
            
        # Mostrar dados da vers√£o atual
        print("\nüìä Dados da vers√£o atual:")
        spark.sql("SELECT * FROM iceberg.lab.db.vendas").show()
        
        # Exemplo de time travel por timestamp (se dispon√≠vel)
        print("\n‚è∞ Tentando time travel por timestamp...")
        try:
            # Pegar timestamp do primeiro snapshot
            first_snapshot = snapshots[-1]  # Mais antigo
            timestamp = first_snapshot['made_current_at']
            print(f"Tentando acessar dados do timestamp: {timestamp}")
            spark.sql(f"SELECT * FROM iceberg.lab.db.vendas TIMESTAMP AS OF '{timestamp}'").show()
        except Exception as e:
            print(f"‚ö†Ô∏è Time travel por timestamp n√£o dispon√≠vel: {e}")
    else:
        print("‚ö†Ô∏è Nenhum snapshot encontrado")
        
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao acessar hist√≥rico: {e}")
    print("üí° Time Travel requer que existam m√∫ltiplas vers√µes da tabela")

## 19. Exerc√≠cio 18: Exportar tabela Iceberg para CSV

In [None]:
# Exerc√≠cio 18: Exportar tabela Iceberg para CSV
print("üì§ Exportando tabela Iceberg para CSV no HDFS...")

# Ler dados da tabela Iceberg
df_vendas_export = spark.sql("SELECT * FROM iceberg.lab.db.vendas")

print("üìä Dados a serem exportados:")
df_vendas_export.show()

# Caminho de destino no HDFS
export_path = "hdfs://namenode:9000/export/vendas.csv"

# Exportar para CSV
df_vendas_export.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(export_path)

print(f"‚úÖ Dados exportados com sucesso para: {export_path}")

# Verificar se o arquivo foi criado
try:
    import subprocess
    result = subprocess.run(['hdfs', 'dfs', '-ls', '/export/'], 
                          capture_output=True, text=True)
    print("\nüìÅ Arquivos no diret√≥rio /export/:")
    print(result.stdout)
except Exception as e:
    print(f"‚ö†Ô∏è N√£o foi poss√≠vel listar arquivos: {e}")

# Verificar conte√∫do do arquivo exportado
print("\nüîç Verificando dados exportados:")
df_verificacao = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(export_path)

df_verificacao.show()
print(f"üìà Total de registros exportados: {df_verificacao.count()}")

## 20. Exerc√≠cio 19: Configura√ß√£o para Dashboard no Superset

In [None]:
# Exerc√≠cio 19: Prepara√ß√£o para Dashboard no Superset
print("üìä Preparando dados para Dashboard no Superset...")

# Criar uma view agregada para facilitar visualiza√ß√µes
print("üìà Criando view agregada para dashboards...")

create_view_sql = """
CREATE OR REPLACE VIEW iceberg.lab.db.vendas_dashboard AS
SELECT 
    ano,
    COUNT(*) as total_vendas,
    SUM(valor) as receita_total,
    AVG(valor) as ticket_medio,
    MIN(valor) as menor_venda,
    MAX(valor) as maior_venda
FROM iceberg.lab.db.vendas
GROUP BY ano
"""

spark.sql(create_view_sql)
print("‚úÖ View 'vendas_dashboard' criada!")

# Verificar dados da view
print("\nüìä Dados da view para dashboard:")
spark.sql("SELECT * FROM iceberg.lab.db.vendas_dashboard ORDER BY ano").show()

# Criar dados de exemplo adicionais para visualiza√ß√µes mais ricas
print("\nüìù Criando tabela de vendas detalhadas para dashboards...")

vendas_detalhadas_sql = """
CREATE OR REPLACE TABLE iceberg.lab.db.vendas_detalhadas
USING ICEBERG AS
SELECT 
    id,
    valor,
    ano,
    CASE 
        WHEN valor < 1000 THEN 'Baixo'
        WHEN valor < 2000 THEN 'M√©dio' 
        ELSE 'Alto'
    END as categoria_valor,
    CASE 
        WHEN id % 4 = 0 THEN 'Q1'
        WHEN id % 4 = 1 THEN 'Q2'
        WHEN id % 4 = 2 THEN 'Q3'
        ELSE 'Q4'
    END as trimestre,
    valor * 0.1 as comissao
FROM iceberg.lab.db.vendas
"""

spark.sql(vendas_detalhadas_sql)
print("‚úÖ Tabela 'vendas_detalhadas' criada!")

print("\nüìä Preview dos dados detalhados:")
spark.sql("SELECT * FROM iceberg.lab.db.vendas_detalhadas ORDER BY ano, id").show()

print("""
üìã INSTRU√á√ïES PARA SUPERSET:

1. Conectar ao Trino:
   - Host: trino (ou IP do container Trino)
   - Porta: 8080
   - Usu√°rio: admin
   - Cat√°logo: iceberg
   - Schema: lab.db

2. URL de conex√£o no Superset:
   trino://admin@trino:8080/iceberg/lab.db

3. Tabelas dispon√≠veis para visualiza√ß√£o:
   - iceberg.lab.db.vendas (dados brutos)
   - iceberg.lab.db.vendas_dashboard (dados agregados)
   - iceberg.lab.db.vendas_detalhadas (dados com categorias)
   - iceberg.lab.db.pessoas (dados de pessoas)
   - iceberg.lab.db.tabela_df (dados de produtos)

4. Exemplos de visualiza√ß√µes:
   - Gr√°fico de barras: receita por ano
   - Gr√°fico de pizza: vendas por categoria
   - Tabela: m√©tricas de vendas por trimestre
""")

## 21. Exerc√≠cio 20: Consultas via Trino

In [None]:
# Exerc√≠cio 20: Simula√ß√£o de consultas Trino
print("üîç Simulando consultas que seriam executadas via Trino...")

# Estas s√£o as consultas que voc√™ executaria no CLI do Trino ou Superset
trino_queries = [
    "SELECT * FROM iceberg.lab.db.pessoas;",
    "SELECT * FROM iceberg.lab.db.vendas ORDER BY ano, valor DESC;",
    "SELECT ano, SUM(valor) as total FROM iceberg.lab.db.vendas GROUP BY ano;",
    "SELECT categoria, AVG(preco) as preco_medio FROM iceberg.lab.db.tabela_df GROUP BY categoria;"
]

print("üìã Consultas Trino equivalentes:")
for i, query in enumerate(trino_queries, 1):
    print(f"{i}. {query}")

print("\n" + "="*80)
print("üöÄ Executando consultas via Spark (equivalente ao Trino):")

# Query 1: Pessoas
print("\n1Ô∏è‚É£ Consulta: Tabela pessoas")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

# Query 2: Vendas ordenadas
print("\n2Ô∏è‚É£ Consulta: Vendas ordenadas por ano e valor")
spark.sql("SELECT * FROM iceberg.lab.db.vendas ORDER BY ano, valor DESC").show()

# Query 3: Total por ano
print("\n3Ô∏è‚É£ Consulta: Total de vendas por ano")
spark.sql("SELECT ano, SUM(valor) as total FROM iceberg.lab.db.vendas GROUP BY ano ORDER BY ano").show()

# Query 4: Pre√ßo m√©dio por categoria
print("\n4Ô∏è‚É£ Consulta: Pre√ßo m√©dio por categoria de produtos")
spark.sql("SELECT categoria, AVG(preco) as preco_medio FROM iceberg.lab.db.tabela_df GROUP BY categoria").show()

print("""
üí° COMANDOS TRINO CLI:

Para conectar ao Trino CLI (fora do notebook):
$ trino --server localhost:8080 --catalog iceberg --schema lab.db

Comandos √∫teis no Trino:
- SHOW CATALOGS;
- SHOW SCHEMAS FROM iceberg;
- SHOW TABLES FROM iceberg.lab.db;
- DESCRIBE iceberg.lab.db.vendas;
""")

## üéØ Resumo Final e Verifica√ß√µes

In [None]:
# RESUMO FINAL - Verifica√ß√£o de todos os exerc√≠cios
print("üéØ RESUMO FINAL DOS EXERC√çCIOS")
print("="*60)

# Verificar todas as tabelas criadas
print("üìã Tabelas Iceberg criadas:")
try:
    spark.sql("SHOW TABLES IN iceberg.lab.db").show()
except:
    print("‚ö†Ô∏è Erro ao listar tabelas")

print("\nüìä Resumo dos dados:")

# 1. Tabela pessoas
try:
    pessoas_count = spark.sql("SELECT COUNT(*) FROM iceberg.lab.db.pessoas").collect()[0][0]
    print(f"üë• Pessoas: {pessoas_count} registros")
except:
    print("üë• Pessoas: Erro ao contar")

# 2. Tabela vendas
try:
    vendas_count = spark.sql("SELECT COUNT(*) FROM iceberg.lab.db.vendas").collect()[0][0]
    vendas_total = spark.sql("SELECT SUM(valor) FROM iceberg.lab.db.vendas").collect()[0][0]
    print(f"üí∞ Vendas: {vendas_count} registros, Total: R$ {vendas_total:.2f}")
except:
    print("üí∞ Vendas: Erro ao contar")

# 3. Tabela produtos
try:
    produtos_count = spark.sql("SELECT COUNT(*) FROM iceberg.lab.db.tabela_df").collect()[0][0]
    print(f"üì¶ Produtos: {produtos_count} registros")
except:
    print("üì¶ Produtos: Erro ao contar")

print("\n‚úÖ EXERC√çCIOS COMPLETADOS:")
exercicios = [
    "‚úì 1. DataFrame simples criado",
    "‚úì 2. DataFrame salvo no HDFS como CSV", 
    "‚úì 3. CSV lido do HDFS",
    "‚úì 4. Namespace Iceberg criado",
    "‚úì 5. Tabela Iceberg criada",
    "‚úì 6. Dados inseridos na tabela Iceberg",
    "‚úì 7. Tabela Iceberg consultada",
    "‚úì 8. Registros contados",
    "‚úì 9. Registro atualizado",
    "‚úì 10. Registro deletado",
    "‚úì 11. Tabela particionada criada",
    "‚úì 12. Dados particionados inseridos",
    "‚úì 13. Parti√ß√£o espec√≠fica consultada",
    "‚úì 14. Metadados da tabela visualizados",
    "‚úì 15. Tabela criada a partir de DataFrame",
    "‚úì 16. Convers√£o para Iceberg demonstrada",
    "‚úì 17. Time Travel implementado",
    "‚úì 18. Dados exportados para CSV",
    "‚úì 19. Prepara√ß√£o para Superset realizada",
    "‚úì 20. Consultas Trino simuladas"
]

for exercicio in exercicios:
    print(exercicio)

print(f"\nüéâ PARAB√âNS! Todos os {len(exercicios)} exerc√≠cios foram completados!")
print("\nüí° PR√ìXIMOS PASSOS:")
print("- Conecte o Superset ao Trino para criar dashboards")
print("- Explore funcionalidades avan√ßadas do Iceberg")
print("- Pratique com datasets maiores")
print("- Implemente pipelines de dados completos")

print("\nüöÄ Bom estudo com Big Data!")

In [None]:
# Exerc√≠cio 10: Deletar um registro
print("üóëÔ∏è Deletando registro da tabela pessoas...")

# Mostrar dados antes da dele√ß√£o
print("üìä Dados ANTES da dele√ß√£o:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

# Deletar Charlie
delete_sql = """
DELETE FROM iceberg.lab.db.pessoas 
WHERE nome = 'Charlie'
"""

spark.sql(delete_sql)
print("‚úÖ Registro deletado com sucesso!")

# Mostrar dados ap√≥s a dele√ß√£o
print("üìä Dados AP√ìS a dele√ß√£o:")
spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()

# Contar registros restantes
total_after_delete = spark.sql("SELECT COUNT(*) as total FROM iceberg.lab.db.pessoas").collect()[0]['total']
print(f"üìà Total de registros ap√≥s dele√ß√£o: {total_after_delete}")

## 12. Exerc√≠cio 11: Criar tabela particionada

In [None]:
# Exerc√≠cio 11: Criar tabela particionada
print("üóÇÔ∏è Criando tabela particionada 'vendas'...")

create_partitioned_table_sql = """
CREATE TABLE IF NOT EXISTS iceberg.lab.db.vendas (
    id INT,
    valor DOUBLE,
    ano INT
) USING ICEBERG
PARTITIONED BY (ano)
"""

spark.sql(create_partitioned_table_sql)
print("‚úÖ Tabela particionada 'vendas' criada com sucesso!")

# Verificar estrutura da tabela
print("\nüìä Estrutura da tabela vendas:")
spark.sql("DESCRIBE iceberg.lab.db.vendas").show()

# Mostrar informa√ß√µes de particionamento
print("\nüóÇÔ∏è Informa√ß√µes de particionamento:")
try:
    spark.sql("DESCRIBE DETAIL iceberg.lab.db.vendas").show()
except Exception as e:
    print(f"‚ö†Ô∏è N√£o foi poss√≠vel mostrar detalhes: {e}")

## 13. Exerc√≠cio 12: Inserir dados particionados

In [None]:
# Exerc√≠cio 12: Inserir dados particionados
print("üìù Inserindo dados particionados na tabela vendas...")

# Inserir dados com diferentes anos
insert_vendas_sql = """
INSERT INTO iceberg.lab.db.vendas VALUES 
    (1, 1500.50, 2022),
    (2, 2300.75, 2022),
    (3, 1800.00, 2023),
    (4, 2100.25, 2023),
    (5, 2500.00, 2023),
    (6, 1900.00, 2024),
    (7, 2200.50, 2024)
"""

spark.sql(insert_vendas_sql)
print("‚úÖ Dados inseridos com sucesso!")

# Mostrar todos os dados
print("\nüìä Dados na tabela vendas:")
spark.sql("SELECT * FROM iceberg.lab.db.vendas ORDER BY ano, id").show()

# Mostrar resumo por ano
print("\nüìà Resumo por ano:")
spark.sql("""
    SELECT ano, COUNT(*) as total_vendas, SUM(valor) as total_valor 
    FROM iceberg.lab.db.vendas 
    GROUP BY ano 
    ORDER BY ano
""").show()

## 14. Exerc√≠cio 13: Consultar apenas um particionamento

In [None]:
# Exerc√≠cio 13: Consultar apenas um particionamento
print("üîç Consultando apenas vendas de 2023...")

# Query para apenas 2023
vendas_2023 = spark.sql("""
    SELECT * FROM iceberg.lab.db.vendas 
    WHERE ano = 2023
    ORDER BY id
""")

vendas_2023.show()

print(f"üìä Total de vendas em 2023: {vendas_2023.count()}")

# Mostrar estat√≠sticas de 2023
print("\nüìà Estat√≠sticas de vendas 2023:")
spark.sql("""
    SELECT 
        COUNT(*) as total_vendas,
        SUM(valor) as valor_total,
        AVG(valor) as valor_medio,
        MIN(valor) as menor_venda,
        MAX(valor) as maior_venda
    FROM iceberg.lab.db.vendas 
    WHERE ano = 2023
""").show()

# Verificar se o Spark est√° fazendo partition pruning
print("\nüöÄ Plano de execu√ß√£o (deve mostrar partition pruning):")
vendas_2023.explain()

## 15. Exerc√≠cio 14: Ver metadados da tabela

In [None]:
# Exerc√≠cio 14: Ver metadados da tabela
print("üìã Verificando metadados da tabela vendas...")

# DESCRIBE HISTORY - mostra hist√≥rico de commits/snapshots
print("\nüïí Hist√≥rico da tabela (DESCRIBE HISTORY):")
try:
    spark.sql("DESCRIBE HISTORY iceberg.lab.db.vendas").show(truncate=False)
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao obter hist√≥rico: {e}")

# DESCRIBE DETAIL - mostra detalhes da tabela
print("\nüîç Detalhes da tabela (DESCRIBE DETAIL):")
try:
    spark.sql("DESCRIBE DETAIL iceberg.lab.db.vendas").show(truncate=False)
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao obter detalhes: {e}")

# Informa√ß√µes adicionais do Iceberg
print("\nüìä Estrutura da tabela:")
spark.sql("DESCRIBE iceberg.lab.db.vendas").show()

# Mostrar arquivos da tabela (se dispon√≠vel)
print("\nüìÅ Files da tabela:")
try:
    spark.sql("SELECT * FROM iceberg.lab.db.vendas.files").show()
except Exception as e:
    print(f"‚ö†Ô∏è N√£o foi poss√≠vel mostrar files: {e}")

## 16. Exerc√≠cio 15: Criar tabela Iceberg a partir de DataFrame

In [None]:
# Exerc√≠cio 15: Criar tabela Iceberg a partir de DataFrame
print("üìä Criando DataFrame artificial e salvando como tabela Iceberg...")

# Criar um DataFrame com dados de produtos
from datetime import datetime
import random

produtos_data = [
    (1, "Notebook", 2500.00, "Eletr√¥nicos", datetime(2024, 1, 15)),
    (2, "Mouse", 45.50, "Eletr√¥nicos", datetime(2024, 1, 16)),
    (3, "Teclado", 120.00, "Eletr√¥nicos", datetime(2024, 1, 17)),
    (4, "Monitor", 800.00, "Eletr√¥nicos", datetime(2024, 1, 18)),
    (5, "Cadeira", 350.00, "M√≥veis", datetime(2024, 1, 19)),
    (6, "Mesa", 450.00, "M√≥veis", datetime(2024, 1, 20))
]

produtos_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True),
    StructField("preco", DoubleType(), True),
    StructField("categoria", StringType(), True),
    StructField("data_cadastro", DateType(), True)
])

df_produtos = spark.createDataFrame(produtos_data, produtos_schema)

print("üìã DataFrame criado:")
df_produtos.show()

# Salvar como tabela Iceberg usando writeTo
print("\nüíæ Salvando como tabela Iceberg...")
df_produtos.writeTo("iceberg.lab.db.produtos").createOrReplace()

print("‚úÖ Tabela 'produtos' criada com sucesso!")

# Verificar a tabela criada
print("\nüìä Dados da nova tabela:")
spark.sql("SELECT * FROM iceberg.lab.db.produtos").show()

print("\nüìã Schema da tabela:")
spark.sql("DESCRIBE iceberg.lab.db.produtos").show()

## 17. Exerc√≠cio 16: Converter tabela para Iceberg

In [None]:
# Exerc√≠cio 16: Converter tabela para Iceberg
print("üîÑ Criando tabela Parquet e convertendo para Iceberg...")

# Primeiro, criar uma tabela Parquet simples
clientes_data = [
    (1, "Jo√£o Silva", "joao@email.com", "SP"),
    (2, "Maria Santos", "maria@email.com", "RJ"),
    (3, "Pedro Costa", "pedro@email.com", "MG"),
    (4, "Ana Oliveira", "ana@email.com", "SP")
]

clientes_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("nome", StringType(), True),
    StructField("email", StringType(), True),
    StructField("estado", StringType(), True)
])

df_clientes = spark.createDataFrame(clientes_data, clientes_schema)

# Salvar como tabela Parquet no cat√°logo Hive
print("üìÅ Criando tabela Parquet...")
df_clientes.write \
    .mode("overwrite") \
    .option("path", "hdfs://namenode:9000/warehouse/clientes_parquet") \
    .saveAsTable("default.clientes_parquet")

print("‚úÖ Tabela Parquet criada!")

# Verificar a tabela Parquet
print("\nüìä Dados da tabela Parquet:")
spark.sql("SELECT * FROM default.clientes_parquet").show()

# Agora converter para Iceberg
print("\nüîÑ Convertendo para Iceberg...")
try:
    # Criar tabela Iceberg baseada na Parquet
    spark.sql("""
        CREATE TABLE iceberg.lab.db.clientes
        USING ICEBERG
        AS SELECT * FROM default.clientes_parquet
    """)
    print("‚úÖ Tabela convertida para Iceberg com sucesso!")
    
    # Verificar a tabela Iceberg
    print("\nüìä Dados da tabela Iceberg:")
    spark.sql("SELECT * FROM iceberg.lab.db.clientes").show()
    
except Exception as e:
    print(f"‚ö†Ô∏è Erro na convers√£o: {e}")
    print("üí° Tentando m√©todo alternativo...")
    
    # M√©todo alternativo: ler Parquet e escrever como Iceberg
    df_from_parquet = spark.sql("SELECT * FROM default.clientes_parquet")
    df_from_parquet.writeTo("iceberg.lab.db.clientes").createOrReplace()
    print("‚úÖ Convers√£o alternativa realizada!")

## 18. Exerc√≠cio 17: Leitura incremental (Time Travel)

In [None]:
# Exerc√≠cio 17: Leitura incremental (Time Travel)
print("‚è∞ Demonstrando Time Travel com Iceberg...")

# Primeiro, vamos ver o hist√≥rico da tabela vendas
print("üìã Hist√≥rico atual da tabela vendas:")
try:
    history_df = spark.sql("DESCRIBE HISTORY iceberg.lab.db.vendas")
    history_df.show()
    
    # Pegar o n√∫mero de snapshots dispon√≠veis
    snapshots = history_df.collect()
    print(f"üìä Total de snapshots dispon√≠veis: {len(snapshots)}")
    
    if len(snapshots) >= 2:
        # Mostrar dados atuais
        print("\nüìä Dados atuais (vers√£o mais recente):")
        spark.sql("SELECT * FROM iceberg.lab.db.vendas ORDER BY id").show()
        
        # Time travel para vers√£o anterior
        print(f"\n‚è™ Dados da vers√£o anterior (snapshot {snapshots[1]['snapshot_id']}):")
        spark.sql(f"""
            SELECT * FROM iceberg.lab.db.vendas 
            VERSION AS OF {snapshots[1]['snapshot_id']}
            ORDER BY id
        """).show()
        
        # Time travel por timestamp
        if len(snapshots) >= 1:
            older_timestamp = snapshots[-1]['made_current_at']
            print(f"\nüïí Dados em timestamp espec√≠fico ({older_timestamp}):")
            spark.sql(f"""
                SELECT * FROM iceberg.lab.db.vendas 
                TIMESTAMP AS OF '{older_timestamp}'
                ORDER BY id
            """).show()
    else:
        print("‚ö†Ô∏è Poucos snapshots dispon√≠veis para demonstrar time travel")
        
        # Vamos criar mais mudan√ßas para ter mais snapshots
        print("üìù Criando mais altera√ß√µes para demonstrar time travel...")
        
        # Adicionar mais dados
        spark.sql("""
            INSERT INTO iceberg.lab.db.vendas VALUES 
                (8, 3000.00, 2024),
                (9, 1750.00, 2024)
        """)
        
        # Fazer uma atualiza√ß√£o
        spark.sql("""
            UPDATE iceberg.lab.db.vendas 
            SET valor = valor * 1.1 
            WHERE ano = 2024
        """)
        
        print("‚úÖ Novas altera√ß√µes feitas!")
        
        # Mostrar hist√≥rico atualizado
        print("\nüìã Hist√≥rico atualizado:")
        spark.sql("DESCRIBE HISTORY iceberg.lab.db.vendas").show()
        
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao demonstrar time travel: {e}")
    print("üí° Time travel requer snapshots m√∫ltiplos da tabela")

## 19. Exerc√≠cio 18: Exportar tabela Iceberg para CSV

In [None]:
# Exerc√≠cio 18: Exportar tabela Iceberg para CSV
print("üì§ Exportando tabela Iceberg para CSV no HDFS...")

# Ler dados da tabela Iceberg vendas
df_vendas_export = spark.sql("SELECT * FROM iceberg.lab.db.vendas")

print("üìä Dados a serem exportados:")
df_vendas_export.show()

# Caminho de destino no HDFS
export_path = "hdfs://namenode:9000/export/vendas.csv"

# Exportar para CSV
df_vendas_export.coalesce(1) \
    .write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv(export_path)

print(f"‚úÖ Dados exportados com sucesso para: {export_path}")

# Verificar se o arquivo foi criado
try:
    import subprocess
    result = subprocess.run(['hdfs', 'dfs', '-ls', '/export/'], 
                          capture_output=True, text=True)
    print("\nüìÅ Arquivos no diret√≥rio /export/:")
    print(result.stdout)
except Exception as e:
    print(f"‚ö†Ô∏è N√£o foi poss√≠vel listar arquivos: {e}")

# Verificar conte√∫do do arquivo exportado
print("\nüîç Verificando dados exportados:")
df_verificacao = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(export_path)

df_verificacao.show()
print(f"üìà Total de registros exportados: {df_verificacao.count()}")

# Estat√≠sticas da exporta√ß√£o
print("\nüìä Estat√≠sticas da exporta√ß√£o:")
spark.sql("""
    SELECT 
        'Original' as fonte,
        COUNT(*) as total_registros,
        SUM(valor) as soma_valores,
        AVG(valor) as valor_medio
    FROM iceberg.lab.db.vendas
""").show()

print("‚úÖ Exerc√≠cio 18 conclu√≠do: Tabela Iceberg exportada para CSV com sucesso!")

## 20. Exerc√≠cio 19: Configura√ß√£o para Dashboard no Superset

In [None]:
# Exerc√≠cio 19: Prepara√ß√£o para Dashboard no Superset
print("üìä Preparando dados para Dashboard no Superset...")

# Criar uma view agregada para facilitar visualiza√ß√µes
print("üìà Criando view agregada para dashboards...")

create_view_sql = """
CREATE OR REPLACE VIEW iceberg.lab.db.vendas_dashboard AS
SELECT 
    ano,
    COUNT(*) as total_vendas,
    SUM(valor) as receita_total,
    AVG(valor) as ticket_medio,
    MIN(valor) as menor_venda,
    MAX(valor) as maior_venda
FROM iceberg.lab.db.vendas
GROUP BY ano
"""

spark.sql(create_view_sql)
print("‚úÖ View 'vendas_dashboard' criada!")

# Verificar dados da view
print("\nüìä Dados da view para dashboard:")
spark.sql("SELECT * FROM iceberg.lab.db.vendas_dashboard ORDER BY ano").show()

# Criar dados de exemplo adicionais para visualiza√ß√µes mais ricas
print("\nüìù Criando tabela de vendas detalhadas para dashboards...")

vendas_detalhadas_sql = """
CREATE OR REPLACE TABLE iceberg.lab.db.vendas_detalhadas
USING ICEBERG AS
SELECT 
    id,
    valor,
    ano,
    CASE 
        WHEN valor < 1000 THEN 'Baixo'
        WHEN valor < 2000 THEN 'M√©dio' 
        ELSE 'Alto'
    END as categoria_valor,
    CASE 
        WHEN id % 4 = 0 THEN 'Q1'
        WHEN id % 4 = 1 THEN 'Q2'
        WHEN id % 4 = 2 THEN 'Q3'
        ELSE 'Q4'
    END as trimestre,
    valor * 0.1 as comissao
FROM iceberg.lab.db.vendas
"""

spark.sql(vendas_detalhadas_sql)
print("‚úÖ Tabela 'vendas_detalhadas' criada!")

print("\nüìä Preview dos dados detalhados:")
spark.sql("SELECT * FROM iceberg.lab.db.vendas_detalhadas ORDER BY ano, id").show()

print("""
üìã INSTRU√á√ïES PARA SUPERSET:

1. Conectar ao Trino:
   - Host: trino (ou IP do container Trino)
   - Porta: 8080
   - Usu√°rio: admin
   - Cat√°logo: iceberg
   - Schema: lab.db

2. URL de conex√£o no Superset:
   trino://admin@trino:8080/iceberg/lab.db

3. Tabelas dispon√≠veis para visualiza√ß√£o:
   - iceberg.lab.db.vendas (dados brutos)
   - iceberg.lab.db.vendas_dashboard (dados agregados)
   - iceberg.lab.db.vendas_detalhadas (dados com categorias)
   - iceberg.lab.db.pessoas (dados de pessoas)

4. Exemplos de visualiza√ß√µes:
   - Gr√°fico de barras: receita por ano
   - Gr√°fico de pizza: vendas por categoria
   - Tabela: m√©tricas de vendas por trimestre
""")

## 21. Exerc√≠cio 20: Consultas via Trino

In [None]:
# Exerc√≠cio 20: Simula√ß√£o de consultas Trino
print("üîç Simulando consultas que seriam executadas via Trino...")

# Estas s√£o as consultas que voc√™ executaria no CLI do Trino ou Superset
trino_queries = [
    "SELECT * FROM iceberg.lab.db.pessoas;",
    "SELECT * FROM iceberg.lab.db.vendas ORDER BY ano, valor DESC;",
    "SELECT ano, SUM(valor) as total FROM iceberg.lab.db.vendas GROUP BY ano;",
    "SELECT categoria, AVG(preco) as preco_medio FROM iceberg.lab.db.produtos GROUP BY categoria;"
]

print("üìã Consultas Trino equivalentes:")
for i, query in enumerate(trino_queries, 1):
    print(f"{i}. {query}")

print("\n" + "="*80)
print("üöÄ Executando consultas via Spark (equivalente ao Trino):")

# Query 1: Pessoas
print("\n1Ô∏è‚É£ Consulta: Tabela pessoas")
try:
    spark.sql("SELECT * FROM iceberg.lab.db.pessoas").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro: {e}")

# Query 2: Vendas ordenadas
print("\n2Ô∏è‚É£ Consulta: Vendas ordenadas por ano e valor")
try:
    spark.sql("SELECT * FROM iceberg.lab.db.vendas ORDER BY ano, valor DESC").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro: {e}")

# Query 3: Total por ano
print("\n3Ô∏è‚É£ Consulta: Total de vendas por ano")
try:
    spark.sql("SELECT ano, SUM(valor) as total FROM iceberg.lab.db.vendas GROUP BY ano ORDER BY ano").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro: {e}")

# Query 4: Pre√ßo m√©dio por categoria
print("\n4Ô∏è‚É£ Consulta: Pre√ßo m√©dio por categoria de produtos")
try:
    spark.sql("SELECT categoria, AVG(preco) as preco_medio FROM iceberg.lab.db.produtos GROUP BY categoria").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro: {e}")

print("""
üí° COMANDOS TRINO CLI:

Para conectar ao Trino CLI (fora do notebook):
$ trino --server localhost:8080 --catalog iceberg --schema lab.db

Comandos √∫teis no Trino:
- SHOW CATALOGS;
- SHOW SCHEMAS FROM iceberg;
- SHOW TABLES FROM iceberg.lab.db;
- DESCRIBE iceberg.lab.db.vendas;

‚úÖ Exerc√≠cio 20 conclu√≠do: Consultas Trino simuladas com sucesso!
""")

## üéØ Resumo Final Completo

In [None]:
# üéØ RESUMO FINAL - Verifica√ß√£o de todos os 20 exerc√≠cios
print("üéØ RESUMO FINAL DOS EXERC√çCIOS DE BIG DATA")
print("="*70)

# Verificar todas as tabelas criadas
print("üìã Verificando tabelas Iceberg criadas:")
try:
    spark.sql("SHOW TABLES IN iceberg.lab.db").show()
except Exception as e:
    print(f"‚ö†Ô∏è Erro ao listar tabelas: {e}")

print("\nüìä Resumo dos dados:")

# Verifica√ß√£o das tabelas principais
tabelas_para_verificar = [
    ("pessoas", "iceberg.lab.db.pessoas"),
    ("vendas", "iceberg.lab.db.vendas"),
    ("produtos", "iceberg.lab.db.produtos"),
    ("vendas_dashboard", "iceberg.lab.db.vendas_dashboard"),
    ("vendas_detalhadas", "iceberg.lab.db.vendas_detalhadas")
]

for nome, tabela in tabelas_para_verificar:
    try:
        count = spark.sql(f"SELECT COUNT(*) FROM {tabela}").collect()[0][0]
        print(f"‚úÖ {nome}: {count} registros")
    except Exception as e:
        print(f"‚ö†Ô∏è {nome}: Erro ao verificar - {e}")

print("\n‚úÖ LISTA COMPLETA DOS 20 EXERC√çCIOS:")
exercicios = [
    "‚úì  1. DataFrame simples criado",
    "‚úì  2. DataFrame salvo no HDFS como CSV", 
    "‚úì  3. CSV lido do HDFS",
    "‚úì  4. Namespace Iceberg criado",
    "‚úì  5. Tabela Iceberg criada",
    "‚úì  6. Dados inseridos na tabela Iceberg",
    "‚úì  7. Tabela Iceberg consultada",
    "‚úì  8. Registros contados",
    "‚úì  9. Registro atualizado",
    "‚úì 10. Registro deletado",
    "‚úì 11. Tabela particionada criada",
    "‚úì 12. Dados particionados inseridos",
    "‚úì 13. Parti√ß√£o espec√≠fica consultada",
    "‚úì 14. Metadados da tabela visualizados",
    "‚úì 15. Tabela criada a partir de DataFrame",
    "‚úì 16. Convers√£o para Iceberg demonstrada",
    "‚úì 17. Time Travel implementado",
    "‚úì 18. Dados exportados para CSV",
    "‚úì 19. Prepara√ß√£o para Superset realizada",
    "‚úì 20. Consultas Trino simuladas"
]

for exercicio in exercicios:
    print(exercicio)

print(f"\nüéâ PARAB√âNS! Todos os {len(exercicios)} exerc√≠cios foram completados!")

print("\nüí° TECNOLOGIAS UTILIZADAS:")
tecnologias = [
    "üêç Python + PySpark",
    "üßä Apache Iceberg (formato de tabela)",
    "üóÑÔ∏è Hive Metastore (metadados)",
    "üêò HDFS (armazenamento distribu√≠do)",
    "üîç Trino (engine de consulta)",
    "üìä Apache Superset (dashboards)",
    "‚ö° Spark SQL (processamento)"
]

for tech in tecnologias:
    print(f"   {tech}")

print("\nüöÄ PR√ìXIMOS PASSOS SUGERIDOS:")
next_steps = [
    "1. Execute este notebook no ambiente lab",
    "2. Conecte o Superset ao Trino para criar dashboards",
    "3. Explore funcionalidades avan√ßadas do Iceberg",
    "4. Pratique com datasets maiores e mais complexos",
    "5. Implemente pipelines de dados end-to-end",
    "6. Estude otimiza√ß√µes de performance com particionamento",
    "7. Experimente com schema evolution no Iceberg"
]

for step in next_steps:
    print(f"   {step}")

print("\n" + "="*70)
print("üéì NOTEBOOK DE EXERC√çCIOS COMPLETO!")
print("üìö Bom estudo com Big Data, Spark e Iceberg!")
print("="*70)