### **TransLOB** 
This is the implementation of the model TransLOB proposed in the paper *Transformers for limit order books by james wallbridge

In [1]:
# load packages

import numpy as np
import matplotlib.pyplot as plt
from datetime import datetime
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
from google.colab import drive
import torch
from tqdm import tqdm
from torch.utils import data
import torch.nn as nn
import torch.optim as optim
from keras import layers
from keras.layers import Conv1D, BatchNormalization
import tensorflow as tf

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")



Mounted at /content/drive


### **Data**


In [None]:
# please change the data_path to your local path and unzip the file

dec_data = np.loadtxt('/published/BenchmarkDatasets/BenchmarkDatasets/NoAuction/1.NoAuction_Zscore/NoAuction_Zscore_Training/Train_Dst_NoAuction_ZScore_CF_7.txt')
dec_train = dec_data[:, :int(dec_data.shape[1] * 0.8)]
dec_val = dec_data[:, int(dec_data.shape[1] * 0.8):]

dec_test1 = np.loadtxt('/published/BenchmarkDatasets/BenchmarkDatasets/NoAuction/1.NoAuction_Zscore/NoAuction_Zscore_Testing/Test_Dst_NoAuction_ZScore_CF_7.txt')
dec_test2 = np.loadtxt('/published/BenchmarkDatasets/BenchmarkDatasets/NoAuction/1.NoAuction_Zscore/NoAuction_Zscore_Testing/Test_Dst_NoAuction_ZScore_CF_8.txt')
dec_test3 = np.loadtxt('/published/BenchmarkDatasets/BenchmarkDatasets/NoAuction/1.NoAuction_Zscore/NoAuction_Zscore_Testing/Test_Dst_NoAuction_ZScore_CF_9.txt')
dec_test = np.hstack((dec_test1, dec_test2, dec_test3))


horizon = 5         #if horizon = 5, than k = 10

y_train = dec_train[-horizon, :].flatten()

y_val = dec_val[-horizon, :].flatten()
y_test = dec_test[-horizon, :].flatten()

y_train = y_train[99:] - 1
y_val = y_val[99:] - 1
y_test = y_test[99:] - 1 

dec_train = dec_train[:40, :].T
dec_val = dec_val[:40, :].T
dec_test = dec_test[:40, :].T

print(dec_train.shape)



In [5]:
#Create the dataset

class Dataset(data.Dataset):
    """Characterizes a dataset for PyTorch"""
    def __init__(self, x, y, num_classes, n, dim):
        """Initialization""" 
        self.num_classes = num_classes
        self.dim = dim
        self.x = x   
        self.y = y
        self.n = n

        self.length = x.shape[0] - T -self.dim + 1
        print(self.length)

        x = torch.from_numpy(x)
        self.x = torch.unsqueeze(x, 1)
        self.y = torch.from_numpy(y)

    def __len__(self):
        """Denotes the total number of samples"""
        return self.length

    def __getitem__(self, i):
        input = self.x[i:i+self.dim, :]
        input = input.permute(1, 0, 2)
        input = np.squeeze(input)
        input = input.permute(1, 0)
        return input, self.y[i]

#Hyperparameters
batch_size = 32
epochs = 100
T = 100   #horizon    
lr = 0.0001
num_classes = 3
dim = 100
n = 3 


dataset_val = Dataset(dec_val, y_val, num_classes, n, dim)
dataset_test = Dataset(dec_test, y_test, num_classes, n, dim)
dataset_train = Dataset(dec_train, y_train, num_classes, n, dim)

train_loader = torch.utils.data.DataLoader(dataset=dataset_train, batch_size=batch_size, shuffle=True)
val_loader = torch.utils.data.DataLoader(dataset=dataset_val, batch_size=batch_size, shuffle=False)
test_loader = torch.utils.data.DataLoader(dataset=dataset_test, batch_size=batch_size, shuffle=False)



50751
139388
203601


### **Model Architecture**
The model architecture is specified in the original paper.

In [9]:


def positional_encoding(x):
  n_levels = 100
  pos = np.arange(0, n_levels,  1, dtype=np.float32) / (n_levels-1)
  pos = (pos + pos) - 1
  #pos = np.reshape(pos, (pos.shape[0]))
  pos_final = np.zeros((x.shape[0], n_levels, 1), dtype=np.float32)
  for i in range(pos_final.shape[0]):
    for j in range(pos_final.shape[1]):
      pos_final[i, j, 0] = pos[j]

  pos_final = torch.tensor(pos_final).to(device)
  x = torch.cat((x, pos_final), 2)
  
  return x

class transLOB(nn.Module):
    def __init__(self, in_c, out_c, seq_len, n_attlayers, n_heads, dim_linear, dim_feedforward, dropout):
        super().__init__()

        '''
        Args:
          in_c: the number of input channels for the first Conv1d layer in the CNN
          out_c: the number of output channels for all Conv1d layers in the CNN
          seq_len: the sequence length of the input data
          n_attlayers: the number of attention layers in the transformer encoder
          n_heads: the number of attention heads in the transformer encoder
          dim_linear: the number of neurons in the first linear layer (fc1)
          dim_feedforward: the number of neurons in the feed-forward layer of the transformer encoder layer
          dropout: the dropout rate for the Dropout layer
        '''

        self.conv = nn.Sequential(
            nn.Conv1d(in_channels=in_c, out_channels=out_c, kernel_size=2, stride=1, padding="same"),
            nn.ReLU(),
            nn.Conv1d(in_channels=out_c, out_channels=out_c, kernel_size=2, dilation=2, padding="same"),
            nn.ReLU(),
            nn.Conv1d(in_channels=out_c, out_channels=out_c, kernel_size=2, dilation=4, padding="same"),
            nn.ReLU(),
            nn.Conv1d(in_channels=out_c, out_channels=out_c, kernel_size=2, dilation=8, padding="same"),
            nn.ReLU(),
            nn.Conv1d(in_channels=out_c, out_channels=out_c, kernel_size=2, dilation=16, padding="same"),
            nn.ReLU(),
        )

        self.dropout = nn.Dropout(0.1)

        self.activation = nn.ReLU()

        d_model = out_c + 1
        self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model, nhead=n_heads, dim_feedforward=dim_feedforward, dropout=0.0, batch_first=True, device=device)

        self.layer_norm = nn.LayerNorm([seq_len, out_c])
        
        self.transformer = nn.TransformerEncoder(self.encoder_layer, n_attlayers)
        
        self.fc1 = nn.Linear(seq_len*d_model, dim_linear)
        self.fc2 = nn.Linear(dim_linear, 3)
     
    def forward(self, x):
        
        # Pass the input tensor through a series of convolutional layers
        x = self.conv(x)
        
        # Permute the dimensions of the output from the convolutional layers so that the second dimension becomes the first
        x = x.permute(0, 2, 1)

        # Normalize the output from the convolutional layers
        x = self.layer_norm(x)

        # Apply positional encoding to the output from the layer normalization
        x = positional_encoding(x)

        # Pass the output from the previous steps through the transformer encoder
        x = self.transformer(x)

        # Reshape the output from the transformer encoder to have only two dimensions
        x = torch.reshape(x, (x.shape[0], x.shape[1]*x.shape[2]))

        # Apply dropout and activation function to the output from the previous step, then pass it through the first linear layer
        x = self.dropout(self.activation(self.fc1(x)))

        # Pass the output from the previous step through the second linear layer
        x = self.fc2(x)
        
        # Apply softmax activation to the output from the second linear layer
        forecast_y = torch.softmax(x, dim=1)
        
        return forecast_y

### **Model Training**

In [10]:
model = transLOB(in_c=40, out_c=14, seq_len=dim, n_attlayers=2, n_heads=3, dim_linear=64, dim_feedforward=60, dropout=0.1)
model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr, weight_decay=1e-5)

def batch_gd(model, criterion, optimizer, train_loader, test_loader, epochs):
    
    train_losses = np.zeros(epochs)
    test_losses = np.zeros(epochs)
    best_test_loss = np.inf
    best_test_epoch = 0

    for it in tqdm(range(epochs)):
        
        model.train()
        t0 = datetime.now()
        train_loss = []
        for inputs, targets in train_loader:
            # move data to GPU
            inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)
            # print("inputs.shape:", inputs.shape)
            # zero the parameter gradients
            optimizer.zero_grad()
            # Forward pass
            # print("about to get model output")
            outputs = model(inputs)
            # print("done getting model output")
            # print("outputs.shape:", outputs.shape, "targets.shape:", targets.shape)
            loss = criterion(outputs, targets)
            # Backward and optimize
            # print("about to optimize")
            loss.backward()
            optimizer.step()
            train_loss.append(loss.item())
        # Get train loss and test loss
        train_loss = np.mean(train_loss) # a little misleading
    
        model.eval()
        test_loss = []
        for inputs, targets in test_loader:
            inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)      
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            test_loss.append(loss.item())
        test_loss = np.mean(test_loss)

        # Save losses
        train_losses[it] = train_loss
        test_losses[it] = test_loss
        
        if test_loss < best_test_loss:
            torch.save(model, '/best_model_transformer')
            best_test_loss = test_loss
            best_test_epoch = it
            print('model saved')

        dt = datetime.now() - t0
        print(f'Epoch {it+1}/{epochs}, Train Loss: {train_loss:.4f}, \
          Validation Loss: {test_loss:.4f}, Duration: {dt}, Best Val Epoch: {best_test_epoch}')

    return train_losses, test_losses

In [11]:
train_losses, val_losses = batch_gd(model, criterion, optimizer, 
                                    train_loader, val_loader, epochs)

### **Model Testing**

In [12]:
model = torch.load('/best_model_transformer')

n_correct = 0.
n_total = 0.
all_targets = []
all_predictions = []

for inputs, targets in test_loader:
    # Move to GPU
    inputs, targets = inputs.to(device, dtype=torch.float), targets.to(device, dtype=torch.int64)

    # Forward pass
    outputs = model(inputs)
    
    # Get prediction
    # torch.max returns both max and argmax
    _, predictions = torch.max(outputs, 1)

    # update counts
    n_correct += (predictions == targets).sum().item()
    n_total += targets.shape[0]

    all_targets.append(targets.cpu().numpy())
    all_predictions.append(predictions.cpu().numpy())

test_acc = n_correct / n_total
print(f"Test acc: {test_acc:.4f}")

all_targets = np.concatenate(all_targets)    
all_predictions = np.concatenate(all_predictions)   

print('accuracy_score:', accuracy_score(all_targets, all_predictions))
print(classification_report(all_targets, all_predictions, digits=4))
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

c = confusion_matrix(all_targets, all_predictions, normalize="true")
disp = ConfusionMatrixDisplay(c)
disp.plot()
plt.show()




NameError: ignored