<a href="https://colab.research.google.com/github/cepdnaclk/e19-co544-Bitcoin-Cost-Forecast-System/blob/main/Models/Multivariate_LSTM_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import Data

In [2]:
import yfinance as yf
import pandas as pd

# Define the ticker simbol for Bitcoin
ticker = 'BTC-USD'

# Get historical market data
hist = yf.Ticker(ticker).history(period="max")


# Making the 'Date' as the index
hist.index = pd.to_datetime(hist.index)

# Drop the "Dividends" column and "Stock Splits" column
hist.drop(columns = ['Dividends', 'Stock Splits'], inplace = True)

# Print the data
print(hist.tail())

                                   Open          High           Low  \
Date                                                                  
2024-06-19 00:00:00+00:00  65146.660156  65695.351562  64693.300781   
2024-06-20 00:00:00+00:00  64960.296875  66438.960938  64547.847656   
2024-06-21 00:00:00+00:00  64837.988281  65007.546875  63378.894531   
2024-06-22 00:00:00+00:00  64113.863281  64475.468750  63929.757812   
2024-06-24 00:00:00+00:00  63173.351562  63264.613281  62647.945312   

                                  Close       Volume  
Date                                                  
2024-06-19 00:00:00+00:00  64960.296875  21103423504  
2024-06-20 00:00:00+00:00  64828.656250  25641109124  
2024-06-21 00:00:00+00:00  64096.199219  26188171739  
2024-06-22 00:00:00+00:00  64252.578125   9858198793  
2024-06-24 00:00:00+00:00  62817.820312  14938336256  


# Set Inputs and Outputs

In [3]:
X, y = hist.drop(columns = ['Close']), hist.Close.values
X.shape, y.shape

((3568, 4), (3568,))

## Standardize Features

In [4]:
# Import the StandardScaler and MinMaxScaler classes from the sklearn.preprocessing module
from sklearn.preprocessing import StandardScaler, MinMaxScaler

# Create an instance of thee MinMaxScaler, which scales the data to a specified range (default is 0 to 1)
mm = MinMaxScaler()

# Creating an instance of the standardScaler, which standardizes the data by removing the mean and scaling to unit variance
ss = StandardScaler()

# Applying the StandardScaler to the feature matrix X to standardize the features
# fit_transform() first fits the scaler to the data (calculating the mean and standard deviation) and then transforms the data
X_trans = ss.fit_transform(X)

# Reshaping the target variable y to be a 2D array with one column, as required by the MinMaxScaler
# fit_transform() first fits the scaler to the data (calculating the min and max values) and then transforms the data
y_trans = mm.fit_transform(y.reshape(-1, 1))

## Split a multivariate sequence past, future samples (X abd y)

In [5]:
import numpy as np

# split a multivariate sequence past, future samples (X and y)
def split_sequences(input_sequences, output_sequence, n_steps_in, n_steps_out):

    X, y = list(), list() # instantiate X and y

    for i in range(len(input_sequences)):

        # find the end of the input, output sequence
        end_ix = i + n_steps_in
        out_end_ix = end_ix + n_steps_out - 1

        # check if we are beyond the dataset
        if out_end_ix > len(input_sequences): break

        # gather input and output of the pattern
        seq_x, seq_y = input_sequences[i:end_ix], output_sequence[end_ix-1:out_end_ix, -1]
        X.append(seq_x), y.append(seq_y)

    return np.array(X), np.array(y)

X_ss, y_mm = split_sequences(X_trans, y_trans, 2, 1)
print(X_ss.shape, y_mm.shape)

(3567, 2, 4) (3567, 1)


## Check the y_mm sample

In [6]:
print("y_mm[0]:", y_mm[0])
print("y_trans[99:100].squeeze(1):", y_trans[99:100].squeeze(1))

assert y_mm[0].all() == y_trans[99:100].squeeze(1).all()

y_mm[0]

y_mm[0]: [0.00337886]
y_trans[99:100].squeeze(1): [0.00193271]


array([0.00337886])

In [7]:
y_trans[99:100].squeeze(1)

array([0.00193271])

## Split Training & Test Sets

In [8]:
total_samples = len(X)
train_test_cutoff = round(0.90 * total_samples)

X_train = X_ss[:-101]
X_test = X_ss[-101:]

y_train = y_mm[:-101]
y_test = y_mm[-101:]

print("Training Shape:", X_train.shape, y_train.shape)
print("Testing Shape:", X_test.shape, y_test.shape)

Training Shape: (3466, 2, 4) (3466, 1)
Testing Shape: (101, 2, 4) (101, 1)


## Convert Data Into Tensors

In [9]:
import torch

# convert to pytorch tensors
X_train_tensors = torch.Tensor(X_train).requires_grad_(True)
X_test_tensors = torch.Tensor(X_test).requires_grad_(True)

y_train_tensors = torch.Tensor(y_train).requires_grad_(True)
y_test_tensors = torch.Tensor(y_test).requires_grad_(True)


## Reshaping to rows, timestamps, features

In [10]:
# reshaping to rows, timestamps, features
X_train_tensors_final = torch.reshape(X_train_tensors,
                                      (X_train_tensors.shape[0], 2,
                                       X_train_tensors.shape[2]))
X_test_tensors_final = torch.reshape(X_test_tensors,
                                     (X_test_tensors.shape[0], 2,
                                      X_test_tensors.shape[2]))

print("Training Shape:", X_train_tensors_final.shape, y_train_tensors.shape)
print("Testing Shape:", X_test_tensors_final.shape, y_test_tensors.shape)

Training Shape: torch.Size([3466, 2, 4]) torch.Size([3466, 1])
Testing Shape: torch.Size([101, 2, 4]) torch.Size([101, 1])


In [11]:
X_check, y_check = split_sequences(X, y.reshape(-1, 1), 2, 1)
X_check[-1][0:4]

X.iloc[-149:-145]

Unnamed: 0_level_0,Open,High,Low,Volume
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
2024-01-27 00:00:00+00:00,41815.625,42195.632812,41431.28125,11422941934
2024-01-28 00:00:00+00:00,42126.125,42797.175781,41696.910156,16858971687
2024-01-29 00:00:00+00:00,42030.914062,43305.867188,41818.332031,20668476578
2024-01-30 00:00:00+00:00,43300.226562,43838.945312,42711.371094,23842814518


In [12]:
y_check[-1]

array([62817.8203125])

In [13]:
hist.Close.values[-1:]

array([62817.8203125])

# LSTM Model

In [14]:
import torch.nn as nn

class LSTM(nn.Module):

    def __init__(self, num_classes, input_size, hidden_size, num_layers):
        super().__init__()
        self.num_classes = num_classes # output size
        self.num_layers = num_layers # number of recurrent layers in the lstm
        self.input_size = input_size # input size
        self.hidden_size = hidden_size # neurons in each lstm layer
        # LSTM model
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size,
                            num_layers=num_layers, batch_first=True, dropout=0.2) # lstm
        self.fc_1 =  nn.Linear(hidden_size, 128) # fully connected
        self.fc_2 = nn.Linear(128, num_classes) # fully connected last layer
        self.relu = nn.ReLU()

    def forward(self,x):
        # hidden state
        h_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        # cell state
        c_0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(x.device)
        # propagate input through LSTM
        output, (hn, cn) = self.lstm(x, (h_0, c_0)) # (input, hidden, and internal state)
        hn = hn.view(-1, self.hidden_size) # reshaping the data for Dense layer next
        out = self.relu(hn)
        out = self.fc_1(out) # first dense
        out = self.relu(out) # relu
        out = self.fc_2(out) # final output
        return out

# Training

In [15]:
def training_loop(n_epochs, lstm, optimiser, loss_fn, X_train, y_train,
                  X_test, y_test):
    for epoch in range(n_epochs):
        lstm.train()
        outputs = lstm.forward(X_train) # forward pass
        optimiser.zero_grad() # calculate the gradient, manually setting to 0
        # obtain the loss function
        loss = loss_fn(outputs, y_train)
        loss.backward() # calculates the loss of the loss function
        optimiser.step() # improve from loss, i.e backprop
        # test loss
        lstm.eval()
        test_preds = lstm(X_test)
        test_loss = loss_fn(test_preds, y_test)
        if epoch % 100 == 0:
            print("Epoch: %d, train loss: %1.5f, test loss: %1.5f" % (epoch,
                                                                      loss.item(),
                                                                      test_loss.item()))

# Instance of a LSTM Model

In [16]:
import warnings
warnings.filterwarnings('ignore')

n_epochs = 1000 # 1000 epochs
learning_rate = 0.001 # 0.001 lr

input_size = 4 # number of features
hidden_size = 2 # number of features in hidden state
num_layers = 1 # number of stacked lstm layers

num_classes = 1 # number of output classes

lstm = LSTM(num_classes,
              input_size,
              hidden_size,
              num_layers)

# Training Loop (MSE as the loss function)

In [17]:
loss_fn = torch.nn.MSELoss()    # mean-squared error for regression
optimiser = torch.optim.Adam(lstm.parameters(), lr=learning_rate)


training_loop(n_epochs=n_epochs,
              lstm=lstm,
              optimiser=optimiser,
              loss_fn=loss_fn,
              X_train=X_train_tensors_final,
              y_train=y_train_tensors,
              X_test=X_test_tensors_final,
              y_test=y_test_tensors)



Epoch: 0, train loss: 0.05240, test loss: 0.45301
Epoch: 100, train loss: 0.00219, test loss: 0.00864
Epoch: 200, train loss: 0.00110, test loss: 0.00311
Epoch: 300, train loss: 0.00053, test loss: 0.00252
Epoch: 400, train loss: 0.00029, test loss: 0.00321
Epoch: 500, train loss: 0.00021, test loss: 0.00361
Epoch: 600, train loss: 0.00017, test loss: 0.00314
Epoch: 700, train loss: 0.00015, test loss: 0.00263
Epoch: 800, train loss: 0.00014, test loss: 0.00221
Epoch: 900, train loss: 0.00012, test loss: 0.00183


In [18]:
input_data = [
    [465.864014, 468.174011, 452.421997, 21056800],  # Day 1
    [456.859985, 456.859985, 413.104004, 34483200]]  # Day 2

  # Assuming 'scaler' is already fitted to the training data
input_data_scaled = ss.transform(input_data)  # Scale the data
input_tensor = torch.Tensor(input_data_scaled).unsqueeze(0)  # Convert to tensor and add batch dimension

# Make prediction with the LSTM model
lstm.eval()  # Set the model to evaluation mode
with torch.no_grad():  # Disable gradient computation
    prediction_tensor = lstm(input_tensor)  # Make the prediction

# Reshape the prediction to match the number of features
prediction_reshaped = prediction_tensor.numpy().reshape(-1, 1)

# Reverse the scaling of the prediction
prediction_reshaped = mm.inverse_transform(prediction_reshaped)

# Since we're only interested in the 'Close' price, select the relevant column
predicted_close_price = prediction_reshaped[0][0]

print(f"Predicted Close price: {predicted_close_price}")


Predicted Close price: 445.9823303222656


# Prediction

In [None]:
# import matplotlib.pyplot as plt

# df_X_ss = ss.transform(hist.drop(columns=['Close'])) # old transformers
# df_y_mm = mm.transform(hist.Close.values.reshape(-1, 1)) # old transformers
# # split the sequence
# df_X_ss, df_y_mm = split_sequences(df_X_ss, df_y_mm, 100, 1)
# # converting to tensors
# df_X_ss = torch.Tensor(df_X_ss)
# df_y_mm = torch.Tensor(df_y_mm)
# # reshaping the dataset
# df_X_ss = torch.reshape(df_X_ss, (df_X_ss.shape[0], 100, df_X_ss.shape[2]))

# train_predict = lstm(df_X_ss) # forward pass
# data_predict = train_predict.data.numpy() # numpy conversion
# dataY_plot = df_y_mm.data.numpy()

# data_predict = mm.inverse_transform(data_predict) # reverse transformation

# dataY_plot = mm.inverse_transform(dataY_plot)
# true, preds = [], []
# for i in range(len(dataY_plot)):
#     true.append(dataY_plot[i][0])
# for i in range(len(data_predict)):
#     preds.append(data_predict[i][0])


# plt.figure(figsize=(10,6)) #plotting
# plt.axvline(x=train_test_cutoff, c='r', linestyle='--') # size of the training set

# plt.plot(true, label='Actual Data') # actual plot
# plt.plot(preds, label='Predicted Data') # predicted plot
# plt.title('Time-Series Prediction')
# plt.legend()
# plt.savefig("whole_plot.png", dpi=300)
# plt.show()

# print(preds[-1])

# Save as a pickle file

In [19]:
import pickle

# Save the LSTM model
with open('lstm_model.pkl', 'wb') as f:
    pickle.dump(lstm, f)

In [20]:
loaded_model = pickle.load(open('lstm_model.pkl', 'rb'))

In [21]:
input_data = [
    [465.864014, 468.174011, 452.421997, 21056800],  # Day 1
    [456.859985, 456.859985, 413.104004, 34483200]]  # Day 2

  # Assuming 'scaler' is already fitted to the training data
input_data_scaled = ss.transform(input_data)  # Scale the data
input_tensor = torch.Tensor(input_data_scaled).unsqueeze(0)  # Convert to tensor and add batch dimension

# Make prediction with the LSTM model
loaded_model.eval()  # Set the model to evaluation mode
with torch.no_grad():  # Disable gradient computation
    prediction_tensor = loaded_model(input_tensor)  # Make the prediction

# Reshape the prediction to match the number of features
prediction_reshaped = prediction_tensor.numpy().reshape(-1, 1)

# Reverse the scaling of the prediction
prediction_reshaped = mm.inverse_transform(prediction_reshaped)

# Since we're only interested in the 'Close' price, select the relevant column
predicted_close_price = prediction_reshaped[0][0]

print(f"Predicted Close price: {predicted_close_price}")


Predicted Close price: 445.9823303222656


# Model Evaluation

## RMSE

In [None]:
import torch

# Function to evaluate the model
def evaluate_model(model, X_test, y_test):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Turn off gradients for validation, saves memory and computations
        predictions = model(X_test)
        # Calculate the RMSE loss
        mse = torch.mean((predictions - y_test)**2)  # Calculate mean squared error
        rmse = torch.sqrt(mse)  # Calculate the square root of the MSE to get RMSE
    return rmse.item()  # Return RMSE value

# Call the evaluate_model function
test_rmse = evaluate_model(lstm, X_test_tensors_final, y_test_tensors)
print(f"Test RMSE: {test_rmse}")


Test RMSE: 0.022388571873307228


## Test Loss

In [None]:
import torch

# Function to evaluate the model
def evaluate_model(model, X_test, y_test):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Turn off gradients for validation, saves memory and computations
        predictions = model(X_test)
        # Calculate the loss
        loss_fn = torch.nn.MSELoss()
        loss = loss_fn(predictions, y_test)
    return loss.item()

# Call the evaluate_model function
test_loss = evaluate_model(lstm, X_test_tensors_final, y_test_tensors)
print(f"Test Loss: {test_loss}")


Test Loss: 0.0005012481124140322


## MAE

In [None]:
import torch
from torch.nn.functional import l1_loss

# 'predictions' and 'true_values' are PyTorch tensors of the same shape
predictions = lstm(X_test_tensors_final)
true_values = y_test_tensors

# Calculate MAE
mae = l1_loss(predictions, true_values, reduction='mean').item()
print(f"Mean Absolute Error: {mae}")



Mean Absolute Error: 0.017877081409096718


## R-squared

In [None]:
from sklearn.metrics import r2_score

# lstm and X_test_tensors_final are already defined and lstm is trained
lstm.eval()  # Set the model to evaluation mode
y_pred = lstm(X_test_tensors_final)
y_pred_numpy = y_pred.data.numpy()

# y_test_tensors is the actual values tensor and mm is the MinMaxScaler
y_test_numpy = y_test_tensors.data.numpy()

# Inverse transform the scaled data to original scale
y_pred_rescaled = mm.inverse_transform(y_pred_numpy)
y_test_rescaled = mm.inverse_transform(y_test_numpy)

# Calculate R-squared
r_squared = r2_score(y_test_rescaled, y_pred_rescaled)
print(f"R-squared: {r_squared}")




R-squared: 0.902178113624038


## MAPE

In [None]:
import torch

# Function to calculate MAPE
def mean_absolute_percentage_error(y_true, y_pred, eps=1e-8):
    # Avoid division by zero
    y_true, y_pred = torch.tensor(y_true), torch.tensor(y_pred)
    mape = torch.mean(torch.abs((y_true - y_pred) / torch.clamp(y_true, min=eps)))
    return mape

# Function to evaluate the model
def evaluate_model(model, X_test, y_test):
    model.eval()  # Set the model to evaluation mode
    with torch.no_grad():  # Turn off gradients for validation, saves memory and computations
        predictions = model(X_test)
        # Calculate the MAPE
        mape = mean_absolute_percentage_error(y_test, predictions)
    return mape.item()

# Call the evaluate_model function
test_mape = evaluate_model(lstm, X_test_tensors_final, y_test_tensors)
print(f"Test MAPE: {test_mape}")


Test MAPE: 0.019540103152394295
