# QuantNote - Tutorial Didático

## Sistema Quantitativo para Probabilidades de Retorno Condicionadas por Regime

Este notebook demonstra passo a passo o funcionamento do sistema QuantNote.

### Objetivo
Calcular a probabilidade de um ativo atingir um retorno alvo em H períodos, condicionada ao regime de mercado atual.

### Conceitos Principais
1. **Log-Retornos**: Usamos log(P_t/P_{t-1}) pois são aditivos e simétricos
2. **Slope (Inclinação)**: Tendência calculada via regressão linear do log-preço
3. **Volatilidade**: Desvio padrão dos retornos em janela móvel
4. **Regimes**: Estados do mercado (bull/bear/flat combinados com alta/baixa volatilidade)
5. **K-Means**: Clustering para detectar regimes automaticamente
6. **Walk-Forward**: Validação para evitar overfitting

## Etapa 1: Setup e Imports

Primeiro, configuramos o ambiente e importamos os módulos necessários.

In [None]:
# Setup do path para imports
import sys
import os
sys.path.insert(0, os.path.dirname(os.getcwd()))

# Imports padrão
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Verificar se os imports funcionam
print("Python path configurado!")
print(f"Diretório de trabalho: {os.getcwd()}")

# Configuração de visualização
%matplotlib inline
plt.style.use('seaborn-v0_8-whitegrid')
pd.set_option('display.max_columns', 20)
pd.set_option('display.width', 200)

## Etapa 2: Configuração do Sistema

O sistema usa Pydantic para validar configurações. Isso garante que parâmetros inválidos sejam rejeitados.

In [None]:
from config.settings import AnalysisConfig, InfrastructureConfig, Config
from config.search_space import GAConfig, GASearchSpace

# =============================================================================
# CONFIGURAÇÃO CENTRALIZADA
# =============================================================================
# Altere os parâmetros abaixo conforme necessário.
# Todos os notebooks usarão estas configurações.

# --- Ticker a analisar ---
TICKER = "BOVA11.SA"

# --- Parâmetros de Predição (usados pelo GA) ---
TARGET_RETURN = 0.03   # Target: 1% de variação
HORIZON = 7            # Horizonte: 2 dias

# --- Parâmetros do GA Principal ---
GA_POPULATION_SIZE = 100
GA_GENERATIONS = 1000
GA_N_FOLDS = 3
GA_EARLY_STOPPING = True

# --- Parâmetros do GA para Multi-Target/Strike Grid (reduzido para demo) ---
MULTI_GA_POPULATION_SIZE = 50
MULTI_GA_GENERATIONS = 100

# --- Parâmetros de Análise Exploratória (Etapas 1-11) ---
ANALYSIS_WINDOW_SLOPE = 20
ANALYSIS_WINDOW_VOLATILITY = 20
ANALYSIS_WINDOW_ROLLING_RETURN = 20
ANALYSIS_N_CLUSTERS = 6
ANALYSIS_TARGET_RETURN = 0.05  # 5% para exploração didática

# --- Parâmetros de Médias Móveis ---
MA_FAST_PERIOD = 9     # Média móvel rápida
MA_SLOW_PERIOD = 21    # Média móvel lenta

# =============================================================================
# Criar objetos de configuração
# =============================================================================

# Config para análise exploratória (etapas iniciais do notebook)
config = Config()
config.analysis.future_return_periods = HORIZON
config.analysis.window_slope = ANALYSIS_WINDOW_SLOPE
config.analysis.window_volatility = ANALYSIS_WINDOW_VOLATILITY
config.analysis.window_rolling_return = ANALYSIS_WINDOW_ROLLING_RETURN
config.analysis.n_clusters = ANALYSIS_N_CLUSTERS
config.analysis.target_return = ANALYSIS_TARGET_RETURN
config.analysis.ma_fast_period = MA_FAST_PERIOD
config.analysis.ma_slow_period = MA_SLOW_PERIOD

# Config do GA para otimização principal
ga_config = GAConfig(
    target_return=TARGET_RETURN,
    horizon=HORIZON,
    population_size=GA_POPULATION_SIZE,
    generations=GA_GENERATIONS,
    n_folds=GA_N_FOLDS,
    stability_penalty=0.1,
    elite_size=2,
    early_stopping=GA_EARLY_STOPPING,
)

# Config do GA para multi-target e strike grid (reduzido)
multi_ga_config = GAConfig(
    target_return=TARGET_RETURN,  # Será sobrescrito pelo optimizer
    horizon=HORIZON,
    population_size=MULTI_GA_POPULATION_SIZE,
    generations=MULTI_GA_GENERATIONS,
    n_folds=GA_N_FOLDS,
    early_stopping=GA_EARLY_STOPPING,
)

# =============================================================================
# Exibir configurações
# =============================================================================
print("=" * 60)
print("CONFIGURAÇÃO CENTRALIZADA")
print("=" * 60)

print(f"\n📊 TICKER: {TICKER}")

print(f"\n🎯 Parâmetros de Predição:")
print(f"   Target Return: {TARGET_RETURN:.1%}")
print(f"   Horizonte: {HORIZON} dias")

print(f"\n🧬 GA Principal:")
print(f"   População: {GA_POPULATION_SIZE}")
print(f"   Gerações: {GA_GENERATIONS}")
print(f"   Folds: {GA_N_FOLDS}")
print(f"   Early Stopping: {GA_EARLY_STOPPING}")

print(f"\n🎯 GA Multi-Target/Strike Grid:")
print(f"   População: {MULTI_GA_POPULATION_SIZE}")
print(f"   Gerações: {MULTI_GA_GENERATIONS}")

print(f"\n📈 Análise Exploratória:")
print(f"   Window Slope: {ANALYSIS_WINDOW_SLOPE}")
print(f"   Window Volatility: {ANALYSIS_WINDOW_VOLATILITY}")
print(f"   Clusters: {ANALYSIS_N_CLUSTERS}")
print(f"   Target (exploratório): {ANALYSIS_TARGET_RETURN:.1%}")

print(f"\n📉 Médias Móveis:")
print(f"   MA Rápida: {MA_FAST_PERIOD} períodos")
print(f"   MA Lenta: {MA_SLOW_PERIOD} períodos")

print("\n" + "=" * 60)

## Etapa 3: Obtenção de Dados

Usamos `YahooDataSource` para baixar dados OHLCV. O sistema tem rate limiting para evitar bloqueio.

In [None]:
from src.infrastructure.yahoo_data_source import YahooDataSource
from src.infrastructure.parquet_repository import ParquetRepository
from src.infrastructure.file_logger import FileLogger, NullLogger

# Criar logger (usar NullLogger para menos output)
logger = NullLogger()  # ou FileLogger("quantnote") para logs detalhados

# Data source e repository
data_source = YahooDataSource(calls_per_minute=5, logger=logger)
repository = ParquetRepository(data_dir="../data", logger=logger)

# Usar ticker da configuração centralizada
ticker = TICKER

print(f"Buscando dados para {ticker}...")

# Verificar se dados no cache estão atualizados
is_current, last_date = repository.is_data_current(ticker, max_age_days=1)

if is_current:
    print(f"✅ Cache atualizado (última data: {last_date.date()})")
    df = repository.load(ticker)
else:
    if last_date:
        print(f"⚠️  Cache desatualizado (última data: {last_date.date()})")
    else:
        print("📥 Dados não encontrados no cache.")
    
    print("Baixando dados atualizados do Yahoo Finance...")
    df = data_source.fetch_ohlcv(ticker)
    
    # Limpar arquivos antigos e salvar novos dados
    deleted = repository.delete_old_files(ticker, keep_latest=0)
    if deleted:
        print(f"🗑️  {deleted} arquivo(s) antigo(s) removido(s)")
    
    repository.save(df, ticker)
    print("✅ Dados atualizados salvos no cache.")

print(f"\n=== Dados Obtidos ===")
print(f"Shape: {df.shape}")
print(f"Período: {df['date'].min().date()} a {df['date'].max().date()}")
print(f"\nPrimeiras linhas:")
df.head()

## Etapa 4: Validação de Dados

Antes de processar, validamos os dados para garantir qualidade.

O sistema usa o padrão **Composite** para combinar múltiplos validadores.

In [None]:
from src.infrastructure.validators import create_default_validator

# Criar validador composto
validator = create_default_validator(
    min_length=config.analysis.min_data_points,
    max_window=max(config.analysis.window_slope, config.analysis.window_volatility)
)

# Executar validação
validation = validator.validate(df)

print("=== Resultado da Validação ===")
print(f"Válido: {'SIM' if validation.is_valid else 'NÃO'}")

if validation.errors:
    print(f"\nERROS:")
    for error in validation.errors:
        print(f"  - {error}")

if validation.warnings:
    print(f"\nAVISOS:")
    for warning in validation.warnings:
        print(f"  - {warning}")

if validation.is_valid and not validation.warnings:
    print("Todos os testes passaram sem avisos!")

## Etapa 5: Calculadores Individuais

Vamos explorar cada calculador individualmente para entender o que faz.

Cada calculador implementa `IColumnCalculator` e segue o princípio de **Single Responsibility**.

In [None]:
from src.calculators.log_price_calculator import LogPriceCalculator

# 5.1 - Log Price Calculator
log_price_calc = LogPriceCalculator()

print("=== LogPriceCalculator ===")
print(f"Nome: {log_price_calc.name}")
print(f"Colunas requeridas: {log_price_calc.required_columns}")
print(f"Colunas produzidas: {log_price_calc.output_columns}")

# Aplicar
df_step1 = log_price_calc.calculate(df)

# Visualizar resultado
print(f"\nFormula: log_close = ln(close)")
print(f"\nExemplo:")
print(df_step1[['date', 'close', 'log_close']].head())

In [None]:
from src.calculators.log_return_calculator import LogReturnCalculator

# 5.2 - Log Return Calculator
log_return_calc = LogReturnCalculator(window=config.analysis.window_rolling_return)

print("=== LogReturnCalculator ===")
print(f"Nome: {log_return_calc.name}")
print(f"Colunas requeridas: {log_return_calc.required_columns}")
print(f"Colunas produzidas: {log_return_calc.output_columns}")

# Aplicar
df_step2 = log_return_calc.calculate(df_step1)

print(f"\nFormulas:")
print(f"  log_return = ln(close_t / close_{{t-1}})")
print(f"  log_return_rolling_20 = sum(log_return over 20 days)")
print(f"\nExemplo:")
print(df_step2[['date', 'close', 'log_return', f'log_return_rolling_{config.analysis.window_rolling_return}']].head(25))

In [None]:
from src.calculators.volatility_calculator import VolatilityCalculator

# 5.3 - Volatility Calculator
vol_calc = VolatilityCalculator(window=config.analysis.window_volatility)

print("=== VolatilityCalculator ===")
print(f"Nome: {vol_calc.name}")
print(f"Colunas requeridas: {vol_calc.required_columns}")
print(f"Colunas produzidas: {vol_calc.output_columns}")

# Aplicar
df_step3 = vol_calc.calculate(df_step2)

print(f"\nFormula: volatility = std(log_return over {config.analysis.window_volatility} days)")
print(f"\nExemplo:")
print(df_step3[['date', 'log_return', f'volatility_{config.analysis.window_volatility}']].tail(10))

In [None]:
from src.calculators.slope_calculator import SlopeCalculator

# 5.4 - Slope Calculator
slope_calc = SlopeCalculator(window=config.analysis.window_slope)

print("=== SlopeCalculator ===")
print(f"Nome: {slope_calc.name}")
print(f"Colunas requeridas: {slope_calc.required_columns}")
print(f"Colunas produzidas: {slope_calc.output_columns}")

# Aplicar
df_step4 = slope_calc.calculate(df_step3)

print(f"\nFormula: slope = coef angular da regressão linear de log_close sobre {config.analysis.window_slope} dias")
print(f"\nInterpretação:")
print(f"  slope > 0: tendência de alta")
print(f"  slope < 0: tendência de baixa")
print(f"  slope ≈ 0: mercado lateral")
print(f"\nExemplo:")
print(df_step4[['date', 'log_close', f'slope_{config.analysis.window_slope}']].tail(10))

In [None]:
from src.calculators.ma_distance_calculator import MADistanceCalculator

# 5.5 - MA Distance Calculator
ma_dist_calc = MADistanceCalculator(
    fast_period=config.analysis.ma_fast_period,
    slow_period=config.analysis.ma_slow_period
)

print("=== MADistanceCalculator ===")
print(f"Nome: {ma_dist_calc.name}")
print(f"Colunas requeridas: {ma_dist_calc.required_columns}")
print(f"Colunas produzidas: {ma_dist_calc.output_columns}")

# Aplicar
df_step4b = ma_dist_calc.calculate(df_step4)

ma_col = f'ma_dist_{config.analysis.ma_fast_period}_{config.analysis.ma_slow_period}'

print(f"\nFormula:")
print(f"  MA_fast = SMA(close, {config.analysis.ma_fast_period})")
print(f"  MA_slow = SMA(close, {config.analysis.ma_slow_period})")
print(f"  distance = MA_fast - MA_slow")
print(f"  {ma_col} = normalize(distance, min=-1, max=1)")

print(f"\nInterpretação:")
print(f"  > 0: tendência de alta (MA rápida > MA lenta)")
print(f"  < 0: tendência de baixa (MA rápida < MA lenta)")
print(f"  ≈ 0: médias cruzando/convergindo")

print(f"\nExemplo:")
print(df_step4b[['date', 'close', ma_col]].tail(10))

In [None]:
from src.calculators.future_return_calculator import FutureReturnCalculator

# 5.6 - Future Return Calculator
future_calc = FutureReturnCalculator(horizon=config.analysis.future_return_periods)

print("=== FutureReturnCalculator ===")
print(f"Nome: {future_calc.name}")
print(f"Colunas requeridas: {future_calc.required_columns}")
print(f"Colunas produzidas: {future_calc.output_columns}")

# Aplicar
df_step5 = future_calc.calculate(df_step4)

print(f"\nFormula: log_return_future_{config.analysis.future_return_periods} = ln(close_{{t+{config.analysis.future_return_periods}}} / close_t)")
print(f"\nNOTA: Esta coluna é a variável TARGET que queremos prever!")
print(f"\nExemplo:")
print(df_step5[['date', 'close', f'log_return_future_{config.analysis.future_return_periods}']].head(10))

## Etapa 6: Pipeline com Resolução Automática de Dependências

O `CalculatorPipeline` usa **topological sort** para ordenar os calculadores automaticamente.

Isso implementa o princípio **Open/Closed** - podemos adicionar calculadores sem modificar o pipeline.

In [None]:
from src.calculators.pipeline import CalculatorPipeline
from src.calculators.log_price_calculator import LogPriceCalculator
from src.calculators.log_return_calculator import LogReturnCalculator
from src.calculators.future_return_calculator import FutureReturnCalculator
from src.calculators.volatility_calculator import VolatilityCalculator
from src.calculators.slope_calculator import SlopeCalculator
from src.calculators.ma_distance_calculator import MADistanceCalculator

# Criar pipeline (note que a ordem não importa - será resolvida automaticamente)
pipeline = CalculatorPipeline([
    SlopeCalculator(window=config.analysis.window_slope),      # Depende de log_close
    LogReturnCalculator(window=config.analysis.window_rolling_return),  # Depende de close
    VolatilityCalculator(window=config.analysis.window_volatility),     # Depende de log_return
    LogPriceCalculator(),                                       # Depende de close
    FutureReturnCalculator(horizon=config.analysis.future_return_periods),  # Depende de close
    MADistanceCalculator(                                       # Depende de close
        fast_period=config.analysis.ma_fast_period,
        slow_period=config.analysis.ma_slow_period
    )
], logger=logger)

# Executar pipeline
df_analysis = pipeline.run(df)

print("=== Pipeline Executado ===")
print(f"Ordem de execução: {pipeline.get_execution_order()}")
print(f"\nColunas originais: {len(df.columns)}")
print(f"Colunas após pipeline: {len(df_analysis.columns)}")
print(f"\nNovas colunas: {list(pipeline.get_all_output_columns())}")

In [None]:
# Visualizar resultado do pipeline
print("=== Dados Após Pipeline ===")
df_analysis.head()

## Etapa 7: Classificação Manual de Regimes

Uma abordagem é usar thresholds manuais para classificar regimes baseado em slope e volatilidade.

In [None]:
from src.analysis.regime_classifier import ManualRegimeClassifier

# Nomes das colunas
slope_col = f'slope_{config.analysis.window_slope}'
vol_col = f'volatility_{config.analysis.window_volatility}'

# Criar classificador manual (thresholds automáticos)
manual_classifier = ManualRegimeClassifier(
    slope_column=slope_col,
    volatility_column=vol_col
)

# Classificar
df_manual = manual_classifier.classify(df_analysis)

# Resultados
print("=== Classificação Manual de Regimes ===")
print(f"\nThresholds usados: {manual_classifier.get_thresholds()}")
print(f"\nDistribuição de regimes:")
print(df_manual['regime'].value_counts())

In [None]:
# Visualizar distribuição por regime
regime_counts = df_manual['regime'].value_counts()

fig, ax = plt.subplots(figsize=(10, 5))
colors = {
    'bull_high_vol': 'lightgreen',
    'bull_low_vol': 'darkgreen',
    'bear_high_vol': 'lightcoral',
    'bear_low_vol': 'darkred',
    'flat_high_vol': 'yellow',
    'flat_low_vol': 'gold'
}
bar_colors = [colors.get(r, 'gray') for r in regime_counts.index]
regime_counts.plot(kind='bar', ax=ax, color=bar_colors, edgecolor='black')
ax.set_title('Distribuição de Regimes (Classificação Manual)')
ax.set_xlabel('Regime')
ax.set_ylabel('Frequência')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

## Etapa 8: Classificação com K-Means

K-Means detecta regimes automaticamente baseado em múltiplas features.

Isso é mais robusto que thresholds manuais pois considera todas as features simultaneamente.

In [None]:
from src.analysis.kmeans_regimes import KMeansRegimeClassifier

# Criar classificador K-Means
kmeans = KMeansRegimeClassifier(
    n_clusters=config.analysis.n_clusters,
    logger=logger
)

# Fit e Predict
df_kmeans = kmeans.fit_predict(df_analysis)

print("=== Classificação K-Means ===")
print(f"\nFeatures usadas: {kmeans.feature_columns}")
print(f"Número de clusters: {config.analysis.n_clusters}")
print(f"\nDistribuição de clusters:")
print(df_kmeans['cluster'].value_counts().sort_index())

In [None]:
# Estatísticas por cluster
future_col = f'log_return_future_{config.analysis.future_return_periods}'
stats = kmeans.compute_statistics(df_kmeans, future_col)
interpretations = kmeans.interpret_clusters(df_kmeans, slope_col)

print("=== Estatísticas por Cluster ===")
for stat in stats:
    interp = interpretations.get(stat.cluster_id, 'unknown')
    print(f"\nCluster {stat.cluster_id} ({interp.upper()}):")
    print(f"  Observações: {stat.count} ({stat.percentage:.1f}%)")
    print(f"  Retorno futuro médio: {stat.future_return_mean:.4f}")
    print(f"  Desvio padrão: {stat.future_return_std:.4f}")
    print(f"  Features médias: {stat.feature_means}")

## Etapa 9: Cálculo de Probabilidades

Agora calculamos a probabilidade de atingir o retorno alvo, condicionada por regime.

In [None]:
from src.analysis.probability_calculator import ProbabilityCalculator

# Usando classificação manual
prob_calc_manual = ProbabilityCalculator(
    future_return_column=future_col,
    target_return=config.analysis.target_return,
    regime_column='regime'
)

# Gerar relatório completo
report = prob_calc_manual.generate_report(df_manual)

print("=== Relatório de Probabilidades (Regimes Manuais) ===")
print(f"\nRetorno alvo: {report['target_return']:.1%}")
print(f"Log-retorno alvo: {report['log_target']:.4f}")
print(f"\nProbabilidade incondicional: {report['raw_probability_pct']:.2f}%")

print(f"\nProbabilidades condicionais:")
for regime, data in report['conditional_probabilities'].items():
    print(f"  {regime}:")
    print(f"    P(hit) = {data['probability_pct']:.2f}%")
    print(f"    n = {data['count']}")
    print(f"    retorno médio = {data['mean_return']:.4f}")

print(f"\nMétricas de separação:")
print(f"  Delta P: {report['separation_metrics']['delta_p']:.4f}")
print(f"  (diferença entre maior e menor probabilidade)")
print(f"  Information Ratio: {report['separation_metrics']['information_ratio']:.4f}")

In [None]:
# Usando classificação K-Means
prob_calc_kmeans = ProbabilityCalculator(
    future_return_column=future_col,
    target_return=config.analysis.target_return,
    regime_column='cluster'
)

report_kmeans = prob_calc_kmeans.generate_report(df_kmeans)

print("=== Relatório de Probabilidades (K-Means) ===")
print(f"\nRetorno alvo: {report_kmeans['target_return']:.1%}")
print(f"\nProbabilidade incondicional: {report_kmeans['raw_probability_pct']:.2f}%")

print(f"\nProbabilidades por cluster:")
for cluster_id, data in sorted(report_kmeans['conditional_probabilities'].items()):
    interp = interpretations.get(int(float(cluster_id)), 'unknown')
    print(f"  Cluster {cluster_id} ({interp}): P(hit) = {data['probability_pct']:.2f}% (n={data['count']})")

print(f"\nDelta P: {report_kmeans['separation_metrics']['delta_p']:.4f}")

## Etapa 10: Visualizações

Visualizamos a distribuição de retornos por regime.

In [None]:
from src.visualization.histogram_plotter import HistogramPlotter, PriceRegimePlotter

# Histograma geral
hist_plotter = HistogramPlotter(
    return_column=future_col,
    regime_column='regime'
)

fig = hist_plotter.plot(df_manual)
plt.show()

In [None]:
# Histogramas por regime
fig = hist_plotter.plot_by_regime(df_manual, target_return=config.analysis.target_return)
plt.show()

In [None]:
# Preço com background de regime
df_manual_indexed = df_manual.set_index('date')

price_plotter = PriceRegimePlotter(regime_column='regime')
fig = price_plotter.plot(df_manual_indexed)
plt.show()

## Etapa 11: Walk-Forward Validation

Para evitar overfitting, usamos walk-forward validation.

Isso simula como o modelo performaria em tempo real, sempre treinando no passado e testando no futuro.

In [None]:
from src.analysis.time_series_splitter import TimeSeriesSplitter

# Criar splitter
splitter = TimeSeriesSplitter(train_ratio=0.7)

# Demonstrar walk-forward splits
print("=== Walk-Forward Splits ===")
for split in splitter.walk_forward_split(df, n_folds=5, min_train_size=252):
    train_start = split.train['date'].min().date()
    train_end = split.train['date'].max().date()
    test_start = split.test['date'].min().date()
    test_end = split.test['date'].max().date()
    
    print(f"\nFold {split.fold}:")
    print(f"  Train: {train_start} a {train_end} ({len(split.train)} obs)")
    print(f"  Test:  {test_start} a {test_end} ({len(split.test)} obs)")

## Etapa 12: Otimização com Algoritmo Genético (Otimizado)

O GA busca os melhores parâmetros automaticamente.

**Otimizações implementadas:**
1. **Paralelização**: Avaliação de fitness usa múltiplos cores CPU
2. **Cache**: Cromossomos idênticos não são reavaliados
3. **Numba**: Cálculo de slope ~10-50x mais rápido (após warm-up)
4. **KMeans otimizado**: Algoritmo Lloyd para melhor performance

**NOTA sobre Numba**: A primeira execução será mais lenta devido à compilação JIT. Execuções subsequentes serão significativamente mais rápidas. O cache do Numba persiste entre sessões.

**NOTA**: Esta etapa é computacionalmente intensiva. Reduzimos os parâmetros para demonstração.

In [None]:
from src.calculators.slope_calculator import SlopeCalculator
import multiprocessing as mp

# Verificar otimizações disponíveis
print("=== Verificação de Otimizações ===")
print(f"Numba disponível para SlopeCalculator: {SlopeCalculator.is_numba_available()}")
print(f"CPUs disponíveis para paralelização: {mp.cpu_count()}")

# ga_config já definido na célula de configuração centralizada
print("\n=== Configuração do GA (da célula centralizada) ===")
print(f"Target Return: {ga_config.target_return:.1%}")
print(f"Horizonte: {ga_config.horizon} dias")
print(f"População: {ga_config.population_size}")
print(f"Gerações: {ga_config.generations}")
print(f"Folds: {ga_config.n_folds}")
print(f"Early Stopping: {ga_config.early_stopping}")

# Estimativa de tempo
tempo_por_gen = ga_config.population_size * 0.035  # ~0.035s por cromossomo
tempo_total = ga_config.generations * tempo_por_gen
print(f"\nTempo estimado: {tempo_total/60:.1f} minutos")
print("\nNOTA: Use Ctrl+C para interromper e salvar checkpoint a qualquer momento.")

In [None]:
from src.optimization.genetic_algorithm import GeneticAlgorithm, GACheckpoint
from src.optimization.progress_callback import LiveProgressCallback

# Criar callback de progresso visual
progress_callback = LiveProgressCallback(
    total_generations=ga_config.generations,
    population_size=ga_config.population_size,
    update_every=1
)

# Executar GA
# NOTA: parallel=False é mais estável em Jupyter notebooks
ga = GeneticAlgorithm(
    df, 
    ga_config, 
    logger=logger,
    progress_callback=progress_callback,
    n_workers=None
)

try:
    result = ga.run(
        verbose=False,
        parallel=False,  # Desativado para evitar problemas no Jupyter
        auto_checkpoint_path="ga_checkpoint.json",
        checkpoint_every=10
    )
except KeyboardInterrupt:
    print("\nInterrompido! Salvando checkpoint...")
    ga.save_checkpoint("ga_checkpoint.json")
    print("Checkpoint salvo em 'ga_checkpoint.json'")
    print("Para retomar: checkpoint = GACheckpoint.load('ga_checkpoint.json')")
    raise

progress_callback.finalize()

print("\n=== Melhores Parâmetros Encontrados ===")
best = result.best_chromosome
print(f"  Window Slope: {best.window_slope}")
print(f"  Window Volatility: {best.window_volatility}")
print(f"  Window Rolling Return: {best.window_rolling_return}")
print(f"  N Clusters: {best.n_clusters}")
print(f"  MA Fast Period: {best.ma_fast_period}")
print(f"  MA Slow Period: {best.ma_slow_period}")
print(f"  Use Volatility: {best.use_volatility}")
print(f"  Use Rolling Return: {best.use_rolling_return}")
print(f"  Use MA Distance: {best.use_ma_distance}")

print(f"\nParâmetros fixos (da configuração):")
print(f"  Target Return: {ga_config.target_return:.2%}")
print(f"  Horizon: {ga_config.horizon} dias")

print(f"\nMétricas:")
print(f"  Fitness: {result.best_fitness:.4f}")
print(f"  Delta P (test): {result.best_metrics.delta_p_test:.4f}")
print(f"  Overfitting Ratio: {result.best_metrics.overfitting_ratio:.2f}")

print(f"\nEstatísticas:")
print(f"  Total avaliações: {result.all_evaluations}")

In [None]:
# Plotar evolução do fitness
generations = [h[0] for h in result.history]
best_fitness = [h[1] for h in result.history]
mean_fitness = [h[2] for h in result.history]

fig, ax = plt.subplots(figsize=(10, 5))
ax.plot(generations, best_fitness, 'b-', label='Best Fitness', linewidth=2)
ax.plot(generations, mean_fitness, 'g--', label='Mean Fitness', alpha=0.7)
ax.set_xlabel('Generation')
ax.set_ylabel('Fitness')
ax.set_title('Evolução do Algoritmo Genético')
ax.legend()
ax.grid(True, alpha=0.3)
plt.show()

## Etapa 13: Aplicar Melhores Parâmetros

Agora aplicamos os parâmetros otimizados e recalculamos as probabilidades.

In [None]:
from src.optimization.calculator_factory import CalculatorFactory

# Factory para criar pipeline com os melhores parâmetros
# horizon vem da configuração, não do cromossomo
factory = CalculatorFactory(horizon=ga_config.horizon)
best_pipeline = factory.create_pipeline(best)
feature_cols = factory.get_feature_columns(best)
future_col_best = factory.get_future_return_column()

# Processar dados
df_best = best_pipeline.run(df)

# Clustering
best_kmeans = KMeansRegimeClassifier(
    n_clusters=best.n_clusters,
    feature_columns=feature_cols
)
df_best = best_kmeans.fit_predict(df_best)

# Probabilidades (target_return vem da configuração)
best_prob = ProbabilityCalculator(
    future_return_column=future_col_best,
    target_return=ga_config.target_return,
    regime_column='cluster'
)

best_report = best_prob.generate_report(df_best)

print("=== Relatório com Parâmetros Otimizados ===")
print(f"\nRetorno alvo: {ga_config.target_return:.1%}")
print(f"Horizonte: {ga_config.horizon} dias")
print(f"\nProbabilidade incondicional: {best_report['raw_probability_pct']:.2f}%")

# Interpretar clusters
slope_col_best = f'slope_{best.window_slope}'
best_interp = best_kmeans.interpret_clusters(df_best, slope_col_best)

print(f"\nProbabilidades por cluster:")
for cluster_id, data in sorted(best_report['conditional_probabilities'].items()):
    interp = best_interp.get(int(float(cluster_id)), 'unknown')
    print(f"  Cluster {cluster_id} ({interp.upper()}): P(hit) = {data['probability_pct']:.2f}% (n={data['count']})")

print(f"\nDelta P: {best_report['separation_metrics']['delta_p']:.4f}")
print(f"Information Ratio: {best_report['separation_metrics']['information_ratio']:.4f}")

## Etapa 14: Visualizações com Parâmetros Otimizados

Agora repetimos todas as visualizações usando os parâmetros do indivíduo vencedor do GA.

In [None]:
# Estatísticas por cluster com parâmetros otimizados
stats_best = best_kmeans.compute_statistics(df_best, future_col_best)

print("=== Estatísticas por Cluster (Parâmetros Otimizados) ===")
for stat in stats_best:
    interp = best_interp.get(stat.cluster_id, 'unknown')
    print(f"\nCluster {stat.cluster_id} ({interp.upper()}):")
    print(f"  Observações: {stat.count} ({stat.percentage:.1f}%)")
    print(f"  Retorno futuro médio: {stat.future_return_mean:.4f}")
    print(f"  Desvio padrão: {stat.future_return_std:.4f}")
    print(f"  Features médias: {stat.feature_means}")

In [None]:
# Histograma de retornos com parâmetros otimizados
hist_plotter_best = HistogramPlotter(
    return_column=future_col_best,
    regime_column='cluster'
)

fig = hist_plotter_best.plot(df_best)
plt.suptitle(f'Distribuição de Retornos Futuros ({ga_config.horizon} dias) - Parâmetros Otimizados', y=1.02)
plt.show()

In [None]:
# Histogramas por cluster com linha do target
fig = hist_plotter_best.plot_by_regime(df_best, target_return=ga_config.target_return)
plt.suptitle(f'Distribuição por Cluster - Target: {ga_config.target_return:.1%} em {ga_config.horizon} dias', y=1.02)
plt.show()

In [None]:
# Preço com background de cluster otimizado
df_best_indexed = df_best.set_index('date')

price_plotter_best = PriceRegimePlotter(regime_column='cluster')
fig = price_plotter_best.plot(df_best_indexed)
plt.suptitle('Preço com Regimes Otimizados pelo GA', y=1.02)
plt.show()

In [None]:
# Gráfico de barras: Probabilidade por Cluster
clusters = []
probs = []
colors = []
color_map = {'bull': 'green', 'bear': 'red', 'flat': 'gold'}

for cluster_id, data in sorted(best_report['conditional_probabilities'].items()):
    interp = best_interp.get(int(float(cluster_id)), 'flat')
    clusters.append(f"Cluster {int(float(cluster_id))}\n({interp})")
    probs.append(data['probability_pct'])
    colors.append(color_map.get(interp, 'gray'))

fig, ax = plt.subplots(figsize=(10, 5))
bars = ax.bar(clusters, probs, color=colors, edgecolor='black', alpha=0.8)

# Linha horizontal para probabilidade incondicional
ax.axhline(y=best_report['raw_probability_pct'], color='blue', linestyle='--', 
           linewidth=2, label=f"P incondicional: {best_report['raw_probability_pct']:.1f}%")

# Adicionar valores nas barras
for bar, prob in zip(bars, probs):
    ax.text(bar.get_x() + bar.get_width()/2, bar.get_height() + 0.5, 
            f'{prob:.1f}%', ha='center', va='bottom', fontsize=11, fontweight='bold')

ax.set_ylabel('Probabilidade (%)')
ax.set_title(f'Probabilidade de Atingir {ga_config.target_return:.1%} em {ga_config.horizon} dias por Cluster')
ax.legend()
ax.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

print(f"\nDelta P (separação): {best_report['separation_metrics']['delta_p']:.4f}")
print(f"Melhor cluster: {max(best_report['conditional_probabilities'].items(), key=lambda x: x[1]['probability_pct'])[0]}")
print(f"Pior cluster: {min(best_report['conditional_probabilities'].items(), key=lambda x: x[1]['probability_pct'])[0]}")

In [None]:
# Identificar regime atual e probabilidade
last_row = df_best.iloc[-1]
current_cluster = int(last_row['cluster'])
current_interp = best_interp.get(current_cluster, 'unknown')
current_prob = best_report['conditional_probabilities'][str(float(current_cluster))]['probability_pct']

print("=" * 60)
print("SITUAÇÃO ATUAL")
print("=" * 60)
print(f"\nData mais recente: {last_row['date'].strftime('%Y-%m-%d')}")
print(f"Preço de fechamento: R$ {last_row['close']:.2f}")
print(f"\nRegime atual: Cluster {current_cluster} ({current_interp.upper()})")
print(f"\nProbabilidade de atingir {ga_config.target_return:.1%} em {ga_config.horizon} dias:")
print(f"  -> {current_prob:.1f}%")
print(f"\nProbabilidade incondicional (média histórica): {best_report['raw_probability_pct']:.1f}%")

# Comparação
diff = current_prob - best_report['raw_probability_pct']
if diff > 0:
    print(f"\n✅ Regime atual tem probabilidade {diff:.1f} pontos percentuais ACIMA da média")
else:
    print(f"\n⚠️ Regime atual tem probabilidade {abs(diff):.1f} pontos percentuais ABAIXO da média")

<cell_type>markdown</cell_type>## Etapa 15: Análise Dual - Fechar vs Tocar

Comparamos duas probabilidades diferentes:
- **P(fechar)**: Probabilidade do preço FECHAR acima do alvo em t+H
- **P(tocar)**: Probabilidade do preço TOCAR o alvo em algum momento entre t+1 e t+H

Esta análise é útil para operações de opções e stop-loss/take-profit.

In [None]:
from src.calculators import FutureTouchCalculatorVectorized
from src.analysis import DualProbabilityCalculator

# Adicionar colunas de touch ao DataFrame com parâmetros otimizados
touch_calc = FutureTouchCalculatorVectorized(horizon=ga_config.horizon)
df_dual = touch_calc.calculate(df_best)

print("=== Novas Colunas de Touch ===")
print(f"Colunas adicionadas: {touch_calc.output_columns}")
print(f"\nExemplo dos dados:")
print(df_dual[['date', 'close', f'log_return_future_{ga_config.horizon}', 
               f'log_return_touch_max_{ga_config.horizon}']].head(10))

In [None]:
# Criar calculador dual
dual_calc = DualProbabilityCalculator(
    close_return_column=f'log_return_future_{ga_config.horizon}',
    touch_return_column=f'log_return_touch_max_{ga_config.horizon}',  # Para alvos de alta
    target_return=ga_config.target_return,
    regime_column='cluster',
    horizon=ga_config.horizon
)

# Imprimir comparação formatada
dual_calc.print_comparison(df_dual)

In [None]:
# Gráfico comparativo: P(fechar) vs P(tocar) por cluster
dual_report = dual_calc.generate_report(df_dual)

fig, ax = plt.subplots(figsize=(12, 6))

clusters_list = sorted(dual_report['conditional_probabilities'].keys())
x = np.arange(len(clusters_list))
width = 0.35

close_probs = [dual_report['conditional_probabilities'][c]['prob_close'] * 100 for c in clusters_list]
touch_probs = [dual_report['conditional_probabilities'][c]['prob_touch'] * 100 for c in clusters_list]

bars1 = ax.bar(x - width/2, close_probs, width, label='P(fechar)', color='steelblue', alpha=0.8)
bars2 = ax.bar(x + width/2, touch_probs, width, label='P(tocar)', color='coral', alpha=0.8)

# Labels nos clusters
cluster_labels = []
for c in clusters_list:
    interp = best_interp.get(int(float(c)), 'unknown')
    cluster_labels.append(f"Cluster {int(float(c))}\n({interp})")

ax.set_xticks(x)
ax.set_xticklabels(cluster_labels)
ax.set_ylabel('Probabilidade (%)')
ax.set_title(f'Comparação: P(fechar) vs P(tocar) - Target: {ga_config.target_return:.1%} em {ga_config.horizon} dias')
ax.legend()

# Adicionar valores nas barras
for bar in bars1:
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height, f'{height:.1f}%',
            ha='center', va='bottom', fontsize=9)

for bar in bars2:
    height = bar.get_height()
    ax.text(bar.get_x() + bar.get_width()/2., height, f'{height:.1f}%',
            ha='center', va='bottom', fontsize=9)

ax.grid(axis='y', alpha=0.3)
plt.tight_layout()
plt.show()

# Ratio touch/close
print("\n=== Ratio Touch/Close por Cluster ===")
for c in clusters_list:
    data = dual_report['conditional_probabilities'][c]
    ratio = data['touch_vs_close_ratio']
    interp = best_interp.get(int(float(c)), 'unknown')
    print(f"Cluster {int(float(c))} ({interp}): {ratio:.2f}x mais provável tocar que fechar")

In [None]:
# Situação atual com análise dual
last_row = df_dual.iloc[-1]
current_cluster = int(last_row['cluster'])
current_interp = best_interp.get(current_cluster, 'unknown')

current_data = dual_report['conditional_probabilities'][str(float(current_cluster))]
current_prob_close = current_data['prob_close'] * 100
current_prob_touch = current_data['prob_touch'] * 100

print("=" * 60)
print("SITUAÇÃO ATUAL - ANÁLISE DUAL")
print("=" * 60)
print(f"\nData: {last_row['date'].strftime('%Y-%m-%d')}")
print(f"Preço: R$ {last_row['close']:.2f}")
print(f"Regime: Cluster {current_cluster} ({current_interp.upper()})")

print(f"\nProbabilidade de atingir {ga_config.target_return:.1%} em {ga_config.horizon} dias:")
print(f"  P(fechar ≥ alvo): {current_prob_close:.1f}%")
print(f"  P(tocar o alvo):  {current_prob_touch:.1f}%")
print(f"  Ratio:            {current_prob_touch/current_prob_close:.2f}x")

print(f"\n📈 Interpretação:")
print(f"  É {current_prob_touch/current_prob_close:.1f}x mais provável o preço TOCAR o alvo")
print(f"  do que FECHAR acima dele.")
print(f"\n  Isso é útil para:")
print(f"  - Opções: P(tocar) relevante para barreiras knock-in/knock-out")
print(f"  - Trading: P(tocar) para stop-loss e take-profit")

## Etapa 16: Salvando o Modelo para Produção

Usamos `RegimePredictor` para encapsular o modelo treinado e facilitar previsões futuras.

In [None]:
from src.prediction import RegimePredictor

# Criar predictor a partir dos resultados do GA
predictor = RegimePredictor(
    chromosome=best,                      # Cromossomo otimizado
    target_return=ga_config.target_return,
    horizon=ga_config.horizon,
    ticker=ticker,
    data_dir="../data"
)

# Treinar com os dados históricos
predictor.fit(df)

# Salvar modelo para uso futuro
predictor.save("../models/bova11_predictor")
print("✅ Modelo salvo em ../models/bova11_predictor.joblib e .json")

# Carregar modelo (simulando uso em produção)
predictor_loaded = RegimePredictor.load("../models/bova11_predictor", data_dir="../data")
print("✅ Modelo carregado com sucesso!")

# Fazer previsão do regime atual
result = predictor_loaded.predict_current()

# Exibir status formatado
predictor_loaded.print_status(result)

## Etapa 17: Otimização Multi-Alvo

Para diferentes targets de retorno, o GA pode encontrar parâmetros diferentes.
O `MultiTargetOptimizer` roda GAs independentes para cada alvo e cria uma matriz de probabilidades.

**NOTA**: Esta etapa é MUITO intensiva computacionalmente. Use com cautela.

In [None]:
# Exemplo de uso do MultiTargetOptimizer (NÃO EXECUTAR - apenas demonstração)
# Este código levaria MUITO tempo para rodar

from config.search_space import TargetGrid
from src.optimization import MultiTargetOptimizer
from src.prediction import MultiTargetPredictor

# Configurar grid de targets (1% a 5%, step 1%)
target_grid = TargetGrid(
    target_min=0.01,   # 1%
    target_max=0.05,   # 5%
    target_step=0.01   # 1%
)

print("=== Configuração Multi-Target ===")
print(f"Targets: {target_grid.to_array()}")
print(f"Total de GAs a executar: {len(target_grid)}")

# multi_ga_config já definido na célula de configuração centralizada
print(f"\nUsando multi_ga_config da célula centralizada:")
print(f"  População: {multi_ga_config.population_size}")
print(f"  Gerações: {multi_ga_config.generations}")
print(f"  Early Stopping: {multi_ga_config.early_stopping}")

# DESCOMENTE PARA EXECUTAR (demora!)
# optimizer = MultiTargetOptimizer(
#     df=df,
#     horizon=HORIZON,
#     target_grid=target_grid,
#     ga_config=multi_ga_config
# )
# 
# results = optimizer.run(verbose=True)
# results.save("../models/multi_target_h2")
# 
# # Criar predictor
# multi_predictor = MultiTargetPredictor.from_optimization(results, df)
# multi_predictor.save("../models/multi_target_h2_predictor")
# 
# # Ver matriz de probabilidades
# matrix = multi_predictor.predict_current(ticker)
# multi_predictor.print_matrix(ticker)

print("\n⚠️ Código comentado para evitar execução longa.")
print("Descomente as linhas acima para executar a otimização multi-alvo.")

## Etapa 18: Grade de Strikes para Opções

O `StrikeGridOptimizer` é similar ao MultiTargetOptimizer, mas trabalha com preços absolutos (strikes).
Ideal para precificação de opções.

**Fluxo:**
1. Define strikes (ex: R$ 120, 125, 130...)
2. Converte cada strike para target return: `(strike / preço_atual) - 1`
3. Roda GA para cada target exato
4. Gera matriz de probabilidades por strike

In [None]:
# Exemplo de uso do StrikeGridOptimizer com Progress Callback

from src.optimization import StrikeGridOptimizer
from src.optimization.progress_callback import SimpleProgressCallback
from src.prediction import StrikeGridPredictor

# Preço atual do ativo
current_price = df['close'].iloc[-1]
print(f"Preço atual: R$ {current_price:.2f}")

# Definir strikes (exemplo para BOVA11)
strikes = list(range(int(current_price * 0.95), int(current_price * 1.10), 1))  # -5% a +10%, step de R$ 1
print(f"\nStrikes definidos: {strikes}")
print(f"Total de GAs a executar: {len(strikes)}")

# Preview da conversão strike → target
print("\n=== Preview Strike → Target ===")
print(f"{'Strike':<12} {'Retorno':<12} {'Direção':<10}")
print("-" * 34)
for strike in strikes[:5]:
    target_return = (strike / current_price) - 1
    direction = 'UP' if target_return > 0 else 'DOWN' if target_return < 0 else 'ATM'
    print(f"R$ {strike:<9.2f} {target_return*100:+.2f}%       {direction:<10}")
if len(strikes) > 5:
    print(f"... ({len(strikes) - 5} mais)")

# Usando multi_ga_config da célula de configuração centralizada
print(f"\nUsando multi_ga_config da célula centralizada:")
print(f"  População: {multi_ga_config.population_size}")
print(f"  Gerações: {multi_ga_config.generations}")
print(f"  Early Stopping: {multi_ga_config.early_stopping}")

# Factory para criar callback de progresso para cada GA
def create_ga_callback(strike_idx, strike):
    return SimpleProgressCallback(
        total_generations=multi_ga_config.generations,
        print_every=5  # Atualiza a cada 5 gerações
    )

# Executar otimização
strike_optimizer = StrikeGridOptimizer(
    df=df,
    current_price=current_price,
    strikes=strikes,
    horizon=HORIZON,
    ga_config=multi_ga_config
)

print(f"\n⏳ Iniciando otimização para {len(strikes)} strikes...")

strike_results = strike_optimizer.run(
    verbose=True,
    ga_progress_callback_factory=create_ga_callback,
    parallel_ga=False  # Melhor para Jupyter notebooks
)

# Salvar resultados
strike_results.save("../models/strike_grid_h2")

# Criar predictor
strike_predictor = StrikeGridPredictor.from_optimization(strike_results, df)
strike_predictor.save("../models/strike_grid_h2_predictor")

# Ver matriz de strikes
strike_predictor.print_matrix(ticker)

## Conclusão

Este notebook demonstrou o fluxo completo do sistema QuantNote:

1. **Configuração** com validação via Pydantic
2. **Obtenção de dados** do Yahoo Finance com rate limiting
3. **Validação** de dados com múltiplos validators
4. **Pipeline de calculadores** com resolução automática de dependências
5. **Classificação de regimes** (manual e K-Means)
6. **Cálculo de probabilidades** condicionais
7. **Visualização** de distribuições
8. **Walk-forward validation** para evitar overfitting
9. **Otimização** com algoritmo genético
10. **Análise dual** - P(fechar) vs P(tocar)

### Métricas de Probabilidade

- **P(fechar)**: Probabilidade de fechar acima do alvo no final do período
- **P(tocar)**: Probabilidade de atingir o alvo em algum momento durante o período

### Próximos Passos

- Testar com outros ativos
- Ajustar parâmetros do GA para busca mais ampla
- Implementar novos indicadores (RSI, MACD, etc.)
- Integrar com sistema de backtesting
- Usar P(tocar) para otimização de opções