### Importação das Bibliotecas

In [None]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score, make_scorer
from scikeras.wrappers import KerasRegressor

from keras.models import Sequential
from keras.layers import Input, LSTM, Dropout, Dense
from keras.callbacks import EarlyStopping
from keras.metrics import RootMeanSquaredError

In [None]:
def create_sequences_mult(X, y, seq_len=14):
    Xs, ys = [], []
    for i in range(len(X) - seq_len):
        Xs.append(X[i:i+seq_len])   
        ys.append(y[i+seq_len])    
    return np.array(Xs), np.array(ys)

In [None]:
def create_lstm_model(seq_len=14, n_features=1, neurons=100, drop=0.2, optimizer='Adam'):
  model = Sequential([
      Input(shape=(seq_len, n_features)),
      LSTM(neurons, return_sequences=True),
      Dropout(drop),
      LSTM((neurons//2), return_sequences=False),
      Dropout(drop),
      Dense(1),
  ])
  model.compile(optimizer=optimizer, loss='mse', metrics=[RootMeanSquaredError()])
  return model


param_grid = {
                # 'model__neurons': [16, 32, 64],
                # 'model__drop': [0.2, 0.25, 0.3],
                # 'model__optimizer': ['Adam'], 
                # 'fit__batch_size': [16, 32],   
                # 'fit__epochs': [50, 100],
                'model__neurons': [40, 60],
                'model__drop': [0.1, 0.2],
                'model__optimizer': ['Adam'], 
                'fit__batch_size': [16],   
                'fit__epochs': [100],
              }

In [None]:
models_lstm_all = []
models_lstm_cropped = []
models_lstm_crypto_all = []
# seq_len_options = [7, 14, 30]
seq_len_options = [7, 30]
# split_options = [5, 10]
split_options = [5, 10]


event_date = pd.Timestamp('2024-11-05') 
event_label = "Trump Elected"

### Modelo com dado completo

In [None]:
data = pd.read_csv('../data/processed/BTC-USD_all.csv', index_col='Date', parse_dates=True)
data.head(5)

In [None]:
X_mult = data[['High', 'Low', 'Open', 'Volume']]
y_mult = data[['Close']]

In [None]:
for seq_len in seq_len_options:
  print(f"===== JANELA DE {seq_len} =====")

  X_seq, y_seq = create_sequences_mult(X_mult.values, y_mult.values, seq_len)

  train_size = int(0.8 * len(X_seq))

  X_train_raw, X_test_raw = X_seq[:train_size], X_seq[train_size:]
  y_train_raw, y_test_raw = y_seq[:train_size], y_seq[train_size:]

  scaler_X_all = MinMaxScaler(feature_range=(0, 1))
  scaler_y_all = MinMaxScaler(feature_range=(0, 1))

  # O X é 3D (samples, timesteps, features), então precisamos achatar para escalar
  X_train_reshaped = X_train_raw.reshape(-1, X_train_raw.shape[2])
  X_test_reshaped = X_test_raw.reshape(-1, X_test_raw.shape[2])

  scaler_X_all.fit(X_train_reshaped)
  X_train_scaled = scaler_X_all.transform(X_train_reshaped).reshape(X_train_raw.shape)
  X_test_scaled = scaler_X_all.transform(X_test_reshaped).reshape(X_test_raw.shape)

  scaler_y_all.fit(y_train_raw)     
  y_train_scaled = scaler_y_all.transform(y_train_raw)
  y_test_scaled = scaler_y_all.transform(y_test_raw)

  for split in split_options:
    print(f"_____ SPLIT DE {split} _____")

    model = KerasRegressor(model=create_lstm_model, verbose=0, seq_len=seq_len, n_features=X_train_scaled.shape[2])

    tscv = TimeSeriesSplit(n_splits=split)

    grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=tscv, refit=True, scoring="neg_root_mean_squared_error", n_jobs = -1)

    early_stopping = EarlyStopping(monitor='val_root_mean_squared_error', patience=10, restore_best_weights=True)

    grid_result_all = grid.fit(X_train_scaled, y_train_scaled, callbacks=[early_stopping], validation_split=0.2)

    models_lstm_all.append({
      "model": grid_result_all.best_estimator_,
      "params": grid_result_all.best_params_,
      "rmse": -grid_result_all.best_score_,
      "n_splits": split,
      "seq_len": seq_len,
      "X_test": X_test_scaled,
      "y_test": y_test_scaled,
      "type": "Multivariado",
      "algorithm": "LSTM",
      "scaler_X": scaler_X_all,
      "scaler_y": scaler_y_all
    })
    
    print(f"Menor RMSE: {-grid_result_all.best_score_:.3f} usando: {grid_result_all.best_params_}")

#### Resultados


In [None]:
for m in models_lstm_all:
    y_pred = m["model"].predict(m["X_test"])
    rmse_test = np.sqrt(mean_squared_error(m["y_test"], y_pred))
    m["rmse_test"] = rmse_test

lstm_all_ordered_by_test = sorted(models_lstm_all, key=lambda x: x["rmse_test"])
lstm_all_ordered_by_test

In [None]:
y_pred_scaled = lstm_all_ordered_by_test[0]["model"].predict(lstm_all_ordered_by_test[0]["X_test"])

rmse = np.sqrt(mean_squared_error(lstm_all_ordered_by_test[0]["y_test"], y_pred_scaled))
mae = mean_absolute_error(lstm_all_ordered_by_test[0]["y_test"], y_pred_scaled)

print(f"MAE normalizado: {mae}")
print(f"RMSE normalizado: {rmse}")

y_pred = lstm_all_ordered_by_test[0]["scaler_y"].inverse_transform(y_pred_scaled)
y_test = lstm_all_ordered_by_test[0]["scaler_y"].inverse_transform(lstm_all_ordered_by_test[0]["y_test"])

In [None]:
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
r2 = r2_score(y_test, y_pred)

print(f"MAE real: {mae}")
print(f"RMSE real: {rmse}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.2f}")

In [None]:
y_pred_plot = np.empty_like(data['Close'])
y_pred_plot[:] = np.nan
y_pred_plot[-len(y_pred):] = y_pred.reshape(-1)

plt.figure(figsize=(20,8))
plt.plot(data.index, y_pred_plot, color="red", marker=",", label='Predicted Close')
plt.plot(data.index, np.array(data['Close']), color="black", marker=",", label='Total True Close')
plt.title('Close: total real vs predito')
plt.legend()
plt.show()

In [None]:
plt.figure(figsize=(20,8))
plt.plot(data.index[-len(y_pred):], y_pred, color="red", marker=",", label='Predicted Close')
plt.plot(data.index[-len(y_test):], y_test, color="black", marker=",", label='True Close')
plt.axvline(x=event_date, color='blue', linestyle='--', label=event_label)
plt.text(event_date, max(data['Close']), event_label, color='blue')
plt.title('Close: real vs predito')
plt.legend()
plt.grid(True, linestyle="--")
plt.show()

### Modelo com dado cortado (inicio ate a moeda se estabilizar foi removido)

In [None]:
data = pd.read_csv('../data/processed/BTC-USD_cropped.csv', index_col='Date', parse_dates=True)
data.head(5)

In [None]:
X_mult = data[['High', 'Low', 'Open', 'Volume']]
y_mult = data[['Close']]

In [None]:
for seq_len in seq_len_options:
  print(f"===== JANELA DE {seq_len} =====")

  X_seq, y_seq = create_sequences_mult(X_mult.values, y_mult.values, seq_len)

  train_size = int(0.8 * len(X_seq))

  X_train_raw, X_test_raw = X_seq[:train_size], X_seq[train_size:]
  y_train_raw, y_test_raw = y_seq[:train_size], y_seq[train_size:]

  scaler_X_cropped = MinMaxScaler(feature_range=(0, 1))
  scaler_y_cropped = MinMaxScaler(feature_range=(0, 1))

  # O X é 3D (samples, timesteps, features), então precisamos achatar para escalar
  X_train_reshaped = X_train_raw.reshape(-1, X_train_raw.shape[2])
  X_test_reshaped = X_test_raw.reshape(-1, X_test_raw.shape[2])

  scaler_X_cropped.fit(X_train_reshaped)
  X_train_scaled = scaler_X_cropped.transform(X_train_reshaped).reshape(X_train_raw.shape)
  X_test_scaled = scaler_X_cropped.transform(X_test_reshaped).reshape(X_test_raw.shape)

  scaler_y_cropped.fit(y_train_raw)     
  y_train_scaled = scaler_y_cropped.transform(y_train_raw)
  y_test_scaled = scaler_y_cropped.transform(y_test_raw)

  for split in split_options:
    print(f"_____ SPLIT DE {split} _____")

    model = KerasRegressor(model=create_lstm_model, verbose=0, seq_len=seq_len, n_features=X_train_scaled.shape[2])

    tscv = TimeSeriesSplit(n_splits=split)

    grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=tscv, refit=True, scoring="neg_root_mean_squared_error", n_jobs = -1)

    early_stopping = EarlyStopping(monitor='val_root_mean_squared_error', patience=10, restore_best_weights=True)

    grid_result_cropped = grid.fit(X_train_scaled, y_train_scaled, callbacks=[early_stopping], validation_split=0.2)

    models_lstm_cropped.append({
      "model": grid_result_cropped.best_estimator_,
      "params": grid_result_cropped.best_params_,
      "rmse": -grid_result_cropped.best_score_,
      "n_splits": split,
      "seq_len": seq_len,
      "X_test": X_test_scaled,
      "y_test": y_test_scaled,
      "type": "Multivariado",
      "algorithm": "LSTM",
      "scaler_X": scaler_X_cropped,
      "scaler_y": scaler_y_cropped
    })
    
    print(f"Menor RMSE: {-grid_result_cropped.best_score_:.3f} usando: {grid_result_cropped.best_params_}")

#### Resultados


In [None]:

for m in models_lstm_cropped:
    y_pred = m["model"].predict(m["X_test"])
    rmse_test = np.sqrt(mean_squared_error(m["y_test"], y_pred))
    m["rmse_test"] = rmse_test

lstm_cropped_ordered_by_test = sorted(models_lstm_cropped, key=lambda x: x["rmse_test"])
lstm_cropped_ordered_by_test

In [None]:
y_pred_scaled = lstm_cropped_ordered_by_test[0]["model"].predict(lstm_cropped_ordered_by_test[0]["X_test"])

rmse = np.sqrt(mean_squared_error(lstm_cropped_ordered_by_test[0]["y_test"], y_pred_scaled))
mae = mean_absolute_error(lstm_cropped_ordered_by_test[0]["y_test"], y_pred_scaled)

print(f"MAE normalizado: {mae}")
print(f"RMSE normalizado: {rmse}")

y_pred = lstm_cropped_ordered_by_test[0]["scaler_y"].inverse_transform(y_pred_scaled)
y_test = lstm_cropped_ordered_by_test[0]["scaler_y"].inverse_transform(lstm_cropped_ordered_by_test[0]["y_test"])

In [None]:
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
r2 = r2_score(y_test, y_pred)

print(f"MAE real: {mae}")
print(f"RMSE real: {rmse}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.2f}")

In [None]:
y_pred_plot = np.empty_like(data['Close'])
y_pred_plot[:] = np.nan
y_pred_plot[-len(y_pred):] = y_pred.reshape(-1)

plt.figure(figsize=(20,8))
plt.plot(data.index, y_pred_plot, color="red", marker=",", label='Predicted Close')
plt.plot(data.index, np.array(data['Close']), color="black", marker=",", label='Total True Close')
plt.title('Close: total real vs predito')
plt.legend()
plt.show()

In [None]:

plt.figure(figsize=(20,8))
plt.plot(data.index[-len(y_pred):], y_pred, color="red", marker=",", label='Predicted Close')
plt.plot(data.index[-len(y_test):], y_test, color="black", marker=",", label='True Close')
plt.axvline(x=event_date, color='blue', linestyle='--', label=event_label)
plt.text(event_date, max(data['Close']), event_label, color='blue')
plt.title('Close: real vs predito')
plt.legend()
plt.grid(True, linestyle="--")
plt.show()

5 de novembro donald trump foi eleito presidente

### Modelo com dado completo com a tendencia das palavras

In [None]:
data = pd.read_csv('../data/processed/BTC-USD_crypto_all.csv', index_col='Date', parse_dates=True)
data.head(5)

In [None]:
X_mult = data[['High', 'Low', 'Open', 'Volume', 'crypto']]
y_mult = data[['Close']]

In [None]:
for seq_len in seq_len_options:
  print(f"===== JANELA DE {seq_len} =====")

  X_seq, y_seq = create_sequences_mult(X_mult.values, y_mult.values, seq_len)

  train_size = int(0.8 * len(X_seq))

  X_train_raw, X_test_raw = X_seq[:train_size], X_seq[train_size:]
  y_train_raw, y_test_raw = y_seq[:train_size], y_seq[train_size:]

  scaler_X_crypto_all = MinMaxScaler(feature_range=(0, 1))
  scaler_y_crypto_all = MinMaxScaler(feature_range=(0, 1))

  # O X é 3D (samples, timesteps, features), então precisamos achatar para escalar
  X_train_reshaped = X_train_raw.reshape(-1, X_train_raw.shape[2])
  X_test_reshaped = X_test_raw.reshape(-1, X_test_raw.shape[2])

  scaler_X_crypto_all.fit(X_train_reshaped)
  X_train_scaled = scaler_X_crypto_all.transform(X_train_reshaped).reshape(X_train_raw.shape)
  X_test_scaled = scaler_X_crypto_all.transform(X_test_reshaped).reshape(X_test_raw.shape)

  scaler_y_crypto_all.fit(y_train_raw)     
  y_train_scaled = scaler_y_crypto_all.transform(y_train_raw)
  y_test_scaled = scaler_y_crypto_all.transform(y_test_raw)

  for split in split_options:
    print(f"_____ SPLIT DE {split} _____")

    model = KerasRegressor(model=create_lstm_model, verbose=0, seq_len=seq_len, n_features=X_train_scaled.shape[2])

    tscv = TimeSeriesSplit(n_splits=split)

    grid = GridSearchCV(estimator=model, param_grid=param_grid, cv=tscv, refit=True, scoring="neg_root_mean_squared_error", n_jobs = -1)

    early_stopping = EarlyStopping(monitor='val_root_mean_squared_error', patience=10, restore_best_weights=True)

    grid_result_crypto_all = grid.fit(X_train_scaled, y_train_scaled, callbacks=[early_stopping], validation_split=0.2)

    models_lstm_crypto_all.append({
      "model": grid_result_crypto_all.best_estimator_,
      "params": grid_result_crypto_all.best_params_,
      "rmse": -grid_result_crypto_all.best_score_,
      "n_splits": split,
      "seq_len": seq_len,
      "X_test": X_test_scaled,
      "y_test": y_test_scaled,
      "type": "Multivariado",
      "algorithm": "LSTM",
      "scaler_X": scaler_X_crypto_all,
      "scaler_y": scaler_y_crypto_all
    })
    
    print(f"Menor RMSE: {-grid_result_crypto_all.best_score_:.3f} usando: {grid_result_crypto_all.best_params_}")

#### Resultados

In [None]:

for m in models_lstm_cropped:
    y_pred = m["model"].predict(m["X_test"])
    rmse_test = np.sqrt(mean_squared_error(m["y_test"], y_pred))
    m["rmse_test"] = rmse_test

lstm_crypto_ordered_by_test = sorted(models_lstm_cropped, key=lambda x: x["rmse_test"])
lstm_crypto_ordered_by_test

In [None]:
y_pred_scaled = lstm_crypto_ordered_by_test[0]["model"].predict(lstm_crypto_ordered_by_test[0]["X_test"])

rmse = np.sqrt(mean_squared_error(lstm_crypto_ordered_by_test[0]["y_test"], y_pred_scaled))
mae = mean_absolute_error(lstm_crypto_ordered_by_test[0]["y_test"], y_pred_scaled)

print(f"MAE normalizado: {mae}")
print(f"RMSE normalizado: {rmse}")

y_pred = lstm_crypto_ordered_by_test[0]["scaler_y"].inverse_transform(y_pred_scaled)
y_test = lstm_crypto_ordered_by_test[0]["scaler_y"].inverse_transform(lstm_crypto_ordered_by_test[0]["y_test"])

In [None]:
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
mape = np.mean(np.abs((y_test - y_pred) / y_test)) * 100
r2 = r2_score(y_test, y_pred)

print(f"MAE real: {mae}")
print(f"RMSE real: {rmse}")
print(f"MAPE: {mape:.2f}%")
print(f"R²: {r2:.2f}")

In [None]:
y_pred_plot = np.empty_like(data['Close'])
y_pred_plot[:] = np.nan
y_pred_plot[-len(y_pred):] = y_pred.reshape(-1)

plt.figure(figsize=(20,8))
plt.plot(data.index, y_pred_plot, color="red", marker=",", label='Predicted Close')
plt.plot(data.index, np.array(data['Close']), color="black", marker=",", label='Total True Close')
plt.title('Close: total real vs predito')
plt.legend()
plt.show()

In [None]:

plt.figure(figsize=(20,8))
plt.plot(data.index[-len(y_pred):], y_pred, color="red", marker=",", label='Predicted Close')
plt.plot(data.index[-len(y_test):], y_test, color="black", marker=",", label='True Close')
plt.axvline(x=event_date, color='blue', linestyle='--', label=event_label)
plt.text(event_date, max(data['Close']), event_label, color='blue')
plt.title('Close: real vs predito')
plt.legend()
plt.grid(True, linestyle="--")
plt.show()