## Bibliotecas

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from statsmodels.tsa.stattools import coint
import itertools

## Fun√ß√µes

In [3]:
# Carregar dados
def carregar_dados(moeda1, moeda2):

    path_ativo_1 = f'../../data/fechamentos/{moeda1}USDT_5m_data.csv'
    path_ativo_2 = f'../../data/fechamentos/{moeda2}USDT_5m_data.csv'

    ativo1 = pd.read_csv(path_ativo_1, parse_dates=['timestamp'], index_col='timestamp')
    ativo2 = pd.read_csv(path_ativo_2, parse_dates=['timestamp'], index_col='timestamp')

    # Elimina NaNs restantes
    df = pd.DataFrame({
        'Ativo1': ativo1['close'],
        'Ativo2': ativo2['close']
    }).dropna()

    return df



# Teste de cointegra√ß√£o
def testar_cointegracao(series1, series2):
    _, pvalor, _ = coint(series1, series2)
    return pvalor

# Zscore
def calcular_zscore(spread, window=60):
    rolling_mean = spread.rolling(window=window).mean() #m√©dia da √∫ltima janela
    rolling_std = spread.rolling(window=window).std() #desvio padr√£o da √∫ltima janela

    # Substituir stds muito baixos por um m√≠nimo
    epsilon = 1e-8
    rolling_std = rolling_std.replace(0, epsilon).fillna(epsilon)

    zscore = (spread - rolling_mean) / rolling_std  #zscore = (x- m√©dia_janela)/(desvio_padrao_janela)

    return rolling_mean, rolling_std, zscore #m√©dia da janela, desvio padrao, zscore

# Zscore
def calcular_media_ativos(ativo1, ativo2, window=60):
    ativo1_mean = ativo1.rolling(window=window).mean() #m√©dia da √∫ltima janela
    ativo2_mean = ativo2.rolling(window=window).mean() #m√©dia da √∫ltima janela

    ativo1_std = ativo1.rolling(window=window).std() #std da media movel
    ativo2_std = ativo2.rolling(window=window).std() #std da media movel


    return ativo1_mean, ativo2_mean, ativo1_std, ativo2_std  #m√©dia da janela, desvio padrao, zscore


#spread
def calcular_spread(serie1, serie2):
    serie1 = pd.to_numeric(serie1, errors='coerce')
    serie2 = pd.to_numeric(serie2, errors='coerce')
    spread = serie1 - serie2

    return spread

def vale_a_pena_operar(cotacao_atual, cotacao_futura_esperada, taxa=0.01, tipo_operacao='compra'):
    """
    Verifica se vale a pena operar o ativo (comprado ou vendido), considerando as taxas.

    Par√¢metros:
    - cotacao_atual: pre√ßo atual do ativo
    - cotacao_futura_esperada: pre√ßo esperado na venda (se comprado) ou recompra (se vendido)
    - taxa: taxa cobrada em cada opera√ß√£o (compra ou venda), como 0.01 para 1%
    - tipo_operacao: 'compra' ou 'venda' (representa a posi√ß√£o inicial)

    Retorna:
    - True se a opera√ß√£o vale a pena (lucro cobre as taxas), False caso contr√°rio
    """

    if tipo_operacao == 'compra':
        custo_total = cotacao_atual * (1 + taxa)
        valor_recebido = cotacao_futura_esperada * (1 - taxa)
        return valor_recebido > custo_total
    
    elif tipo_operacao == 'venda':
        receita_liquida = cotacao_atual * (1 - taxa)
        custo_recompra = cotacao_futura_esperada * (1 + taxa)
        return receita_liquida > custo_recompra
    
    else:
        raise ValueError("tipo_operacao deve ser 'compra' ou 'venda'")


# Estrat√©gia de sinaliza√ß√£o
def gerar_sinais(df, zscore_compra_e_venda, zscore_encerrar_posicao, stop_loss, cooldown_stop_loss, janela, taxa=0.01):
    series1 = df['Ativo1']
    series2 = df['Ativo2']

    limite_superior =zscore_compra_e_venda
    limite_inferior = -zscore_compra_e_venda

    spread = calcular_spread(series1, series2)
    rolling_mean, rolling_std, zscore = calcular_zscore(spread, janela) #Zscore com janela movel
    media_movel_ativo1, media_movel_ativo2, ativo1_std, ativo2_std = calcular_media_ativos(series1, series2, janela) #media movel do ativo 1 e 2

    nomes_posicoes = ["neutro", "compra_1_vende_2", "vende_1_compra_2"]
    posicao = nomes_posicoes[0] #comprado_vendido, vendido_comprado, encerrar, Neutro

    
    sinais_compra_e_venda = []

    for i in range(len(df)):
        linha_df = df.iloc[i]
        '''
        if zscore.iloc[i] > limite_superior:
            posicao = nomes_posicoes[2] #entra vendido no ativo 1 e comprado no 2
        if zscore.iloc[i] < -limite_inferior:
            posicao = nomes_posicoes[1] #entra comprado no ativo 1 e vendido no 2
        '''
        if zscore.iloc[i] > limite_superior and vale_a_pena_operar(df['Ativo1'].iloc[i], media_movel_ativo1.iloc[i], taxa, 'venda'):
            posicao = nomes_posicoes[2] #entra vendido no ativo 1 e comprado no 2
        if zscore.iloc[i] < -limite_inferior and vale_a_pena_operar(df['Ativo1'].iloc[i], media_movel_ativo1.iloc[i], taxa, 'compra'):
            posicao = nomes_posicoes[1] #entra comprado no ativo 1 e vendido no 2


        if (-0.5 < zscore.iloc[i] < zscore_encerrar_posicao):
            posicao = nomes_posicoes[0] #posicao mantem neutra ou encerra posicao anterior
        ##if stoploss
        sinais_compra_e_venda.append(posicao)
    
    # Resultado final
    df_sinais = pd.DataFrame({
        'timestamp': df.index,
        'Ativo1': pd.to_numeric(series1, errors='coerce'),
        'Ativo2': pd.to_numeric(series2, errors='coerce'),
        'ativo1_mean': media_movel_ativo1,
        'ativo2_mean': media_movel_ativo2,
        'ativo1_std': ativo1_std, 
        'ativo2_std': ativo2_std,
        'spread': pd.to_numeric(spread, errors='coerce'),
        'rolling_mean': rolling_mean.values,
        'rolling_std': rolling_std.values,
        'zscore': zscore.values,
        'sinal': sinais_compra_e_venda
    }).dropna()
    
    df_sinais.to_excel('resultados/estrategia.xlsx', index=False)

    return df_sinais


def simular_retorno_por_trade(df, capital_inicial=10000, taxa=0.01):
    series1 = df['Ativo1']
    series2 = df['Ativo2']
    sinais = df['sinal']

    capital = capital_inicial
    capital_series = [capital]
    posicao = None
    entrada_indice = None
    retornos_trade = [None]

    for i in range(1, len(sinais)):
        ret = None

        if posicao is None and sinais.iloc[i] != 'neutro':
            posicao = sinais.iloc[i]
            entrada_indice = i

        elif posicao is not None and sinais.iloc[i] == 'neutro':
            # Entrada
            preco_entrada_1 = series1.iloc[entrada_indice]
            preco_entrada_2 = series2.iloc[entrada_indice]

            # Sa√≠da
            preco_saida_1 = series1.iloc[i]
            preco_saida_2 = series2.iloc[i]

            if posicao == 'compra_1_vende_2':
                # Compra ativo 1 (paga mais), vende ativo 2 (recebe menos)
                preco_entrada_1 *= (1 + taxa)
                preco_entrada_2 *= (1 - taxa)

                # Vende ativo 1 (recebe menos), recompra ativo 2 (paga mais)
                preco_saida_1 *= (1 - taxa)
                preco_saida_2 *= (1 + taxa)

                ret = ((preco_saida_1 - preco_entrada_1) / preco_entrada_1) - \
                      ((preco_saida_2 - preco_entrada_2) / preco_entrada_2)

            elif posicao == 'vende_1_compra_2':
                # Vende ativo 1 (recebe menos), compra ativo 2 (paga mais)
                preco_entrada_1 *= (1 - taxa)
                preco_entrada_2 *= (1 + taxa)

                # Recompra ativo 1 (paga mais), vende ativo 2 (recebe menos)
                preco_saida_1 *= (1 + taxa)
                preco_saida_2 *= (1 - taxa)

                ret = ((preco_saida_2 - preco_entrada_2) / preco_entrada_2) - \
                      ((preco_saida_1 - preco_entrada_1) / preco_entrada_1)

            capital *= (1 + ret)
            posicao = None
            entrada_indice = None

        capital_series.append(capital)
        retornos_trade.append(ret)

    retorno_pct = (capital / capital_inicial) - 1

    df_resultado = df.copy()
    df_resultado['capital'] = capital_series
    df_resultado['retorno_trade'] = retornos_trade

    return capital, retorno_pct, capital_series, df_resultado

def simular_estrategia(moeda1, moeda2, zscore_compra_e_venda, zscore_encerrar_posicao, 
                       stop_loss, cooldown_stop_loss, janela, data_inicial, data_final, 
                       taxa=0.01, capital_inicial=10000):
    
    # Carregar dados
    df = carregar_dados(moeda1, moeda2).loc[data_inicial:data_final]


    # Testar cointegra√ß√£o
    '''
    p_valor = testar_cointegracao(df['Ativo1'], df['Ativo2'])
    if p_valor > 0.05:
        print(f"As s√©ries {moeda1}-{moeda2} n√£o s√£o cointegradas (p-valor = {p_valor:.4f})")
        return None, None, None
    '''

    # Gerar sinais
    df_sinais = gerar_sinais(df, zscore_compra_e_venda, zscore_encerrar_posicao, stop_loss, cooldown_stop_loss, janela, taxa)

    # Simular retorno
    capital_final, retorno_pct, capital_series, df_resultado = simular_retorno_por_trade(df_sinais, capital_inicial, taxa)

    # Calcular retornos percentuais por trade
    retornos_trade = df_resultado['retorno_trade'].dropna()

    # Calcular m√©tricas
    sharpe = calcular_sharpe(retornos_trade)
    retorno_risco = retorno_ajustado_ao_risco(retornos_trade)

    return retorno_pct, retorno_risco, sharpe

def retorno_ajustado_ao_risco(serie_retorno):
    serie_retorno = pd.Series(serie_retorno).dropna()
    if len(serie_retorno) == 0:
        return 0
    retorno_total = (1 + serie_retorno).prod() - 1
    risco = serie_retorno.std()
    if risco == 0:
        return 0
    return retorno_total / risco

def calcular_sharpe(serie_retorno, taxa_livre_risco=0.0):
    serie_retorno = pd.Series(serie_retorno).dropna()
    if len(serie_retorno) == 0:
        return 0
    media_excesso_retorno = serie_retorno.mean() - taxa_livre_risco
    desvio = serie_retorno.std()
    if desvio == 0:
        return 0
    sharpe = media_excesso_retorno / desvio
    return sharpe
    

## Teste da estrat√©gia

In [5]:

# Algoritmo de otimizacao
zscv = 2.5
zse = 0.5

stop_loss = 0

cooldown = 0
janela = 3000

lista_moedas = ['BTC', 'WBTC', 'ETH', 'ADA', 'BNB', 'SOL', 'XRP']
dfcodigo = carregar_dados("BTC", "BNB")

# Intervalo para teste de cointegra√ß√£o
data_inicial = '2025-06-20'
data_final   = '2025-07-19'
df_filtrado = dfcodigo.loc[data_inicial:data_final]


df_sinalizado = gerar_sinais(df_filtrado, zscv, zse, stop_loss, cooldown, janela, 0.01) #gera os sinais de compra_venda_encerramento

capital_final, retorno_percentual, capital_series, df_com_retorno_financeiro = simular_retorno_por_trade(df_sinalizado) #simula os retornos

#Valores pra gr√°fico
df_com_retorno_financeiro['ativo1_desvio_preco'] =  df_com_retorno_financeiro['Ativo1'] - df_com_retorno_financeiro['ativo1_mean']
df_com_retorno_financeiro['ativo1_std_preco'] = df_com_retorno_financeiro['ativo1_desvio_preco'] / df_com_retorno_financeiro['ativo1_std']

df_com_retorno_financeiro['ativo2_desvio_preco'] = df_com_retorno_financeiro['Ativo2'] - df_com_retorno_financeiro['ativo2_mean'] 
df_com_retorno_financeiro['ativo2_std_preco'] = df_com_retorno_financeiro['ativo2_desvio_preco'] / df_com_retorno_financeiro['ativo2_std']




df_com_retorno_financeiro.to_excel('resultados/retornos.xlsx', index=False)

print(f"\nRetorno acumulado: {retorno_percentual * 100:.2f}%")
print(f"Capital final: ${capital_final:.2f}")

retorno_pct, retorno_risco, sharpe = simular_estrategia(
    moeda1='BTC',
    moeda2='BNB',
    zscore_compra_e_venda=2,
    zscore_encerrar_posicao=0.5,
    stop_loss=0,
    cooldown_stop_loss=0,
    janela=3000,
    data_inicial='2025-06-20',
    data_final='2025-07-19'
)
print(f"Retorno percentual Simulado: {retorno_pct*100:.2f}%")




Retorno acumulado: 1.23%
Capital final: $10122.62
Retorno percentual Simulado: -2.37%


## Testar pares de cointegrados

In [None]:

moedas = ['BTC', 'WBTC', 'ETH', 'ADA', 'BNB', 'SOL', 'XRP']
caminho_base = '../../data/fechamentos'

path_ativo_1 = f'../../data/fechamentos/{moedas[1]}USDT_5m_data.csv'
path_ativo_2 = f'../../data/fechamentos/{moedas[2]}USDT_5m_data.csv'

stop_loss_pct = -0.02
cooldown_periodos = 12

# Carregar todos os dados
df = carregar_dados (moedas[1], moedas[2])

# Intervalo para teste de cointegra√ß√£o
data_inicial = '2024-07-19'
data_final   = '2025-07-19'
df_filtrado = df.loc[data_inicial:data_final]

pvalor = testar_cointegracao(df_filtrado['Ativo1'], df_filtrado['Ativo2'])
print(f'P-valor da cointegra√ß√£o no per√≠odo {data_inicial} a {data_final}: {pvalor:.4f}')


def carregar_ativo(nome):
    caminho = f'{caminho_base}/{nome}USDT_5m_data.csv'
    df = pd.read_csv(caminho, parse_dates=['timestamp'], index_col='timestamp')
    return df['close'].loc[data_inicial:data_final]


# üìä Rodar teste de cointegra√ß√£o em todos os pares
resultados = []

for moeda1, moeda2 in itertools.combinations(moedas, 2):
    try:
        serie1 = carregar_ativo(moeda1)
        serie2 = carregar_ativo(moeda2)

        df = pd.DataFrame({moeda1: serie1, moeda2: serie2}).dropna()

        if len(df) < 100:
            continue  # ignora s√©ries com dados insuficientes

        pvalor = coint(df[moeda1], df[moeda2])[1]

        resultados.append({
            'Ativo1': moeda1,
            'Ativo2': moeda2,
            'P-valor': round(pvalor, 5),
            'N_observacoes': len(df)
        })

    except Exception as e:
        print(f'Erro ao processar {moeda1} x {moeda2}: {e}')

# üìÑ Resultado final
df_resultados = pd.DataFrame(resultados)
df_resultados = df_resultados.sort_values('P-valor')
df_resultados.to_csv('resultados/pares_cointegrados.csv', index=False)

# üìã Mostrar apenas os pares cointegrados
cointegrados = df_resultados[df_resultados['P-valor'] < 0.05]
print(f"\nPares cointegrados encontrados (p < 0.05):\n")
print(cointegrados)


P-valor da cointegra√ß√£o no per√≠odo 2024-07-19 a 2025-07-19: 0.8951

Pares cointegrados encontrados (p < 0.05):

   Ativo1 Ativo2  P-valor  N_observacoes
0     BTC   WBTC  0.00000         105303
19    BNB    XRP  0.01545         105305
3     BTC    BNB  0.03062         105304
8    WBTC    BNB  0.03259         105303
