## ROBUSTA - 11.4.2 - Age of Triggers - Persistence


In [1]:
import yfinance
import requests
import matplotlib.pyplot
from bs4 import BeautifulSoup
import re
from numpy import percentile, sign, amax, amin, arange, where, random, log, exp, inf, where, select, sqrt, nan, mean, abs, linspace, argmax, argmin, zeros, maximum, isfinite, sign
import pandas
import pandas_ta
# Setup
pandas.set_option('future.no_silent_downcasting', True)
from pandas.tseries.offsets import BDay, MonthBegin
from datetime import datetime
from sklearn.linear_model import LinearRegression
import itertools
from tqdm import tqdm  # Biblioteca para mostrar uma barra de progresso
from icecream import ic
from scipy.stats import pearsonr

### História

In [2]:
# O retorno médio volta a ser um importante indicador de risco. 
# Apesar de ser difícil achar um indicador com correção positiva. Usar gatilhos para a escolha de indicadores aumenta os ganhos e diminui as perdas
# Além disso, confirmar algum rompimento por k dias pode aumentar a chance de descobrir qual indicador é mais potente nos mercados

# x.x.2 - Utiliza o 'Adj Close' do df exportado para todos os cálculos

### Versão 

In [3]:
# Versão da Robusta 

# 1 - 

versao = "11.4.2 - Age of Triggers - Persistence"

### Funções Gerais e Administrativas

In [4]:
def configurar_data(data_ajuste, ano=0, mes=0, dia=1):    

    adjusted_date = data_ajuste - pandas.DateOffset(years=ano, months=mes, days=dia)
    
    return adjusted_date


def ler_todos_tickers():    
    return pandas.read_excel('lista_tickers_corrigido.xlsx')    


def first_day_alert(date):
    """
    Função retorna true se hoje for o primeiro bussiness day
    """

    # Encontre o primeiro dia do mês atual
    primeiro_dia_do_mes = pandas.offsets.MonthBegin().rollback(hoje)

    # Vá para o próximo dia útil
    primeiro_dia_util = primeiro_dia_do_mes + BDay(0)

    # Verifique se a data atual é igual ao primeiro dia útil do mês
    if hoje == primeiro_dia_util:
        return True

    return False


def find_latest_market_days(total_days):
    """
    Lista as datas de operação do mercado.
    Importante para o backtest conseguir calcular corretamente 5, 10, 15 e 20 dias pois as datas de negociação não seguirem um padrão perfeitamente regular.
    """    
    # Baixando os dados do BOVA11
    ticker = yfinance.download('BOVA11.SA', progress=False, period='1y')

    # Obtendo as últimas 60 datas e convertendo para string
    latest_market_days = ticker.tail(total_days).index.strftime('%Y-%m-%d').tolist()    
    
    return latest_market_days


def alerta_final_do_mes():
    '''
    Final de mês tem oscilações técnicas de fluxo
    '''

    ultimo_dia_do_mes = hoje + pandas.tseries.offsets.MonthEnd(0)
    dois_dias_uteis_antes = ultimo_dia_do_mes - BDay(2)

    if hoje >= dois_dias_uteis_antes:
        
        return f'Final de Mês. Estresse na Posição'
    
    return False


def crie_variacao(stock_data, info_in, info_out): 
    '''
    Coloca a variação na coluna de nome info_out da coluna que recebe o argumento em info_in 
    
    '''        
                                 
    # adiciona coluna de variação
    stock_data[info_out] = stock_data[info_in].pct_change()
    
    # retorna o dataframe
    return stock_data


def crie_medias_moveis(stock_data, *args): #recebe o dataframe e as médias para cálculo 
    
    for i in args:
        
        # adiciona uma coluna extra no dataset com os dados da mma
        stock_data[f'MMA{i}'] = stock_data['Adj Close'].rolling(window=i).mean()  
        
    return stock_data
    

def release_date_alert(df):

      
    df['Alerta'] = df['Data de Divulgação de resultados'].apply(lambda x: 'Inverter posição. Publicação de Resultado' 
                                                if 0 < (x - hoje).days < 2 else f'Robust_{versao}')
    
    return df


def verticalize_preços_fechamento(df):
    """
    Cria colunas *_var_desloc* usando a média móvel de fechamento
    (SMA de `ma_window` dias, default = 14) para suavizar distorções.

    Para cada `d` em days_list:
        coluna 'Xd_var_desloc' = SMA14_t+X / SMA14_t  - 1
    """

    ma_window = 4
    
    # 1) SMA-14 do preço de fechamento
    sma = df['Adj Close'].rolling(ma_window).mean()

    # 2) Deslocamentos futuros baseados nessa média suavizada
    for days in days_list:
        col = f'{days}d_var_desloc'
        df[col] = (sma.shift(-days) / sma) - 1

    return df

### Todos os parâmetros

In [5]:
# Datas
hoje = pandas.Timestamp.now()
ic(hoje)

data_formatada = hoje.date()

# A data de início do download do dataframe
data_inicio = configurar_data(hoje, 5, 0, 0)

# Dias de backtest
total_days_backtest = 25 # um mês e um dia para evitar que faltem 20 registros a partir da última da de montagem realizada

# Encontrar a data de inicio
backtest_inicial = hoje - BDay(total_days_backtest + 1)

# Últimas 60 datas de negociação do mercado
latest_market_days = find_latest_market_days(60)
        
# Obtendo a data mais recente do df 
end_date = latest_market_days[-1]

# BACKTEST - Dias para os quais você deseja calcular as datas futuras
days_list = [10, 20, 30, 45, 90]

# Persistance - Manutenção da flag após k dias.
'''
df["break_n"] = (
    (df["Close"] > df["High"].rolling(20).max().shift(1))  # rompeu topo de 20 d
    .rolling(k).sum() == k                                 # fez isso k dias seguidos
)
'''
global k 
k = 10

# Ações
# Lista de tickers que darão problemas
probleminhas = []

probleminhas_temp = [] # Antes de mandar para o registro definitivo, cai aqui

# Setup técnico
# Listas de MMA (Médias móveis Aritméticas)
mma_list = [9, 26, 200]

# Lista de Momentum
# momentum_list = [30]

# Volatilidade Anualizada
mes_anualized_vol = ['30']

# Lista de ATRs
atr_windows = [5 , 14] 

# Tolerâncias
# de perda  
tolerancia_perda = 0.05
# Introduzir um fator de desvio para determinar sinais significativos
tolerancia_erro = 0.005

# Window de atr_stops e RSI (14 dias porque é o padrão de mercado)
stop_atr = 14
rsi_window = stop_atr

# Macro
# Lista
macro = ['^TNX', 'BOVA11.SA','SMAL11.SA'] 

# Setup
"""
Por que não usa só uma variável em macro e técnico? Porque o backtest fica muito grande e 
perde a função primordial, além de ficar muito pesado
"""
macro_mma_list = [10, 50] # Listas de mma
macro_momentum_list = [10, 20] # Lista de Momentum


# Strings de nomes e endereços
diretorio_arquivo = 'C:/Users/gioch/Desktop/Python/Acompanhamento do Mercado - Jupyter/'
# Nome que completará o arquivo
nome_arquivo = f'{diretorio_arquivo}Robust-{versao}-{data_formatada}' 
# URLs para informações fundamentalistas
url_release_dates = "https://www.empiricus.com.br/artigos/investimentos/agenda-de-resultados-4t23-divulgacao-calendario-temporada-balancos-4t2023-quarto-trimestre-2023/"
# https://www.moneytimes.com.br/agenda-de-resultados-do-4t23-veja-datas-e-o-que-esperar-dos-balancos-das-empresas-da-b3/

# Indicadores Financeiros
url_financials = f'https://www.fundamentus.com.br/detalhes.php?papel='

# DFs, Listas e Dicionários

# Cria dicionários para armazenar os dados já buscado
data_cache_macro_tickers = {}
# Datas de publicações de resultados
all_ticker_release_dates = None
# Lista para armazenar os resultados temporários
resultados_temp_backtest = []  

# Instruções
# Indicador para rodar o test comparativo de indicadores
test_indicadores = False

ic| hoje: Timestamp('2025-06-11 19:58:58.914194')


YF.download() has changed argument auto_adjust default to True


### Bloco Técnico

#### Cálculos indicadores   

In [6]:
def analisa_tecnicamente_cotacoes(ticker):
    """

    """ 
    
    stock_data = yfinance.download(
        ticker, 
        progress=False, 
        start=data_inicio,
        auto_adjust=False,
        multi_level_index=False)      

    
    # se o método download retornar vazio, encerrar a função
    if stock_data.empty:
        return False

    # Verificar se o volume médio dos últimos 30 dias é menor que 10.000
    if stock_data['Volume'].tail(30).mean().item() < 10000:
        return False
       
    else:
                       
        # adicionar coluna de variação do ticker
        stock_data = crie_variacao(stock_data, 'Adj Close','Return')
        
        # adicionar colunas de médias móveis aritméticas
        stock_data = crie_medias_moveis(stock_data, *mma_list)
        # stock_data = create_mma_position(stock_data, mma_list)       
        stock_data = create_mma_direction(stock_data, mma_list)       
                
        # Indicadores de volatilidade
        # stock_data = calcule_volatilidade_anualizada(stock_data, 30) # 30 porque são os dias que mais se assemelham  ao utilizado pelo mercado de derivativos 
        # stock_data = calculate_obv(stock_data)
        # stock_data = calcule_exaustao_atr(stock_data)
        # stock_data = calcule_vwap(stock_data)

        # Confirmadores
        # 1 - Volume
        # Window =20, 150% acima da média
        stock_data = alto_volume_diario(stock_data, 20, 1.5)
        # stock_data = alto_volume_diario(stock_data, 20, 2)
        # Movivento forte que faciita reversão   
        # defautlt -> def up_or_down_trend(df, trend_pct=0.03, trending_days=5): ->0.03
        # stock_data = up_or_down_trend(stock_data)
        # 10%
        # stock_data = up_or_down_trend(stock_data,0.1)
        # 20%
        # stock_data = up_or_down_trend(stock_data,0.20)
        # 30%
        # stock_data = up_or_down_trend(stock_data,0.30)

        # Indicadores Técnicos
        # stock_data = calculate_rsi(stock_data, 9)
        # stock_data = calculate_rsi(stock_data, 21)
        # stock_data = calculate_rsi(stock_data, 30)
        
        # Indicadores Candlestick
        # 1 - Martelo
        # def candlestick_martelo(df, wick_pct=2., head_pct=0.1):
        # stock_data = candlestick_martelo(stock_data) #default NÃO RETIRE ele é usado para combinar indicadores
        # stock_data = candlestick_martelo(stock_data, 2.5,0)
        # 2 - Engolfo
        # stock_data = candlestick_engolfo(stock_data)
              
        # Combinação de indicadores
        #stock_data = combine_indicadores(stock_data, 'Candlestick_martelo_2,0.1_?value', 'up_or_down_trend.25_?value')
        #stock_data = combine_indicadores(stock_data, 'Candlestick_martelo_2,0.1_?value', 'up_or_down_trend.03_?value')
        # DESLIG stock_data = combine_indicadores(stock_data, 'Candlestick_martelo_2,0.1_?value', 'Alto_volume_1_?value')
        # stock_data = combine_indicadores(stock_data, 'Alto_volume_1.5_?value', 'reversal_uptrend0.03_?value')
        # DESLIG stock_data = combine_indicadores(stock_data, 'reversal_uptrend0.03_?value', 'Candlestick_martelo_2,0.1_?value+Alto_volume_1_?value_combine_?value')
        # DESLIG stock_data = combine_indicadores(stock_data, 'Candlestick_engolfo_?value', 'Alto_volume_1_?value')

        
        
        
        
        """
        Voltará no futuro se achar que comparação com etf ajuda a entender o movimento técnico
                       
        # Colando os valores macro necessários para o backtest individual e adicionando info de variações
        stock_data = adicione_dados_outro_ticker(stock_data, data_cache_macro_tickers["BOVA11.SA"],'Close',"bova11")
        # adicionar colunas de médias móveis
        stock_data = crie_medias_moveis(stock_data, "BOVA11.SA", *macro_mme_list)
        stock_data = adicione_dados_outro_ticker(stock_data, data_cache_macro_tickers["SMAL11.SA"],'Close',"smal11")                                 
        # adicionar colunas de médias móveis
        stock_data = crie_medias_moveis(stock_data, "SMAL11.SA", *macro_mme_list)
        """ 

        return stock_data


In [7]:
def alto_volume_diario(df, volume_window=20, volume_multiplier=1.5):
    """
    Função que é combinada com outros indicadores, principalmente de candlestick de reversão
    Sozinho não é um indicadror relevante. 
    
    Retorna se o volume diário foi superior que a média.

    Combinado com o fechamento do dia. Se o volume foi superior e o sinal acompanha a variação do dia.
    
    -1 para variação negativa.
    +1 para positiva.   
    
    Análises de candle são mais eficientes se confirmarem bom volume de negociações no dia.
    
    """
    
    # Cálculo da média de volume para o período definido.
    df['Vol_media'] = df['Volume'].rolling(window=volume_window, min_periods=20).mean()

    df[f'Alto_volume_{volume_multiplier}_?value'] = where(df['Volume'] >= df['Vol_media'] * volume_multiplier * (1 - tolerancia_erro),
        where(df['Return'] < 0, -1, where(df['Return'] > 0, 1, 0)),
        0
    )       
    
    # Remove a coluna auxiliar de volume
    df.drop(columns=['Vol_media'], inplace=True)

    return df

def up_or_down_trend(df, trend_pct=0.03, trending_days=5):
    """
    Função que é combinada com outros indicadores, principalmente de candlestick de reversão
    Sozinho não é um indicadror relevante. 
    Famoso caiu porque subiu
    """

    # Inicializando as colunas
    df[f'reversal_uptrend{trend_pct}_?value'] = df[f'uptrend_following{trend_pct}_?value'] = 0
    df[f'reversal_downtrend{trend_pct}_?value'] = df[f'downtrend_following{trend_pct}_?value'] = 0

    
    for i in range(len(df)):

        close = df.iloc[i]['Adj Close']

        # Verifica a condição de tendência usando os valores de trending_days dias atrás.
        downtrend = False
        uptrend = False
        days = 0

        if i >= 1:

            # Primeiro descobre se rolou algum movimento significativo nos últimos trendig_days dias
            while days <= trending_days and not (downtrend or uptrend):
                close_days_ago = df.iloc[i - days]['Adj Close']
                
                # downtrend: fechamento até trend_pct abaixo
                if close <= (1 - trend_pct) * close_days_ago * (1 + tolerancia_erro):
                    downtrend = True
                # uptrend: fechamento até trend_pct acima
                elif close >= (1 + trend_pct) * close_days_ago * (1 - tolerancia_erro):
                    uptrend = True
                else:
                    days += 1
                    
        # Depois assinala no df. Botei separado, porque dentro do while iria ficar assinalando para todos os laços
        # Cria os dois indicadores para ver qual tem mais correlação. Se continua o movimento ou se reverte
        if downtrend:
            df.at[df.index[i], f'reversal_downtrend{trend_pct}_?value'] = 1
            df.at[df.index[i], f'downtrend_following{trend_pct}_?value'] = -1

        elif uptrend:
            df.at[df.index[i], f'reversal_uptrend{trend_pct}_?value'] = -1
            df.at[df.index[i], f'uptrend_following{trend_pct}_?value'] = 1

        # else permanece zero
    
    return df
    

def candlestick_martelo(df, wick_pct=2, head_pct=0.1):
    """
    Para cada candle (linha do DataFrame), verifica se o padrão corresponde a um martelo (hammer/takuri)
    e cria duas colunas:
      - "Candlestick_martelo": "Compra" se o candle com martelo surgir após uma tendência de baixa,
        "Venda" se o martelo ocorrer após uma tendência de alta e, caso contrário, "Neutro".
      - "sinal": 1 para sinal de compra, -1 para sinal de venda e NaN para os demais casos.
      
    Critérios:
      1. O candle deve ter uma sombra (inferior ou superior) >= 2.5 vezes o tamanho do corpo 
         e a sombra oposta deve ser irrisória (<= 0.3 vezes o corpo).
      2. Deve surgir após um movimento significativo:
         - Tendência de baixa: fechamento atual <= (downtrend_pct) × fechamento de 3 dias atrás.
         - Tendência de alta: fechamento atual >= 1.03 × fechamento de 3 dias atrás.
      3. O volume do candle deve ser maior que a média dos últimos 'volume_window' dias.
    """
    df = df.copy() # para não dar erro de modificar o df 

    for i in range(len(df)):
        # Valores do candle atual
        open_ = df.iloc[i]['Open']
        close = df.iloc[i]['Adj Close']
        high = df.iloc[i]['High']
        low = df.iloc[i]['Low']
        
        # Calcula o tamanho do corpo e das sombras
        body = abs(close - open_)
        total_range = high - low
        lower_shadow = min(open_, close) - low
        upper_shadow = high - max(open_, close)
        
        # Inicializa a condição de padrão de martelo
        forma_martelo_long = False
        forma_martelo_short = False
        
        if body > 0:
            # Martelo normal: longa sombra inferior e sombra superior pequena
            if lower_shadow >= wick_pct * body * (1 - tolerancia_erro) and upper_shadow <= head_pct * body * (1 + tolerancia_erro):
                forma_martelo_long = True
            # Martelo invertido: longa sombra superior e sombra inferior pequena
            elif upper_shadow >= wick_pct * body * (1 - tolerancia_erro) and lower_shadow <= head_pct * body * (1 + tolerancia_erro):
                forma_martelo_short = True        
        
        # Se o candle preenche o padrão de martelo e tem volume acima da média,
        # verifica a tendência para sinalizar:
        if forma_martelo_long:
            df.at[df.index[i], f'Candlestick_martelo_{wick_pct},{head_pct}_?value'] = 1
                
        elif forma_martelo_short:    
            df.at[df.index[i], f'Candlestick_martelo_{wick_pct},{head_pct}_?value'] = -1
            
        else:
            df.at[df.index[i], f'Candlestick_martelo_{wick_pct},{head_pct}_?value'] = 0
    
    return df


def candlestick_engolfo(df):
    """
    Para cada candle do DataFrame, identifica:
      - Engolfo de Alta (Bullish Engulfing) após queda → sinal +1
      - Engolfo de Baixa (Bearish Engulfing) após alta → sinal -1
    Insere na coluna 'Candlestick_engolfo' esses sinais, ou NaN caso contrário.
    
    Critérios para Engolfo de Alta (Bullish Engulfing) [Investopedia]:
      1. Candle anterior de baixa (Close < Open).
      2. Candle atual de alta (Close > Open).
      3. Abertura atual < Fechamento anterior E Fechamento atual > Abertura anterior  
         (“engulfa” o corpo do candle anterior) :contentReference[oaicite:0]{index=0}.
      4. Deve ocorrer após um movimento de queda de pelo menos `trend_pct` em até `trending_days` atrás.

    Critérios para Engolfo de Baixa (Bearish Engulfing) [Investopedia]:
      1. Candle anterior de alta (Close > Open).
      2. Candle atual de baixa (Close < Open).
      3. Abertura atual > Fechamento anterior E Fechamento atual < Abertura anterior  
         (“engulfa” o corpo do candle anterior) :contentReference[oaicite:1]{index=1}.
      4. Deve ocorrer após um movimento de alta de pelo menos `trend_pct` em até `trending_days` atrás.
    """
    df = df.copy()
    col = 'Candlestick_engolfo_?value'
    df[col] = 0

    for i in range(1, len(df)):
        
        prev_open  = df.iloc[i-1]['Open']
        prev_close = df.iloc[i-1]['Adj Close']
        curr_open  = df.iloc[i  ]['Open']
        curr_close = df.iloc[i  ]['Adj Close']

        # Detecta engolfo de alta
        is_bullish_engulfing = (
            prev_close < prev_open * (1 + tolerancia_erro) and           # anterior de baixa
            curr_close > curr_open * (1 - tolerancia_erro) and           # atual de alta
            curr_open  < prev_close * (1 + tolerancia_erro) and          # engolfa abertura anterior
            curr_close > prev_open * (1 - tolerancia_erro)               # engolfa fechamento anterior
        )

        # Detecta engolfo de baixa
        is_bearish_engulfing = (
            prev_close > prev_open and
            curr_close < curr_open and
            curr_open  > prev_close and
            curr_close < prev_open
        )
        

        # Sinaliza conforme padrão + tendência
        if is_bullish_engulfing:
            df.at[df.index[i], col] =  1
        elif is_bearish_engulfing:
            df.at[df.index[i], col] = -1
            
        # Caso contrário, permanece 0

    return df


def calcule_exaustao_atr(df, atr_period=14, multiplier=1.5):
    """
    Calcula o ATR (Average True Range) e verifica se o movimento (True Range) do candle atual
    é, pelo menos, 'multiplier' vezes maior que o ATR do dia anterior.
    
    Se sim, adiciona "1" na coluna "Exaustao ATR", caso contrário, NaN.
    
    Parâmetros:
      - atr_period: período em dias para cálculo da média (padrão 14).
      - multiplier: fator multiplicador para definir “movimento forçado” (padrão 1.5).
    """
    df = df.copy()
        
    # True Range (TR): a maior entre (High - Low), |High - prev_Close| e |Low - prev_Close|
    # Fechamento do dia anterior - df['Adj Close'].shift(1)
    df['TR'] = maximum(df['High'] - df['Low'],
               maximum(abs(df['High'] - df['Adj Close'].shift(1)), abs(df['Low'] - df['Adj Close'].shift(1))))
    
    # Cálculo do ATR: média móvel simples do TR, com janela definida por atr_period
    df['ATR'] = df['TR'].rolling(window=atr_period).mean()
    
    # Se o TR do candle atual for pelo menos "multiplier" vezes maior que o ATR do dia anterior,
    # então:
    # - Se df['Return'] for negativo, atribui -1 (movimento forçado de baixa).
    # - Se df['Return'] for positivo, atribui 1 (movimento forçado de alta).
    # Caso contrário, atribui 0.
    
    df['Exaustao ATR_?value'] = where(
        df['TR'] >= multiplier * df['ATR'].shift(1) * (1 - tolerancia_erro),
        where(df['Return'] < 0, -1, where(df['Return'] > 0, 1, 0)),
        0
    )

    return df


def create_mma_direction(stock_data, mma_list):
    """
    Função para adicionar _?value analisys ao df, além de distância do close price para a média móvel 200 dias
    
    """     
    
    # Cruzamentos de MMA com np.where
    '''
    stock_data[f'MMA9/MMA26_?value'] = where(
        (stock_data[f'MMA26'] < stock_data[f'MMA9'] * (1 + tolerancia_erro)) & stock_data[f'MMA26'].notna() & stock_data[f'MMA9'].notna(), 
        1, 
        where(
            (stock_data[f'MMA26'] > stock_data[f'MMA9'] * (1 - tolerancia_erro)) & stock_data[f'MMA26'].notna() & stock_data[f'MMA9'].notna(),
            -1,
            nan
        )
    )
    '''
    for deno in mma_list: # DENOminador
        for num in mma_list: # Numerador
            if num < deno:                
                stock_data[f'MMA{num}/MMA{deno}_?value'] = where(
                    (stock_data[f'MMA{deno}'] < stock_data[f'MMA{num}'] * (1 + tolerancia_erro)) & stock_data[f'MMA{deno}'].notna() & stock_data[f'MMA{num}'].notna(), 
                    1, 
                    where(
                        (stock_data[f'MMA{deno}'] > stock_data[f'MMA{num}'] * (1 - tolerancia_erro)) & stock_data[f'MMA{deno}'].notna() & stock_data[f'MMA{num}'].notna(),
                        -1,
                        nan
                    )
                )
        
    # MA e Preço
    for mma in mma_list:
        
        stock_data[f'Price_MMA{mma}_?value'] = where(
            (stock_data[f'MMA{mma}'] < stock_data['Adj Close'] * (1 + tolerancia_erro)) & stock_data[f'MMA{mma}'].notna() & stock_data['Adj Close'].notna(), 
            1, 
            where(
                (stock_data[f'MMA{mma}'] > stock_data['Adj Close'] * (1 - tolerancia_erro)) & stock_data[f'MMA{mma}'].notna() & stock_data['Adj Close'].notna(),
                -1,
                nan
            )
        )
            
    return stock_data


def create_mma_position(df, mma_list):
    """
    Função para controlar o momento do papel, de curto, médio e longos prazos.
    Se ele está acima ou abaixo da mma. Combinado com outros indicadores, qual tem melhor correlação. 
    
    """       
    
    # Position
    for mma in mma_list:

        # Cria variáveis 
        df[f'Position_mma_over{mma}_?value'] = df[f'Position_mma_bellow{mma}_?value'] = 0
        
        df[f'Position_mma_over{mma}_?value'] = where(
            (df[f'MMA{mma}'] < df['Adj Close'] * (1 + tolerancia_erro)) & df[f'MMA{mma}'].notna() & df['Adj Close'].notna(), 
            1, 
            where(
                (df[f'MMA{mma}'] > df['Adj Close'] * (1 - tolerancia_erro)) & df[f'MMA{mma}'].notna() & df['Adj Close'].notna(),
                -1,
                0
            )
        )

        # Inverte o indicador
        df[f'Position_mma_bellow{mma}_?value'] = where(df[f'Position_mma_over{mma}_?value'] == 1, -1,
            where(df[f'Position_mma_over{mma}_?value'] == -1, 1, 0))
                                                       
            
    return df

def calculate_obv(stock_data):
    """
    """
    
    # Inicializando a lista OBV com o primeiro valor como 0
    obv = [0]
    
    # Iterando sobre os preços de fechamento e volumes
    for i in range(1, len(stock_data['Adj Close'])):
        if stock_data['Adj Close'].iloc[i] > stock_data['Adj Close'].iloc[i-1]:
            # Se o preço de fechamento atual é maior que o anterior, adicionar o volume
            obv.append(obv[-1] + stock_data['Volume'].iloc[i])
        elif stock_data['Adj Close'].iloc[i] < stock_data['Adj Close'].iloc[i-1]:
            # Se o preço de fechamento atual é menor que o anterior, subtrair o volume
            obv.append(obv[-1] - stock_data['Volume'].iloc[i])
        else:
            # Se o preço de fechamento é igual ao anterior, OBV não muda
            obv.append(obv[-1])
            
    # Adicionar a coluna OBV ao DataFrame
    stock_data['OBV'] = obv
    
    # Calcular o OBV para todas as mma_list
    # Fiz o código para calcular obv para toda a mma. Mas acho que em se tratando de volume, não faz sentido olhar obv_MA_50/200days
    for window in [10, 20]: #mma_list: 
        
        stock_data[f'OBV_MA_{window}days'] = stock_data['OBV'].rolling(window=window).mean()
    
        # Atribuindo value
        stock_data[f'OBV_{window}d_?value'] = [1 if stock_data['OBV'].iloc[i] > stock_data[f'OBV_MA_{window}days'].iloc[i] else -1 for i in range(len(stock_data))]
    
        # Apagando colunas desnecessárias
        stock_data.drop(columns=[f'OBV_MA_{window}days'], inplace=True)
    
    return stock_data

def adicione_dados_outro_ticker(stock_data, df_added, col_name, col_rename="Nova coluna"):
    """
    Função que faz merge de uma coluna do df_added ao stock_data
    
    Col_name = Nome da coluna que entrará no merge
    col_rename = Quase sempre, para evitar conflito, será necessário mudar o nome da coluna
    
    """
    # Isolando a coluna que irá para o merge
    df_added = df_added[[col_name]]

    # trocando o nome da coluna para o merge não ficar com nomes em duplicidade
    df_added_col_renamed = df_added.rename(columns={col_name: col_rename})    
    
    # Realizando o merge com base na coluna 'Date'
    stock_data = pandas.merge(stock_data, df_added_col_renamed, on='Date', how='outer')
    
    return stock_data


def calcule_vwap(dados):
    
    # Calcular o VWAP
    dados['VWAP'] = (dados['Adj Close'] * dados['Volume']).cumsum() / dados['Volume'].cumsum()

    # Sinal de compra/venda baseado na comparação do preço de fechamento com o VWAP
    dados['Sinal_Num'] = where(dados['Adj Close'] > dados['VWAP'] * (1 + tolerancia_erro), 1, 
                              where(dados['Adj Close'] < dados['VWAP'] * (1 + tolerancia_erro), -1, 0))
  
    # Calcular a soma em uma janela deslizante de 20 dias
    for window in [20]: 
        dados[f'Soma_Sinal_{window}d'] = dados['Sinal_Num'].rolling(window=window, min_periods=1).sum()
    
        # Determinar o sinal baseado na soma: Se a maioria é compra (>0), venda (<0), ou neutro (==0)
        dados[f'VWAP_Sinal_{window}d_?value'] = select(
            [
                dados[f'Soma_Sinal_{window}d'] > 0, 
                dados[f'Soma_Sinal_{window}d'] <= 0
            ], 
            [1,-1])
    
        # Apagar coluna desnecessária
        dados.drop(columns=[f'Soma_Sinal_{window}d'], inplace=True)

    # Apagar coluna desnecessária
    dados.drop(columns=['Sinal_Num'], inplace=True)
        
    return dados


def calculate_rsi(dados, rsi_window=14):
    """
    Horrível
    corr de -0,004 a -0,07
    """
    delta = dados['Adj Close'].diff()
    gain = delta.where(delta > 0, 0)
    loss = -delta.where(delta < 0, 0)

    avg_gain = gain.rolling(window=rsi_window, min_periods=1).mean()
    avg_loss = loss.rolling(window=rsi_window, min_periods=1).mean()

    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))

    dados[f'RSI_{rsi_window}days'] = rsi

    # Atribuindo _?value
    dados[f'RSI_{rsi_window}days_?value'] = dados[f'RSI_{rsi_window}days'].apply(lambda x: 1 if x <= 30 else -1 )    
    
    return dados

def calculate_stocastic(dados):
    '''
    '''
   
    # Cálculo do Estocástico Lento (14, 3, 3)
    estocastico = pandas_ta.stoch(dados['High'], dados['Low'], dados['Adj Close'], k=14, d=3, smooth_k=3)
    dados['SlowK'] = estocastico['STOCHk_14_3_3']
    dados['SlowD'] = estocastico['STOCHd_14_3_3']

    # Inicializar coluna de sinais de compra/venda
    dados['estocastico_?value'] = 0

    # Condições para sinais de compra e venda
    for i in range(1, len(dados)):
        # Sinal de Compra: %K cruza acima de %D e ambos estão abaixo de 20
        if dados['SlowK'].iloc[i-1] < dados['SlowD'].iloc[i-1] and dados['SlowK'].iloc[i] > dados['SlowD'].iloc[i] and dados['SlowK'].iloc[i] < 20:
            dados.loc[dados.index[i], 'estocastico_?value'] = 1
        
        # Sinal de Venda: %K cruza abaixo de %D e ambos estão acima de 80
        elif dados['SlowK'].iloc[i-1] > dados['SlowD'].iloc[i-1] and dados['SlowK'].iloc[i] < dados['SlowD'].iloc[i] and dados['SlowK'].iloc[i] > 80:
            dados.loc[dados.index[i], 'estocastico_?value'] = -1

    return dados

def adx_prblc_sar_trigger_strtg(dados):
    """
    7. ADX + Parabolic SAR
    Trigger de Compra: Se o ADX (Índice de Direcional Médio) está acima de 25, e o Parabolic SAR gera um ponto abaixo do preço, é uma indicação de tendência de alta forte.
    Trigger de Venda: Se o ADX está acima de 25 e o Parabolic SAR gera um ponto acima do preço, sinaliza uma tendência de baixa forte.
    Combinação de Indicadores:
    ADX (14 períodos)
    Parabolic SAR (configurações padrão)
    """
    # Cálculo do ADX (14 períodos)
    adx = pandas_ta.adx(dados['High'], dados['Low'], dados['Adj Close'], length=14)
    dados['ADX'] = adx['ADX_14']

    # Cálculo do Parabolic SAR
    psar = pandas_ta.psar(dados['High'], dados['Low'], dados['Adj Close'])
    dados['SAR'] = psar['PSARl_0.02_0.2']  # "l" de "lower" para valores de suporte

    # Inicializar a coluna 'signal' para guardar os sinais de compra/venda
    dados['adx_sar_?value'] = 0

    # Condições para sinais de compra e venda
    for i in range(1, len(dados)):
        # Sinal de Compra: Preço cruza acima do Parabolic SAR e ADX maior que 25
        if dados['Adj Close'].iloc[i] > dados['SAR'].iloc[i] and dados['ADX'].iloc[i] > 25:
            dados.loc[dados.index[i], 'adx_sar_?value'] = 1
        
        # Sinal de Venda: Preço cruza abaixo do Parabolic SAR e ADX maior que 25
        elif dados['Adj Close'].iloc[i] < dados['SAR'].iloc[i] and dados['ADX'].iloc[i] > 25:
            dados.loc[dados.index[i], 'adx_sar_?value'] = -1

    return dados

def calculate_ichimoku(data):
    """
    Função para calcular os componentes da Nuvem Ichimoku e os sinais de compra/venda.
    """
    # Calcular a Linha de Conversão (Tenkan-sen) para cada linha do DataFrame
    data['Tenkan_sen'] = (data['High'].rolling(window=9).max() + data['Low'].rolling(window=9).min()) / 2

    # Calcular a Linha de Base (Kijun-sen) para cada linha do DataFrame
    data['Kijun_sen'] = (data['High'].rolling(window=26).max() + data['Low'].rolling(window=26).min()) / 2

    # Calcular Senkou Span A (primeiro componente da nuvem) e deslocar 26 períodos para frente
    data['Senkou_span_A'] = ((data['Tenkan_sen'] + data['Kijun_sen']) / 2).shift(26)

    # Calcular Senkou Span B (segundo componente da nuvem) e deslocar 26 períodos para frente
    data['Senkou_span_B'] = (data['High'].rolling(window=52).max() + data['Low'].rolling(window=52).min()) / 2
    data['Senkou_span_B'] = data['Senkou_span_B'].shift(26)

    # Calcular a Linha Chikou (Lagging Span) e deslocar 26 períodos para trás
    data['Chikou_span'] = data['Adj Close'].shift(-26)

    # Inicializar a coluna 'ichimoku_signal' com 0 (Hold)
    data['ichimoku_?value'] = 0

    # Sinal de Compra: Preço de fechamento acima da nuvem e Tenkan-sen acima da Kijun-sen
    data['ichimoku_?value'] = where(
        (data['Adj Close'] > data[['Senkou_span_A', 'Senkou_span_B']].max(axis=1)) & 
        (data['Tenkan_sen'] > data['Kijun_sen']), 
        1, 
        data['ichimoku_?value']
    )

    # Sinal de Venda: Preço de fechamento abaixo da nuvem e Tenkan-sen abaixo da Kijun-sen
    data['ichimoku_?value'] = where(
        (data['Adj Close'] < data[['Senkou_span_A', 'Senkou_span_B']].min(axis=1)) & 
        (data['Tenkan_sen'] < data['Kijun_sen']), 
        -1, 
        data['ichimoku_?value']
    )

    return data
    

def calcule_volatilidade_anualizada(dados, vol_window):
    """
    Calcula a volatilidade anualizada com uma janela móvel para os retornos logarítmicos.

    Args:
    dados (DataFrame): DataFrame contendo os preços com uma coluna 'Return' para os retornos diários logarítmicos.
    vol_window (int): Janela de tempo em dias para o cálculo da volatilidade móvel.

    Returns:
    DataFrame: O mesmo DataFrame de entrada com uma nova coluna adicionada para a volatilidade anualizada.
    """    
    
    # Volatilidade anualizada para comparação com outros ativos
    dados[f'vol_anualized_{vol_window}days'] = dados['Return'].rolling(window=vol_window, min_periods=1).std() * sqrt(252)
    
    # Calculando média e std da volatilidade para saber quando o ticker está mudando de patamar de risco 
    # Garantindo que só calculamos média e std se houver dados suficientes
    dados['media_vol_200d'] = where(dados['Return'].rolling(window=200, min_periods=200).count() >= 200,
                                       dados[f'vol_anualized_{vol_window}days'].rolling(window=200).mean(),
                                       0)
    dados['std_vol_200d'] = where(dados['Return'].rolling(window=200, min_periods=200).count() >= 200,
                                     dados[f'vol_anualized_{vol_window}days'].rolling(window=200).std(),
                                     0)
    
    # Analisando a variação de volatilidade do ativo para saber se o risco está aumentando
    dados['change_in_volatility_?value'] = where(dados['media_vol_200d'].notna() & dados['std_vol_200d'].notna(),
                                           where(dados[f'vol_anualized_{vol_window}days'] > dados['media_vol_200d'] + dados['std_vol_200d'], 1,
                                                    where(dados[f'vol_anualized_{vol_window}days'] < dados['media_vol_200d'] - dados['std_vol_200d'], -1, 0)),
                                           0)
    
        
    dados.drop(columns=['media_vol_200d','std_vol_200d'], inplace=True)
  
    return dados

def calculate_cci(df, window=20):
    """
    Calcula o Commodity Channel Index (CCI) para os preços fornecidos e gera sinais de negociação.
    
    Parâmetros:
    - df: DataFrame contendo os preços com colunas ['High', 'Low', 'Adj Close'].
    - window: O período para calcular o CCI. Padrão é 20.

    Retorna:
    - DataFrame com a coluna 'CCI' e 'CCI_Signal' (1 para compra, -1 para venda, 0 para neutro).
    """
    # Calcular o Typical Price (Preço Típico)
    df['Typical_Price'] = (df['High'] + df['Low'] + df['Adj Close']) / 3

    # Calcular a Média Móvel do Preço Típico
    df['TP_MA'] = df['Typical_Price'].rolling(window=window).mean()

    # Calcular o Desvio Médio do Preço Típico
    df['TP_Deviation'] = df['Typical_Price'].rolling(window=window).apply(lambda x: mean(abs(x - mean(x))), raw=True)

    # Calcular o CCI
    df['CCI'] = (df['Typical_Price'] - df['TP_MA']) / (0.015 * df['TP_Deviation'])

    # Inicializar a coluna de sinal como 0
    df['CCI_?value'] = 0

    # Sinal de Compra: CCI cruza acima de -100
    df['CCI_?value'] = where(df['CCI'] > -100, 1, df['CCI_?value'])

    # Sinal de Venda: CCI cruza abaixo de 100
    df['CCI_?value'] = where(df['CCI'] < 100, -1, df['CCI_?value'])

    # Limpar colunas temporárias
    df.drop(['Typical_Price', 'TP_MA', 'TP_Deviation'], axis=1, inplace=True)

    return df

def bollinger_keltner_squeeze(df, window=20):
    """
    Calcula o Bollinger Squeeze usando Bandas de Bollinger e Canais de Keltner.
    
    Parâmetros:
    - df: DataFrame contendo os preços com a coluna 'Adj Close'.
    - window: O período para calcular as Bandas de Bollinger e os Canais de Keltner. Padrão é 20.

    Retorna:
    - DataFrame com colunas 'Bollinger_Squeeze' para sinais de negociação: 
      1 para compra, -1 para venda, e 0 para neutro.
    """
    # Calcular a Média Móvel Simples (SMA) e o Desvio Padrão para as Bandas de Bollinger
    df['SMA'] = df['Adj Close'].rolling(window=window).mean()
    df['STD'] = df['Adj Close'].rolling(window=window).std()
    
    # Calcular as Bandas de Bollinger
    df['Bollinger_Upper'] = df['SMA'] + 2 * df['STD']
    df['Bollinger_Lower'] = df['SMA'] - 2 * df['STD']
    
    # Calcular a Média Móvel Exponencial (EMA) para os Canais de Keltner
    df['EMA'] = df['Adj Close'].ewm(span=window, adjust=False).mean()
    
    # Calcular o True Range e o ATR para os Canais de Keltner
    df['High_Low'] = df['High'] - df['Low']
    df['High_Close'] = abs(df['High'] - df['Adj Close'].shift(1))
    df['Low_Close'] = abs(df['Low'] - df['Adj Close'].shift(1))
    df['True_Range'] = df[['High_Low', 'High_Close', 'Low_Close']].max(axis=1)
    df['ATR'] = df['True_Range'].rolling(window=window).mean()
    
    # Calcular os Canais de Keltner
    df['Keltner_Upper'] = df['EMA'] + 1.5 * df['ATR']
    df['Keltner_Lower'] = df['EMA'] - 1.5 * df['ATR']
    
    # Calcular o Bollinger Squeeze
    df['Squeeze'] = ((df['Bollinger_Upper'] < df['Keltner_Upper']) & 
                     (df['Bollinger_Lower'] > df['Keltner_Lower'])).astype(int)
    
    # Inicializar a coluna de sinal como 0
    df['Bollinger_Squeeze_?value'] = 0

    # Sinal de Compra: O squeeze está presente e o preço rompe acima da banda superior de Bollinger
    df['Bollinger_Squeeze_?value'] = where(
        (df['Squeeze'] == 1) & (df['Adj Close'] > df['Bollinger_Upper']), 
        1, 
        df['Bollinger_Squeeze_?value']
    )

    # Sinal de Venda: O squeeze está presente e o preço rompe abaixo da banda inferior de Bollinger
    df['Bollinger_Squeeze_?value'] = where(
        (df['Squeeze'] == 1) & (df['Adj Close'] < df['Bollinger_Lower']), 
        -1, 
        df['Bollinger_Squeeze_?value']
    )

    # Limpar colunas temporárias
    df.drop(['SMA', 'STD', 'Bollinger_Upper', 'Bollinger_Lower', 'EMA', 
             'High_Low', 'High_Close', 'Low_Close', 'True_Range', 'ATR', 
             'Keltner_Upper', 'Keltner_Lower', 'Squeeze'], axis=1, inplace=True)

    return df

def calculate_volume_profile(df, period=200, num_bins=40):
    """
    Calcula o Volume Profile para um período específico e gera sinais de compra/venda.
    
    Parâmetros:
    - df: DataFrame contendo os preços com as colunas 'High', 'Low', 'Adj Close', e 'Volume'.
    - period: O período para análise de swing trade (em dias). Padrão é 30 dias.
    - num_bins: Número de faixas de preço (bins) para calcular o volume profile. Padrão é 20.

    Retorna:
    - DataFrame com colunas 'Volume_Profile_Signal' contendo 1 (compra), -1 (venda), ou 0 (neutro).
    """
    # Inicializar listas para armazenar hvn_price e lvn_price
    hvn_prices = [nan] * len(df)
    lvn_prices = [nan] * len(df)

    # Inicializar a coluna de sinais
    df['hvn_price'] = 0
    df['lvn_price'] = 0
    df['Volume_Profile_?value'] = 0

    # Iterar sobre o DataFrame para calcular o Volume Profile em cada janela de tempo de 'period' dias
    for i in range(period, len(df)):
        # Selecionar a janela de tempo para a análise
        data_slice = df.iloc[i-period:i]
        
        # Calcular o preço máximo e mínimo no período
        price_min = data_slice['Low'].min()
        price_max = data_slice['High'].max()
        
        # Criar bins de preço para o volume profile
        price_bins = linspace(price_min, price_max, num_bins)
        
        # Inicializar o volume profile
        volume_profile = zeros(len(price_bins) - 1)
        
        # Calcular o volume profile
        for j in range(len(price_bins) - 1):
            # Volume total para cada bin de preço
            volume_profile[j] = data_slice[(data_slice['Adj Close'] >= price_bins[j]) & (data_slice['Adj Close'] < price_bins[j+1])]['Volume'].sum()
        
        # Encontrar os High Volume Node (HVN) e Low Volume Node (LVN)
        hvn_index = argmax(volume_profile)
        lvn_index = argmin(volume_profile)
        
        hvn_price = (price_bins[hvn_index] + price_bins[hvn_index + 1]) / 2
        lvn_price = (price_bins[lvn_index] + price_bins[lvn_index + 1]) / 2

        # Armazenar os valores de hvn_price e lvn_price nas listas
        hvn_prices[i] = hvn_price
        lvn_prices[i] = lvn_price

    # Adicionar as listas como novas colunas no DataFrame
    df['hvn_price'] = hvn_prices
    df['lvn_price'] = lvn_prices

    # Usar where para criar os sinais de compra e venda
    df['Volume_Profile_?value'] = where(
        (df['Adj Close'] <= df['hvn_price'] * 1.02) & (df['Adj Close'] >= df['hvn_price'] * 0.98), 
        1, 
        df['Volume_Profile_?value']
    )

    df['Volume_Profile_?value'] = where(
        (df['Adj Close'] <= df['lvn_price'] * 1.02) & (df['Adj Close'] >= df['lvn_price'] * 0.98), 
        -1, 
        df['Volume_Profile_?value']
    )

    return df

#### Screeener


In [8]:
def combine_indicadores(df, col1: str, col2: str):
    """
    Cria uma coluna que é a soma de duas colunas (col1 e col2),
    nomeada como "col1+col2_combine_?value". Em seguida, transforma
    todos os 0 em NaN e todos os 2 em 1, percorrendo com um laço.

    Parâmetros:
    -----------
    df : pd.DataFrame
        DataFrame de entrada.
    col1 : str
        Nome da primeira coluna.
    col2 : str
        Nome da segunda coluna.

    Retorna:
    --------
    pd.DataFrame
        Mesmo DataFrame de entrada, com a coluna combinada adicionada.
    """

    # Define o nome da nova coluna
    new_col = f"{col1}+{col2}_combine_?value"

    # Soma as duas colunas
    df[new_col] = df[col1] + df[col2]

    # Loop para ajustar valores
    for idx in df.index:
        val = df.at[idx, new_col]
        if val in (1, -1, 0):
            df.at[idx, new_col] = 0
        elif val == 2:
            df.at[idx, new_col] = 1
        elif val == -2:
            df.at[idx, new_col] = -1
    return df


def adicione_variacoes_dataframe(df, days_var_list, operarion_days_per_var_days=3):
    """
    operarion_days_per_var_days é a quantidade de dias que permite por Var_X_days.
    Exemplo: as últimas 5 operações com variação de 3 dias 
    
    """
    
    # Adicionando novas colunas para armazenar as variações percentuais
    for days in days_var_list:
        df[f'Var_{days}d'] = None
                           
    # Iterando sobre o DataFrame e calculando as variações percentuais
    for idx, row in df.iterrows():       

        operation_date = str(row['Date']) # O objeto é pandas.datetime. Se não mudar para string a função latest_market_days volta vazia
        closing_value = row['Fechamento']
        ticker = row['Ticker'] + ".SA" #Os dados em data_cache_backtest estão com .SA

        for days in days_var_list:
            
            if operation_date >= latest_market_days[- days - operarion_days_per_var_days]:

                # Encontrando a data futura
                try:
                    future_date_str = latest_market_days[latest_market_days.index(operation_date) + days]              
                    future_date = pandas.to_datetime(future_date_str) # devolve a data para o objeto datetime            

                except (ValueError, IndexError):
                    # Se a data futura não estiver disponível, continue para a próxima iteração
                    continue

                # Encontrando o valor de fechamento futuro
                try:             
                    future_closing_value = data_cache_backtest[ticker].loc[future_date, 'Adj Close']

                except (KeyError, IndexError): # Quando você roda o robo durante a operação, algumas vezes os dados de hoje não estão disponíveis
                    print(f'Dados de {ticker} em {future_date.date()} não disponíveis' )
                    continue

                # Calculando a variação percentual e armazenando no DataFrame
                variation = ((future_closing_value - closing_value) / closing_value) * 100
                df.at[idx, f'Var_{days}d'] = variation


    # Deletando as colunas desnecessárias. Deixam o df muito pesado
    for col in df.columns:

        if col in latest_market_days:

            df.drop(col, axis=1, inplace=True)
            
    return df


def screener(lista):
    '''
    Parametros: Lista de tickers
                Informação se o dataframe será do dia atual ou para backtest
                    
    Return: carteira_analise_todos_tickers -> dfs com a soma de todas as últimas linhas de cada extração
            absolut_todas_montagens_indicadores_analises -> Todos os dfs somados
    '''    
    
    # Inicializando variváveis 
    carteira_analise_todos_tickers = pandas.DataFrame()
    absolut_todas_montagens_indicadores_analises = pandas.DataFrame()
    
    for values in tqdm(lista, desc="Processando Tickers"):

        # filtro para quando o papel está com algum erro de dados na Yahoo Finance
        if values not in probleminhas:

            values = f'{values}.SA' # Para atender ao código do ticker no Yfinance

            try:
    
                fechamento_com_todos_indicadores_tecnicos = analisa_tecnicamente_cotacoes(values)

    
                if not fechamento_com_todos_indicadores_tecnicos.empty:
                    
                    # Adicionando o ticker às colunas
                    fechamento_com_todos_indicadores_tecnicos.insert(0, 'Ticker', values[:-3])    
                    # Movendo o index "data" para coluna. O Index será ignorado e zerado a seguir
                    # Importante porque o backtest usa o método loc, que só funciona com indice em interger
                    fechamento_com_todos_indicadores_tecnicos = fechamento_com_todos_indicadores_tecnicos.reset_index()
                    # Adicionando dados de retorno da operação para days_list            
                    fechamento_com_todos_indicadores_tecnicos_valorizacao = verticalize_preços_fechamento(fechamento_com_todos_indicadores_tecnicos)
                    fechamento_com_todos_indicadores_tecnicos_valorizacao_avaliacao = avalie_alta_baixa(fechamento_com_todos_indicadores_tecnicos_valorizacao, days_list)

            except:   
                   
                print(f'Erro ao calcular {values}.')                
                if values not in probleminhas_temp:
                    probleminhas_temp.append(values)
                    continue
            
            # Adiciona as flags de gatilhos
            fechamento_com_todos_indicadores_tecnicos_valorizacao_avaliacao_gatilhos = encontre_gatilhos_persistences(fechamento_com_todos_indicadores_tecnicos_valorizacao_avaliacao, k)
            
            if test_indicadores:
                # INTEGRAÇÃO DOS DFS DE TICKERS
                absolut_todas_montagens_indicadores_analises = pandas.concat([absolut_todas_montagens_indicadores_analises, fechamento_com_todos_indicadores_tecnicos_valorizacao_avaliacao_gatilhos], ignore_index=True) 


                    
            # EXPORTAÇÕES 
            if len(lista) <=2: # Aciona a exportação só se a lista de tickers é pequena
                autoriza_exportar = input(f'Exportar arquivo?') # INPUT SERVE SÓ PARA TRAVAR UM ERRO DE BLOQUEIO E CÓDIGO NÃO EXPORTAR 300 DF
                fechamento_com_todos_indicadores_tecnicos_valorizacao_avaliacao_gatilhos.to_excel(f'{values}.xlsx')
            
            # Selecionar a última linha e adicionar a coluna "Ticker"
            ultima_linha = fechamento_com_todos_indicadores_tecnicos_valorizacao_avaliacao_gatilhos.iloc[[-1]].copy()
            
            # Gerando df com todas as montagens e adicionando a última linha de cada ticker
            if carteira_analise_todos_tickers.empty:
                
                carteira_analise_todos_tickers = ultima_linha # Empacotando como DataFrame
            
            else:
                
                carteira_analise_todos_tickers = pandas.concat([carteira_analise_todos_tickers, ultima_linha], ignore_index=True)  # Passando uma lista de DataFrames                    
            
    # Se estamos testando indicadores, retorna o df com todos os as montagens.
    if test_indicadores:
        return carteira_analise_todos_tickers, absolut_todas_montagens_indicadores_analises
    
    else:
    
        return carteira_analise_todos_tickers
   

## Funções do Backtest

In [9]:
def avalie_alta_baixa(df, days_list):
    """
    Converte o valor percentual do retorno da operação em uma informação de alta ou baixa.
    Ex. A variação em 10 dias foi 0.09. Vai virar 1. A variação de 60 dias foi -0.53. Vira -1.
    """
    for days in days_list:
    
        df[f'{days}d_?alta'] = where(df[f'{days}d_var_desloc'] > 0, 1, -1)

    return df


def encontre_gatilhos_persistences(df, k):
    """
    Procura em toda coluna _?value uma mudança de valor. A mudança de valor é um gatilho
    Ex. cruzamento de mmas estava 0 ontem, hoje 1. Então a coluna _?gatilho será 1
    """

    # 1) Gera todas as séries de gatilho em dict
    new_cols = {}
    #persistence_ks = 3
    persistence_ks = range(3, 4)
    delay_days= (2,3,4,5)
    
    for col in df.columns:
        if not col.endswith('_?value'):
            continue
        
        # Caso especial: colunas de Position. Exemplo - todas os fechamentos com close acima da mma50
        if 'Position' in col:
            gat_col = col.replace('_?value', '_position_not_?gatilho')
            # atribui cópia inteira da série original
            new_cols[gat_col] = df[col].copy()  
        
        # Caso padrão: sinaliza mudança de valor
        else:
            gat_col = col.replace('_?value', '_?gatilho')
            delta = df[col].diff()
            gat = sign(delta).fillna(0).astype(int)
            gat[df[col] == 0] = 0           # exclui transições para 0
            new_cols[gat_col] = gat

            # -------------- persistência k-dias ---------------------------
            # streak de valores iguais
            streak = (
                df[col]
                .groupby((df[col] != df[col].shift()).cumsum())
                .cumcount() + 1
            )

            if type(persistence_ks) == int:
                pers_col = col.replace("_?value", f"_?persistence_{k}days")
                # mantém o valor original se a sequência atual >= k dias
                new_cols[pers_col] = df[col].where(streak >= k, 0)
            
            else:
               
                for k in persistence_ks:
                    pers_col = col.replace("_?value", f"_?persistence_{k}days")
                    # mantém o valor original se a sequência atual >= k dias
                    new_cols[pers_col] = df[col].where(streak >= k, 0)

            if delay_days != None:
                # --- delay_last d dias ---
                for d in delay_days:
                    delay_col = f"delay_last{d}days_{gat_col}"
                    roll_sum = gat.rolling(window=d, min_periods=d).sum()
                    new_cols[delay_col] = sign(roll_sum).astype('Int8')
            
    # 2) Concatena explicitamente antes do return
    #gat_df = pandas.DataFrame(new_cols, index=df.index)
    #df = pandas.concat([df, gat_df], axis=1)
    df[list(new_cols)] = pandas.DataFrame(new_cols, index=df.index)
    
    return df
    
'''
def combine_dois_gatilhos(df: pandas.DataFrame, days: int = 1) -> pandas.DataFrame:
    """
    Para cada par de colunas '*_?gatilho':
      1) calcula o rolling sum de 'days' dias apenas uma vez
      2) soma as duas séries já roladas
      3) guarda no dict new_cols
    No fim, concatena new_cols ao df em um único passo, evitando fragmentação.
    """
    # 1) identifica e calcula rolling
    gatilho_cols = [
        c for c in df.columns
        if c.endswith('_?gatilho') or '_?persistence' in c
    ]
    print(f'Total de _?gatilhos: {len(gatilho_cols)}')
    rolling_sums = df[gatilho_cols].rolling(window=days).sum()

    # 2) monta todas as combinações
    new_cols = {}
    total = len(gatilho_cols) * (len(gatilho_cols) - 1) // 2
    for col1, col2 in tqdm(itertools.combinations(gatilho_cols, 2),
                           total=total,
                           desc="Combinando gatilhos"):
        base1 = col1.replace('_?gatilho', '')
        base2 = col2.replace('_?gatilho', '')
        new_name = f"{base1}_{base2}&2_?gatilho"

        combined = rolling_sums[col1] + rolling_sums[col2]
        ic(new_name, combined.iloc[-1])
        new_cols[new_name] = combined

    # 3) concatena tudo de uma vez
    new_df = pandas.DataFrame(new_cols, index=df.index)
    df = pandas.concat([df, new_df], axis=1)

    # 4) opcional: consolida blocos para prevenir fragmentação residual
    df = df.copy()

    return df
'''

def combine_dois_gatilhos(df, days = 1):
    """
    Para cada par de colunas que terminam em '_?gatilho' OU contêm '_?persistence':
        1) calcula rolling-sum dos últimos `days`;
        2) converte para sinal (+1, 0, -1);
        3) cria coluna combinada:
               +1 se ambos +1,
               -1 se ambos -1,
                0 caso contrário.
    Retorna o DataFrame original com as novas colunas anexadas.
    """
    # ---------------- 1. Selecionar colunas de gatilho ----------------
    gatilho_cols = [
        c for c in df.columns
        if c.endswith('_?gatilho') or '_?persistence' in c
    ]

    # ---------------- 2. Rolling + sinal para cada coluna -------------
    roll_sign = {}
    for col in gatilho_cols:
        # soma dos últimos `days` valores
        roll_sum = df[col].rolling(window=days, min_periods=days).sum()
        # sinal: +1, 0, -1
        roll_sign[col] = sign(roll_sum).astype('Int8')

    # ---------------- 3. Combinar pares -------------------------------
    new_cols = {}
    total = len(gatilho_cols) * (len(gatilho_cols) - 1) // 2

    for col1, col2 in tqdm(itertools.combinations(gatilho_cols, 2),
                           total=total,
                           desc="Combinando gatilhos"):

        base1 = col1.replace('_?gatilho', '')
        base2 = col2.replace('_?gatilho', '')
        new_name = f"{base1}_{base2}&2_?gatilho"

        s1 = roll_sign[col1]
        s2 = roll_sign[col2]

        # +1 se ambos +1, -1 se ambos -1, senão 0
        same_sign = (s1 == s2) & (s1 != 0)
        combined = s1.where(same_sign, 0).astype('Int8')

        new_cols[new_name] = combined

    # ---------------- 4. Anexar ao DataFrame --------------------------
    df[list(new_cols)] = pandas.DataFrame(new_cols, index=df.index)

    return df

def encontre_corr_media_indicadores_tecnicos(df: pandas.DataFrame) -> pandas.DataFrame:
    """
    Para cada par (coluna *_?alta*, coluna *_?gatilho* ou *_?persistence*)
    calcula:
      · correlação de Pearson + p-valor
      · média de *var_desloc* filtrada
      · média de *var_desloc* quando o gatilho = 1  (Media_Desloc_1)
      · média de *var_desloc* quando o gatilho = –1 (Media_Desloc_-1)
    """
    cols_alta = [c for c in df.columns if c.endswith('_?alta')]

    # agora coleta tanto *_?gatilho* quanto *_?persistence_kdays*
    cols_gatilho = [
        c for c in df.columns
        if c.endswith('_?gatilho') or '_?persistence' in c
    ]

    resultados = []

    for col_alta in tqdm(cols_alta, desc="Calculando Correlações"):
        serie_alta = df[col_alta]

        # ---- coluna de deslocamento correspondente ----
        col_desloc = col_alta.replace('_?alta', '_var_desloc')
        serie_desloc = df[col_desloc] if col_desloc in df.columns else None

        for col_gat in cols_gatilho:
            serie_gat = df[col_gat]

            indicadores = 2 if '&2' in col_gat else 1

            # filtro básico pelo gatilho
            mask = serie_gat != 0
            mask &= serie_alta.notna() & serie_gat.notna()
            mask &= isfinite(serie_alta) & isfinite(serie_gat)

            if serie_desloc is not None:
                mask &= serie_desloc.notna() & isfinite(serie_desloc)

            qtd = int(mask.sum())
            if qtd > 1:
                x = serie_alta[mask]
                y = serie_gat[mask]

                corr, p_val = pearsonr(x, y)

                # --------- médias de deslocamento ---------
                if serie_desloc is not None:
                    media_desloc_all = float(serie_desloc[mask].mean())

                    mask_long  = mask & (serie_gat == 1)
                    media_desloc_1 = (
                        float(serie_desloc[mask_long].mean())
                        if mask_long.any() else nan
                    )

                    mask_short = mask & (serie_gat == -1)
                    media_desloc_minus1 = (
                        float(serie_desloc[mask_short].mean())
                        if mask_short.any() else nan
                    )
                else:
                    media_desloc_all = media_desloc_1 = media_desloc_minus1 = nan

                resultados.append({
                    'Coluna_Alta': col_alta,
                    'Coluna_Gatilho': col_gat,
                    'Correlacao': corr,
                    'Coincidência >0.05': p_val,
                    'Quantidade_Linhas': qtd,
                    'Indicadores': indicadores,
                    'Media_Desloc': media_desloc_all,
                    'Media_Desloc_1': media_desloc_1,
                    'Media_Desloc_-1': media_desloc_minus1
                })

    return pandas.DataFrame(resultados)


def calcular_medias(ticker, df, filtro, days_list):
    """
    Calcula as médias, pencentagem de montagens ganhadoras das colunas em colunas_necessarias   
    Parâmetros:
    ticker - Trago lá de extração yfinance o nome do ticker
    df - É o df que já passou pelo filtro na função que enviou esses argumentos
    filtro - são os filtros usados pelo df anterior para adicionar essa informação ao df
    

    """
    # Essa linha é igual a da função adicione_variacoes_dataframe do bloco técnico. Mas elas criam essas colunas em dfs diferentes
    colunas_necessarias = [ f'Var_{days}d' for days in days_list]
    
    # Calculando médias, desvios padrões e porcentagem de valores acima de 0
    medias = df[colunas_necessarias].mean()
    desvios = df[colunas_necessarias].std()
    quant_acima_de_0 = df[colunas_necessarias].gt(0).sum()
    acima_de_0 = quant_acima_de_0 / df[colunas_necessarias].count() * 100

    #if len(df) >= 30: # Registrar somente observações mais relevantes

    # Inicializando o dicionário para o DataFrame de apuração
    apuracao_dict = {
        'Filtros': [filtro],
        'Linhas': [len(df)],
        'Indicadores técnicos': [len(filtro)],
    }
    
    # Adicionando médias e percentuais dinamicamente
    for days in days_list:
        apuracao_dict[f'média {days}d'] = [medias[f'Var_{days}d']]
    for days in days_list:
        apuracao_dict[f'%alta_{days}d'] = [acima_de_0[f'Var_{days}d']]

    # Criando o DataFrame de apuração
    apuracao = pandas.DataFrame(apuracao_dict)

    return apuracao


def liste_combinacoes_tecnicas(df, x=None):
    """
    Pega um df com todos as montagens, encontra todas que as colunas que participaram da análise técnica: "_?value", 
    Depois, dependendo do valor de x, encontra todas combinações únicas entre x colunas ou entre todas as colunas, se x for None.
    Dúvidas, print "combinações"
    
    Parâmetros:
    - df: DataFrame com os dados.
    - x: Número opcional de colunas para combinar. Se None, combina todas as colunas.
    
    Retorna: Todas as colunas que participaram das combinações, todas as combinações
    Todos os valore únicos. Serão usados para levantar o range de coluna valor na função 
    apure_resultado_segmentado_combinacao_tecnica
    
    """
    # Selecionar colunas que terminam com "_?value"
    colunas_value = [col for col in df.columns if col.endswith('_?value')]

    # Registrar valores únicos para cada coluna "_?value"
    valores_unicos_colunas = {coluna: df[coluna].unique() for coluna in colunas_value}
        
    # Dicionário para armazenar as combinações de colunas e seus valores únicos correspondentes
    combinacoes_dict = {}
    
    # Se x for especificado e for menor que o número total de colunas, gerar combinações de colunas
    if x is not None and x <= len(colunas_value):
        combinacao_colunas = list(itertools.combinations(colunas_value, x))
        for cols in combinacao_colunas:
            # Gerar todas as combinações possíveis dos valores nas colunas selecionadas para cada conjunto de combinação de colunas
            combinacoes_valores = list(itertools.product(*(valores_unicos_colunas[col] for col in cols)))
            combinacoes_dict[cols] = combinacoes_valores
    else:
        # Para combinar todas as colunas, usar diretamente o itertools.product
        combinacoes_valores = list(itertools.product(*(valores_unicos_colunas[col] for col in colunas_value)))
        combinacoes_dict[tuple(colunas_value)] = combinacoes_valores

    return combinacoes_dict, colunas_value, valores_unicos_colunas
  

def apure_resultado_segmentado_combinacao_tecnica(ticker, df, combinacoes_dict, resultados_temp, colunas_value, valores_unicos_colunas):
    """
    Fitra o df para cada combinação na lista de combinações técnicas.
    Depois, com aquele filtro, levanta quanto deu as médias e os %alta, chamando a função calcular_medias
    O resultado será um df com as médias e desvios para 10d, 20d, 30d e 60d por filtro
                                
    """ 
    
    # Precisa zerar para não acumular os resultados do ticker anterior
    resultados_temp = []

    # Iterar sobre cada combinação de colunas no dicionário de combinações    
    for colunas_combinacao, combinacoes_valores in combinacoes_dict.items():
        # Iterar sobre cada combinação de valores para as colunas atuais
        for valores_combinacao in combinacoes_valores:
            filtro = dict(zip(colunas_combinacao, valores_combinacao))
            df_filtrado = df.copy()

            # Aplicar filtros
            for col, val in filtro.items():
                df_filtrado = df_filtrado[df_filtrado[col] == val]

            # Supondo que calcular_medias é uma função que você definirá para calcular as médias e desvios
            resultados_temp.append(calcular_medias(ticker, df_filtrado, filtro, days_list))            

    # Concatenar todos os resultados temporários em um único DataFrame, se resultados_temp não estiver vazio
    apuracao = pandas.concat(resultados_temp, ignore_index=True) if resultados_temp else pandas.DataFrame()    
    
    # Criar uma lista para armazenar todos os valores, sem repetições
    all_values_col = list(set([value for values in valores_unicos_colunas.values() for value in values]))
       
    # Apurando a contribuição que cada item no filtro
    for coluna in colunas_value:
        
        for values in all_values_col:
                    
            df_filtrado = df.copy()
            df_filtrado = df_filtrado[df_filtrado[coluna] == values]
            
            # Concatenar todos os resultados temporários em um único DataFrame
            resultados_temp.append(calcular_medias(ticker, df_filtrado, {coluna:values}, days_list))            
            apuracao = pandas.concat(resultados_temp, ignore_index=True)    
    
    return apuracao
       
        
def calcule_risco_retorno_combinacoes_tecnicas(ticker, apuracao):
    """
    Ao arquivo de apuração dos resultados combinados, essa função vai acrescentar colunas importantes:
    
    Soma das médias	Soma dos desvios	diferença das médias	sharp	ranking sharp	variação retorno	variação desvio	retorno-2risco	ranking retorno-2risco
    
    """
    
    # Ordenando o DataFrame pela melhor média para apuração de 20 dias de montagens
    apuracao = apuracao.sort_values(by="média 20d", ascending=False)
    # Adicionando uma coluna de "ranking" técnico
    apuracao["ranking média"] = range(1, len(apuracao) + 1)
    
    # Ordenando o DataFrame pela porcentagem de alta na apuração de 20 dias de montagens
    apuracao = apuracao.sort_values(by="%alta_20d", ascending=False)
    # Adicionando uma coluna de "ranking" técnico
    apuracao["ranking %alta"] = range(1, len(apuracao) + 1)
    
    # Adicionando o ranking por indicador técnico (filtros com um único indicador)
    apuracao['ranking Ind. Técnicos'] = None
    # Identificar as linhas que atendem à condição de ter um único indicador técnico
    condicao = (apuracao['Indicadores técnicos'] == 1)
    # Aplicar o ranking apenas nas linhas que satisfazem a condição
    apuracao.loc[condicao, 'ranking Ind. Técnicos'] = range(1, condicao.sum() + 1)
            
    # CHECK - Exportar resultados para um arquivo Excel--------------------------------------
    # apuracao.to_excel(f'RCOMB-{ticker}- {nome_arquivo}.xlsx', index=False)
    
    return apuracao

def buscar_e_adicionar_colunas(df, apuracao):
    """
    """
    
    # Inicializar as novas colunas com valores NaN
    df['%alta_20d'] = float('nan')
    df['ranking %alta'] = float('nan')
    
    # Iterar sobre cada linha do DataFrame de fechamento
    for idx, row in df.iterrows():
        
        chave = row['chave_configuracoes_tec']
        
        # Procurar a linha correspondente no DataFrame de análise
        linha_analise = apuracao[apuracao['Filtros'] == chave]
        
        if not linha_analise.empty:
            df.at[idx, '%alta_20d'] = linha_analise['%alta_20d'].values[0]
            df.at[idx, 'ranking %alta'] = linha_analise['ranking %alta'].values[0]
    
    
    return df

## Principal

In [10]:
#1. ANÁLISE TÉCNICA

#1.1 - Escaneando ticker e atribuindo leitura técnica 
ticker_list = ler_todos_tickers()

# Exportações de df técnico. Máximo 2 tickers--------------------------
# ticker_list = {'ticker':['mglu3', 'simh3', 'brav3','caml3']}
# ticker_list = {'ticker':['mrve3', 'BRAV3','PRIO3','AZUL4'] }
# ---------------------------------------------------------------------
# check que linhas do código rodar.
test_indicadores = True                      
todas_montagens_tecnicas = screener(ticker_list['ticker'])

# Check -----
todas_montagens_tecnicas[1].to_excel(f'aquela_testada.xlsx')

Processando Tickers:   3%|▎         | 9/277 [00:06<02:30,  1.78it/s]

Erro ao calcular ALPA3.SA.


Processando Tickers:  22%|██▏       | 60/277 [00:39<02:10,  1.66it/s]
1 Failed download:
['CLSA3.SA']: YFPricesMissingError('possibly delisted; no price data found  (1d 2020-06-11 19:58:58.914194 -> 2025-06-11) (Yahoo error = "No data found, symbol may be delisted")')
Processando Tickers:  22%|██▏       | 61/277 [00:40<02:19,  1.54it/s]

Erro ao calcular CLSA3.SA.


Processando Tickers:  23%|██▎       | 65/277 [00:43<02:40,  1.32it/s]

Erro ao calcular COCE5.SA.


Processando Tickers: 100%|██████████| 277/277 [03:07<00:00,  1.48it/s]


### Backtest

In [11]:
if test_indicadores:
    # Combinar 2 gatilhos
    todas_montagens_tecnicas = combine_dois_gatilhos(todas_montagens_tecnicas[1])
    relatorio_correlacoes = encontre_corr_media_indicadores_tecnicos(todas_montagens_tecnicas)                


Combinando gatilhos: 100%|██████████| 861/861 [00:02<00:00, 287.99it/s]
Calculando Correlações: 100%|██████████| 5/5 [00:55<00:00, 11.05s/it]


### Output

In [12]:
if test_indicadores:
    relatorio_correlacoes.to_excel(f'relatorio_correlacoes.xlsx')
    print('relatorio_correlacoes exportado')
else:
    todas_montagens_tecnicas.to_excel(f'relatorio_correlacoes.xlsx')
    print(nome_arquivo,' exportado')

relatorio_correlacoes exportado


In [13]:
"""3. Outras Formas de Backtesting Conhecidas
Há diversas metodologias para aprimorar seus backtests e capturar com maior profundidade a eficiência dos indicadores. Abaixo estão algumas delas:

a. Monte Carlo Simulations
Ao invés de simplesmente testar estratégias com dados históricos fixos, você pode aplicar Simulações de Monte Carlo para gerar múltiplos cenários futuros aleatórios baseados nas características do mercado (volatilidade, retorno esperado, etc). Isso ajuda a capturar uma variedade de resultados possíveis e a avaliar a robustez das suas estratégias.
b. Walk-Forward Testing
Esse método envolve recalibrar os parâmetros do modelo em uma janela de tempo específica e, em seguida, testar a estratégia na próxima janela (fora do conjunto de treinamento). Essa abordagem evita o overfitting e oferece uma melhor avaliação da performance real da estratégia em condições de mercado variáveis.
c. Análise de Sensibilidade
A análise de sensibilidade consiste em variar sistematicamente os parâmetros dos indicadores (por exemplo, diferentes períodos para médias móveis, largura das bandas de Bollinger, etc.) para identificar quais valores produzem melhores resultados e onde a estratégia é mais sensível a mudanças.
d. Estratégias Multi-Fatoriais
Indicadores técnicos frequentemente têm mais valor quando usados em combinação com outros fatores, como fatores fundamentais (P/L, crescimento de receita, etc.) ou fatores macroeconômicos. Incorporar um modelo multi-fatorial ou até utilizar técnicas como Análise de Componentes Principais (PCA) pode ajudar a identificar interações ocultas entre os fatores.
"""

'3. Outras Formas de Backtesting Conhecidas\nHá diversas metodologias para aprimorar seus backtests e capturar com maior profundidade a eficiência dos indicadores. Abaixo estão algumas delas:\n\na. Monte Carlo Simulations\nAo invés de simplesmente testar estratégias com dados históricos fixos, você pode aplicar Simulações de Monte Carlo para gerar múltiplos cenários futuros aleatórios baseados nas características do mercado (volatilidade, retorno esperado, etc). Isso ajuda a capturar uma variedade de resultados possíveis e a avaliar a robustez das suas estratégias.\nb. Walk-Forward Testing\nEsse método envolve recalibrar os parâmetros do modelo em uma janela de tempo específica e, em seguida, testar a estratégia na próxima janela (fora do conjunto de treinamento). Essa abordagem evita o overfitting e oferece uma melhor avaliação da performance real da estratégia em condições de mercado variáveis.\nc. Análise de Sensibilidade\nA análise de sensibilidade consiste em variar sistematicamen