# Mount Drive

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
%cd /content/gdrive/MyDrive/Colab Notebooks/AI4T

# Dependencies

Restarting the kernel could be needed.

In [None]:
%pip install pytorch_lightning neptune-client -qqq
%pip install numpy --upgrade

In [None]:
from typing import *
import requests
import zipfile
from pathlib import Path

import numpy as np
from numpy.lib.stride_tricks import sliding_window_view

import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader

import pytorch_lightning as pl
from pytorch_lightning.loggers import NeptuneLogger
import torchmetrics

# Dataset
Object used to load data, an input level is defined by a tuple $(p_a, v_a, p_b, v_b)$, standing to price and volume for both *ask* and *bid*.
For each sample we consider 10 levels thus having 40 values. 

In [None]:
class LOBDataset(Dataset):
    def __init__(
        self,
        data: np.ndarray,
        input_idx: int = 40,
        label_idx: int = -5,
        window: int = 100,
        pred_horizon_idx: int = -1,
    ):
        """Dataset object for FI-2010 dataset.

        Args:
            data (np.ndarray): Input data array.
            input_idx (int, optional): Last column for input data. Defaults to 40.
            label_idx (int, optional): First column for labels. Defaults to -5.
            window (int, optional): Window size. Defaults to 100.
            pred_horizon_idx (int, optional): Prediction horizon index. Defaults to -1.
        """
        super(LOBDataset, self).__init__()
        x, y = self._init_data(
            data=data,
            in_idx=input_idx,
            gt_idx=label_idx,
            win=window,
            ph_idx=pred_horizon_idx,
        )

        self.x = torch.from_numpy(x.copy())
        self.y = torch.from_numpy(y)

    def _init_data(
        self, data: np.ndarray, in_idx: int, gt_idx: int, win: int, ph_idx: int
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Preprocess data.

        Args:
            data (np.ndarray): Input data.
            in_idx (int): Last column of input data we aim to consider.
            gt_idx (int): First column of ground truth we aim to consider.
            win (int): Window size.
            ph_idx (int): Prediction horizon index.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]: Preprocessed input and ground truth data.
        """
        # Input data are the first `in_idx`` (40 by default) columns i.e. the first 10 levels
        # due to each level being defined by a 4-tuple (price_bid, volume_bid, price_ask, volume_ask).
        x = data[:, :in_idx]

        # Labels are the last `gt_idx`` (5 by default) columns of the LOB. Possible values are:
        # - 1: Positive percentage change.
        # - 2: Stationary behavior.
        # - 3: Negative percentage change.
        # We also want them to start from 0.
        y = data[:, -gt_idx:] - 1

        # Each of the `gt_idx` columns represents a different projection horizon, for simplicity we keep one only.
        y = y[:, ph_idx]

        # We split the input data in windows of length `win`, then trim the first `win` elements of the labels.
        x_win, y_trim = self._slide_window(x=x, y=y, win=win)

        return x_win, y_trim

    def _slide_window(
        self, x: np.ndarray, y: np.ndarray, win: int
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        """Split data in windows.

        Args:
            x (np.ndarray): Input data.
            y (np.ndarray): Ground truth.
            win (int): Window size.

        Returns:
            Tuple[torch.Tensor, torch.Tensor]: Obtained windows with their ground truth.
        """
        x_win = sliding_window_view(x=x, window_shape=win, axis=0).transpose(0, 2, 1)
        y_trim = y[win - 1 :]
        return x_win, y_trim

    def __len__(self) -> int:
        """Data length.

        Returns:
            int: Length.
        """
        return self.x.shape[0]

    def __getitem__(self, item: int) -> List[torch.Tensor]:
        """Get item by index.

        Args:
            item (int): Index.

        Returns:
            List[torch.Tensor]: List with input data and label corresponding to the specified index.
        """
        return [self.x[item], self.y[item]]

We use *PyTorch Lightning*, this allows us to have a cleaner code.

In [None]:
class LOBDataModule(pl.LightningDataModule):
    def __init__(
        self,
        data_dir: Path,
        batch_size: int,
        input_idx: int = 40,
        label_idx: int = -5,
        window: int = 100,
        pred_horizon_idx: int = -1,
    ):
        self.data_dir = data_dir
        self.batch_size = batch_size

        self.input_idx = input_idx
        self.label_idx = label_idx
        self.window = window
        self.pred_horizon_idx = pred_horizon_idx

    def prepare_data(self):
        """Prepare data."""
        data_file = self.data_dir / "data.zip"

        # * Download data.zip if necessary.
        if not data_file.exists():
            url = "https://raw.githubusercontent.com/zcakhaa/DeepLOB-Deep-Convolutional-Neural-Networks-for-Limit-Order-Books/master/data/data.zip"

            # Download.
            print(f"Downloading data from {url}...")
            r = requests.get(url)
            open(data_file, "wb").write(r.content)

            # Extract.
            print(f"Inflating {data_file}...")
            with zipfile.ZipFile(data_file, "r") as zip_ref:
                zip_ref.extractall(self.data_dir)

        # # * Data preprocessing.
        train_path = self.data_dir / "train.gz"
        val_path = self.data_dir / "val.gz"
        test_path = self.data_dir / "test.gz"

        # If data is not preprocessed, do it.
        if not all(f.exists() for f in [train_path, val_path, test_path]):
            self._prepare_data()
            print()

    def _prepare_data(self):
        """Load and split data according to a 80-20 rate, then save the new splits.

        Args:
            path (Path): Path of the FI-2010 `data.zip` file.
        """

        train_file = "Train_Dst_NoAuction_DecPre_CF_7.txt"
        test_files = [
            "Test_Dst_NoAuction_DecPre_CF_7.txt",
            "Test_Dst_NoAuction_DecPre_CF_8.txt",
            "Test_Dst_NoAuction_DecPre_CF_9.txt",
        ]

        # * Prepare data.
        # Load as NumPy arrays.

        print(f"Loading {train_file}...")
        train_val = np.loadtxt(self.data_dir / train_file)

        # Split into train and val according to a 80-20 ratio.
        train = train_val[:, : int(np.floor(train_val.shape[1] * 0.8))]
        val = train_val[:, int(np.floor(train_val.shape[1] * 0.8)) :]

        test = []
        for f in test_files:
            print(f"Loading {train_file}...")
            test.append(np.loadtxt(self.data_dir / f))

        test = np.hstack(test)

        # * Save data.
        print(f"Saving {self.data_dir / 'train.gz'}...")
        np.savetxt(self.data_dir / "train.gz", train.T)

        print(f"Saving {self.data_dir / 'val.gz'}...")
        np.savetxt(self.data_dir / "val.gz", val.T)

        print(f"Saving {self.data_dir / 'test.gz'}...")
        np.savetxt(self.data_dir / "test.gz", test.T)

    def setup(self, stage: Optional[str] = None):
        """Setup datasets.

        Args:
            stage (Optional[str], optional): Stage in which we are e.g. "fit", "test". Defaults to None.
        """
        # Assign train/val splits.
        if stage in (None, "fit"):
            train_file = self.data_dir / "train.gz"
            print(f"Loading {train_file}...")
            self.train = LOBDataset(
                data=np.loadtxt(train_file),
                input_idx=self.input_idx,
                label_idx=self.label_idx,
                window=self.window,
                pred_horizon_idx=self.pred_horizon_idx,
            )

            val_file = self.data_dir / "val.gz"
            print(f"Loading {val_file}...")
            self.val = LOBDataset(
                data=np.loadtxt(val_file),
                input_idx=self.input_idx,
                label_idx=self.label_idx,
                window=self.window,
                pred_horizon_idx=self.pred_horizon_idx,
            )
        # Assign test split.
        if stage in (None, "test"):
            test_file = self.data_dir / "test.gz"
            print(f"Loading {test_file}...")
            self.test = LOBDataset(
                data=np.loadtxt(test_file),
                input_idx=self.input_idx,
                label_idx=self.label_idx,
                window=self.window,
                pred_horizon_idx=self.pred_horizon_idx,
            )

    def train_dataloader(self) -> DataLoader:
        """Get train dataloader.

        Returns:
            DataLoader: Train dataloader.
        """
        return DataLoader(
            self.train,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=2,
            pin_memory=True,
        )

    def val_dataloader(self) -> DataLoader:
        """Get val dataloader.

        Returns:
            DataLoader: Val dataloader.
        """
        return DataLoader(
            self.val,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=2,
            pin_memory=True,
        )

    def test_dataloader(self) -> DataLoader:
        """Get test dataloader.

        Returns:
            DataLoader: Test dataloader.
        """
        return DataLoader(
            self.test,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=2,
            pin_memory=True,
        )

## Model

This model is defined by a series of 2D convolution, this means we need to add one dimension for channels.
This model is inspired by the one in the paper [Forecasting Stock Prices from the Limit Order
Book using Convolutional Neural Networks](https://smallake.kr/wp-content/uploads/2021/02/2017_CBI_CNNLOB.pdf).

In [None]:
class Flatten(nn.Module):
    def __init__(self):
        """Flatten module."""
        super(Flatten, self).__init__()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output tensor.
        """
        batch_size = x.shape[0]
        return x.view(batch_size, -1)

In [None]:
class Unsqueeze(nn.Module):
    def __init__(self, dim: int = -1):
        """Unsqueeze module.

        Args:
            dim (int, optional): Dimension on which we unsqueeze. Defaults to -1.
        """
        super(Unsqueeze, self).__init__()
        self.dim = dim

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output tensor.
        """
        return x.unsqueeze(dim=self.dim)

In [None]:
class Model(nn.Module):
    def __init__(self, num_classes: int = 3):
        """Neural network.

        Args:
            num_classes (int, optional): Number of classes. Defaults to 3.
        """
        super(Model, self).__init__()

        self.net = nn.Sequential(
            Unsqueeze(dim=1),
            #
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=4),
            nn.LeakyReLU(negative_slope=0.2),
            #
            nn.Conv2d(in_channels=16, out_channels=16, kernel_size=4),
            nn.MaxPool2d(kernel_size=2),
            nn.LeakyReLU(negative_slope=0.2),
            #
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3),
            nn.LeakyReLU(negative_slope=0.2),
            #
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=3),
            nn.MaxPool2d(kernel_size=2),
            nn.LeakyReLU(negative_slope=0.2),
            #
            nn.Flatten(),
            #
            nn.Linear(in_features=4032, out_features=32),
            nn.Linear(in_features=32, out_features=num_classes),
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output logits.
        """
        return self.net(x)

*PyTorch Lightning* allows us to define basic training, validation and test steps. 

In [None]:
class LitModel(pl.LightningModule):
    def __init__(
        self,
        lr: float,
        decay: bool = False,
        num_classes: int = 3,
    ):
        """Neural network Lightning module.

        Args:
            num_classes (int, optional): Number of classes. Defaults to 3.
        """
        super(LitModel, self).__init__()
        self.save_hyperparameters()

        self.model = Model()
        self.criterion = nn.CrossEntropyLoss()

        # * Metrics.
        self.train_recall = torchmetrics.Recall()
        self.val_recall = torchmetrics.Recall()
        self.test_recall = torchmetrics.Recall()

        self.train_precision = torchmetrics.Precision()
        self.val_precision = torchmetrics.Precision()
        self.test_precision = torchmetrics.Precision()

        self.train_f1 = torchmetrics.F1Score()
        self.val_f1 = torchmetrics.F1Score()
        self.test_f1 = torchmetrics.F1Score()

        self.train_cohen = torchmetrics.CohenKappa(num_classes=num_classes)
        self.val_cohen = torchmetrics.CohenKappa(num_classes=num_classes)
        self.test_cohen = torchmetrics.CohenKappa(num_classes=num_classes)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Output logits.
        """
        return self.model(x)

    def step(self, batch: List[torch.Tensor]) -> List[torch.Tensor]:
        """Base step.

        Args:
            batch (List[torch.Tensor]): Input batch.

        Returns:
            List[torch.Tensor]: Loss and predictions' indices.
        """
        x, y = batch
        x = x.to(torch.float)
        y = y.to(torch.long)

        logits = self(x)

        preds = torch.argmax(logits, dim=1)
        loss = self.criterion(logits, y)

        return [loss, preds]

    def training_step(
        self, batch: List[torch.Tensor], batch_idx: Optional[int]
    ) -> torch.Tensor:
        """Base trining step.

        Args:
            batch (List[torch.Tensor]): Input batch.
            batch_idx (Optional[int]): Input batch's index.

        Returns:
            [torch.Tensor]: Loss.
        """
        y = batch[1].to(torch.long)
        loss, preds = self.step(batch)

        self.train_recall(preds, y)
        self.train_precision(preds, y)
        self.train_f1(preds, y)
        self.train_cohen(preds, y)

        self.log_dict(
            {
                "train/loss": loss,
                "train/recall": self.train_recall.compute(),
                "train/precision": self.train_precision.compute(),
                "train/f1": self.train_f1.compute(),
                "train/cohen": self.train_cohen.compute(),
            },
            prog_bar=True,
            on_step=True,
            on_epoch=True,
        )

        return loss

    def validation_step(self, batch: List[torch.Tensor], batch_idx: Optional[int]):
        """Base validation step.

        Args:
            batch (List[torch.Tensor]): Input batch.
            batch_idx (Optional[int]): Input batch's index.
        """
        y = batch[1].to(torch.long)
        loss, preds = self.step(batch)

        self.val_recall(preds, y)
        self.val_precision(preds, y)
        self.val_f1(preds, y)
        self.val_cohen(preds, y)

        self.log_dict(
            {
                "val/loss": loss,
                "val/recall": self.val_recall.compute(),
                "val/precision": self.val_precision.compute(),
                "val/f1": self.val_f1.compute(),
                "val/cohen": self.val_cohen.compute(),
            },
            prog_bar=True,
            on_step=True,
            on_epoch=True,
        )

    def test_step(self, batch: List[torch.Tensor], batch_idx: Optional[int]):
        """Base test step.

        Args:
            batch (List[torch.Tensor]): Input batch.
            batch_idx (Optional[int]): Input batch's index.
        """
        y = batch[1].to(torch.long)
        loss, preds = self.step(batch)

        self.test_recall(preds, y)
        self.test_precision(preds, y)
        self.test_f1(preds, y)
        self.test_cohen(preds, y)

        self.log_dict(
            {
                "test/loss": loss,
                "test/recall": self.test_recall.compute(),
                "test/precision": self.test_precision.compute(),
                "test/f1": self.test_f1.compute(),
                "test/cohen": self.test_cohen.compute(),
            },
            prog_bar=True,
            on_step=True,
            on_epoch=True,
        )

    def configure_optimizers(self) -> List[torch.optim.Optimizer]:
        """Configure optimizer.

        Returns:
            List[torch.optim.Optimizer]: List of optimizers.
        """
        opt = torch.optim.Adam(self.parameters(), lr=self.hparams.lr)

        if not self.hparams.decay:
            return [opt]
        else:
            scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(opt)
            return [opt, scheduler]


# Training

Here we define the actual training setup.

In [None]:
pl.seed_everything(42)

ROOT_DIR = Path(".")

datamodule = LOBDataModule(data_dir=ROOT_DIR / "data", batch_size=32)
model = LitModel(lr=1e-3, decay=False)

# * Logger.
logger = NeptuneLogger(
    project="user/project",
    api_key="key"
    tags=["Conv"],
)

# * Trainer.
trainer = pl.Trainer(
    gpus=-1 if torch.cuda.is_available() else 0,
    max_epochs=30,
    logger=logger,
)

Train and validation.

In [None]:
trainer.fit(
    model=model,
    datamodule=datamodule,
)

Test.

In [None]:
trainer.test(
    model=model,
    datamodule=datamodule,
)