## Setup

In [None]:
! pip install yfinance

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting yfinance
  Downloading yfinance-0.2.12-py2.py3-none-any.whl (59 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m59.2/59.2 KB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting frozendict>=2.3.4
  Downloading frozendict-2.3.5-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.8/112.8 KB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting cryptography>=3.3.2
  Downloading cryptography-39.0.2-cp36-abi3-manylinux_2_28_x86_64.whl (4.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m4.2/4.2 MB[0m [31m34.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting html5lib>=1.1
  Downloading html5lib-1.1-py2.py3-none-any.whl (112 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m112.2/112.2 KB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25

## Libraries

In [None]:
# estrutura de dados
import numpy as np
import pandas as pd

# gráficos
import matplotlib.pyplot as plt
import matplotlib as mpl

# dados
import yfinance as yf

# modelo
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, LSTM, Dropout
from tensorflow.keras import initializers
from keras import callbacks
# métricas
from sklearn.metrics import (
    mean_absolute_error, 
    mean_squared_error, 
    r2_score, 
    mean_absolute_percentage_error
)

# escalonadores
from sklearn.preprocessing import *

import warnings
warnings.filterwarnings('ignore')

# seeds
np.random.seed(0)
tf.random.set_seed(0)

## Functions

In [None]:
#from scipy import show_config
#from sklearn.utils import validation
def get_train_test_split_scal(
    df,
    target,
    train_size,
    window_size,
    normalize,
    scaler
):
  df.pop('Adj Close'), df.pop('Volume')
  training_data_len = int(np.ceil(len(df) * train_size))
  train_data = df[:training_data_len].copy()
  test_data = df[training_data_len-window_size:].copy()

  x_train = list()
  y_train = list()
  x_test = list()
  y_test = list()

  if normalize:
    global scaler_high_test, y_test_normalized
    scaler_open_train = scaler()
    scaler_high_train = scaler()
    scaler_low_train = scaler()
    scaler_close_train = scaler()
    scaler_open_test = scaler()
    scaler_high_test = scaler()
    scaler_low_test = scaler()
    scaler_close_test = scaler()

    train_data['Open'] = scaler_open_train.fit_transform(train_data['Open'].values.reshape(-1,1))
    train_data['High'] = scaler_high_train.fit_transform(train_data['High'].values.reshape(-1,1))
    train_data['Low'] = scaler_low_train.fit_transform(train_data['Low'].values.reshape(-1,1))
    train_data['Close'] = scaler_close_train.fit_transform(train_data['Close'].values.reshape(-1,1))

    test_data['Open'] = scaler_open_test.fit_transform(test_data['Open'].values.reshape(-1,1))
    test_data['High'] = scaler_high_test.fit_transform(test_data['High'].values.reshape(-1,1))
    test_data['Low'] = scaler_low_test.fit_transform(test_data['Low'].values.reshape(-1,1))
    test_data['Close'] = scaler_close_test.fit_transform(test_data['Close'].values.reshape(-1,1))

    y_test_normalized = np.array(scaler_high_test.fit_transform(df[target][training_data_len:].copy().values.reshape(-1,1)))

  # end if

  for i in range(window_size, len(train_data)):
    x_train.append(train_data[i-window_size:i])
    y_train.append(train_data[target][i])
          
  for i in range(window_size, len(test_data)):
    x_test.append(test_data[i-window_size:i])

  x_test = np.array(x_test)
  x_train, y_train = np.array(x_train), np.array(y_train)
  y_test = np.array(df[target][training_data_len:].copy())

  return x_train, y_train, x_test, y_test

def get_pred(
    model,
    x_test,
    normalize,
    shuffle=True,
    batch=0  
):
  if shuffle == False:
    y_pred = model.predict(x_test, batch_size=batch)
  else:
    y_pred = model.predict(x_test)
  if normalize:
    y_pred = scaler_high_test.inverse_transform(y_pred)
  return y_pred

def get_compare(
    df,
    target,
    train_size,
    window_size,
    y_pred
):
  data = df[target].copy()

  training_data_len = int(np.ceil(len(df) * train_size))
  train_data = data[:training_data_len]
  test_data = data[training_data_len-window_size:]

  ma7 = list()
  for i in range(training_data_len, len(data)):
    n = data[i]
    for c in range(1, 7):
      n += data[i-c]
    n = n/7
    ma7.append(n)
  
  validation = pd.DataFrame(data[training_data_len:])
  validation['Predictions'] = y_pred
  random_walk = data[-(len(validation)+1):-1].values
  validation['Random Walk'] = random_walk
  validation['MA7'] = ma7

  return validation

def get_scores(
    df_validation,
    y_test='High',
    y_pred='Predictions',
    random_walk='Random Walk',
    ma7='MA7'
):

  metrics = list()
  comp_metrics = list()
  columns = list(df_validation.columns)

  for i in range(1, 4):
    mae_test = mean_absolute_error(df_validation[y_test], df_validation[columns[i]])
    mse_test = mean_squared_error(df_validation[y_test], df_validation[columns[i]])
    rmse_test = mse_test ** 0.5
    r2_test = r2_score(df_validation[y_test], df_validation[columns[i]])
    mape_test = 100 * mean_absolute_percentage_error(df_validation[y_test], df_validation[columns[i]])

    metrics = [f'{mae_test:.2f}', f'{mape_test:.2f}', f'{mse_test:.2f}', f'{rmse_test:.2f}', f'{r2_test:.2f}']
    comp_metrics.append(metrics.copy())
    metrics.clear()

  df_metrics = pd.DataFrame(comp_metrics, index=['Modelo', 'Random Walk', 'Moving Average 7'], columns=['MAE', 'MAPE', 'MSE', 'RMSE', 'R²'])
  return df_metrics

def show_save_results(setup_name, ticker, df_metrics):
  print(f"Resultados {setup_name} para {ticker}")
  print(df_metrics)
  df_metrics.to_csv(f'Comparação de métricas entre o modelo{setup_name}, média móvel de 7 dias e random walk para a série {ticker}.csv')

def save_fig(
    setup_name,
    df,
    target,
    train_size,
    epochs,
    historic,
    ticker,
    validation,
    loss=False,
    full=False,
    predictions=False,
    interval=False,
    i_start=0,
    i_end=0
):
  mpl.style.use('seaborn-darkgrid')
  plt.style.use("ggplot")
  plt.figure(figsize=(16,6))
  
  if loss: 
    plt.plot(np.arange(0,epochs), historic.history["loss"], label="train_loss", color='#ADBF97')
    plt.plot(np.arange(0,epochs), historic.history['val_loss'], label="val_loss", color='blue')
    plt.title(f"Perda de treinamento da série {ticker}")
    plt.xlabel("Amostra")
    plt.ylabel("Perda")
    plt.legend(prop={'size':16})
    fig = plt.gcf()
    plt.show()
    fig.savefig(f"Perda de treinamento do modelo{setup_name} da série {ticker}.png", format='png')

  if full:

    data = df[target]
    plt.title(f'Previsão do modelo {setup_name} em R$ para o preço de {target} da série {ticker}')
    plt.xlabel('Data')
    plt.ylabel('Preço de fechamento (R$)')
    plt.plot(data, color='#ADBF97')
    plt.plot(validation[target], color='blue')
    plt.plot(validation[['Predictions']], color='orange')
    #plt.plot(validation['Random Walk'], color='red')
    #plt.plot(validation['MA7'], color='green')
    plt.legend(['Train', 'Test', 'Predictions'], loc='best', prop={'size':16})
    fig = plt.gcf()
    plt.show()
    fig.savefig(f'Grafico de treino e previsão do modelo {setup_name} em R$ para o preço de {target} da série {ticker}.png', format='png')

  if predictions:
    plt.title(f'Previsão do modelo {setup_name} em R$ para o preço de {target} da série {ticker}')
    plt.xlabel('Data')
    plt.ylabel('Preço de alta (R$)')
    plt.plot(validation[target], color='blue')
    plt.plot(validation[['Predictions']], color='orange')
    #plt.plot(validation['Random Walk'], color='red')
    #plt.plot(validation['MA7'], color='green')
    plt.legend(['Valor real', 'Predições'], loc='best', prop={'size':16})
    fig = plt.gcf()
    plt.show()
    fig.savefig(f'Grafico de previsão do modelo {setup_name} em R$ para o preço de {target} da série {ticker}.png', format='png')

  if interval:
    plt.title(f'Previsão do modelo {setup_name} em R$ para o preço de {target} da série {ticker}')
    plt.xlabel('Data')
    plt.ylabel('Preço de alta (R$)')
    plt.plot(validation[[target]][i_start:i_end], color='blue')
    plt.plot(validation[['Predictions']][i_start:i_end], color='orange')
    plt.plot(validation['Random Walk'][i_start:i_end], color='red')
    plt.plot(validation['MA7'][i_start:i_end], color='green')
    plt.legend(['Valor real', 'Predições', 'Random Walk', 'Moving average 7'], loc='best', prop={'size':16})
    fig = plt.gcf()
    plt.show()
    fig.savefig(f'Grafico de {i_end-i_start} dias da previsão do modelo {setup_name} em R$ para o preço de {target} da série {ticker}.png', format='png')
  
  return

def full_experiment(
    experiment_name,
    ticker,
    start_date,
    end_date,
    custom_model,
    shuffle,
    target='High',
    train_size=0.7,
    window_size=3,
    normalize=False,
    scaler=MinMaxScaler,
    epochs=300,
    batch=100,
    loss=False,
    full=False,
    predictions=False,
    interval=False,
    i_start=0,
    i_end=0
):
  # Load data
  df = yf.download(ticker, start=start_date, end=end_date)
  # split train and test and normalize data
  if normalize:
    x_train, y_train, x_test, y_test = get_train_test_split_scal(df, target, train_size, window_size, normalize, scaler)
  else:
    x_train, y_train, x_test, y_test = get_train_test_split_scal(df, target, train_size, window_size)
  # create model
  if shuffle == False:
    model = custom_model(x_train, batch)
  else:
    model = custom_model(x_train)
  # train model
  early_stopping = callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
  global historic
  if normalize:
    if shuffle == False:
      historic = model.fit(x_train, y_train, epochs=epochs, batch_size=batch, validation_data = (x_test, y_test_normalized), verbose=1, shuffle=False)
    else:
      historic = model.fit(x_train, y_train, epochs=epochs, batch_size=batch, validation_data = (x_test, y_test_normalized), verbose=1)
  else:
      historic = model.fit(x_train, y_train, epochs=epochs, batch_size=batch, validation_data = (x_test, y_test), verbose=1)
  # get predictions to test and renormalize
  if normalize:
    if shuffle == False:
      y_pred = get_pred(model, x_test, normalize, shuffle=shuffle, batch=batch)
    else:
      y_pred = get_pred(model, x_test, normalize)
  # get predictions to test
  else:
    y_pred = get_pred(model, x_test)
  # create compare
  validation = get_compare(df, target, train_size, window_size, y_pred)
  # compare predictions and real data
  df_metrics = get_scores(validation)
  # show and save results
  show_save_results(experiment_name, ticker, df_metrics)
  if loss:
    save_fig(experiment_name, df, target, train_size, epochs, historic, ticker, validation, loss=True)
  if full:
    save_fig(experiment_name, df, target, train_size, epochs, historic, ticker, validation, full=True)
  if predictions:
    save_fig(experiment_name, df, target, train_size, epochs, historic, ticker, validation, predictions=True)
  if interval:
    save_fig(experiment_name, df, target, train_size, epochs, historic, ticker, validation, interval=True, i_start=i_start, i_end=i_end)
  return experiment_name

In [None]:
# Setup 1_0(LSTM)
def LSTM_setup1_0(x_train):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, input_shape= (x_train.shape[1], x_train.shape[2])))
  model.add(LSTM(100, return_sequences=False))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup1_0",
  ticker='ABEV3.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup1_0,
  shuffle=True,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=100,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 2_0(LSTM)
def LSTM_setup2_0(x_train, batch):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, batch_input_shape=(batch, x_train.shape[1], x_train.shape[2]), stateful=True))
  model.add(LSTM(100, return_sequences=False, stateful=True))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup2_0",
  ticker='ABEV3.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup2_0,
  shuffle=False,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 1_1 (LSTM)
def LSTM_setup1_1(x_train):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, input_shape=(x_train.shape[1], x_train.shape[2])))
  model.add(LSTM(100, return_sequences=False))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup1_1",
  ticker='BBAS3.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup1_1,
  shuffle=True,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 2_1 (LSTM)
def LSTM_setup2_1(x_train, batch):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, batch_input_shape=(batch, x_train.shape[1], x_train.shape[2]), stateful=True))
  model.add(LSTM(100, return_sequences=False, stateful=True))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup2_1",
  ticker='BBAS3.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup2_1,
  shuffle=False,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 1_2 (LSTM)
def LSTM_setup1_2(x_train):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, input_shape=(x_train.shape[1], x_train.shape[2])))
  model.add(LSTM(100, return_sequences=False))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup1_2",
  ticker='BBDC4.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup1_2,
  shuffle=True,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 2_2 (LSTM)
def LSTM_setup2_2(x_train, batch):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, batch_input_shape=(batch, x_train.shape[1], x_train.shape[2]), stateful=True))
  model.add(LSTM(100, return_sequences=False, stateful=True))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup2_2",
  ticker='BBDC4.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup2_2,
  shuffle=False,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#excutado

In [None]:
# Setup 1_3 (LSTM)
def LSTM_setup1_3(x_train):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, input_shape=(x_train.shape[1], x_train.shape[2])))
  model.add(LSTM(100, return_sequences=False))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup1_3",
  ticker='TAEE11.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup1_3,
  shuffle=True,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 2_3 (LSTM)
def LSTM_setup2_3(x_train, batch):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, batch_input_shape=(batch, x_train.shape[1], x_train.shape[2]), stateful=True))
  model.add(LSTM(100, return_sequences=False, stateful=True))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup2_3",
  ticker='TAEE11.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup2_3,
  shuffle=False,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)
#executado

In [None]:
# Setup 1_4 (LSTM)
def LSTM_setup1_4(x_train):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, input_shape=(x_train.shape[1], x_train.shape[2])))
  model.add(LSTM(100, return_sequences=False))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup1_4",
  ticker='CMIG4.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup1_4,
  shuffle=True,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)

In [None]:
# Setup 2_4 (LSTM)
def LSTM_setup2_4(x_train, batch):
  model = Sequential()
  model.add(LSTM(100, return_sequences=True, batch_input_shape=(batch, x_train.shape[1], x_train.shape[2]), stateful=True))
  model.add(LSTM(100, return_sequences=False, stateful=True))
  model.add(Dense(25))
  model.add(Dense(1))
  model.compile(optimizer='adam', loss='mean_squared_error', metrics=['mae'])
  return model

results = full_experiment(
  experiment_name = "LSTM_setup2_4",
  ticker='CMIG4.SA',
  start_date='2008-01-01',
  end_date='2022-12-31',
  custom_model=LSTM_setup2_4,
  shuffle=False,
  normalize=True,
  scaler=MinMaxScaler,
  epochs=300,
  batch=2,
  target='High',
  train_size=0.7,
  window_size=3,
  loss=True,
  full=True,
  predictions=True,
  interval=True,
  i_start=15,
  i_end=150
)