### **Long Short Term Memory**

In [None]:
#Initially we download the zipped data

import os 
if not os.path.isfile('data.zip'):
    !wget https://github.com/LeonardoBerti07/Deep-Learning-Algorithms-for-financial-time-serie-modeling-/blob/main/Dataset/data.zip
    print('data downloaded.')
else:
    print('data already existed.')

In [None]:
#Load packages
from faulthandler import dump_traceback
import time
import datetime
from unicodedata import name
import torch
from torch.utils import data
import matplotlib.pyplot as plt
import numpy as np
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets
from torchvision import transforms  
from torch import nn
import random
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix





### **Data**
The dataset in the folder Dataset is the LOBSTER dataset zipped and normalized. I have combined the data of the 5 stocks available for free. I used the version with 10 levels, so we have 40 columns, in fact for every level we have a quadruple wiht the ask and bid prices and with the volumes associated, for more information i reference to the thesis. 

I used 70% to do the training, 15% to do the validation and 15% for the testing.

In [None]:
# please change the data_path to your local path

num_classes = 3
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

data_path =  "/data.npy"

dec = np.load(data_path)

train_size = int(0.70 * dec.shape[0])
val_size = int(0.15 * dec.shape[0])
test_size = val_size

dec_train = dec[:train_size]
dec_val = dec[train_size:val_size+train_size]
dec_test = dec[val_size+train_size:]

In [None]:
T = 50   #horizon

def labeling(X, T):

  [N, D] = X.shape
  print(N)
  Y = np.zeros((X.shape[0] - 2*T + 1))
  alpha = 0.00072
  media = []
  for i in range(0, X.shape[0]- 2*T + 1):
    ask_minus = X[i:i+T, :1]
    bid_minus = X[i:i+T, 2:3]
    ask_plus = X[i+T:i+2*T, :1]
    bid_plus = X[i+T:i+2*T, 2:3]
    m_minus = (ask_minus + bid_minus) / 2
    m_minus = np.sum(m_minus) / T
    m_plus = (ask_plus + bid_plus) / 2
    m_plus = np.sum(m_plus) / T
    media.append((m_plus - m_minus) / m_minus)
    if (m_plus - m_minus) / m_minus < -alpha:
      label = 1
    elif (m_plus - m_minus) / m_minus > alpha:
      label = 0
    else:
      label = 2
    Y[i] = label
  
  plt.hist(Y)
  plt.show()

  return Y

In [None]:
#Create the Dataset

class Dataset(data.Dataset):
    """Characterizes a dataset for PyTorch"""
    def __init__(self, x, y, num_classes, T):
        """Initialization""" 
        self.num_classes = num_classes
        self.T = T
        self.x = x   
        self.y = y
       
        self.length = x.shape[0] - 2*T + 1
        
        x = torch.from_numpy(x)
        self.x = torch.unsqueeze(x, 1)
        self.y = torch.from_numpy(y)

    def __len__(self):
        """Denotes the total number of samples"""
        return self.length

    def __getitem__(self, i):
        input = self.x[i:i+self.T, :]
        input = input.permute(1, 0, 2)
        #print(input.shape)
        input = input.reshape((input.shape[1], input.shape[2]))
        #print(input.shape)
        return dict(sequence = input.float(), label = self.y[i].float())


In [None]:
#Create the DataLoader
class PriceDataModule():
    def __init__(self, train_sequences, val_sequences, test_sequences, num_workers=1, batch_size = 8):
        super().__init__()
        self.train_sequences = train_sequences
        self.test_sequences = test_sequences
        self.val_sequences = val_sequences
        self.batch_size = batch_size
        self.num_workers = num_workers

    def setup(self, stage=None):        
        self.train_dataset = Dataset(dec_train, y_train, num_classes=3, T=50)
        self.test_dataset = Dataset(dec_test, y_test, num_classes=3, T=50)
        self.val_dataset = Dataset(dec_val, y_val, num_classes=3, T=50)

    def train_dataloader(self):
        return DataLoader(dataset=self.train_dataset, batch_size=batch_size, shuffle=True, num_workers=self.num_workers)

    def val_dataloader(self):
        return DataLoader(dataset=self.val_dataset, batch_size=batch_size, shuffle=False, num_workers=self.num_workers)

    def test_dataloader(self):
        return DataLoader(dataset=self.test_dataset, batch_size=batch_size, shuffle=False, num_workers=self.num_workers)

y_val = labeling(dec_val, T)
y_test = labeling(dec_test, T)
y_train = labeling(dec_train, T)

data_module = PriceDataModule(dec_train, dec_val, dec_test, 2, batch_size)
data_module.setup()
train_dataloader = data_module.train_dataloader()
test_dataloader = data_module.test_dataloader()
val_dataloader = data_module.val_dataloader()

### **Model Architecture**
The architecture has a single layer and a hidden size of 64. The reason for the simplicity of the architecture is that with a greater amount of layers and a greater hidden size the model tends to go in overfitting. As for the hyperparameters, the dropout is equal to 0.5 (to avoid overfitting as much as possible), the learning rate at 0.00005, the batch size is 64. The model has a total of 27,587 parameters.

In [None]:
#Create the LSTM model
class myLSTM(nn.Module):
    def __init__(self, n_features, num_classes, dropout, n_hidden, n_layers):
        super().__init__()
        self.lstm = nn.LSTM(
        input_size = n_features,
        hidden_size = n_hidden,
        batch_first = True,
        num_layers = n_layers, # Stack LSTMs
        dropout = dropout       
    )
        self.classifier = nn.Linear(n_hidden, num_classes) #after we have analyze the sequence, now we have to perform the classification, we utilize a classic linear layer
        
    def forward(self, x):
        self.lstm.flatten_parameters()  
        #x shape is (batch_size, seq_len, n_features)

        output, (hidden, _) = self.lstm(x)
        out = hidden[-1]  #We want the output from the last layer of the last time step to go into the final regressor linear layer, we don't use output 
                          #because in the output there are the H_t for each time step from the last layer, so the dim is (batch_size, L, Hidden_size)
                          #instead in hidden there are hidden state for every layer but only for the last time step, so the dim is (num_layer, Hidden_size) 
                          #so if batch_size=1, the last array of output = hidden[-1]
      
        return self.classifier(out)

In [None]:
#Hyperparameters
epochs = 50
lr = 0.00005
dropout = 0.4


    #input shape 
sequence_length = 50   #each sequence is composed by 5 day
batch_size = 64
num_features = 40

    #hidden shape
num_layer = 1
hidden_size = 64

model = myLSTM(num_features, num_classes, dropout, hidden_size, num_layer).to(device)
model.float()

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr) 

### **Model Training**

In [None]:
#Training the model
def trainingLoop(train_dataloader, model, loss_fn, optimizer):
    cont = 0
    train_loss = 0
    for batch in train_dataloader:         #scorriamo i vari batch del data set, in batch c'è l'index
        labels = batch["label"]
        sequences = batch["sequence"].to(device)

        labels = labels.type(torch.LongTensor)
        labels = labels.to(device)
        #print(sequences.shape)
        #forward pass
        outputs = model(sequences)            #we do the prediction
        loss = loss_fn(outputs, labels)     #compute the error
        train_loss += loss.item()
        #backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        cont+=1

    train_loss = train_loss / cont   #we compute the average train loss
    print("trainining loss   ->   " + str(train_loss)) 

### **Model Validation**

In [None]:
#Validation
def valLoop(test_dataloader, model, loss_fn, test_size):
    true = [0, 0, 0]
    denom = [0, 0, 0, 0, 0, 0]
    num_batches = len(test_dataloader)
    test_loss = 0
    correct = 0
    total = 0
    all_targets = []
    all_predictions = []
    cont = 0
    with torch.no_grad():      
        for batch in test_dataloader:         #we scroll through the various batches of the data set
            
            labels = batch["label"]
            sequences = batch["sequence"].to(device)

            labels = labels.type(torch.LongTensor)
            labels = labels.to(device)
            outputs = model(sequences)            #we do the prediction
            predicted = outputs.argmax(1)
            
            test_loss += loss_fn(outputs, labels).item()       
            correct += (outputs.argmax(1) == labels).type(torch.float).sum().item()     #we count the correct ones
            cont += 1

    print("validation loss   ->   " + str(test_loss / cont))
    return test_loss

In [None]:
print("------- List Hyper Parameters -------")
print("epochs   ->   " + str(epochs))
print("learningRate   ->   " + str(lr))
print("dropout   ->   " + str(dropout))
print("training range   ->   " + str(train_size))
print("number of layer   ->    " + str(num_layer))
print("sequence length    ->     " + str(sequence_length))
print("hidden size    ->    " + str(hidden_size))

best_test_loss = 99999
best_val_loss = 99999
for e in range(epochs):       
    print("------------Start of Epoch {}/{}------------".format(e, epochs-1))

    #training
    trainingLoop(train_dataloader, model, loss_fn, optimizer)

    #validation
    val_loss = valLoop(val_dataloader, model, loss_fn, val_size)
    if (val_loss < best_test_loss):   #we save the best model
      torch.save(model, '/best_model_LSTM')
      best_test_loss = val_loss
      best_test_epoch = e
      print('model saved')      
    print("------------End of Epoch {}/{}------------".format(e, epochs-1))
    print()



### **Model Testing**

In [None]:
#Testing 
def testLoop(test_dataloader, model, loss_fn, test_size):
    true = [0, 0, 0]
    denom = [0, 0, 0, 0, 0, 0]
    num_batches = len(test_dataloader)
    test_loss = 0
    correct = 0
    total = 0
    cont = 0
    all_targets = []
    all_predictions = []

    with torch.no_grad():      
        for batch in test_dataloader:         #we scroll through the various batches of the data set
            
            labels = batch["label"]
            sequences = batch["sequence"].to(device)

            labels = labels.type(torch.LongTensor)
            labels = labels.to(device)
            outputs = model(sequences)            #we do the prediction
            predicted = outputs.argmax(1)
            
            test_loss += loss_fn(outputs, labels).item()       
            correct += (outputs.argmax(1) == labels).type(torch.float).sum().item()     #we count the correct ones
            total += labels.shape[0]
            
            cont += 1
            all_targets.append(labels.cpu().numpy())
            all_predictions.append(predicted.cpu().numpy())
    all_targets = np.concatenate(all_targets)    
    all_predictions = np.concatenate(all_predictions)  
    print('accuracy_score:', accuracy_score(all_targets, all_predictions))
    print(classification_report(all_targets, all_predictions, digits=4))
    b 

    return test_loss / cont

In [None]:
model = torch.load('/best_model_LSTM')

#final test
testLoop(test_dataloader, model, loss_fn, test_size)