<a href="https://colab.research.google.com/github/JuanHermann/BT_IFR2/blob/main/BT_IFR2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import yfinance as yf
import pandas as pd
import numpy as np
from datetime import date, datetime, timedelta
from tabulate import tabulate

In [None]:
def rsi2_mm3(data, rsi_period=2, smooth_period=3):

    if "Close" not in data.columns or len(data) < smooth_period:
        print(f"Erro: Dados insuficientes. Necessário pelo menos {smooth_period} períodos.")
        return data

    # 2. Variação de Preço
    delta = data["Close"].diff(periods=1)

    # 3. Ganhos (Up) e Perdas (Down) - Perda como valor positivo
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    # 4. Suavização EMA (Wilder's Smoothing)
    # O método 'ewm' com adjust=False e span=smooth_period é o equivalente no Pandas
    # à suavização de Welles Wilder (RMA).

    # Usamos min_periods=smooth_period para o cálculo começar no ponto correto.
    avg_gain = gain.ewm(span=smooth_period, min_periods=smooth_period, adjust=False).mean()
    avg_loss = loss.ewm(span=smooth_period, min_periods=smooth_period, adjust=False).mean()

    # 5. Cálculo do RS e RSI
    epsilon = 1e-10 # Adiciona epsilon para estabilidade numérica e evitar divisão por zero

    # Força Relativa (RS)
    rs = avg_gain / (avg_loss + epsilon)

    # Índice de Força Relativa (RSI)
    rsi = 100 - (100 / (1 + rs))

    # 6. Tratamento de Casos Extremos (Divisão por zero)
    # Garante que o RSI = 100 quando não há perda (avg_loss = 0) e há ganho.
    loss_is_zero = np.isclose(avg_loss, 0, atol=epsilon)
    gain_is_positive = (avg_gain > 0)

    rsi[loss_is_zero & gain_is_positive] = 100

    # 7. Renomeia e Retorna
    # Nome da coluna ajustado para refletir o período de suavização real
    data["RSI_2"] = rsi

    # Retorna o DataFrame a partir do ponto onde o RSI é calculado
    return data.iloc[smooth_period - 1:]

In [None]:
def exitTrade(trades,entry_date, exit_date, entry_price,exit_price,max_holding_days):

  profit = exit_price - entry_price
  trades.append({
                'Entry_Date': entry_date,
                'Exit_Date': exit_date,
                'Entry_Price': entry_price,
                'Exit_Price': exit_price,
                'P/L': profit,
                'Result': 'Win' if profit > 0 else 'Loss',
                'Max_Hold_Days': max_holding_days

            })

In [None]:
def gerar_tabela(list_of_trades_data):
    """
    Gera uma tabela de métricas de backtest para uma lista de dados de trades.

    Args:
        list_of_trades_data (list): Uma lista onde cada elemento contém dados de trades
                                     (por exemplo, um DataFrame de trades para um conjunto de parâmetros).

    Returns:
        pd.DataFrame: Um DataFrame contendo as métricas agregadas para cada conjunto de trades.
    """

    # --- 1. PREPARAR OS DADOS PARA O CABEÇALHO MULTINÍVEL (MultiIndex) ---
    cabecalho_multi_nivel = []
    periodos = ["5 ANOS", "3 ANOS", "1 ANO", "6 MESES"]
    metricas = ["QTDE", "F. LUCRO", "%"]

    for periodo in periodos:
        for metrica in metricas:
            cabecalho_multi_nivel.append((periodo, metrica))

    cabecalho_multi_nivel.append(("CANDLES", ""))
    cabecalho_multi_nivel.append(("DROPDOWN MAX", ""))

    colunas = pd.MultiIndex.from_tuples(cabecalho_multi_nivel, names=['Período', 'Métrica'])

    # --- 2. Processar cada conjunto de trades na lista ---
    resultados = []
    for trades_data in list_of_trades_data:
        # Convert the trades data to a DataFrame if it's not already
        trades_df = pd.DataFrame(trades_data)

        if trades_df.empty:
            # Append a row of zeros or NaNs if there are no trades for this set
            resultados.append([0] * len(colunas))
            continue

        # Ensure 'Entry_Date' is datetime for filtering
        trades_df['Entry_Date'] = pd.to_datetime(trades_df['Entry_Date'])

        # Filter trades based on time periods
        trades_df_5_years  = trades_df[trades_df['Entry_Date'] >= (date.today() - pd.DateOffset(years=5))]
        trades_df_3_years  = trades_df[trades_df['Entry_Date'] >= (date.today() - pd.DateOffset(years=3))]
        trades_df_1_year   = trades_df[trades_df['Entry_Date'] >= (date.today() - pd.DateOffset(years=1))]
        trades_df_6_months = trades_df[trades_df['Entry_Date'] >= (date.today() - pd.DateOffset(months=6))]

        # Calculate metrics for each period
        row_data = [
            # 5 ANOS (QTDE, F. LUCRO, %)
            len(trades_df_5_years), getProfitFactor(trades_df_5_years), getWinRate(trades_df_5_years),
            # 3 ANOS (QTDE, F. LUCRO, %)
            len(trades_df_3_years), getProfitFactor(trades_df_3_years), getWinRate(trades_df_3_years),
            # 1 ANO (QTDE, F. LUCRO, %)
            len(trades_df_1_year), getProfitFactor(trades_df_1_year), getWinRate(trades_df_1_year),
            # 6 MESES (QTDE, F. LUCRO, %)
            len(trades_df_6_months), getProfitFactor(trades_df_6_months), getWinRate(trades_df_6_months),
            # CANDLES (Assuming you want an aggregate like mean)
            trades_df['Max_Hold_Days'][0],
            # DROP MAX
            getMaxDropdown(trades_df)
        ]
        resultados.append(row_data)


    # --- 3. CRIAR O DATAFRAME ---
    df = pd.DataFrame(resultados, columns=colunas)
    df.index.name = None # Opcional: Remover o nome do índice da primeira linha

    return df
#gerar_tabela(trades_list_of_lists)

In [None]:
def getWinRate(trades_df):
    winning_trades = len(trades_df[trades_df['P/L'] > 0])
    win_rate = (winning_trades / len(trades_df)) * 100
    return f"{win_rate:.2f}%"

In [None]:
def getProfitFactor(trades_df):
    total_gross_profit = trades_df[trades_df['P/L'] > 0]['P/L'].sum()
    total_gross_loss = abs(trades_df[trades_df['P/L'] < 0]['P/L'].sum())
    profit_factor = total_gross_profit / total_gross_loss if total_gross_loss > 0 else np.inf
    return f"{profit_factor:.2f}"

In [None]:
def getMaxDropdown(trades_df):
    # 6.3. Drawdown Máximo (Max Drawdown)
    # Calcula o retorno cumulativo (equity curve)
    trades_df['Cumulative_P/L'] = trades_df['P/L'].cumsum()
    equity_curve = trades_df['Cumulative_P/L']
    # Calcula o máximo valor da curva de patrimônio até o momento
    peak = equity_curve.expanding().max()
    # Calcula o Drawdown (do pico até o valor atual)
    drawdown = (equity_curve - peak) / peak.mask(peak == 0, 1)
    max_drawdown = abs(drawdown.min()) * 100
    return f"{max_drawdown:.2f}%"

In [None]:
def getTableAllResults(all_tickers_results):
  """
  Cria um DataFrame contendo o ticker e a primeira linha dos resultados de backtest para cada ticker.

  Args:
      all_tickers_results (dict): Um dicionário onde as chaves são os tickers
                                  e os valores são os DataFrames de resultados do backtest.

  Returns:
      pd.DataFrame: Um DataFrame com o ticker e a primeira linha dos resultados.
  """
  # Create an empty list to store the data for the new DataFrame
  data_for_new_df = []

  # Iterate through the results for each ticker
  for ticker, result_df in all_tickers_results.items():
    # Check if the result is a DataFrame and not empty
    if isinstance(result_df, pd.DataFrame) and not result_df.empty:
      # Get the first row of the results DataFrame
      best_row = selecionar_melhor_parametro_ponderado(result_df)
      # Add the ticker to the dictionary for the first row
      best_row['Ticker'] = ticker[:-3]
      # Append the data to the list
      data_for_new_df.append(best_row)
    else:
      # Handle cases where there are no results or an error occurred
      print(f"Could not get results for {ticker}: {result_df}")


  # Create a new DataFrame from the collected data
  new_df = pd.DataFrame(data_for_new_df)

  # Reorder columns to have Ticker as the first column
  if 'Ticker' in new_df.columns:
    cols = ['Ticker'] + [col for col in new_df.columns if col != 'Ticker']
    new_df = new_df[cols]

  return new_df

In [None]:
def getBestResult(result_df, metric_to_optimize='F. LUCRO', period_to_optimize='5 ANOS'):
    return result_df.iloc[2].to_dict()

In [None]:
def selecionar_melhor_parametro_ponderado(df_resultados: pd.DataFrame) -> pd.DataFrame:
    """
    Calcula um Score de Qualidade ponderado, verificando os resultados de 5 ANOS,
    3 ANOS, 1 ANO e 6 MESES para determinar o melhor período de retenção (CANDLES).

    O Score Final é a soma ponderada dos Scores de Qualidade de cada período.

    :param df_resultados: DataFrame de resultados de otimização (MultiIndex).
    :return: Linha do DataFrame com o melhor Score de Qualidade Ponderado.
    """

    # --- DEFINIÇÃO DE PESOS E CONFIGURAÇÕES ---

    PERIODOS_E_PESOS = {
        "5 ANOS": 1.0,
        "3 ANOS": 0.8,
        "1 ANO": 0.6,
        "6 MESES": 0.4,
    }

    # 1. Preparação: Limpeza, normalização e cálculo do Score para CADA PERÍODO

    # Ensure index name is set for the score DataFrame
    df_scores = df_resultados.index.to_frame(name='CANDLES').copy()


    for periodo, peso in PERIODOS_E_PESOS.items():

        # Extrai as métricas
        qtde = df_resultados[(periodo, 'QTDE')]

        # Convert 'F. LUCRO' to numeric, coercing errors
        f_lucro = pd.to_numeric(df_resultados[(periodo, 'F. LUCRO')], errors='coerce')

        # Convert '%' to numeric (float between 0 and 1), coercing errors
        taxa_acerto_str = df_resultados[(periodo, '%')].astype(str).str.replace('%', '').str.replace(',', '.')
        taxa_acerto = pd.to_numeric(taxa_acerto_str, errors='coerce') / 100

        # Create a column for the Raw Score of this period
        coluna_score_bruto = f'SCORE_BRUTO_{periodo.replace(" ", "_")}'

        # Formula for Raw Score (Quality)
        # Score = (Win Rate * Profit Factor) * sqrt(Quantity of Trades)
        # Handle potential NaN values from coercion by filling with 0 for score calculation
        df_scores[coluna_score_bruto] = (taxa_acerto.fillna(0) * f_lucro.fillna(0) * np.sqrt(qtde.fillna(0)))

        # Apply the Weight to the Final Score
        df_scores[coluna_score_bruto] *= peso

    # 2. Calculation of FINAL SCORE (Weighted Sum)

    score_cols = [col for col in df_scores.columns if col.startswith('SCORE_BRUTO')]
    df_scores['SCORE_FINAL_PONDERADO'] = df_scores[score_cols].sum(axis=1)

    # 3. Selection of the Best Result

    melhor_parametro_index = df_scores['SCORE_FINAL_PONDERADO'].idxmax()
    print(melhor_parametro_index)

    # Return the final result, including the complete row from the original DF and the Final Score
    melhor_linha = df_resultados.loc[melhor_parametro_index].to_frame().T
    # Add the SCORE FINAL PONDERADO to the output DataFrame
    melhor_linha[('SCORE', 'FINAL')] = df_scores.loc[melhor_parametro_index, 'SCORE_FINAL_PONDERADO']

    return  df_resultados.iloc[melhor_parametro_index].to_dict()

# --- EXECUTION OF THE ALGORITHM WITH PROVIDED DATA ---
# (Reusing the original data structure)

periodos = ["5 ANOS", "3 ANOS", "1 ANO", "6 MESES"]
metricas = ["QTDE", "F. LUCRO", "%"]
colunas = pd.MultiIndex.from_product([periodos, metricas], names=['Período', 'Métrica'])
colunas = colunas.append(pd.MultiIndex.from_tuples([("CANDLES", "")]))


dados_raw = [
    [158, '1.39', '62.03%', 97, '1.13', '58.76%', 30, '1.56', '56.67%', 14, '1.08', '50.00%', 3],
    [148, '1.27', '68.24%', 91, '1.02', '62.64%', 28, '2.11', '67.86%', 14, '2.31', '64.29%', 4],
    [138, '1.25', '68.12%', 84, '1.02', '65.48%', 26, '2.83', '73.08%', 13, '3.87', '76.92%', 5],
    [134, '1.38', '67.16%', 81, '1.08', '64.20%', 26, '2.10', '69.23%', 13, '2.18', '76.92%', 6],
    [129, '1.27', '68.22%', 80, '1.14', '67.50%', 25, '1.91', '68.00%', 12, '1.80', '75.00%', 7],
    [126, '1.24', '67.46%', 77, '0.96', '64.94%', 25, '2.32', '68.00%', 12, '1.98', '75.00%', 8]
]

df_resultados_multi = pd.DataFrame(dados_raw, columns=colunas)
#df_resultados_multi = df_resultados_multi.set_index(('CANDLES', '')).rename_axis('CANDLES')
#df_resultados_multi.index = pd.Series([3, 4, 5, 6, 7, 8])

# --- FINAL EXECUTION ---

melhor_parametro_ponderado = selecionar_melhor_parametro_ponderado(df_resultados_multi)

print("--- Análise Algorítmica Ponderada (Incluindo 3 Anos, 1 Ano, 6 Meses) ---")
print("O melhor parâmetro de CANDLES (Período de Retenção) é:")
#print(f"CANDLES: {melhor_parametro_ponderado.index.values[0]}")
#print(f"Score Final Ponderado: {melhor_parametro_ponderado[('SCORE', 'FINAL')].values[0]:.4f}")
print("-" * 65)

print("\nLinha do Parâmetro Vencedor:")
# Rendering in Colab/Jupyter:
melhor_parametro_ponderado

2
--- Análise Algorítmica Ponderada (Incluindo 3 Anos, 1 Ano, 6 Meses) ---
O melhor parâmetro de CANDLES (Período de Retenção) é:
-----------------------------------------------------------------

Linha do Parâmetro Vencedor:


{('5 ANOS', 'QTDE'): 138,
 ('5 ANOS', 'F. LUCRO'): '1.25',
 ('5 ANOS', '%'): '68.12%',
 ('3 ANOS', 'QTDE'): 84,
 ('3 ANOS', 'F. LUCRO'): '1.02',
 ('3 ANOS', '%'): '65.48%',
 ('1 ANO', 'QTDE'): 26,
 ('1 ANO', 'F. LUCRO'): '2.83',
 ('1 ANO', '%'): '73.08%',
 ('6 MESES', 'QTDE'): 13,
 ('6 MESES', 'F. LUCRO'): '3.87',
 ('6 MESES', '%'): '76.92%',
 ('CANDLES', ''): 5}

In [None]:
melhor_parametro_ponderado

Unnamed: 0_level_0,5 ANOS,5 ANOS,5 ANOS,3 ANOS,3 ANOS,3 ANOS,1 ANO,1 ANO,1 ANO,6 MESES,6 MESES,6 MESES,CANDLES
Unnamed: 0_level_1,QTDE,F. LUCRO,%,QTDE,F. LUCRO,%,QTDE,F. LUCRO,%,QTDE,F. LUCRO,%,Unnamed: 13_level_1
2,138,1.25,68.12%,84,1.02,65.48%,26,2.83,73.08%,13,3.87,76.92%,5


In [None]:
# Remove the last column using column slicing
melhor_parametro_ponderado_cleaned = melhor_parametro_ponderado.iloc[:, :-1]

# Display the DataFrame without the last column
display(melhor_parametro_ponderado_cleaned)

Unnamed: 0_level_0,5 ANOS,5 ANOS,5 ANOS,3 ANOS,3 ANOS,3 ANOS,1 ANO,1 ANO,1 ANO,6 MESES,6 MESES,6 MESES
Unnamed: 0_level_1,QTDE,F. LUCRO,%,QTDE,F. LUCRO,%,QTDE,F. LUCRO,%,QTDE,F. LUCRO,%
2,138,1.25,68.12%,84,1.02,65.48%,26,2.83,73.08%,13,3.87,76.92%


In [None]:
def showAllResults(all_tickers_results):
# --- Display results for all tickers ---
  print("\n--- Backtest Results for All Tickers ---")
  for ticker, result in all_tickers_results.items():
      print(f"\nResults for {ticker}:")
      if isinstance(result, pd.DataFrame):
          display(result)
      else:
          print(result)

In [None]:
# -------------ANOTACOES -----------
# A QUANTIDADE DE TRADES PODE SEM 1 NUMERO MENOR QUE O PROFIT POIS O PROFIT CONTA OS TRADES NAO FINALIZADOS.

# -------------- TO DO -------------
# CONFERIR O CALCULO DO MAX DROPDOWN
# VALIDAR ALGORITIMO QUE DECIDE A MELHOR LINHA



TICKERS = ["MOTV3.SA","CPFE3.SA","ITUB4.SA"] # Changed to a list of tickers
RSI_PERIOD = 2
RSI_THRESHOLD = 25
MAX_HOLDING_DAYS_RANGE = range(3, 9) # Range of Max Holding Days to test
START_DATE = date.today() - pd.DateOffset(years=5)
DIAS_ANTERIORES = 30

all_tickers_results = {} # Dictionary to store results for each ticker

start_date_download = START_DATE - pd.DateOffset(days=DIAS_ANTERIORES)
print(f"realizando Backtest entre as datas {START_DATE.strftime("%d/%m/%Y")} e {date.today().strftime("%Y-%m-%d")}")

# --- 3. Obter Dados para todos os Tickers ---
try:
    print(f"Baixando dados para {TICKERS} desde {start_date_download}...")
    data = yf.download(TICKERS, start=start_date_download, end=date.today().strftime("%Y-%m-%d"), auto_adjust=True)

    if data.empty:
        raise ValueError(f"Não foi possível baixar dados para os tickers {TICKERS}. Verifique os símbolos.")

except Exception as e:
    print(f"ERRO: Não foi possível obter dados para os tickers {TICKERS}. Verifique sua conexão ou os tickers. Detalhe: {e}")



# --- Process data and perform backtest for each ticker ---
# Iterate through each ticker to process its data and run the backtest
for ticker in TICKERS:
    print(f"\n--- Processing Ticker: {ticker} ---")

    # Select the data for the current ticker from the multi-indexed DataFrame
    # Check if data was successfully downloaded and the ticker exists in the downloaded data
    if 'data' in locals() and not data.empty and ticker in data.columns.get_level_values(1):
        # Select all rows (:) and columns where the second level of the MultiIndex (ticker) matches the current ticker
        df = data.xs(ticker, level='Ticker', axis=1).copy()

        df = rsi2_mm3(df)

        # --- 4. Pré-processamento dos Dados ---
        # Remove linhas com valores nulos (início dos cálculos)
        df.dropna(inplace=True)

        trades_list_of_lists = [] # This will store trades for each maxHoldDay run for the current ticker

        # faz todos os testes para verificar qual a melhor quantidade de dias para levar a operacao
        for maxHoldDay in MAX_HOLDING_DAYS_RANGE:
            trades = [] # List to store trades for the current maxHoldDay
            in_trade = False
            entry_price = 0
            entry_date = None
            exit_target = 0
            holding_days = 0

            for i in range(len(df)):
                current_day = df.iloc[i]

                if current_day.name < START_DATE:
                    continue

                if not in_trade:
                    # Condição de Compra (Entrada): RSI(2) abaixo de 25

                    if current_day['RSI_2'].item() < RSI_THRESHOLD:
                        in_trade = True
                        entry_price = current_day['Close'].item()
                        entry_date = current_day.name
                        holding_days = 0


                else: # Está em um trade
                    holding_days += 1

                    exit_target= max(df.iloc[i-1]['High'].item(),df.iloc[i-2]['High'].item())

                    # 1. Condição de Venda (Take Profit): Preço atinge ou supera o alvo
                    if current_day['High'].item() >= exit_target:
                        exitTrade(trades,entry_date,current_day.name,entry_price,exit_target, maxHoldDay)
                        in_trade = False
                        continue

                    # 2. Condição de Venda (Time Stop): Fechamento do candle limite
                    if holding_days >= maxHoldDay:
                        exitTrade(trades,entry_date,current_day.name,entry_price,current_day['Close'].item(), maxHoldDay)
                        in_trade = False
                        continue


            # Case where the last trade was not closed for this maxHoldDay run
            if in_trade:
                #print(f"Attention: Last trade not finished for MAX_HOLDING_DAYS = {maxHoldDay} for ticker {ticker}. Ignored for metrics.")
                pass # Or handle as needed, e.g., close at the last price if desired


            # Append the trades list for the current maxHoldDay to the list of lists
            if trades:
                trades_list_of_lists.append(trades)


        # --- 6. Cálculo das Métricas de Desempenho e Geração da Tabela ---
        if trades_list_of_lists:
            # Pass the list of trades data to the gerar_tabela function and store in the dictionary
            all_tickers_results[ticker] = gerar_tabela(trades_list_of_lists)
        else:
            print(f"\nNenhum trade realizado para o ticker {ticker} no período de backtest com as condições especificadas para nenhum MAX_HOLDING_DAYS.")
            all_tickers_results[ticker] = "No trades found"
    else:
        # If data download failed for this specific ticker or ticker not found in downloaded data, skip processing
        if ticker not in all_tickers_results: # Avoid overwriting existing error message if download failed
             all_tickers_results[ticker] = "Ticker not found in downloaded data or data download failed"
        print(f"Skipping processing for {ticker} due to data issue.")

#showAllResults(all_tickers_results)
getTableAllResults(all_tickers_results)

[*********************100%***********************]  3 of 3 completed

realizando Backtest entre as datas 07/11/2020 e 2025-11-07
Baixando dados para ['MOTV3.SA', 'CPFE3.SA', 'ITUB4.SA'] desde 2020-10-08 00:00:00...

--- Processing Ticker: MOTV3.SA ---






--- Processing Ticker: CPFE3.SA ---

--- Processing Ticker: ITUB4.SA ---
3
4
1


Unnamed: 0,Ticker,"(5 ANOS, QTDE)","(5 ANOS, F. LUCRO)","(5 ANOS, %)","(3 ANOS, QTDE)","(3 ANOS, F. LUCRO)","(3 ANOS, %)","(1 ANO, QTDE)","(1 ANO, F. LUCRO)","(1 ANO, %)","(6 MESES, QTDE)","(6 MESES, F. LUCRO)","(6 MESES, %)","(CANDLES, )","(DROPDOWN MAX, )"
0,MOTV3,14,4.82,85.71%,14,4.82,85.71%,14,4.82,85.71%,13,4.58,84.62%,6,28.03%
1,CPFE3,134,2.08,73.13%,82,2.34,76.83%,28,4.37,82.14%,14,2.76,78.57%,7,209.17%
2,ITUB4,149,1.66,67.11%,87,1.71,68.97%,28,1.99,71.43%,13,1.45,76.92%,4,47.33%
