In [11]:
import onnx
import onnxruntime

In [12]:
import torch
import torch.nn as nn
import pytorch_lightning as pl


class LSTMRegressor(nn.Module):
    """
    Standard LSTM model with PyTorch Lightning.
    """
    def __init__(self,
        batch_size: int,
        dropout_rate: float,
        hidden_size: int,
        number_of_features: int,
        number_of_layers: int,
        run_on_gpu: bool,
    ):
        super().__init__()
        self.batch_size = batch_size
        self.dropout_rate = dropout_rate
        self.hidden_size = hidden_size
        self.n_features = number_of_features
        self.number_of_layers = number_of_layers
        self.run_on_gpu = run_on_gpu

        self.lstm = nn.LSTM(
            batch_first=True,
            dropout=self.dropout_rate,
            hidden_size=self.hidden_size,
            input_size=self.n_features,
            num_layers=self.number_of_layers,
        )

        self.regressor = nn.Linear(self.hidden_size, 1)


    def forward(self, x):
        """
        Forward pass through the model.

        lstm_out = (batch_size, sequence_length, hidden_size)
        """
        if self.run_on_gpu:
            self.lstm.flatten_parameters()
        _, (hidden, _) = self.lstm(x)
        out = hidden[-1]
        return self.regressor(out)


class PricePredictor(pl.LightningModule):
    """
    Training model with PyTorch Lightning.
    """
    def __init__(self,
        batch_size: int,
        dropout_rate: float,
        hidden_size: int,
        learning_rate: float,
        number_of_features: int,
        number_of_layers: int,
        run_on_gpu: bool,
        criterion: nn.Module = nn.MSELoss(),
    ):
        super().__init__()
        self.model = LSTMRegressor(
            batch_size, dropout_rate, hidden_size, number_of_features, number_of_layers, run_on_gpu,
        )
        self.learning_rate = learning_rate
        self.criterion = criterion
        self.save_hyperparameters()


    def forward(self, x, labels=None):
        output = self.model(x)
        if labels is not None:
            loss = self.criterion(output, labels.unsqueeze(dim=1))
            return loss, output
        return output

        
    def training_step(self, batch, batch_idx):
        sequences, labels = batch
        loss, _ = self(sequences, labels)
        self.log("train/loss", loss, on_step=True, on_epoch=True)
        return {"loss": loss}


    def validation_step(self, batch, batch_idx):
        sequences, labels = batch
        loss, _ = self(sequences, labels)
        self.log("valid/loss", loss, on_step=True, on_epoch=True)
        return {"loss": loss}


    def test_step(self, batch, batch_idx):
        sequences, labels = batch
        loss, _ = self(sequences, labels)
        self.log("test/loss", loss, on_step=True, on_epoch=True)
        return {"loss": loss}


    def configure_optimizers(self):
        return torch.optim.AdamW(self.parameters(), lr=self.learning_rate)

In [13]:
import pandas as pd
import pytorch_lightning as pl
import torch

from torch.utils.data import DataLoader, Dataset
from typing import List, Tuple


class CryptoDataset(Dataset):
    """
    Dataset class for the LSTM model used by PyTorch Lightning.
    """
    def __init__(self, sequences: List[Tuple[pd.DataFrame, float]]):
        self.sequences = sequences


    def __len__(self):
        return len(self.sequences)


    def __getitem__(self, index: int):
        sequence, label = self.sequences[index]
        return (torch.Tensor(sequence.to_numpy()),torch.tensor(label).float())


class LSTMDataLoader(pl.LightningDataModule):
    """
    Data loader for the LSTM model.
    """
    def __init__(self,
        train_sequences: List[Tuple[pd.DataFrame, float]],
        val_sequences: List[Tuple[pd.DataFrame, float]],
        test_sequences: List[Tuple[pd.DataFrame, float]],
        train_batch_size: int,
        val_batch_size: int,
        train_workers: int = 2,
        val_workers: int = 1,
    ):
        super().__init__()
        self.train_sequences = train_sequences
        self.val_sequences = val_sequences
        self.test_sequences = test_sequences
        self.train_batch_size = train_batch_size
        self.val_batch_size = val_batch_size
        self.train_workers = train_workers
        self.val_workers = val_workers
        self.test_workers = val_workers


    def setup(self, stage: str = None):
        """
        Load the data.
        """
        self.train_dataset = CryptoDataset(self.train_sequences)
        self.val_dataset = CryptoDataset(self.val_sequences)
        self.test_dataset = CryptoDataset(self.test_sequences)

    
    def train_dataloader(self):
        return DataLoader(
            self.train_dataset, 
            batch_size=self.train_batch_size, 
            shuffle=False,
            num_workers=self.train_workers
        )


    def val_dataloader(self):
        return DataLoader(
            self.val_dataset, 
            batch_size=self.val_batch_size, 
            shuffle=False,
            num_workers=self.val_workers
        )


    def test_dataloader(self):
        return DataLoader(
            self.test_dataset, 
            batch_size=self.val_batch_size, 
            shuffle=False,
            num_workers=self.test_workers
        )


def create_sequences(
    input_data: pd.DataFrame, 
    target_column: str, 
    sequence_length: int
    ) -> List[Tuple[pd.DataFrame, float]]:
    """
    Create sequences from the input data.
    """
    sequences = []
    size = len(input_data)
    for i in range(size - sequence_length):
        sequence = input_data[i: i + sequence_length]
        label_position = i + sequence_length
        label = input_data.iloc[label_position][target_column]
        sequences.append([sequence, label])
    return sequences


def split_train_and_val_sequences(
    sequences: List[Tuple[pd.DataFrame, float]],
    val_size: float,
) -> Tuple[List[Tuple[pd.DataFrame, float]]]:
    """
    Split sequences into training and validation sets.
    """
    train_sequences, val_sequences = [], []
    for sequence, label in sequences:
        if len(train_sequences) < len(sequences) * (1 - val_size):
            train_sequences.append((sequence, label))
        else:
            val_sequences.append((sequence, label))
    return train_sequences, val_sequences

In [14]:
train_df = pd.read_csv("../data/05_model_input/scaled_train_data.csv")
test_df = pd.read_csv("../data/05_model_input/scaled_test_data.csv")

test_sequences = create_sequences(test_df, "close", 60)
train_sequences = create_sequences(train_df, "close", 60)
train_sequences, val_sequences = split_train_and_val_sequences(train_sequences, 0.2)

data = LSTMDataLoader(
    train_sequences=train_sequences, 
    val_sequences=val_sequences, 
    test_sequences=test_sequences, 
    train_batch_size=2, 
    val_batch_size=1,
)
data.setup()
model = PricePredictor.load_from_checkpoint("../data/06_models/epoch=4-step=7854.ckpt")
model.eval()
X, y = next(iter(data.train_dataloader()))
with torch.no_grad():
    torch_out = model(X)

In [15]:
X.shape

torch.Size([2, 60, 9])

In [16]:
path_onnx_model = "../data/06_models/model.onnx"
onnx_model = onnx.load(path_onnx_model)
onnx.checker.check_model(onnx_model)

ort_session = onnxruntime.InferenceSession(path_onnx_model)
ort_session.get_providers()


['CPUExecutionProvider']

In [17]:
def to_numpy(tensor):
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

In [18]:
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(X)}
ort_outs = ort_session.run(None, ort_inputs)

In [19]:
import numpy as np 

np.testing.assert_allclose(to_numpy(torch_out), ort_outs[0], rtol=0.001, atol=0.00001)
print("All good!")

All good!


In [20]:
torch_out

tensor([[-0.6997],
        [-0.7026]])

In [21]:
ort_outs

[array([[-0.6996894 ],
        [-0.70255405]], dtype=float32)]