In [44]:
import pandas as pd
import numpy as np
import yfinance as yfin
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt
from tensorflow.keras.models import Sequential
from tensorflow.keras.models import Model
from tensorflow.keras.layers import LSTM, Dropout, Dense, Input, Concatenate
import requests

In [45]:
def getTaxaJuros(data_inicio, data_fim):
    url = "https://api.bcb.gov.br/dados/serie/bcdata.sgs.11/dados?formato=json"
    response = requests.get(url)
    
    if response.status_code == 200:
        dados = response.json()
        df = pd.DataFrame(dados)
        df['data'] = pd.to_datetime(df['data'], dayfirst=True)
        df['valor'] = df['valor'].astype(float)
        
        data_inicio = pd.to_datetime(data_inicio, dayfirst=True)
        data_fim = pd.to_datetime(data_fim, dayfirst=True)
        
        df_filtrado = df[(df['data'] >= data_inicio) & (df['data'] <= data_fim)]
        
        return df_filtrado
    else:
        print("Erro ao acessar a API:", response.status_code)
        return None

In [46]:
def carregarDados(ticker, data_inicio, data_fim):
    df = yfin.download(tickers=ticker, start=data_inicio, end=data_fim)
    return df

In [47]:
def normalizar(df):
    scaler = MinMaxScaler()
    scaled_data = scaler.fit_transform(df['Adj Close'].values.reshape(-1, 1))
    return scaled_data, scaler

In [48]:
def prepararDados(df, days):
    x_train = []
    y_train = []
    for i in range(days, len(df)):
        x_train.append(df[i-days:i])
        y_train.append(df[i])
    return np.array(x_train), np.array(y_train)


In [49]:
def prepararDadosAdicionais(df, days):
    x_add_train = []
    for i in range(days, len(df)):
        x_add_train.append(df[i-days:i])
    return np.array(x_add_train)

In [50]:
def modeloLSTM(input_shape, additional_input_shape):
    main_input = Input(shape=input_shape)
    x = LSTM(units=50, return_sequences=True)(main_input)
    x = Dropout(0.2)(x)
    x = LSTM(units=50, return_sequences=True)(x)
    x = Dropout(0.2)(x)
    x = LSTM(units=50)(x)
    x = Dropout(0.2)(x)
    
    additional_input = Input(shape=additional_input_shape)
    
    concatenated = Concatenate()([x, additional_input])
    output = Dense(units=1)(concatenated)
    
    model = Model(inputs=[main_input, additional_input], outputs=output)
    return model


In [51]:
def treinarModelo(model, x_train, x_add_train, y_train, epochs=25, batch_size=32):
    model.compile(optimizer='adam', loss='mean_squared_error')
  
    model.fit([x_train, x_add_train], y_train, epochs=epochs, batch_size=batch_size, verbose=1)
    return model


In [52]:
def avaliarTreino(model, x_train, df, scaler, days):
    predicted_train = model.predict(x_train)
    predicted_train = scaler.inverse_transform(predicted_train)

    actual_train = df['Adj Close'].values
    mae_train = mean_absolute_error(actual_train[days:], predicted_train)
    mse_train = mean_squared_error(actual_train[days:], predicted_train)
    rmse_train = np.sqrt(mse_train)
    r2_train = r2_score(actual_train[days:], predicted_train)

    return mae_train, mse_train, rmse_train, r2_train, actual_train, predicted_train

In [53]:
def prepararDadosTeste(total_dataset, df_teste, scaler, days):
    model_inputs = total_dataset[len(total_dataset) - len(df_teste) - days:].values
    model_inputs = model_inputs.reshape(-1, 1)
    model_inputs = scaler.transform(model_inputs)
    
    x_test = []
    for i in range(days, len(model_inputs)):
        x_test.append(model_inputs[i-days:i, 0])

    x_test = np.array(x_test)
    return np.reshape(x_test, (x_test.shape[0], x_test.shape[1], 1))


In [54]:
def avaliarTeste(model, x_test, df_teste, scaler, days):
    predicted_test = model.predict(x_test)
    predicted_test = scaler.inverse_transform(predicted_test)
    
    actual_test = df_teste['Adj Close'].values
    
    predicted_test = predicted_test[:-1]
    actual_test = actual_test[:len(predicted_test)]
    
    mae = mean_absolute_error(actual_test, predicted_test)
    mse = mean_squared_error(actual_test, predicted_test)
    rmse = np.sqrt(mse)
    r2 = r2_score(actual_test, predicted_test)
    
    return mae, mse, rmse, r2, actual_test, predicted_test

In [55]:
def plotarResultados(df_treino, predicted_train, df_teste, predicted_test, days, ticker):
    plt.figure(figsize=(14, 7)) 

    plt.plot(df_treino['Adj Close'].values, color='blue', label=f'Dados de Treinamento da {ticker}')
    
    plt.plot(range(days, days + len(predicted_train)), predicted_train, color='red', label=f'Previsão do Treinamento {ticker}')
    
    plt.plot(range(len(df_treino), len(df_treino) + len(df_teste)), df_teste['Adj Close'].values, color='black', label=f'Preços Reais da {ticker}')
    
    plt.plot(range(len(df_treino), len(df_treino) + len(predicted_test)), predicted_test, color='green', label=f'Previsão do Teste da {ticker}')

    plt.title(f'{ticker} - Janela: {days} dias')
    plt.xlabel('Tempo')
    plt.ylabel(f'Preço de {ticker}')
    
    plt.legend()
  
    plt.savefig(f'prediction_plot_{days}_days.png')
    plt.show()


In [None]:
def executarModelo(ticket, data, addData, max_days=21):
    all_results = [] 
    
    scaled_data, scaler = normalizar(data)
    scaled_addData = scaler.fit_transform(addData['valor'].values.reshape(-1, 1))


    for days in range(1, max_days):
        x_train, y_train = prepararDados(scaled_data, days)
        x_add_train = prepararDadosAdicionais(scaled_addData, days)

        x_add_train = x_add_train[:len(x_train)]

        x_train = np.reshape(x_train, (x_train.shape[0], x_train.shape[1], 1))
        x_add_train = np.reshape(x_add_train, (x_add_train.shape[0], x_add_train.shape[1], 1))

        model = modeloLSTM((x_train.shape[1], 1), (x_add_train.shape[1], 1))
        model = treinarModelo(model, [x_train, x_add_train], y_train, epochs=25, batch_size=32)

        mae_train, mse_train, rmse_train, r2_train = avaliarTreino(model, x_train, data, scaler, days)

        df_teste = carregarDados(ticket, '2021-01-01', '2024-01-01')
        total_dataset = pd.concat((data['Adj Close'], df_teste['Adj Close']), axis=0)
        x_test = prepararDadosTeste(total_dataset, df_teste, scaler, days)

        mae, mse, rmse, r2, actual_test, predicted_test = avaliarTeste(model, x_test, df_teste, scaler, days)

        plotarResultados(data, scaler.inverse_transform(y_train.reshape(-1, 1)), df_teste, predicted_test, days, ticket)

        all_results.append({
            'Days': days,
            'MAE_Train': mae_train,
            'MSE_Train': mse_train,
            'RMSE_Train': rmse_train,
            'R²_Train': r2_train,
            'MAE_Test': mae,
            'MSE_Test': mse,
            'RMSE_Test': rmse,
            'R²_Test': r2
        })

    all_results_df = pd.DataFrame(all_results)
    all_results_df.to_csv('model_performance_all_days.csv', index=False)


In [57]:
ticket = 'PETR4.SA'
data = carregarDados(ticket, '2005-01-01', '2021-01-01')
taxaJuros = getTaxaJuros('2005-01-01', '2021-01-01').reset_index(drop=True)
taxaJuros
executarModelo(ticket, data, taxaJuros, max_days=21)




[*********************100%%**********************]  1 of 1 completed


TypeError: treinarModelo() missing 1 required positional argument: 'y_train'