In [1]:
import numpy as np
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
import pytorch_lightning as pl

import os
from sklearn.preprocessing import StandardScaler
import tensorflow as tf

In [2]:
DATA_DIR = 'gs://time_series_datasets'
LOCAL_CACHE_DIR = './data_loader/dataset/'

# modularise the code 
class TSFDataLoader:
  """Generate data loader from raw data."""

  def __init__(
      self, data_dir, data,seq_len, pred_len, feature_type, target='OT'
  ):
    self.data_dir = data_dir
    self.data = data
    #self.batch_size = batch_size
    self.seq_len = seq_len
    self.pred_len = pred_len
    self.feature_type = feature_type
    self.target = target
    #self.target_slice = slice(0, None)

    self.train_df, self.val_df, self.test_df = self._read_data()

  def _read_data(self):
    """Load raw data and split datasets."""

    # copy data from cloud storage if not exists
    LOCAL_CACHE_DIR = self.data_dir
    if not os.path.isdir(LOCAL_CACHE_DIR):
      os.mkdir(LOCAL_CACHE_DIR)

    file_name = self.data + '.csv'
    cache_filepath = os.path.join(LOCAL_CACHE_DIR, file_name)
    if not os.path.isfile(cache_filepath):
      tf.io.gfile.copy(
          os.path.join(DATA_DIR, file_name), cache_filepath, overwrite=True
      )

    df_raw = pd.read_csv(cache_filepath)

    # S: univariate-univariate, M: multivariate-multivariate, MS:
    # multivariate-univariate
    df = df_raw.set_index('date')
    if self.feature_type == 'S':
      df = df[[self.target]]
    # elif self.feature_type == 'MS':
    # target_idx = df.columns.get_loc(self.target)
    # self.target_slice = slice(target_idx, target_idx + 1)

    # split train/valid/test
    n = len(df)
    if self.data.startswith('ETTm'):
      train_end = 12 * 30 * 24 * 4
      val_end = train_end + 4 * 30 * 24 * 4
      test_end = val_end + 4 * 30 * 24 * 4
    elif self.data.startswith('ETTh'):
      train_end = 12 * 30 * 24
      val_end = train_end + 4 * 30 * 24
      test_end = val_end + 4 * 30 * 24
    else:
      train_end = int(n * 0.7)
      val_end = n - int(n * 0.2)
      test_end = n
    train_df = df[:train_end]
    val_df = df[train_end - self.seq_len : val_end]
    test_df = df[val_end - self.seq_len : test_end]

    # standardize by training set
    self.scaler = StandardScaler()
    self.scaler.fit(train_df.values)

    def scale_df(df, scaler):
      data = scaler.transform(df.values)
      return pd.DataFrame(data, index=df.index, columns=df.columns)

    train_df = scale_df(train_df, self.scaler)
    val_df = scale_df(val_df, self.scaler)
    test_df = scale_df(test_df, self.scaler)
    #self.n_feature = self.train_df.shape[-1]

    return train_df, val_df, test_df 

In [3]:
#DATA_DIR = 'gs://time_series_datasets'
#LOCAL_CACHE_DIR = './data_loader/dataset/'

class DataExtractor:
    def __init__(self, df, row_length=432, tail_length=96):
        self.data = self.extract_contiguous_rows_with_stride(df, row_length, tail_length)


    def extract_contiguous_rows_with_stride(self, df, row_length=432, tail_length=96):
        num_rows = len(df)
        num_chunks = num_rows - row_length + 1

        contiguous_rows = []
        last_four_rows = []
        indices = []

        for i in range(num_chunks):
            chunk = df.iloc[i:i+row_length].values
            contiguous_rows.append(chunk[:row_length-tail_length])
            last_four_rows.append(chunk[-tail_length:])
            indices.append(i)  # Adding the index

        data = {
            "inputs": np.array(contiguous_rows),
            "labels": np.array(last_four_rows),
            "indices": np.array(indices)
        }

        return data

    def __len__(self):
        return len(self.data["indices"])

    def __getitem__(self, index):
        idx = self.data["indices"][index]
        return {
            "inputs": self.data["inputs"][idx],
            "labels": self.data["labels"][idx]
        }

In [4]:
class CustomDataset(Dataset):
    def __init__(self, data_extractor):
        self.data_extractor = data_extractor

    def __len__(self):
        return len(self.data_extractor)

    def __getitem__(self, index):
        data = self.data_extractor[index]
        inputs = torch.tensor(data["inputs"], dtype=torch.float32)
        labels = torch.tensor(data["labels"], dtype=torch.float32)
        return inputs, labels

In [5]:
class CustomDataModule(pl.LightningDataModule):
    def __init__(self, train_df, val_df, test_df, batch_size=32):
        super(CustomDataModule, self).__init__()
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df
        self.batch_size = batch_size

    def setup(self, stage=None):
        # Initialize datasets
        self.train_dataset = CustomDataset(self.train_df)
        self.val_dataset = CustomDataset(self.val_df)
        self.test_dataset = CustomDataset(self.test_df)

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)

    def test_dataloader(self):
        return DataLoader(self.test_dataset, batch_size=self.batch_size, shuffle=False)


In [6]:
# Example usage:
# Creating a sample train, validation and test DataFrame from the weather data
ts_weather = TSFDataLoader(data_dir='./data_loader/dataset/', data='weather', seq_len=432, pred_len=96, feature_type='MS', target='OT')
train_df, val_df, test_df = ts_weather._read_data()

In [7]:
train_df.shape, val_df.shape, test_df.shape

((36887, 21), (5702, 21), (10971, 21))

In [8]:
# Creating a DataExtractor instance
train_data = DataExtractor(train_df)
val_data = DataExtractor(val_df)
test_dat = DataExtractor(test_df)

In [9]:
# Creating a LightningDataModule instance
#data_module = CustomDataModule(data_extractor)
data_module_01= CustomDataModule(train_data, val_data, test_dat, batch_size=32)

In [10]:
import torch.nn as nn

In [101]:
class ResBlock(nn.Module):
    def __init__(self, input_dim, norm_type, activation, dropout, ff_dim):
        super(ResBlock, self).__init__()
        self.input_dim = input_dim
        self.norm = nn.LayerNorm(input_dim) if norm_type == 'L' else nn.BatchNorm1d(input_dim)
        self.temporal_linear = nn.Sequential(
            nn.Linear(input_dim, 336),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout)
        )
        self.feature_linear = nn.Sequential(
            nn.Linear(input_dim, ff_dim),
            nn.ReLU(inplace=True),
            nn.Dropout(dropout),
            nn.Linear(ff_dim, input_dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        print(x.size(1), x.size(2),self.input_dim) #21, 336, 21
        assert x.size(-1) == self.input_dim, f"Input tensor last dimension {x.size(-1)} does not match expected input features {self.input_dim}"
        res = x
        x = self.norm(x)
        print(x.size(-1), x.size(1))
        x = self.temporal_linear(x.transpose(1, 2)).transpose(1, 2)
        x = x + res
        x = self.norm(x)
        x = self.feature_linear(x)
        x = x + res
        return x
        
# Lambda layer for custom operations
class Lambda(nn.Module):
    def __init__(self, func):
        super(Lambda, self).__init__()
        self.func = func

    def forward(self, x):
        return self.func(x)

In [102]:
class RevNorm(nn.Module):
    """
    Reversible Instance Normalization.
    """

    def __init__(self, axis, eps=1e-5, affine=True):
        """
        Constructor for RevNorm.

        Args:
            axis (int): Axis or axes along which to compute mean and variance.
            eps (float): Small constant to avoid division by zero.
            affine (bool): If True, learnable affine parameters are applied.
        """
        super().__init__()
        self.axis = axis
        self.eps = eps
        self.affine = affine

        if self.affine:
            self.affine_weight = nn.Parameter(torch.ones(1))
            self.affine_bias = nn.Parameter(torch.zeros(1))

    def forward(self, x, mode, target_slice=None):
        """
        Forward pass of the RevNorm layer.

        Args:
            x (torch.Tensor): Input tensor.
            mode (str): 'norm' for normalization, 'denorm' for denormalization.
            target_slice (int): Target slice index for denormalization.

        Returns:
            torch.Tensor: Normalized or denormalized tensor.
        """
        if mode == 'norm':
            self._get_statistics(x)
            x = self._normalize(x)
        elif mode == 'denorm':
            x = self._denormalize(x, target_slice)
        else:
            raise NotImplementedError
        return x

    def _get_statistics(self, x):
        """
        Calculate mean and standard deviation of the input tensor.

        Args:
            x (torch.Tensor): Input tensor.
        """
        self.mean = torch.mean(x, dim=self.axis, keepdim=True).detach()
        self.stdev = torch.sqrt(torch.var(x, dim=self.axis, keepdim=True) + self.eps).detach()

    def _normalize(self, x):
        """
        Normalize the input tensor.

        Args:
            x (torch.Tensor): Input tensor.

        Returns:
            torch.Tensor: Normalized tensor.
        """
        x = x - self.mean
        x = x / self.stdev
        if self.affine:
            x = x * self.affine_weight
            x = x + self.affine_bias
        return x

    def _denormalize(self, x, target_slice=None):
        """
        Denormalize the input te nsor.

        Args:
            x (torch.Tensor): Input tensor.
            target_slice (int): Target slice index for denormalization.

        Returns:
            torch.Tensor: Denormalized tensor.
        """
        if self.affine:
            x = x - self.affine_bias
            x = x / self.affine_weight
        x = x * self.stdev[:, :, target_slice]
        x = x + self.mean[:, :, target_slice]
        return x

class RevNormLightning(pl.LightningModule):
    """
    PyTorch Lightning module for RevNorm.
    """

    def __init__(self, axis, eps=1e-5, affine=True):
        """
        Constructor for RevNormLightning.

        Args:
            axis (int): Axis or axes along which to compute mean and variance.
            eps (float): Small constant to avoid division by zero.
            affine (bool): If True, learnable affine parameters are applied.
        """
        super(RevNormLightning, self).__init__()
        self.revnorm = RevNorm(axis, eps, affine)

    def forward(self, x, mode, target_slice=None):
        """
        Forward pass of the RevNormLightning model.

        Args:
            x (torch.Tensor): Input tensor.
            mode (str): 'norm' for normalization, 'denorm' for denormalization.
            target_slice (int): Target slice index for denormalization.

        Returns:
            torch.Tensor: Normalized or denormalized tensor.
        """
        return self.revnorm(x, mode, target_slice)

    def configure_optimizers(self):
        """
        Configure optimizer for training.
        """
        return torch.optim.Adam(self.parameters(), lr=0.001)



In [103]:
#developing the whole pipeline: 
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torch.optim as optim
    
class TSMixer(pl.LightningModule):
    """
        Constructor for TSMixer.
        
        Args:
            input_shape (tuple): Input tensor shape.
            pred_len (int): Length of the prediction.
            norm_type (str): Type of normalization ('L' for LayerNorm, 'B' for BatchNorm).
            activation (nn.Module): Activation function.
            n_block (int): Number of ResBlocks in TSMixer.
            dropout (float): Dropout probability.
            ff_dim (int): Feature dimension.
            target_slice (int): Target slice index.
            rev_norm_inst(bool): Flag for reverse normalisation
    """
    
    def __init__(self, input_shape, pred_len, norm_type, activation, n_block, dropout, ff_dim, rev_norm_inst=False, target_slice=None):
        super(TSMixer, self).__init__()
        self.input_shape = input_shape
        self.pred_len = pred_len
        self.target_slice = target_slice
        self.activation = activation
        self.n_block = n_block

        self.rev_norm_inst = rev_norm_inst
        if self.rev_norm_inst:
              self.rev_norm = RevNormLightning(axis=-2)

        layers = []
        for _ in range(self.n_block):
            layers.append(ResBlock(self.input_shape[-1], norm_type, activation, dropout, ff_dim))
        self.blocks = nn.Sequential(*layers)

        self.output_layer = nn.Sequential(
            nn.Linear(self.input_shape[-1], self.pred_len),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        """
        Forward pass of the TSMixer model.
        
        Args:
            x (torch.Tensor): Input tensor.
            mode (str): 'norm' for training, 'denorm' for inference.
        
        Returns:
            torch.Tensor: Output tensor after passing through the TSMixer.
        
        """
        
        if self.rev_norm_inst:
               x = self.rev_norm(x, mode='norm')
        x = self.blocks(x)
        if self.target_slice:
            x = x[:, :, self.target_slice]
        x = x.transpose(1, 2) # [Batch, Channel, Input Length]
        x = self.output_layer(x) # [Batch, Channel, Output Length]
        x = x.transpose(1, 2) # [Batch, Output Length, Channel]
        outputs = x
        if self.rev_norm_inst:
          outputs = self.rev_norm(x, 'denorm', self.target_slice)
        return outputs
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        target_ = y.unsqueeze(dim=-1)
        loss = nn.MSELoss()(y_hat, target_)
        return loss

    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        target_ = y.unsqueeze(dim=-1)
        loss = nn.MSELoss()(y_hat, target_)
        #loss = nn.MSELoss()(y_hat, y)
        return loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=0.001)
        return optimizer

In [104]:
model = TSMixer(
                input_shape=(336,21),
                pred_len=4,
                norm_type='L',
                activation='relu',
                dropout=0.9,
                n_block=2,
                ff_dim=64,
                target_slice=slice(20,21,None)
            )

In [105]:
model

TSMixer(
  (blocks): Sequential(
    (0): ResBlock(
      (norm): LayerNorm((21,), eps=1e-05, elementwise_affine=True)
      (temporal_linear): Sequential(
        (0): Linear(in_features=21, out_features=336, bias=True)
        (1): ReLU(inplace=True)
        (2): Dropout(p=0.9, inplace=False)
      )
      (feature_linear): Sequential(
        (0): Linear(in_features=21, out_features=64, bias=True)
        (1): ReLU(inplace=True)
        (2): Dropout(p=0.9, inplace=False)
        (3): Linear(in_features=64, out_features=21, bias=True)
        (4): Dropout(p=0.9, inplace=False)
      )
    )
    (1): ResBlock(
      (norm): LayerNorm((21,), eps=1e-05, elementwise_affine=True)
      (temporal_linear): Sequential(
        (0): Linear(in_features=21, out_features=336, bias=True)
        (1): ReLU(inplace=True)
        (2): Dropout(p=0.9, inplace=False)
      )
      (feature_linear): Sequential(
        (0): Linear(in_features=21, out_features=64, bias=True)
        (1): ReLU(inplace=Tru

In [106]:
# Training the model using PyTorch Lightning Trainer
trainer = pl.Trainer(max_epochs=10, accelerator="mps")  # Use gpus=0 if you don't have a GPU
trainer.fit(model, datamodule=data_module_01)

GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name         | Type       | Params
--------------------------------------------
0 | blocks       | Sequential | 20.4 K
1 | output_layer | Sequential | 88    
--------------------------------------------
20.5 K    Trainable params
0         Non-trainable params
20.5 K    Total params
0.082     Total estimated model params size (MB)


Sanity Checking DataLoader 0:   0%|                                                                                                | 0/2 [00:00<?, ?it/s]336 21 21
21 336


RuntimeError: The size of tensor a (336) must match the size of tensor b (21) at non-singleton dimension 2

In [None]:
import torch.nn.functional as F

class SimpleMLP(pl.LightningModule):
    def __init__(self, input_dim, output_dim):
        super(SimpleMLP, self).__init__()
        self.input_dim = input_dim
        self.output_dim = output_dim
        self.fc1 = torch.nn.Linear(input_dim, input_dim)
        self.fc2 = torch.nn.Linear(input_dim, output_dim)
        

    def forward(self, x):
        x= F.relu(self.fc1(x))
        x= self.fc2(x)
        return x

    def training_step(self, batch, batch_idx):
        contiguous_rows, last_four_rows = batch
        contiguous_rows = contiguous_rows.view(contiguous_rows.size(0), -1)
        last_four_rows = last_four_rows.view(last_four_rows.size(0), -1)
        #nputs = torch.cat((contiguous_rows, last_four_rows), dim=1)
        outputs = self(contiguous_rows)
        loss = torch.nn.functional.mse_loss(outputs, last_four_rows)
        return loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=0.001)

# Example usage:
# Creating a sample DataFrame with 25 rows and 10 columns
data = np.random.rand(25, 10)
df = pd.DataFrame(data)

# Creating a DataExtractor instance
data_extractor = DataExtractor(df)

# Creating a LightningDataModule instance
data_module = CustomDataModule(data_extractor)

# Creating a SimpleMLP model
input_dim = 100 #10 * 10 * 2  # Input dimension after concatenating contiguous_rows and last_four_rows
output_dim = 40 # 10 * 4  # Output dimension (last_four_rows)
model = SimpleMLP(input_dim, output_dim)

# Training the model using PyTorch Lightning Trainer
trainer = pl.Trainer(max_epochs=10, accelerator="mps")  # Use gpus=0 if you don't have a GPU
trainer.fit(model, datamodule=data_module)

In [None]:
model

In [None]:
model.fc1.weight.shape

In [None]:
model.fc2.weight.shape

In [None]:
data_extractor[1]