# Algotrading - Aula 6

- Diebold-Mariano Test
- Monte Carlo Simulation
- Regime Detection
___

Retomando o último resultado da última aula

In [None]:
%matplotlib inline

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import yfinance as yf
from datetime import datetime, timedelta
import pandas_ta as ta
import statsmodels.api as sm
from scipy.stats import norm
from bcb import sgs

Agora vamos baixar dados de fechamento (**Historical Data**) de 5 anos de IBOV diretamente do site do Yahoo

In [None]:
ticker = '^BVSP'

# Selecionado uma data de hoje menos 5 anos
start_date = (datetime.now() - timedelta(days=5*365))

# Formatando para Ymd
start_date = start_date.strftime('%Y-%m-%d')

# Selecionando a data de ontem
end_date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')

data = yf.download(ticker, start=start_date, end=end_date, interval='1d', auto_adjust=True)

# Remove o ticker do index
ibov = data.droplevel(1, axis=1)

ibov['rsi'] = ibov.ta.rsi(length=2)
ibov = ibov.dropna()

# Puxando os dados de DI 1 dia do Banco Central do Brasil
# CDI = 12
cdi_data = sgs.get({'CDI': 12}, start=start_date, end=end_date)

# Incorporando o CDI no DataFrame
ibov = ibov.join(cdi_data, how='inner')
ibov

Simulação do Buy & Hold e CDI para benchmarking:

In [None]:
# Protótipo da estratégia:
k = 1_000_000
b = k
CDI = [] # Patrimonio líquido corrigido pelo CDI
BNH = [] # Buy and Hold
position = 0

for row in ibov.itertuples():
    k *= (1 + row.CDI/100)  # Corrigindo o caixa pelo CDI diário
    b *= (1 + row.CDI/100)  # Corrigindo o caixa pelo CDI diário
    if k > row.Close:
        position += k // row.Close
        k -= position * row.Close
    BNH.append(position * row.Close + k)
    CDI.append(b)

Função para simular a estratégia:

- ub: upper band da estratégia
- lb: lower band da estratégia
- init: índice inicial do DataFrame
- end: índice final do DataFrame
- k: capital inicial

In [None]:
# Custos aproximados para ações na B3
CUSTO_CORRETAGEM = 10   # por ordem
CUSTO_EMOLUMENTOS = (0.005 + 0.0275) / 100  # taxa de 0.005% + 0.0275% (emolumentos)
IMPOSTO_RENDA = 0.15  # sobre o lucro


def simulate(serie, lb, ub, init = 0, end = -1, k = 1_000_000, carry = True):

    position = 0
    PVCC = [] # Patrimonio líquido corrigido pelo CDI
    gaps = []
    trades = []
    mes = 0
    lucro = 0
    giro = 0

    days = 0

    for row in serie.iloc[init:end].itertuples():

        if row.Index.month != mes:
            imposto = 0        
            if lucro > 0:
                imposto = lucro * IMPOSTO_RENDA
                k -= imposto
            giro = 0
            if lucro > 0:
                lucro = 0

            mes = row.Index.month
        if carry:
            k *= (1 + row.CDI/100)  # Corrigindo o caixa pelo CDI diário
        if row.rsi < lb and position == 0:
            position = k // row.Close
            giro += position
            k -= position * row.Close  * (1 + CUSTO_EMOLUMENTOS)
            k -= CUSTO_CORRETAGEM
            buy_price = row.Close
            gaps.append(days)
            days = 0
        elif row.rsi > ub and position != 0:
            k += position * row.Close * (1 - CUSTO_EMOLUMENTOS)
            k -= CUSTO_CORRETAGEM
            sell_price = row.Close
            lucro += (sell_price - buy_price) * position
            trades.append((days, sell_price / buy_price - 1))
            giro += position
            position = 0            
            days = 0

        days += 1
        PVCC.append(k + position * row.Close)

    if position == 0:
        gaps.append(days)
    else:
        trades.append((days, (row.Close / buy_price) - 1))

    return (PVCC, (gaps, trades))

Grid Search para achar o melhor conjunto de parâmetros com dados In Sample (IS):

In [None]:
oost_of_sample = 504  # Último dois anos

lower_band = range(5, 21, 5)
upper_band = range(50, 101, 5)  
results = []
for lb in lower_band:
    for ub in upper_band:
        if lb >= ub:
            continue

        results.append((lb, ub, simulate(ibov, lb, ub, init = 0, end = -oost_of_sample)[0][-1]))

results_df = pd.DataFrame(results, columns=['Lower Band', 'Upper Band', 'Final Balance'])
results_df = results_df.sort_values(by='Final Balance', ascending=False).reset_index(drop=True)
results_df.head()

Separando o melhor resultado do periodo de calibração IS, vamos simular o período de testes OOS (Out of Sample):

In [None]:
lb = results_df.iloc[0]['Lower Band']
ub = results_df.iloc[0]['Upper Band']
benchmark = pd.Series(simulate(ibov, 10, 80, init = -oost_of_sample, end = -1)[0], name='Patrimonio')
result, report = simulate(ibov, lb, ub, init = -oost_of_sample, end = -1)
best_param = pd.Series(result, name='Patrimonio')
no_carry = pd.Series(simulate(ibov, lb, ub, init = -oost_of_sample, end = -1, carry = False)[0], name='Patrimonio')

CDI_OOS = pd.Series(CDI[-oost_of_sample:-1]) - CDI[-oost_of_sample] + 1_000_000
BNH_OOS = pd.Series(BNH[-oost_of_sample:-1]) - BNH[-oost_of_sample] + 1_000_000

CDI_OOS.plot();
BNH_OOS.plot();
benchmark.plot();
best_param.plot();
no_carry.plot();

# title
plt.title(f'Out of Sample');
# legenda
plt.legend(['CDI', 'B&H', 'Original Strategy', 'Best Parameters Strategy', 'No Carry Strategy']);
# eixo x
plt.xlabel('Dias');
# eixo y
plt.ylabel('Patrimônio líquido (R$)');
# preencher o eixo x com datas do ibov
plt.xticks(ticks=range(0, len(CDI_OOS), 30), labels=ibov.index[-oost_of_sample::30].strftime('%Y-%m-%d'), rotation=45);
plt.show();

## Teste Diebold-Mariano

Agora vamos realizar o teste Diebold-Mariano (DM) para verificar se duas estratégias são iguais.

O teste pode ser usado para medir qual estratégia é mais precisa em relação à um *groundtruth*. Mas também pode medir se duas estratégias apresentam resultados (PnL) similares.

Vamos usar o segundo arcabouço, pois não há predição de preços.

O teste indica que:

- $H_0$: As duas estratégias não possuem diferenças
- $H_A$: As duas estratégias possuem diferenças

In [None]:
# Teste Diebold-Mariano

def dm_test(pnl1, pnl2, h=1, q=None):
    """
    Implementa o teste Diebold-Mariano 
    
    Parameters:
    pnl1: patrimônio da primeira estratégia
    pnl2: patrimônio da segunda estratégia
    h: horizonte de previsão (default=1)
    q: número de defasagens na estimativa da variância (default=None, que usa max(h-1, 0))

    Quantidade de defasagens sugerida quando as diferença das entradas são altamente autocorrelacionadas:
    q = int(4*(T/100)**(2/9))
    
    Returns:
    dm_stat: estatística DM
    p_value: p-valor do teste
    dm: média da diferença das perdas

    Notes:
    A hipótese nula é que as duas estratégias têm a mesma capacidade preditiva.
    A hipótese alternativa é que a segunda estratégia é melhor que a primeira.
    A estatística DM é aproximadamente normal sob a hipótese nula.
    1.96 é o valor crítico para um nível de significância de 5%
    """
    
    # Diferença das loss functions
    d = (-pnl1) - (-pnl2)  # = pnl2 - pnl1

    T = d.size
    dm = d.mean()

    if q is None:
        q = max(h-1, 0)
    
    d_c = d - dm
    gamma0 = np.mean(d_c**2)
    var = gamma0
    for k in range(1, q+1):
        cov = np.mean(d_c[k:] * d_c[:-k])
        w = 1 - k/(q+1)
        var += 2*w*cov
    var_mean = var / T
    stat = dm / np.sqrt(var_mean)
    pval = 2*(1 - norm.cdf(abs(stat)))
    return stat, pval, dm  # dm is avg advantage of strategy 2 over 1

print(f'Best Parameters vs Benchmark (10, 80):')    
q = int(4*(len(best_param)/100)**(2/9))
dm_stat, p_value, dm = dm_test(best_param, benchmark, h=1, q=q)
print(f'DM Statistic: {dm_stat}, p-value: {p_value}, DM: {dm}')

In [None]:
q = int(4*(len(best_param)/100)**(2/9))
dm_stat, p_value, dm = dm_test(best_param, CDI_OOS, h=1, q=q)
print(f'Best Parameters vs CDI:')
print(f'DM Statistic: {dm_stat}, p-value: {p_value}, DM: {dm}')

In [None]:
q = int(4*(len(best_param)/100)**(2/9))
dm_stat, p_value, dm = dm_test(best_param, BNH_OOS, h=1, q=q)
print(f'Best Parameters vs Buy & Hold:')
print(f'DM Statistic: {dm_stat}, p-value: {p_value}, DM: {dm}')

Como vocês Interpretam os resultados? Lembrando que $H_0$ é que não há diferença entre o resultado das estratégias

### Para ir Além

Trocar a função de Loss:

- Mean–Variance (Risk-Adjusted)
- Drawdown-Aware
- Downside-Only Risk

---

## Monte Carlo Simulation

Agora vamos simular a estratégia com dados sintéticos. Essa técnica é chama de **Monte Carlo Simulation**.

A ideia é gerar dados sintéticos usando um modelo *Geometric Brownian Motion* (GMB) com os dados estatísticos da série real.

Ao final a estratégia é aplicada às séries sintéticas e é gerado a distribuição do PnL. Se o percentil do resultado da série real não for um *outlier*, isso pode indicar robustez da estratégia.

In [None]:
# Geometric Brownian Motion Simulation

# Gerar 1000 simulações de GBM para o IBOV com os mesmos parâmetros de drift e volatilidade do IBOV real
# e comparar o desempenho da estratégia otimizada nessas simulações
# com o desempenho da estratégia otimizada no IBOV real

# Parâmetros do GBM
S0 = ibov['Close'].iloc[-oost_of_sample-1]  # Preço inicial

mu = np.log(ibov['Close'].iloc[-oost_of_sample:]).diff().dropna().mean()  # Drift (retorno médio
sigma = np.log(ibov['Close'].iloc[-oost_of_sample:]).diff().dropna().std(ddof=1)  # Volatilidade (desvio padrão dos retornos)

print(f'Drift: {mu*252*100:.2f}%, Volatilidade: {sigma*np.sqrt(252)*100:.2f}% (anualizados)')

n_simulations = 1000  # Número de simulações
n_steps = len(ibov.iloc[-oost_of_sample-1:-1])  # Número de passos (dias)
np.random.seed(42)  # Para reprodutibilidade
simulations = np.zeros((n_steps, n_simulations))
dfs = []
for i in range(n_simulations):
    Z = np.random.standard_normal(n_steps)  # Números aleatórios normais
    X = (mu - 0.5 * sigma**2) + sigma * Z
    simulations[:, i] = S0 * np.exp(X.cumsum())  # Preço simulado
    dfs.append(pd.DataFrame(simulations[:, i], index=ibov.iloc[-oost_of_sample-1:-1].index, columns=[f'Close']))
    dfs[i] = dfs[i].join(cdi_data, how='inner')

pd.concat([df.Close for df in dfs], axis=1).plot(legend=False, alpha=0.1);
ibov['Close'].iloc[-oost_of_sample:].plot(color='black', label='IBOV Real', linewidth=2);
plt.title('Simulações de GBM para o IBOV');
plt.xlabel('Dias');
plt.ylabel('Preço Simulado');
plt.show();

In [None]:
lb = results_df.iloc[0]['Lower Band']
ub = results_df.iloc[0]['Upper Band']
results = []
for i in range(n_simulations):    
    dfs[i]['rsi'] = ta.rsi(dfs[i]['Close'], length=2)
    dfs[i] = dfs[i].dropna()    
    results.append(simulate(dfs[i], lb, ub, init = 0, end = -1)[0][-1])

results = pd.Series(results, name='Final Balance')
results.plot(kind='hist', bins=30, edgecolor='black');
plt.title('Distribuição do Patrimônio Final nas Simulações de GBM');
plt.xlabel('Patrimônio Final (R$)');
plt.ylabel('Frequência');
plt.axvline(x=results.mean(), color='r', linestyle='--', label='Média');
plt.axvline(x=results.median(), color='g', linestyle='--', label='Mediana');
plt.axvline(x=best_param.iloc[-1], color='b', linestyle='--', label='Resultado IBOV Real');
plt.axvline(x=CDI_OOS.iloc[-1], color='purple', linestyle='--', label='CDI OOS');
plt.axvline(x=BNH_OOS.iloc[-1], color='orange', linestyle='--', label='Buy & Hold OOS');
plt.legend();
plt.show();

In [None]:
percentile = (results <= best_param.iloc[-1]).mean()
print(f"O resultado real está no percentil {percentile:.1%}")

In [None]:
# Das simulações de IBOV, selecione apenas aquelas que tiveram retorno maior do que o CDI no período
# Analisar como foi o desempenho da estratégia otimizada nessas simulações

lb = results_df.iloc[0]['Lower Band']
ub = results_df.iloc[0]['Upper Band']
results = []
baseline = (CDI_OOS.iloc[-1] - CDI_OOS.iloc[0]) / CDI_OOS.iloc[0]
print(f'Baseline (CDI OOS): {baseline:.1%}')

for i in range(n_simulations):    
    dfs[i]['rsi'] = ta.rsi(dfs[i]['Close'], length=2)
    dfs[i] = dfs[i].dropna()    
    if dfs[i]['Close'].iloc[-1] >= dfs[i]['Close'].iloc[0] * (1 + baseline):
        results.append(simulate(dfs[i], lb, ub, init = 0, end = -1)[0][-1])

results = pd.Series(results, name='Final Balance')
results.plot(kind='hist', bins=30, edgecolor='black');
plt.title('Distribuição do Patrimônio Final nas Simulações de GBM');
plt.xlabel('Patrimônio Final (R$)');
plt.ylabel('Frequência');
plt.axvline(x=results.mean(), color='r', linestyle='--', label='Média');
plt.axvline(x=results.median(), color='g', linestyle='--', label='Mediana');
plt.axvline(x=best_param.iloc[-1], color='b', linestyle='--', label='Resultado IBOV Real');
plt.axvline(x=CDI_OOS.iloc[-1], color='purple', linestyle='--', label='CDI OOS');
plt.axvline(x=BNH_OOS.iloc[-1], color='orange', linestyle='--', label='Buy & Hold OOS');
plt.legend();
plt.show();

## Trades Bootstrapping

Agora vamos usar uma técnica chamada Bootstrapping (Efron, 1979) para analisar se o resultado que obtivemos com a série real foi apenas um resultado espúrio ou a estratégia é consistente.

A técnica envolve reamostrar a ordem das operações (com *replacement*). Isso deve embaralhar a ordem das operações e verificar se a ordem original obtida não é apenas um *oulier* da amostra (por exemplo: se a estratégia estiver com overfit, a estratégia deve performar mal quando fora de ordem).

Como a estratégia não fica 100% alocada, deve-se sortear também os momentos em que ele fica zerado.

In [None]:
# (k, allocation, position, price, cdi)
import numpy as np

B = 1_000
results = []

for i in range(B):
    gap = np.random.choice(report[0])
    k = 1_000_000

    count = 0
    for row in ibov.iloc[-oost_of_sample:-1].itertuples():
        # sample a gap in report[0]:
        if count == gap:
            idx = np.random.choice(range(len(report[1])))
            trade, ret = report[1][idx]
            gap = 0
            count = 0
        elif count < gap:
            k *= (1 + row.CDI/100)  # Corrigindo o caixa pelo CDI diário    
        elif count == trade:
            k += k * ret
            gap = np.random.choice(report[0])
            trade = 0
            count = 0

        count += 1    

    results.append(k)

results = pd.Series(results, name='Final Balance')
results.plot(kind='hist', bins=30, edgecolor='black');
plt.title('Distribuição do Patrimônio Final nas Simulações de Trades Aleatórios');
plt.xlabel('Patrimônio Final (R$)');
plt.ylabel('Frequência');
plt.axvline(x=results.mean(), color='r', linestyle='--', label='Média');
plt.axvline(x=results.median(), color='g', linestyle='--', label='Mediana');
plt.axvline(x=best_param.iloc[-1], color='b', linestyle='--', label='Resultado IBOV Real');
plt.axvline(x=CDI_OOS.iloc[-1], color='purple', linestyle='--', label='CDI OOS');
plt.axvline(x=BNH_OOS.iloc[-1], color='orange', linestyle='--', label='Buy & Hold OOS');
plt.legend();
plt.show();
print(f"O resultado real está no percentil {(results <= best_param.iloc[-1]).mean():.1%}")

Como podem ver do resultado, o resultado real não é um *outlier* e portanto não foi uma questão de sorte.

Porém muito cuidado! Bootstrap tem um viés de estimação que pode ser corrigido.

### Para ir Além

- Realizar o Bootstrapping com blocos ao invés de individualmente (i.i.d). O que talvez faça mais sentido por haver autocorrelação.
- Análisar também a volatilidade e outras métricas (Sharpe, Sortino, Drawdown, etc).

---

## Hurst Exponent para detecção de Regimes

A ideia foi publicada no livro The (mis)behaviour of the markets from Mandelbrot & em 2008. Sim, é o mesmo Mandelbrot especialista em fractais. Já Hurst foi um hidrologista e teve a ideia para medir os níveis de um rio.

Sobre o Indicador:

Se o valor for acima de 0.5: indica que há uma tendência
Se o valor for abaixo de 0.5 indica reversão a media
Se o valor for exatamente 0.5 indica movimento browniano

Vamos usar uma biblioteca para realizar a medição:

In [None]:
!pip install hurst

In [None]:
# Aplicando sobre a série histórica do IBOV dos últimos 5 anos

from hurst import compute_Hc
# Hurst Exponent
H, c, data = compute_Hc(np.log(ibov['Close']), kind='price', simplified=True)
print(f'Hurst Exponent: {H}')

ibov['Close'].plot();

Podemos ver o valor está próximo de 5 para todo o período. É possível realizar um teste de hipóteses para verificar se a série é realmente um movimento browniano. Ver o trabalho do Kleinow (2002) sobre o assunto.

Como desejamos detectar períodos para segregar o treinamento, vamos aplicar a medida sobre uma janela móvel:

In [None]:
# Sliding Window Hurst Exponent

window_size = 252  # 1 ano
hurst_values = []
dates = []
for start in range(0, len(ibov) - window_size + 1):
    end = start + window_size
    window_data = np.log(ibov['Close'].iloc[start:end])
    H, c, data = compute_Hc(window_data, kind='price', simplified=True)
    hurst_values.append(H)
    dates.append(ibov.index[start + window_size - 1])

hurst_series = pd.Series(hurst_values, index=dates, name='Hurst Exponent')
hurst_series.plot(title='Sliding Window Hurst Exponent', figsize=(10, 5));
plt.axhline(y=0.5, color='r', linestyle='--', label='Random Walk Threshold');
plt.xlabel('Date');
plt.ylabel('Hurst Exponent');
plt.legend();

Como podem ver, desconsiderando o ruído de alta frequência, há claros períodos em que o índice entra em tendência e períodos onde fica mais estacionário (lateral). 

Lembrando que essa métrica depende de uma janela de observação, logo, quando a medida ultrapassa o 0.5, isso aconteceu há algum tempo atrás. Logo isso não é uma predição, mas um filtragem para segregração de dados a posteriori.

Vamos dar uma suavizada com uma média móvel exponencial e plotar os intervalos segregados:

In [None]:
# Sliding Window Hurst Exponent

window_size = 252  # 1 ano
hurst_values = []
dates = []
for start in range(0, len(ibov) - window_size + 1):
    end = start + window_size
    window_data = np.log(ibov['Close'].iloc[start:end])
    H, c, data = compute_Hc(window_data, kind='price', simplified=True)
    hurst_values.append(H)
    dates.append(ibov.index[start + window_size - 1])

hurst_series = pd.Series(hurst_values, index=dates, name='Hurst Exponent').ewm(span=42).mean().dropna()


window_size = 252  # your sliding window size in days

fig, (ax1, ax2) = plt.subplots(
    2, 1, sharex=True, figsize=(10, 6),
    gridspec_kw={'height_ratios': [2, 1]}
)

# Plot IBOV Close (top)
ibov['Close'].plot(ax=ax1, color='black', lw=1)
ax1.set_title('IBOV Close Price')

# Plot Hurst (bottom)
hurst_series.plot(ax=ax2, color='tab:blue', lw=1)
ax2.axhline(y=0.5, color='r', linestyle='--', label='Random Walk Threshold')
ax2.set_title('Sliding Window Hurst Exponent')
ax2.set_xlabel('Date'); ax2.set_ylabel('Hurst Exponent')
ax2.legend(loc='upper left')

# --- Shade ax1 according to Hurst --- #
s = hurst_series.dropna().copy()

def _span(ax, start, end, state):
    if start is None or end is None or start >= end:
        return
    if state == 'above':   # H > 0.5
        ax.axvspan(start - pd.Timedelta(days=window_size),
                   end - pd.Timedelta(days=window_size),
                   facecolor='lightblue', alpha=0.3, zorder=0)
    elif state == 'below': # H < 0.5
        ax.axvspan(start - pd.Timedelta(days=window_size),
                   end - pd.Timedelta(days=window_size),
                   facecolor='lightcoral', alpha=0.3, zorder=0)

state = None
span_start = None
prev_t = None

for t, h in s.items():
    curr = 'above' if h > 0.5 else 'below'
    if state is None:
        state, span_start = curr, t
    elif curr != state:
        _span(ax1, span_start, prev_t, state)
        state, span_start = curr, t
    prev_t = t
_span(ax1, span_start, prev_t+pd.Timedelta(days=window_size), state)

# close last span
_span(ax1, span_start, prev_t, state)

plt.tight_layout()
plt.show()

O Resultado indica os períodos que poderiam ser usados como IS ou OOS para a estratégia.

Para avaliar a robustez, poderia-se segregar os dados apenas OOS e aplicar sobre os parâmetros calibrados, analisando o desempenho do modelo em regimes distintos

**Atenção**: Muito cuidado para não contaminar o IS com o OOS. Precisa excluir a janela inicial usada para o cálculo da amostra IS.

### Para ir Além

A mesma análise pode ser feita usando *expanding window* ao invés de *rolling window*. 

Outra forma de detecção de estacionaridade de séries é usando teste Dick-Fuller.

---

## Próxima aula: Backtrader

A mesma simulação feita nas últimas duas aulas pode ser feita com uma biblioteca especialista. Ela irá calcular as métricas automaticamente:

In [None]:
!pip install backtrader

In [None]:
# backtrader implementation of RSI(2, 10, 80)
%matplotlib inline

import backtrader as bt

class RSIMeanReversion(bt.Strategy):
    params = dict(
        period=2,    # RSI lookback period
        lb=5,        # lower bound
        ub=80,       # upper bound
        cdi=0
    )

    def __init__(self):
        # Create RSI indicator
        self.rsi = bt.indicators.RSI(self.data.close, period=self.p.period)

    def next(self):
        if self.broker.getcash() > 0:
            self.broker.add_cash(self.broker.getcash() * self.p.cdi)
        # --- Buy condition ---
        if self.rsi[0] < self.p.lb and not self.position:
            size = self.broker.getcash() // self.data.close[0]
            self.buy(size=size)  # market order on next bar

        # --- Sell condition ---
        elif self.rsi[0] > self.p.ub and self.position:
            size = self.position.size
            self.sell(size=size)  # close position (opposite order)

cdi = ibov['CDI'].iloc[-oost_of_sample-1:-1].mean() / 100 # Gambiarra
data = bt.feeds.PandasData(dataname=ibov.iloc[-oost_of_sample-1:-1])
cerebro = bt.Cerebro(stdstats=True)
cerebro.adddata(data)
cerebro.addstrategy(RSIMeanReversion, period=2, lb=lb, ub=ub, cdi=cdi)
cerebro.broker.setcash(1_000_000)
cerebro.broker.setcommission(commission=CUSTO_EMOLUMENTOS)
cerebro.broker.set_coc(True)  # carry overnight

cerebro.run()
figs = cerebro.plot(style='candlestick', volume=False, iplot=False, numfigs=1, figsize=(20, 12), dpi=120)
plt.show()

Vamos explorar melhor a biblioteca na próxima aula!