# Processamento de Dados no Data Lake com Pandas e S3

Este notebook apresenta o processo de ingestão, limpeza, transformação e enriquecimento de dados em um ambiente de Data Lake utilizando Pandas, com leitura e escrita diretamente no Amazon S3.


In [1]:
import pandas as pd
import numpy as np
from datetime import datetime

## Definição dos caminhos S3
Definimos os prefixos de cada camada (raw, silver e gold) para acesso aos arquivos armazenados no bucket do S3.

In [2]:
RAW_PREFIX = 's3://aula-data-lake/raw/'
SILVER_PREFIX = 's3://aula-data-lake/silver/'
GOLD_PREFIX = 's3://aula-data-lake/gold/'

## Silver Layer: Leitura e Limpeza dos Dados
Nesta etapa, os dados são lidos da camada **raw**, as colunas são padronizadas e os dados são limpos para remoção de duplicidades e nulos críticos.

In [3]:
clientes = pd.read_csv(RAW_PREFIX + 'clientes.csv')
produtos = pd.read_csv(RAW_PREFIX + 'produtos.csv')
tipos_produto = pd.read_csv(RAW_PREFIX + 'tipos_produto.csv')
pedidos = pd.read_csv(RAW_PREFIX + 'pedidos.csv')
itens_pedido = pd.read_csv(RAW_PREFIX + 'itens_pedido.csv')

# Padronizar colunas para lower case
for df in [clientes, produtos, tipos_produto, pedidos, itens_pedido]:
    df.columns = [col.lower() for col in df.columns]

# Limpeza básica
clientes = clientes.drop_duplicates().dropna(subset=['id_cliente'])
produtos = produtos.drop_duplicates().dropna(subset=['id_produto'])
tipos_produto = tipos_produto.drop_duplicates().dropna(subset=['id_tipo'])
pedidos = pedidos.drop_duplicates().dropna(subset=['id_pedido', 'id_cliente'])
itens_pedido = itens_pedido.drop_duplicates().dropna(subset=['id_item', 'id_pedido', 'id_produto'])

# Padronização de datas
if 'data_pedido' in pedidos.columns:
    pedidos['data_pedido'] = pd.to_datetime(pedidos['data_pedido'], errors='coerce')

# Salvar camada Silver em Parquet no S3
clientes.to_parquet(SILVER_PREFIX + 'clientes/', index=False)
produtos.to_parquet(SILVER_PREFIX + 'produtos/', index=False)
tipos_produto.to_parquet(SILVER_PREFIX + 'tipos_produto/', index=False)
pedidos.to_parquet(SILVER_PREFIX + 'pedidos/', index=False)
itens_pedido.to_parquet(SILVER_PREFIX + 'itens_pedido/', index=False)

## Gold Layer: Enriquecimento e Agregações
Criação de fato de vendas e agregações por cliente e tipo de produto, com persistência dos dados em formato particionado por `anomesdia`.

In [4]:
gold_vendas = itens_pedido.merge(pedidos, on='id_pedido') \
    .merge(produtos, on='id_produto') \
    .merge(tipos_produto, on='id_tipo') \
    .merge(clientes, on='id_cliente')

# Valor total do item
if 'quantidade' in gold_vendas.columns and 'preco_unitario' in gold_vendas.columns:
    gold_vendas['valor_total_item'] = gold_vendas['quantidade'] * gold_vendas['preco_unitario']

# Criação do campo anomesdia
if 'data_pedido' in gold_vendas.columns:
    gold_vendas['anomesdia'] = gold_vendas['data_pedido'].dt.strftime('%Y%m%d')

# Resumo por cliente
gold_vendas_por_cliente = gold_vendas.groupby(['id_cliente', 'nome']) \
    .agg({'valor_total_item': 'sum', 'id_pedido': 'nunique'}) \
    .rename(columns={'valor_total_item': 'valor_total_comprado', 'id_pedido': 'num_pedidos'}) \
    .reset_index()

# Resumo por tipo de produto
gold_vendas_por_tipo = gold_vendas.groupby(['id_tipo', 'nome_tipo']) \
    .agg({'valor_total_item': 'sum', 'quantidade': 'sum'}) \
    .rename(columns={'valor_total_item': 'total_vendido', 'quantidade': 'quantidade_total'}) \
    .reset_index()

# Salvar camada Gold no S3, particionando por anomesdia
gold_vendas.to_parquet(GOLD_PREFIX + 'fato_vendas/', partition_cols=['anomesdia'], index=False)
gold_vendas_por_cliente.to_parquet(GOLD_PREFIX + 'vendas_por_cliente/', index=False)
gold_vendas_por_tipo.to_parquet(GOLD_PREFIX + 'vendas_por_tipo/', index=False)

print("Processamento das camadas Silver e Gold no S3 particionado por anomesdia finalizado com sucesso!")

Processamento das camadas Silver e Gold no S3 particionado por anomesdia finalizado com sucesso!
