In [None]:
import yfinance as yf
import numpy as np
import pandas as pd
from keras.models import Sequential, load_model
from keras.layers import LSTM, Dense
from keras.optimizers import Adam, SGD, RMSprop
from sklearn.preprocessing import MinMaxScaler

In [None]:
# Define o ticker da ação usada para treinar e testar os modelos
ticker = "PETR4.SA"

# Baixa os preços históricos do Yahoo Finance
data = yf.download(ticker, start="2010-01-01", end="2022-12-31")

In [None]:
# Preprocessa os dados
scaler = MinMaxScaler(feature_range=(0, 1))
data_scaled = scaler.fit_transform(data["Close"].values.reshape(-1, 1))

In [None]:
# Divide os dados em grupos de treinamento e teste
train_size = int(len(data_scaled) * 0.8)
test_size = len(data_scaled) - train_size
train_data = data_scaled[0:train_size, :]
test_data = data_scaled[train_size:len(data_scaled), :]

In [None]:
# Função para criar os dados de entrada e saída da rede LSTM
def create_dataset(dataset, time_steps=1):
    dataX, dataY = [], []
    for i in range(len(dataset) - time_steps - 1):
        a = dataset[i:(i + time_steps), 0]
        dataX.append(a)
        dataY.append(dataset[i + time_steps, 0])

    return np.array(dataX), np.array(dataY)

In [None]:
# Define o tamanho da janela de tempo para os dados de entrada
time_steps = 5

# Cria os dados de entrada e saída da LSTM com base na janela de tempo definida
X_train, y_train = create_dataset(train_data, time_steps)
X_test, y_test = create_dataset(test_data, time_steps)

# Reformatar os dados de entrada para que sejam compatíveis com a LSTM
X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

In [None]:
# Define os diferentes modelos a serem treinados

models = []
loss_functions = ["mean_squared_error", "mean_absolute_error", "mean_squared_logarithmic_error"]
learning_rates = [0.1, 0.01, 0.001]
optimizers = ["adam", "sgd", "rmsprop"]

def get_optimizer(name: str, rate: float):
    match name:
        case 'adam':
            return Adam(learning_rate=rate)
        case 'sgd':
            return SGD(learning_rate=rate)
        case 'rmsprop':
            return RMSprop(learning_rate=rate)

for loss_func in loss_functions:
    for rate in learning_rates:
        for opt in optimizers:
            model = Sequential()
            model.add(LSTM(units=64, return_sequences=True, input_shape=(X_train.shape[1], 1)))
            model.add(LSTM(units=128))
            model.add(Dense(1))

            model.compile(loss=loss_func, optimizer=get_optimizer(opt, rate), metrics=['mean_squared_error', 'mean_absolute_error', "mean_squared_logarithmic_error"])
            models.append(model)

In [None]:
# Treinando os modelos
histories = []
epochs = [1, 1]
batch_sizes = [16, 32, 64]

model_configs = []

for i, model in enumerate(models):
    for epoch in epochs:
        for size in batch_sizes:
            history = model.fit(X_train, y_train, epochs=epoch, batch_size=size, validation_data=(X_test, y_test), verbose=1)
            histories.append(history.history)
    model_configs.append({'id': i,'batch_size': size, 'epochs': epoch, 'loss': loss_functions[i//9]})

In [None]:
# Resultado das métricas de treino
for i, history in enumerate(histories):
    print(f"Train loss: {history['loss'][-1]:.4f}, Validation loss: {history['val_loss'][-1]:.4f}, Train MAE: {history['mean_absolute_error'][-1]:.4f}, Validation MAE: {history['val_mean_absolute_error'][-1]:.4f}, MSE: {history['mean_squared_error'][-1]:.4f}, Validation MSE: {history['val_mean_squared_error'][-1]:.4f}")

In [None]:
# Predições com dados de teste
predictions = []

for model in models:
    y_pred = model.predict(X_test)
    y_pred = scaler.inverse_transform(y_pred)
    predictions.append(y_pred)

In [None]:
for i, predicted_price in enumerate(predictions):
    #Cria um dataframe com as datas e preços previstos e reais
    dates = data.index[len(data.index)-len(predicted_price):]
    
    predicted_df = pd.DataFrame(predicted_price, index=dates, columns=["Predicted Price"])
    actual_df = pd.DataFrame(data["Close"].values[len(data.index)-len(predicted_price):], index=dates, columns=["Actual Price"])
    result_df = pd.concat([actual_df, predicted_df], axis=1)

    #Plota o gráfico com os preços previstos e reais
    result_df.plot(figsize=(10,5),title=f"Model {i+1}: loss function={loss_functions[i//9]}, \nTrain loss: {histories[i]['loss'][-1]:.4f}, Validation loss: {histories[i]['val_loss'][-1]:.4f}, Train MAE: {histories[i]['mean_absolute_error'][-1]:.4f}, Validation MAE: {histories[i]['val_mean_absolute_error'][-1]:.4f}, MSE: {history['mean_squared_error'][-1]:.4f}, Validation MSE: {history['val_mean_squared_error'][-1]:.4f}")

In [None]:
# Remover comentarios para salvar os modelos treinados

# Salvando modelos
# for i, model in enumerate(models):
#     model.save(f"models/model_{i+1}_{loss_functions[i//9]}.h5")

# Usando modelos salvos

In [None]:
# Carregando os 27 modelos
saved_models = []
for i in range(27):
    model = load_model(f"models/model_{i+1}_{loss_functions[i//9]}.h5")
    saved_models.append(model)

# Predição com os modelos carregados
predictions = []
for model in saved_models:
    y_pred = model.predict(X_test)
    y_pred = scaler.inverse_transform(y_pred)
    predictions.append(y_pred)

In [None]:
for i, predicted_price in enumerate(predictions):
    #Cria um dataframe com as datas e preços previstos e reais
    dates = data.index[len(data.index)-len(predicted_price):]
    
    predicted_df = pd.DataFrame(predicted_price, index=dates, columns=["Predicted Price"])
    actual_df = pd.DataFrame(data["Close"].values[len(data.index)-len(predicted_price):], index=dates, columns=["Actual Price"])
    result_df = pd.concat([actual_df, predicted_df], axis=1)

    #Plota o gráfico com os preços previstos e reais
    result_df.plot(figsize=(10,5),title=f"Model {i+1}: loss function={loss_functions[i//9]}")

In [None]:
# Obtendo as metricas de avaliação dos modelos
histories = []
for model in saved_models:
    histories.append(model.evaluate(X_test, y_test))

In [None]:
for i, history in enumerate(histories):
    print(f"MODEL {i+1}: Validation Loss: {history[0]:.4f}, Validation MAE: {history[1]:.4f}, Validation MSLE: {history[2]:.4f}, Validation MSE: {history[3]:.4f}")

In [None]:
# Encontrando o melhor modelo dentre os 27 treinados
log_file = open("models/best_model.txt","w")

best_history = sorted(histories)[0]
best_model_index = 0
for i, h in enumerate(histories):
    if best_history == h:
        log_file.write(f"model_{i+1}_{loss_functions[i//9]}.h5")
        best_model_index = i

log_file.close()
print(f"MODEL {best_model_index + 1}: Validation Loss: {best_history[0]:.4f}, Validation MAE: {best_history[1]:.4f}, Validation MSLE: {best_history[2]:.4f}, Validation MSE: {best_history[3]:.4f}")
print(f"Best config: {model_configs[best_model_index]}")
print(f"Optimizer: {saved_models[best_model_index].get_compile_config()['optimizer']['class_name']}")
print(f"Learning rate: {saved_models[best_model_index].get_compile_config()['optimizer']['config']['learning_rate']:.3f}")


# Usando as predições do melhor modelo

In [None]:
# Resultados Finais
def show_results(profit_history, dates, title, graph):
    final_profit = []
    for i, profit in enumerate(profit_history):
        if len(final_profit) > 0:
            final_profit.append(final_profit[i-1] + profit)
        else:
            final_profit.append(profit)

    # Plot final profit
    results = pd.DataFrame(final_profit, index=dates, columns=[title])
    if graph:
        results.plot(figsize=(10,5),title=f"Lucro: " + title)
    return results

In [None]:
# calculando lucro com a rede neural
def calc_model_profit(dataset, predictions, initial_money, graph):
    current_money = initial_money
    quantity = 0

    real_values = dataset["Close"].values[len(dataset.index)-len(predictions):]
    dates = dataset.index[len(dataset.index)-len(predictions):]

    # Ganhos Finais
    profit_history = []
    for i, value in enumerate(real_values):
        final_value = 0
        if i-1 < 0:
            profit_history.append(0)
        else:
            day_before_price = real_values[i-1] # Entrada
            predicted_price = predictions[i].item() # Previsto

            if day_before_price > predicted_price:
                # Verificando se o preco previsto e menor que o real
                if value > predicted_price and value < day_before_price:
                    final_value = day_before_price - predicted_price
                elif value > predicted_price and value > day_before_price:
                    final_value = day_before_price - value
            else:
                # Verificando se o preco previsto e maior que o real
                if value < predicted_price and value > day_before_price:
                    final_value = value - day_before_price
                else:
                    final_value = predicted_price - day_before_price
        
            # Define quantidade de ações compradas no dia com o valor de capital
            # Limitando a 100 ações para simular gerenciamento de risco
            if quantity > 100:
                quantity = 100
            else:
                quantity = current_money // day_before_price
            # Multiplica o lucro real pela quantidade de ações compradas no dia
            
            profit = final_value * quantity
            current_money += profit
            profit_history.append(profit)

    return show_results(profit_history, dates, 'Modelo 25', graph)

In [None]:
# Lucro do melhor modelo
model_25_result = calc_model_profit(dataset=data, predictions=predictions[best_model_index], initial_money=500, graph=True)

In [None]:
def calc_technical_profit(dataset, predictions, initial_money, graph):
    current_money = initial_money
    quantity = 0

    real_values = dataset[len(dataset.index)-len(predictions):]
    dates = dataset.index[len(dataset.index)-len(predictions):]
    
    technical_profit_history = []
    for i, open_value in enumerate(real_values["Open"]):
        if i-1 == -1:
            day_before_price = open_value # Entrada
        else:
            day_before_price = real_values["Close"][i-1] # Entrada
            today_close = real_values["Close"][i] # Fechamento hoje
        
        # Define quantidade de ações compradas no dia com o valor de capital
        # Limitando a 100 ações para simular gerenciamento de risco
        if quantity > 100:
                quantity = current_money // day_before_price
        else:
            quantity = 100

        final_value = 0
        if open_value < day_before_price:
            final_value = today_close - open_value
        elif open_value > day_before_price:
            final_value = open_value - today_close

        profit = final_value * quantity
        current_money += profit
        technical_profit_history.append(profit)

    return show_results(technical_profit_history, dates, 'Análise Técnica', graph)

In [None]:
technical_result = calc_technical_profit(dataset=data, predictions=predictions[best_model_index], initial_money=500, graph=True)

In [None]:
result = pd.concat([technical_result, model_25_result], axis=1)
result.plot(figsize=(10,5),title=f"Lucro: {ticker}", xlabel="Data", ylabel="Lucro(R$)")

In [None]:
# Define o ticker da ação usada para treinar e testar os modelos
tickers = ['ITUB4.SA', 'FLRY3.SA']

for ticker in tickers:
    # Baixa os preços históricos do Yahoo Finance
    data = yf.download(ticker, start="2010-01-01", end="2022-12-31")

    # Preprocessa os dados
    scaler = MinMaxScaler(feature_range=(0, 1))
    data_scaled = scaler.fit_transform(data["Close"].values.reshape(-1, 1))

    train_size = int(len(data_scaled) * 0.8)
    test_size = len(data_scaled) - train_size
    train_data = data_scaled[0:train_size, :]
    test_data = data_scaled[train_size:len(data_scaled), :]

    # Define o tamanho da janela de tempo para os dados de entrada
    time_steps = 5

    # Cria os dados de entrada e saída da LSTM com base na janela de tempo definida
    X_train, y_train = create_dataset(train_data, time_steps)
    X_test, y_test = create_dataset(test_data, time_steps)

    # Reformatar os dados de entrada para que sejam compatíveis com a LSTM
    X_train = np.reshape(X_train, (X_train.shape[0], X_train.shape[1], 1))
    X_test = np.reshape(X_test, (X_test.shape[0], X_test.shape[1], 1))

    # Predict
    y_pred = model.predict(X_test)
    predictions = scaler.inverse_transform(y_pred)

    dates = data.index[len(data.index)-len(predictions):]

    predicted_df = pd.DataFrame(predictions, index=dates, columns=["Preço Previsto"])
    actual_df = pd.DataFrame(data["Close"].values[len(data.index)-len(predictions):], index=dates, columns=["Preço real"])
    result_df = pd.concat([actual_df, predicted_df], axis=1)

    # Plota o gráfico com os preços previstos e reais
    result_df.plot(figsize=(10,5),title=f"Modelo {25} - {ticker}", xlabel="Data", ylabel="Preço(R$)")

    # Model 25 result
    model_25_result = calc_model_profit(dataset=data, predictions=predictions, initial_money=500, graph=False)
    # Technical result
    technical_result = calc_technical_profit(dataset=data, predictions=predictions, initial_money=500, graph=False)

    result = pd.concat([technical_result, model_25_result], axis=1)
    result.plot(figsize=(10,5),title=f"Lucro: {ticker}", xlabel="Data", ylabel="Lucro(R$)")