# 02 - Feature Engineering com Dask (Otimizado para Big Data)

Este notebook implementa feature engineering usando **Dask**, uma biblioteca projetada para processar datasets maiores que a RAM disponível.

## Vantagens do Dask:
- ✅ **Processamento Out-of-Core**: Datasets maiores que a RAM
- ✅ **Paralelização Automática**: Usa todos os cores da CPU
- ✅ **API Similar ao Pandas**: Fácil migração
- ✅ **Computação Lazy**: Executa apenas quando necessário
- ✅ **Escalabilidade**: Funciona em clusters distribuídos

In [None]:
import dask.dataframe as dd
import dask.array as da
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Configurar Dask
from dask.distributed import Client, progress
import dask

# Configurar para usar threads (melhor para I/O)
dask.config.set(scheduler='threads')

print('🚀 Dask configurado para processamento de Big Data')
print('💡 Datasets maiores que RAM serão processados automaticamente')

## 1. Carregamento com Dask

In [None]:
# Carregar dados com Dask (lazy loading)
print('📂 Carregando dados com Dask...')

# Carregar apenas colunas essenciais para economizar memória
colunas_essenciais = [
    'internal_store_id', 
    'internal_product_id', 
    'transaction_date', 
    'quantity',
    'gross_value',
    'distributor_id'
]

# Dask lê o arquivo sem carregar na memória
transacoes_dask = dd.read_parquet(
    '../data/part-00000-tid-5196563791502273604-c90d3a24-52f2-4955-b4ec-fb143aae74d8-4-1-c000.snappy.parquet',
    columns=colunas_essenciais
)

print(f'📊 Dados carregados (lazy): {transacoes_dask.shape[0].compute():,} registros')
print(f'📊 Colunas: {list(transacoes_dask.columns)}')
print(f'🧠 Memória: Não carregado na RAM ainda (lazy evaluation)')

# Renomear colunas
transacoes_dask = transacoes_dask.rename(columns={
    'internal_store_id': 'pdv_id',
    'internal_product_id': 'produto_id',
    'transaction_date': 'data',
    'quantity': 'quantidade',
    'gross_value': 'valor'
})

# Converter data para datetime
transacoes_dask['data'] = dd.to_datetime(transacoes_dask['data'])

print('✅ Estrutura de dados preparada (lazy)')

## 2. Criação de Features Temporais com Dask

In [None]:
# Adicionar features temporais (ainda lazy)
print('📅 Criando features temporais...')

# Semana (usando map_partitions para operações customizadas)
def add_week_features(df):
    df = df.copy()
    df['semana'] = df['data'].dt.to_period('W-MON').dt.start_time
    df['mes'] = df['data'].dt.month
    df['semana_ano'] = df['data'].dt.isocalendar().week
    df['ano'] = df['data'].dt.year
    return df

transacoes_dask = transacoes_dask.map_partitions(
    add_week_features, 
    meta=transacoes_dask._meta.assign(
        semana=pd.Timestamp('2022-01-01'),
        mes=1,
        semana_ano=1,
        ano=2022
    )
)

print('✅ Features temporais adicionadas (lazy)')

## 3. Agregação Semanal com Dask

In [None]:
# Agregação semanal usando Dask (processamento paralelo)
print('🔄 Iniciando agregação semanal com Dask...')

# Dask agrupa e processa em paralelo
agregacao_semanal_dask = transacoes_dask.groupby(['semana', 'pdv_id', 'produto_id']).agg({
    'quantidade': ['sum', 'count'],
    'valor': 'sum',
    'distributor_id': 'first'
})

# Flatten columns
agregacao_semanal_dask.columns = [
    'quantidade', 'num_transacoes', 'valor', 'distributor_id'
]

# Reset index
agregacao_semanal_dask = agregacao_semanal_dask.reset_index()

print('🔄 Agregação configurada (lazy). Executando...')

# EXECUTAR a agregação (aqui que realmente processa)
agregacao_semanal = agregacao_semanal_dask.compute()

print(f'📊 Agregação semanal concluída: {agregacao_semanal.shape}')
print(f'   • Combinações semana/PDV/produto: {len(agregacao_semanal):,}')
print(f'   • PDVs únicos: {agregacao_semanal["pdv_id"].nunique():,}')
print(f'   • Produtos únicos: {agregacao_semanal["produto_id"].nunique():,}')

# Converter de volta para Dask para próximas operações
agregacao_semanal_dask = dd.from_pandas(agregacao_semanal, npartitions=4)

del transacoes_dask  # Liberar memória
print('✅ Dados de transação liberados da memória')

## 4. Grid Inteligente com Dask

In [None]:
# Estratégia Grid Inteligente usando Dask
print('🎯 Criando Grid Inteligente com Dask...')

# Obter combinações ativas e semanas únicas
combinacoes_ativas = agregacao_semanal[['pdv_id', 'produto_id']].drop_duplicates()
semanas_unicas = sorted(agregacao_semanal['semana'].unique())

print(f'   • Combinações ativas: {len(combinacoes_ativas):,}')
print(f'   • Semanas: {len(semanas_unicas)}')
print(f'   • Total registros no grid: {len(combinacoes_ativas) * len(semanas_unicas):,}')

# Criar grid usando processamento em lotes otimizado
def create_grid_batch(combo_batch, semanas):
    """Criar grid para um lote de combinações"""
    import pandas as pd
    from itertools import product
    
    # Criar produto cartesiano
    grid_data = []
    for _, row in combo_batch.iterrows():
        for semana in semanas:
            grid_data.append({
                'semana': semana,
                'pdv_id': row['pdv_id'],
                'produto_id': row['produto_id']
            })
    
    return pd.DataFrame(grid_data)

# Processar em lotes usando Dask
batch_size = 5000
grid_parts = []

for i in range(0, len(combinacoes_ativas), batch_size):
    batch = combinacoes_ativas.iloc[i:i+batch_size]
    print(f'   📦 Lote {i//batch_size + 1}: {len(batch)} combinações')
    
    # Criar grid para este lote
    batch_grid = create_grid_batch(batch, semanas_unicas)
    
    # Converter para Dask DataFrame
    batch_grid_dask = dd.from_pandas(batch_grid, npartitions=2)
    
    # Merge com vendas reais
    batch_merged = batch_grid_dask.merge(
        agregacao_semanal_dask,
        on=['semana', 'pdv_id', 'produto_id'],
        how='left'
    )
    
    # Preencher zeros
    batch_merged['quantidade'] = batch_merged['quantidade'].fillna(0)
    batch_merged['valor'] = batch_merged['valor'].fillna(0)
    batch_merged['num_transacoes'] = batch_merged['num_transacoes'].fillna(0)
    
    # Computar e armazenar
    grid_parts.append(batch_merged.compute())

# Concatenar todas as partes
print('🔗 Concatenando grid completo...')
dados_completos = pd.concat(grid_parts, ignore_index=True)

print(f'✅ Grid Inteligente criado: {dados_completos.shape}')
print(f'   • Zeros: {(dados_completos["quantidade"] == 0).sum():,} ({(dados_completos["quantidade"] == 0).mean()*100:.1f}%)')
print(f'   • Não-zeros: {(dados_completos["quantidade"] > 0).sum():,}')

## 5. Features Avançadas com Dask

In [None]:
# Converter para Dask para features avançadas
dados_dask = dd.from_pandas(dados_completos, npartitions=8)

print('🚀 Criando features avançadas com Dask...')

# Ordenar dados
dados_dask = dados_dask.set_index(['pdv_id', 'produto_id', 'semana']).sort_index()

# Features temporais
def add_temporal_features(df):
    df = df.copy()
    df['mes'] = df.index.get_level_values('semana').month
    df['semana_ano'] = df.index.get_level_values('semana').isocalendar().week
    df['mes_sin'] = np.sin(2 * np.pi * df['mes'] / 12)
    df['mes_cos'] = np.cos(2 * np.pi * df['mes'] / 12)
    return df

dados_dask = dados_dask.map_partitions(
    add_temporal_features,
    meta=dados_dask._meta.assign(
        mes=1, semana_ano=1, mes_sin=0.0, mes_cos=1.0
    )
)

# Features de lag usando Dask
print('⏰ Criando features de lag...')
for lag in [1, 2, 3, 4]:
    dados_dask[f'quantidade_lag_{lag}'] = (
        dados_dask.groupby(level=[0, 1])['quantidade']
        .shift(lag)
    )

# Rolling features
print('📊 Criando rolling features...')
dados_dask['quantidade_media_4w'] = (
    dados_dask.groupby(level=[0, 1])['quantidade']
    .rolling(window=4, min_periods=1)
    .mean()
)

dados_dask['quantidade_std_4w'] = (
    dados_dask.groupby(level=[0, 1])['quantidade']
    .rolling(window=4, min_periods=1)
    .std()
    .fillna(0)
)

# Features categóricas
def add_categorical_features(df):
    df = df.copy()
    df['pdv_hash'] = df.index.get_level_values('pdv_id').astype(str).map(hash) % 100
    df['produto_hash'] = df.index.get_level_values('produto_id').astype(str).map(hash) % 100
    return df

dados_dask = dados_dask.map_partitions(
    add_categorical_features,
    meta=dados_dask._meta.assign(pdv_hash=1, produto_hash=1)
)

print('✅ Features avançadas configuradas (lazy)')

## 6. Execução e Limpeza Final

In [None]:
# Executar todas as transformações
print('🔄 Executando todas as transformações...')
dados_final = dados_dask.compute()

# Reset index
dados_final = dados_final.reset_index()

print(f'📊 Dados processados: {dados_final.shape}')

# Limpeza: remover registros sem lag_4
print('🧹 Aplicando limpeza final...')
dados_limpos = dados_final[dados_final['quantidade_lag_4'].notna()].copy()

print(f'✅ Dados limpos: {dados_limpos.shape}')
print(f'   • Período: {dados_limpos["semana"].min()} até {dados_limpos["semana"].max()}')
print(f'   • Semanas: {dados_limpos["semana"].nunique()}')
print(f'   • Features: {len(dados_limpos.columns)}')

## 7. Salvamento e Metadados

In [None]:
# Salvar dataset final
print('💾 Salvando dataset final...')
dados_limpos.to_csv('../data/dados_features_completo.csv', index=False)

# Salvar também em parquet (mais eficiente)
dados_limpos.to_parquet('../data/dados_features_completo.parquet', index=False)

print('✅ Dataset salvo em CSV e Parquet')

# Metadados
import pickle

metadata = {
    'data_processamento': pd.Timestamp.now(),
    'total_registros': len(dados_limpos),
    'total_features': len(dados_limpos.columns),
    'combinacoes_pdv_produto': dados_limpos[['pdv_id', 'produto_id']].drop_duplicates().shape[0],
    'semanas_cobertas': dados_limpos['semana'].nunique(),
    'periodo_treino': f"{dados_limpos['semana'].min()} a {dados_limpos['semana'].max()}",
    'estrategia': 'Grid Inteligente com Dask - Big Data Optimized',
    'features_criadas': list(dados_limpos.columns),
    'tecnologia': 'Dask for Out-of-Core Processing',
    'memoria_maxima_usada': 'Limitada pelo número de partições'
}

with open('../data/feature_engineering_metadata.pkl', 'wb') as f:
    pickle.dump(metadata, f)

print('📋 Metadados salvos')

# Estatísticas finais
print('\n🎉 FEATURE ENGINEERING COM DASK CONCLUÍDO!')
print('=' * 60)
print(f'📊 Dataset final: {dados_limpos.shape}')
print(f'💾 Arquivos salvos:')
print('   • dados_features_completo.csv')
print('   • dados_features_completo.parquet')
print('   • feature_engineering_metadata.pkl')
print(f'\n🏷️ Features principais:')
features_importantes = ['quantidade', 'quantidade_lag_1', 'quantidade_lag_2', 
                       'quantidade_lag_4', 'quantidade_media_4w', 'mes_sin', 'mes_cos']
for feat in features_importantes:
    if feat in dados_limpos.columns:
        print(f'   ✅ {feat}')

print('\n🚀 Pronto para Modelagem com dados otimizados!')
print(f'📈 Distribuição target:')
print(f'   • Zeros: {(dados_limpos["quantidade"] == 0).sum():,} ({(dados_limpos["quantidade"] == 0).mean()*100:.1f}%)')
print(f'   • Não-zeros: {(dados_limpos["quantidade"] > 0).sum():,} ({(dados_limpos["quantidade"] > 0).mean()*100:.1f}%)')