In [7]:
#importes
import yfinance as yf
import pandas as pd
from datetime import datetime
from datetime import timedelta

import re
import talib
import plotly.express as px
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error

from keras.models import Sequential
from keras.layers import Dense,LSTM,Dropout
from keras.callbacks import EarlyStopping

In [5]:
#PETR4
#EMBR3
#ITUB3
#TOTS3

In [116]:
class ExecucaoTherasAnaliseTecnica:
    
    def __init__(self, empresa_ = 'CSAN3', intervalo_ = '1wk', n_predicoes_ = 14 , quant_conjunto_tec_ = 14):
        self.empresa = empresa_
        self.intervalo = intervalo_
        self.n_predicoes = n_predicoes_ 
        self.quant_conjunto_tec = quant_conjunto_tec_
        self.conjunto_dados = None 
        self.indicadores = None
        self.historico = None
        self.df_resultado = None
        self.dados_processados_ia = None
        self.modelo = None
        self.erro_medio_quadratico = None
        
        
    #capturar dados
    def buscarDadosTecnicos(self, empresa = None, intervalo = None, n_predicoes = None):
        

        if (empresa is None): empresa = self.empresa 
        if (intervalo is None): intervalo = self.intervalo
        if (n_predicoes is None): n_predicoes = self.n_predicoes
        
        acao = yf.Ticker(f'{empresa}.sa')

        historico_dados_tecnicos = acao.history(period = 'max', interval= intervalo)
        historico_dados_tecnicos = historico_dados_tecnicos.drop(historico_dados_tecnicos.index[-self.n_predicoes:],axis=0)
        
        #historico_dados_tecnicos = historico_dados_tecnicos[historico_dados_tecnicos['Close'] <= 150]
        historico_dados_tecnicos['Data'] = historico_dados_tecnicos.index
        historico_dados_tecnicos['Data'] = historico_dados_tecnicos['Data'].apply(lambda x: datetime.strptime(re.findall('\d{4}-\d{2}-\d{2}', str(x))[0], '%Y-%m-%d').date())
        historico_dados_tecnicos = historico_dados_tecnicos.dropna()
        historico_dados_tecnicos = historico_dados_tecnicos.reset_index(drop ='index')
        self.historico = historico_dados_tecnicos
        self.ult_data = historico_dados_tecnicos['Data'][len(historico_dados_tecnicos)-1]
        return historico_dados_tecnicos

    def criarConjunto(self, dataframe = None, quant_itens_conjunto = None):
        
               
        if dataframe is None and self.historico is None: 
            dataframe = self.buscarDadosTecnicos()['Close']
        elif dataframe is None and self.historico is not None:
            dataframe = self.historico['Close']
        elif dataframe is not None:
            print('ja possui dados')
        else:
            return "Erro ao criar conjunto"
        
        if quant_itens_conjunto is None: quant_itens_conjunto = self.quant_conjunto_tec
            
        self.quant_conjunto_tec = quant_itens_conjunto
        conjunto = list() #armazena o conjunto de close
        y = list()
        for i in range(self.quant_conjunto_tec, len(dataframe)+1):
            conjunto.append(dataframe[i - self.quant_conjunto_tec: i])
        self.conjunto_dados = conjunto
        return conjunto
    #processar dados
    def calcularIndicadores(self, dados = None, quant_periodo_tec = None):
        
        if quant_periodo_tec is None: quant_periodo_tec = self.quant_conjunto_tec
        
        if dados is None and self.historico is None: 
            dados = self.buscarDadosTecnicos()['Close']
        elif dados is None and self.historico is not None:
            dados = self.historico['Close']
        elif dados is not None:
            if (isinstance(dados,list)):
                dados = pd.Series(dados)
        else:
            return "Houve alguma falha"
        
        #indice força relativa
        rsi = talib.RSI(dados, timeperiod=quant_periodo_tec-1)
        df_rsi = pd.DataFrame(rsi.dropna(), columns=['RSI'])

        #bandas de bollinger 
        upper, middle, lower = talib.BBANDS(dados, timeperiod = quant_periodo_tec)
        df_upper = pd.DataFrame(upper.dropna(), columns=['bb_upper'])
        df_middle = pd.DataFrame(middle.dropna(), columns=['bb_middle'])
        df_lower = pd.DataFrame(lower.dropna(), columns=['bb_lower'])

        #MACD
        macd = (talib.EMA(dados, timeperiod=6).dropna()-talib.EMA(dados, timeperiod=quant_periodo_tec).dropna())
        df_macd = pd.DataFrame(macd.dropna(), columns=['MACD'])
        sinal_macd = macd.ewm(span=quant_periodo_tec, adjust=False).mean()
        df_sinal_macd = pd.DataFrame(sinal_macd.dropna(), columns=['sinal_macd'])

        #Media Movel
        media_movel = talib.MA(dados, timeperiod=quant_periodo_tec)
        df_media_movel = pd.DataFrame(media_movel.dropna(), columns=['media_movel'])
        self.indicadores = pd.concat([df_rsi,df_media_movel,df_macd, df_sinal_macd], axis=1)
        """df_para_ia = df_para_ia.join(df_macd)
        df_para_ia = df_para_ia.join(banda_sup_boll)
        
        df_para_ia = df_para_ia.join(media_movel(entrada1))
        
        df_para_ia = df_para_ia.join(banda_inf_boll)
        df_para_ia = df_para_ia.join(indice_forca_relativa(entrada1))"""

        #pd.concat([df_rsi,df_upper,df_media_movel, df_lower, df_macd, df_sinal_macd], axis=1)
        return pd.concat([df_macd, df_sinal_macd, df_upper, df_media_movel, df_lower, df_rsi], axis=1)

    #modelo de rede neural atual -  3
    def modeloRedeNeural3(self, dados = None , n_predicoes = None, quant_unidades_conjunto = None, porcen_treinamento = 0.9, epoch = 20):
        if dados is None and self.historico is None: 
            dados = self.buscarDadosTecnicos()['Close']
        elif dados is None and self.historico is not None:
            dados = self.historico['Close']
        elif dados is not None:
            print('Dados já existentes')
        else:
            return "Houve alguma falha" 
        if n_predicoes is None: n_predicoes = self.n_predicoes
        
        if quant_unidades_conjunto is None: quant_unidades_conjunto = self.quant_conjunto_tec   
        if self.conjunto_dados is None:
            conjunto = self.criarConjunto(dataframe=dados)
        else:
            conjunto = self.conjunto_dados
        #dataframe dados conjuntos
        df_x = pd.DataFrame(np.array(conjunto))
        #dataframe dados y
        #df_y = pd.DataFrame(df_x[len(conjunto[0])-1].to_list(),columns=['saida'])
        if self.indicadores is None: 
            df_indicadores = self.calcularIndicadores(dados = dados) 
        else: 
            df_indicadores = self.indicadores
        
        df_indicadores['n_linha'] = df_indicadores.index
        
        df_indicadores = df_indicadores.reset_index(drop = 'index')
        
        df_para_ia = pd.concat([df_x, df_indicadores.drop('n_linha', axis='columns')], axis = 1)
        #df_para_ia = df_indicadores.drop('n_linha', axis='columns')
        escalador = MinMaxScaler(feature_range=(0,1))
        escalador_saida = MinMaxScaler(feature_range=(0,1))
        #dados_entrada_ia = df_para_ia.to_numpy()
        quant_dados_treinamento = round(len(df_para_ia) * porcen_treinamento)
        dados_treinamento_x = escalador.fit_transform(np.array(df_para_ia[:quant_dados_treinamento]))

        #dados_treinamento_x = dados_entrada_ia_escalados[:quant_dados_treinamento]
        
        #para_pred = df_y.to_numpy()

        #para_pred_escalado = escalador.transform(para_pred)
        df_y = pd.DataFrame(df_x[len(conjunto[0])-1].to_list(),columns=['saida'])

        dados_treinamento_y = escalador_saida.fit_transform(np.array(df_y[:quant_dados_treinamento]))
        
        #dados_treinamento_y = df_y[:quant_dados_treinamento]

        dados_treinamento_x = dados_treinamento_x.reshape(dados_treinamento_x.shape[0],dados_treinamento_x.shape[1],1)
        quant_colun_treino = dados_treinamento_x.shape[1]
        
        dados_teste_x = escalador.transform(np.array(df_para_ia[quant_dados_treinamento:]))

        dados_teste_x = dados_teste_x.reshape(dados_teste_x.shape[0],dados_teste_x.shape[1],1)
        modelo = Sequential()

        modelo.add(LSTM(quant_colun_treino*3, return_sequences=True, input_shape = (dados_treinamento_x.shape[1], 1)))
        #modelo.add(Dropout(0.2))
        modelo.add(LSTM(quant_colun_treino*6, return_sequences=False))
        #modelo.add(Dropout(0.2))
        modelo.add(Dense(quant_colun_treino*3))
        #modelo.add(Dropout(0.2))
        modelo.add(Dense(quant_colun_treino*2))
        #modelo.add(Dropout(0.2))
        modelo.add(Dense(round(quant_colun_treino)))
        #modelo.add(Dropout(0.2))
        modelo.add(Dense(1))

        modelo.compile(optimizer="adam", loss="mean_squared_error")

        early_stop = EarlyStopping(monitor='loss', patience=10, verbose=1, mode='min')

        registro = modelo.fit(dados_treinamento_x, dados_treinamento_y, batch_size=1,epochs=epoch, callbacks=[early_stop])


        
        predicoes = modelo.predict(dados_teste_x)

        predicoes = escalador_saida.inverse_transform(predicoes)


        teste_y = df_y[quant_dados_treinamento: ]
        teste_y = teste_y.to_numpy()

        remo_erro_quadra = mean_squared_error(teste_y, predicoes)
        print(f'\ntemos um erro quadratico medio com os dados de treino de: {remo_erro_quadra}\n')
        #treinamento = dados.iloc[:quant_dados_treinamento,:]
    
        df_teste = pd.DataFrame({"Close":dados.iloc[quant_dados_treinamento + (quant_unidades_conjunto-1) : ],"predicoes":predicoes.reshape(len(predicoes))})
        para_14_predicoes = dados.to_list()

        previsoes_futuras = []

        for _ in range(n_predicoes):
            # Obtenha o último conjunto de dados
            conjunto = self.criarConjunto(para_14_predicoes)

            # Transforme o conjunto de dados em um array numpy
            x = np.array(conjunto, dtype=object)

            # Crie o DataFrame
            df_x = pd.DataFrame(x)

            # Calcule os indicadores
            df_indicadores = self.calcularIndicadores(para_14_predicoes)
            df_indicadores['n_linha'] = df_indicadores.index
            df_indicadores = df_indicadores.reset_index(drop='index')
            df_para_ia = pd.concat([df_x, df_indicadores.drop('n_linha', axis='columns')], axis=1)
            #df_para_ia = df_indicadores.drop('n_linha', axis='columns')
            # Obtenha a última semana
            ultima_semana = df_para_ia.tail(1)
            ultima_semana_escalada = escalador.transform(np.array(ultima_semana))
            teste_x = ultima_semana_escalada
            teste_x = teste_x.reshape(teste_x.shape[0], teste_x.shape[1], 1)

            # Faça previsões para n_predicoes pontos de dados
            previsoes = modelo.predict(teste_x)
            previsoes = escalador_saida.inverse_transform(previsoes)

            # Adicione as previsões à lista de previsões futuras
            previsoes_futuras.extend(previsoes)
            #print(previsoes)
            # Atualize o conjunto de previsões para a próxima iteração
            para_14_predicoes.append(previsoes[0][0])

        """for a in range(n_predicoes):
            
            conjunto = criarConjunto(para_14_predicoes)
            
            x = np.array(conjunto)
            #criamos o dataframe
            df_x = pd.DataFrame(x)
            df_indicadores = calcularIndicadores(para_14_predicoes)
            df_indicadores['n_linha'] = df_indicadores.index
            df_indicadores = df_indicadores.reset_index(drop = 'index')
            df_para_ia = pd.concat([df_x, df_indicadores.drop('n_linha', axis = 'columns')], axis=1)

            ultima_semana = df_para_ia.tail(1)#.values.reshape(-1,1)
            
            
            ultima_semana_escalado = escalador.transform(np.array(ultima_semana))

            teste_x = ultima_semana_escalado
        
            teste_x = teste_x.reshape(teste_x.shape[0], teste_x.shape[1],1)
            
            previsao_amanha = modelo.predict(teste_x)
            previsao_amanha = escalador_saida.inverse_transform(previsao_amanha)
            
            para_14_predicoes.append(float(previsao_amanha[0]))"""

        df_resultado = pd.DataFrame(para_14_predicoes[len(self.historico):],columns=['resultado'])
        data = self.ult_data
        data += timedelta(days=7)
        lista_datas = []
        for a in range(n_predicoes):
            lista_datas.append(data) 
            data += timedelta(days=7)
            

        datas = pd.DataFrame(lista_datas, columns=['data'])
        df_resultado = df_resultado.join(datas)
        
        self.df_resultado = df_resultado
        self.dados_processados_ia = df_para_ia
        self.modelo = modelo
        self.erro_medio_quadratico = remo_erro_quadra
        
        return df_resultado

In [117]:
primeiro = ExecucaoTherasAnaliseTecnica(empresa_= 'petr4')

In [118]:
primeiro.dados_processados_ia

In [119]:
primeiro.modeloRedeNeural3(epoch=200)

ja possui dados
Epoch 1/200
Epoch 2/200
Epoch 3/200
Epoch 4/200
Epoch 5/200
Epoch 6/200
Epoch 7/200
Epoch 8/200
Epoch 9/200
Epoch 10/200
Epoch 11/200
Epoch 12/200
Epoch 13/200
Epoch 14/200
Epoch 15/200
Epoch 16/200
Epoch 17/200
Epoch 18/200
Epoch 19/200
Epoch 20/200
Epoch 21/200
Epoch 22/200
Epoch 23/200
Epoch 24/200
Epoch 25/200
Epoch 26/200
Epoch 27/200
Epoch 28/200
Epoch 29/200
Epoch 30/200
Epoch 31/200
Epoch 32/200
Epoch 33/200
Epoch 34/200
Epoch 35/200
Epoch 36/200
Epoch 37/200
Epoch 37: early stopping

temos um erro quadratico medio com os dados de treino de: 4.9557404785399

ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados
ja possui dados


Unnamed: 0,resultado,data
0,21.103022,2023-07-03
1,18.948915,2023-07-10
2,17.90361,2023-07-17
3,17.647781,2023-07-24
4,17.562466,2023-07-31
5,17.406408,2023-08-07
6,16.725988,2023-08-14
7,15.791558,2023-08-21
8,14.902817,2023-08-28
9,13.907048,2023-09-04


In [120]:
def criarGrafico(resultado_):
    acao = yf.Ticker(f'{primeiro.empresa}.sa')
    resultado = resultado_
    historico = acao.history(period = 'max', interval= '1wk')
    diferenca = historico['Close'][-14] - resultado['resultado'][0]
    print(diferenca)
    if(diferenca < 0):
        df_ajustado = pd.DataFrame(resultado['resultado'] - diferenca)
    else:
        df_ajustado = pd.DataFrame(resultado['resultado'] + diferenca)

    figura_ano = px.scatter(title='vendo grafico')
    figura_ano.add_scatter(x=resultado['data'], y=df_ajustado['resultado'], name = 'predicao')
    figura_ano.add_scatter(x=resultado['data'], y=historico['Close'][-14:], name = 'real')

    return figura_ano.show()

In [121]:
criarGrafico(resultado_ = primeiro.df_resultado)

7.31376838684082
