# Teste de Conexão DuckDB + MinIO + dbt

Este notebook testa a integração do ambiente de notebooks com:
- DuckDB (banco de dados analítico)
- MinIO (armazenamento S3-compatible)
- dbt (transformações)

As queries são baseadas em `scripts/example_queries.py`


## 1. Importar Bibliotecas


In [2]:
import duckdb
import os
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# Configurar visualização
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
%matplotlib inline


## 2. Configurar Conexão DuckDB


In [3]:
# Caminho do banco de dados (compartilhado com outros serviços)
db_path = "/app/lakehouse/lakehouse.duckdb"
os.makedirs(os.path.dirname(db_path), exist_ok=True)

# Conectar ao DuckDB
con = duckdb.connect(db_path)
print(f"✓ Conectado ao DuckDB: {db_path}")


✓ Conectado ao DuckDB: /app/lakehouse/lakehouse.duckdb


## 3. Configurar Conexão S3 (MinIO)


In [4]:
# Obter variáveis de ambiente
minio_endpoint = os.getenv('MINIO_ENDPOINT', 'minio:9000')
minio_access_key = os.getenv('MINIO_ACCESS_KEY', 'admin')
minio_secret_key = os.getenv('MINIO_SECRET_KEY', 'minioadmin123')
minio_bucket = os.getenv('MINIO_BUCKET', 'lakehouse')

print(f"Configurando conexão S3 (MinIO)...")
print(f"  Endpoint: {minio_endpoint}")
print(f"  Bucket: {minio_bucket}")

# Configurar S3 no DuckDB
con.execute(f"""
    INSTALL httpfs;
    LOAD httpfs;
    INSTALL iceberg;
    LOAD iceberg;
    SET s3_endpoint='{minio_endpoint}';
    SET s3_access_key_id='{minio_access_key}';
    SET s3_secret_access_key='{minio_secret_key}';
    SET s3_use_ssl=false;
    SET s3_url_style='path';
""")

print(f"✓ Conexão S3 configurada: s3://{minio_bucket}/")


Configurando conexão S3 (MinIO)...
  Endpoint: minio:9000
  Bucket: lakehouse
✓ Conexão S3 configurada: s3://lakehouse/


## 4. Verificar Tabelas Disponíveis


In [5]:
# Listar tabelas
tables = con.execute("SHOW TABLES").fetchdf()
print("Tabelas disponíveis:")
display(tables)

# Verificar se a tabela principal existe
try:
    count = con.execute("SELECT COUNT(*) FROM vendas_iceberg").fetchone()[0]
    print(f"\n✓ Tabela 'vendas_iceberg' encontrada com {count:,} registros")
except Exception as e:
    print(f"\n⚠ Tabela 'vendas_iceberg' não encontrada: {e}")
    print("Execute primeiro o script init_lakehouse.py")


Tabelas disponíveis:


Unnamed: 0,name
0,dim_clientes
1,dim_produtos
2,fct_vendas
3,mart_vendas_mensal
4,stg_vendas
5,vendas_enriquecidas
6,vendas_iceberg



✓ Tabela 'vendas_iceberg' encontrada com 55,000 registros


## 5. Query 1: Receita por Categoria

Baseada em `query_1_revenue_by_category` do `example_queries.py`


In [None]:
sql = """
SELECT 
    categoria,
    COUNT(*) as total_vendas,
    SUM(valor_final) as receita_total,
    AVG(valor_final) as ticket_medio,
    SUM(quantidade) as unidades_vendidas
FROM vendas_iceberg
WHERE status = 'Concluída'
GROUP BY categoria
ORDER BY receita_total DESC;
"""

df_revenue = con.execute(sql).fetchdf()
print("Receita por Categoria:")
display(df_revenue)

# Visualização
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Gráfico de barras - Receita Total
df_revenue.plot(x='categoria', y='receita_total', kind='bar', ax=ax1, color='steelblue')
ax1.set_title('Receita Total por Categoria', fontsize=14, fontweight='bold')
ax1.set_xlabel('Categoria', fontsize=12)
ax1.set_ylabel('Receita Total (R$)', fontsize=12)
ax1.tick_params(axis='x', rotation=45)
ax1.grid(axis='y', alpha=0.3)

# Gráfico de barras - Ticket Médio
df_revenue.plot(x='categoria', y='ticket_medio', kind='bar', ax=ax2, color='coral')
ax2.set_title('Ticket Médio por Categoria', fontsize=14, fontweight='bold')
ax2.set_xlabel('Categoria', fontsize=12)
ax2.set_ylabel('Ticket Médio (R$)', fontsize=12)
ax2.tick_params(axis='x', rotation=45)
ax2.grid(axis='y', alpha=0.3)

plt.tight_layout()
plt.show()


In [None]:
sql = """
SELECT 
    DATE_TRUNC('month', data_venda) as mes,
    COUNT(*) as total_vendas,
    SUM(valor_final) as receita_mensal,
    AVG(valor_final) as ticket_medio,
    COUNT(DISTINCT cliente_id) as clientes_unicos
FROM vendas_iceberg
WHERE status = 'Concluída'
GROUP BY DATE_TRUNC('month', data_venda)
ORDER BY mes;
"""

df_trend = con.execute(sql).fetchdf()
df_trend['mes'] = pd.to_datetime(df_trend['mes'])

print("Tendência de Vendas Mensal:")
display(df_trend)

# Visualização
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 10))

# Gráfico de linha - Receita Mensal
ax1.plot(df_trend['mes'], df_trend['receita_mensal'], marker='o', linewidth=2, markersize=8, color='steelblue')
ax1.set_title('Receita Mensal ao Longo do Tempo', fontsize=14, fontweight='bold')
ax1.set_xlabel('Mês', fontsize=12)
ax1.set_ylabel('Receita Mensal (R$)', fontsize=12)
ax1.grid(True, alpha=0.3)
ax1.tick_params(axis='x', rotation=45)

# Gráfico de linha - Total de Vendas
ax2.plot(df_trend['mes'], df_trend['total_vendas'], marker='s', linewidth=2, markersize=8, color='coral')
ax2.set_title('Total de Vendas Mensais', fontsize=14, fontweight='bold')
ax2.set_xlabel('Mês', fontsize=12)
ax2.set_ylabel('Total de Vendas', fontsize=12)
ax2.grid(True, alpha=0.3)
ax2.tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()


## 7. Query 3: Top 10 Clientes

Baseada em `query_3_top_customers` do `example_queries.py`


In [None]:
sql = """
SELECT 
    cliente_id,
    cliente_nome,
    cliente_cidade,
    cliente_estado,
    COUNT(*) as total_compras,
    SUM(valor_final) as valor_total_gasto,
    AVG(valor_final) as ticket_medio
FROM vendas_iceberg
WHERE status = 'Concluída'
GROUP BY cliente_id, cliente_nome, cliente_cidade, cliente_estado
ORDER BY valor_total_gasto DESC
LIMIT 10;
"""

df_top_customers = con.execute(sql).fetchdf()
print("Top 10 Clientes por Valor Total Gasto:")
display(df_top_customers)

# Visualização
fig, ax = plt.subplots(figsize=(12, 6))
df_top_customers.plot(x='cliente_nome', y='valor_total_gasto', kind='barh', ax=ax, color='steelblue')
ax.set_title('Top 10 Clientes por Valor Total Gasto', fontsize=14, fontweight='bold')
ax.set_xlabel('Valor Total Gasto (R$)', fontsize=12)
ax.set_ylabel('Cliente', fontsize=12)
ax.grid(axis='x', alpha=0.3)
plt.tight_layout()
plt.show()


## 8. Verificar Tabelas do dbt

Se o dbt foi executado, podemos consultar as tabelas transformadas.


In [None]:
# Verificar se existem tabelas do dbt
try:
    # Tabelas do dbt geralmente seguem o padrão: dim_*, fct_*, mart_*
    dbt_tables = con.execute("""
        SELECT table_name 
        FROM information_schema.tables 
        WHERE table_schema = 'main'
        AND (table_name LIKE 'dim_%' OR table_name LIKE 'fct_%' OR table_name LIKE 'mart_%')
        ORDER BY table_name
    """).fetchdf()
    
    if len(dbt_tables) > 0:
        print("✓ Tabelas do dbt encontradas:")
        display(dbt_tables)
        
        # Exemplo: consultar uma tabela do dbt
        if 'mart_vendas_mensal' in dbt_tables['table_name'].values:
            print("\nExemplo: Consultando mart_vendas_mensal do dbt:")
            df_mart = con.execute("SELECT * FROM mart_vendas_mensal LIMIT 10").fetchdf()
            display(df_mart)
    else:
        print("⚠ Nenhuma tabela do dbt encontrada.")
        print("Execute o dbt run primeiro.")
except Exception as e:
    print(f"⚠ Erro ao verificar tabelas do dbt: {e}")


## 9. Estatísticas Gerais


In [None]:
sql = """
SELECT 
    COUNT(*) as total_vendas,
    SUM(valor_final) as receita_total,
    AVG(valor_final) as ticket_medio,
    MIN(data_venda) as primeira_venda,
    MAX(data_venda) as ultima_venda,
    COUNT(DISTINCT cliente_id) as clientes_unicos,
    COUNT(DISTINCT categoria) as categorias,
    COUNT(DISTINCT produto_id) as produtos_unicos
FROM vendas_iceberg
WHERE status = 'Concluída';
"""

df_stats = con.execute(sql).fetchdf()
print("Estatísticas Gerais:")
display(df_stats)

# Fechar conexão
con.close()
print("\n✓ Conexão DuckDB fechada.")


## Conclusão

✓ Notebook conectado com sucesso ao DuckDB
✓ Queries executadas com sucesso
✓ Visualizações geradas
✓ Integração com MinIO configurada
✓ Tabelas do dbt verificadas (se disponíveis)

O ambiente de notebooks está funcionando corretamente!
