# Train Image Classification Model

In [1]:
!pip install calflops lightning pyts==0.12.0
!pip install --no-deps tsai==0.3.9

Collecting calflops
  Downloading calflops-0.3.2-py3-none-any.whl.metadata (28 kB)
Collecting lightning
  Downloading lightning-2.5.1-py3-none-any.whl.metadata (39 kB)
Collecting pyts==0.12.0
  Downloading pyts-0.12.0-py3-none-any.whl.metadata (10 kB)
Downloading pyts-0.12.0-py3-none-any.whl (2.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.5/2.5 MB[0m [31m29.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading calflops-0.3.2-py3-none-any.whl (29 kB)
Downloading lightning-2.5.1-py3-none-any.whl (818 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m818.9/818.9 kB[0m [31m35.7 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyts, lightning, calflops
Successfully installed calflops-0.3.2 lightning-2.5.1 pyts-0.12.0
Collecting tsai==0.3.9
  Downloading tsai-0.3.9-py3-none-any.whl.metadata (16 kB)
Downloading tsai-0.3.9-py3-none-any.whl (324 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m324.3/324.

In [2]:
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, WeightedRandomSampler
from calflops import calculate_flops
import lightning as L
from lightning.pytorch.loggers import TensorBoardLogger
import math
from fastai.layers import ConvLayer, MaxPool, AdaptiveAvgPool, Flatten, SimpleSelfAttention, SEModule, NormType, AvgPool
from fastcore.basics import store_attr
from fastcore.meta import delegates
from typing import Literal
from lightning.pytorch.callbacks import EarlyStopping, ModelCheckpoint, LearningRateMonitor
from torch.utils.data import Dataset


class BaseLightningModel(L.LightningModule):
    def __init__(self, config=None):
        super().__init__()
        self.config = config
        self._val_loss = 0.0
        self._test_loss = 0.0
        self._val_batches = 0
        self._test_batches = 0

    def training_step(self, batch, batch_idx):
        inputs, target = batch
        output = self(inputs)
        loss = self.criterion(output, target)
        self.log("train_loss", loss)
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, target = batch
        output = self(inputs)
        self._val_loss += self.val_criterion(output, target).item()
        self._val_batches += 1

    def test_step(self, batch, batch_idx):
        inputs, target = batch
        output = self(inputs)
        self._test_loss += self.val_criterion(output, target).item()
        self._test_batches += 1

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        if isinstance(batch, list) or isinstance(batch, tuple):
            batch, y = batch
            return self(batch), y
        return self(batch)

    def configure_optimizers(self):
        if self.config.optimizer == "adam":
            optimizer = torch.optim.Adam(self.parameters(), lr=self.config.lr)
        elif self.config.optimizer == "adamw":
            optimizer = torch.optim.AdamW(self.parameters(), lr=self.config.lr)
        else:
            raise ValueError(f"Unknown optimizer: {self.config.optimizer}")
        if self.config.use_one_cycle:
            scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=self.lr, pct_start=0.25,
                                                            total_steps=self.trainer.estimated_stepping_batches)
            opt_config = {
                "optimizer": optimizer,
                "lr_scheduler": {
                    "scheduler": scheduler,
                    "interval": "step",
                    "frequency": 1,
                }
            }
            return opt_config
        return optimizer

    def configure_callbacks(self):
        callbacks = []
        early_stop = EarlyStopping(
            monitor="val_loss",
            patience=self.config.patience,
            min_delta=self.config.min_delta,
            verbose=True
        )
        callbacks.append(early_stop)
        if self.config.modelOutput is not None:
            checkpoint = ModelCheckpoint(
                dirpath=self.config.modelOutput,
                filename="best_{epoch:02d}-{val_loss:.3f}",
                monitor="val_loss",
                save_top_k=1,
                verbose=True,
                save_last=True
            )
            callbacks.append(checkpoint)
        lr_monitor = LearningRateMonitor(logging_interval="step")
        callbacks.append(lr_monitor)
        return callbacks

    def on_validation_epoch_end(self):
        """Compute and log validation metrics at the end of the validation epoch."""
        val_loss = self._val_loss / self._val_batches
        self.log("val_loss", val_loss)
        self._val_loss = 0.0
        self._val_batches = 0

    def on_test_epoch_end(self):
        """Compute and log test metrics at the end of the test epoch."""
        test_loss = self._test_loss / self._test_batches
        self.log("test_loss", test_loss)
        self._test_loss = 0.0
        self._test_batches = 0

    @staticmethod
    def get_act(activation: Literal["relu", "leakyrelu", "mish", "silu", "hardswish", "gelu", "celu", "elu"] = "relu"):
        activation = activation.lower()
        if activation == "relu":
            return torch.nn.ReLU
        elif activation == "leakyrelu":
            return torch.nn.LeakyReLU
        elif activation == "mish":
            return torch.nn.Mish
        elif activation == "silu":  # Swish
            return torch.nn.SiLU
        elif activation == "hardswish":
            return torch.nn.Hardswish
        elif activation == "gelu":
            return torch.nn.GELU
        elif activation == "celu":
            return torch.nn.CELU
        elif activation == "elu":
            return torch.nn.ELU
        else:
            raise ValueError(f"Unknown activation function: {activation}")


class BaseClassifier(BaseLightningModel):
    def __init__(self, criterion, val_criterion, optimizer, lr, patience, min_delta, checkpoint_dir, use_one_cycle):
        super().__init__()
        self.criterion = criterion if criterion is not None else nn.BCEWithLogitsLoss()
        self.val_criterion = val_criterion if val_criterion is not None else criterion
        self.optimizer_name = optimizer
        self.lr = lr
        self.patience = patience
        self.min_delta = min_delta
        self.checkpoint_dir = checkpoint_dir
        self.use_one_cycle = use_one_cycle

        # self.save_hyperparameters()

    def configure_optimizers(self):
        if self.optimizer_name == "adam":
            optimizer = torch.optim.Adam(self.parameters(), lr=self.lr)
        elif self.optimizer_name == "adamw":
            optimizer = torch.optim.AdamW(self.parameters(), lr=self.lr)
        else:
            raise ValueError(f"Unknown optimizer: {self.optimizer_name}")
        if self.use_one_cycle:
            scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=self.lr, pct_start=0.25,
                                                            total_steps=self.trainer.estimated_stepping_batches)
            opt_config = {
                "optimizer": optimizer,
                "lr_scheduler": {
                    "scheduler": scheduler,
                    "interval": "step",
                    "frequency": 1,
                }
            }
            return opt_config
        return optimizer

    def configure_callbacks(self):
        callbacks = []
        early_stop = EarlyStopping(
            monitor="val_loss",
            patience=self.patience,
            min_delta=self.min_delta,
            verbose=True
        )
        callbacks.append(early_stop)
        if self.checkpoint_dir is not None:
            checkpoint = ModelCheckpoint(
                dirpath=self.checkpoint_dir,
                filename="best_{epoch:02d}-{val_loss:.3f}",
                monitor="val_loss",
                save_top_k=1,
                verbose=True,
                save_last=True
            )
            callbacks.append(checkpoint)
        lr_monitor = LearningRateMonitor(logging_interval="step")
        callbacks.append(lr_monitor)
        return callbacks


class GAFTransform(nn.Module):
    """Transforms a batch of times eries data to a Gramian Angular Field (GAF)."""
    def __init__(self, method: Literal["summation", "difference"] = "summation", eps: float = 1e-6):
        super().__init__()
        assert method in ["summation", "difference"], "Method must be either 'summation' or 'difference'"
        self.method = method
        self.eps = eps

    def forward(self, x):
        """
        Expects x to be of shape (batch_size, channels, seq_len)
        Returns image tensor of shape (batch_size, channels, seq_len, seq_len)
        """
        x_cos = self.min_max_scale(x)  # Min-Max scaling to range [-1, 1]

        # Calculate GAF
        x_sin = (1 - x_cos ** 2) ** 0.5
        if self.method == "summation":
            gaf = torch.einsum("bci,bcj->bcij", x_cos, x_cos) - torch.einsum("bci,bcj->bcij", x_sin, x_sin)
        else:
            gaf = torch.einsum("bci,bcj->bcij", x_sin, x_cos) - torch.einsum("bci,bcj->bcij", x_cos, x_sin)

        gaf = gaf / 2 + 0.5  # Scale images to range [0, 1]
        return gaf

    def min_max_scale(self, x):
        """Min-Max scaling each sequence to range [-1, 1]"""
        mins = x.min(dim=-1, keepdim=True).values
        maxs = x.max(dim=-1, keepdim=True).values
        return 2 * (x - mins) / (maxs - mins + self.eps) - 1


class TSDataset(Dataset):
    def __init__(
            self,
            X: np.ndarray,
            y: np.ndarray = None,
            window_size: int = 1,
            stride: int = 1,
    ):

        self.window_size = window_size
        self.stride = stride
        self.number_of_windows = ((len(X) - window_size) // stride) + 1
        self.X = torch.tensor(X, dtype=torch.float32)
        self.y = torch.tensor(y, dtype=torch.float32).squeeze() if y is not None else None

    def __len__(self):
        return self.number_of_windows

    def __getitem__(self, idx):
        # get window
        start_idx = idx * self.stride
        end_idx = start_idx + self.window_size

        # apply instance normalization
        window = self.X[start_idx:end_idx]  # (window_size, n_features)

        # change to channels first format
        window = window.permute(1, 0)

        # get label and return
        if self.y is not None:
            return window, self.y[end_idx - 1]
        return window


class ResBlock(nn.Module):
    """Resnet block from `ni` to `nh` with `stride`"""

    @delegates(ConvLayer.__init__)
    def __init__(self, expansion, ni, nf, stride=1, groups=1, reduction=None, nh1=None, nh2=None, dw=False, g2=1,
                 sa=False, sym=False, norm_type="Batch", act_cls=nn.ReLU, ndim=2, ks=3,
                 pool=AvgPool, pool_first=True, **kwargs):
        super().__init__()
        norm2 = (NormType.BatchZero if norm_type == "Batch" else
                 NormType.InstanceZero if norm_type == "Instance" else norm_type)
        if nh2 is None:
            nh2 = nf
        if nh1 is None:
            nh1 = nh2
        nf, ni = nf * expansion, ni * expansion
        k0 = dict(norm_type=norm_type, act_cls=act_cls, ndim=ndim, **kwargs)
        k1 = dict(norm_type=norm2, act_cls=None, ndim=ndim, **kwargs)
        convpath = [ConvLayer(ni, nh2, ks, stride=stride, groups=ni if dw else groups, **k0),
                    ConvLayer(nh2, nf, ks, groups=g2, **k1)
                    ] if expansion == 1 else [
            ConvLayer(ni, nh1, 1, **k0),
            ConvLayer(nh1, nh2, ks, stride=stride, groups=nh1 if dw else groups, **k0),
            ConvLayer(nh2, nf, 1, groups=g2, **k1)]
        if reduction:
            convpath.append(SEModule(nf, reduction=reduction, act_cls=act_cls))
        if sa:
            convpath.append(SimpleSelfAttention(nf, ks=1, sym=sym))
        self.convpath = nn.Sequential(*convpath)
        idpath = []
        if ni != nf:
            idpath.append(ConvLayer(ni, nf, 1, act_cls=None, ndim=ndim, **kwargs))
        if stride != 1:
            idpath.insert((1, 0)[pool_first], pool(stride, ndim=ndim, ceil_mode=True))
        self.idpath = nn.Sequential(*idpath)
        self.act = nn.ReLU() if act_cls is None else act_cls()

    def forward(self, x):
        return self.act(self.convpath(x) + self.idpath(x))


def SEBlock(expansion, ni, nf, groups=1, reduction=16, stride=1, **kwargs):
    return ResBlock(expansion, ni, nf, stride=stride, groups=groups, reduction=reduction, nh1=nf * 2,
                    nh2=nf * expansion, **kwargs)


def SEResNeXtBlock(expansion, ni, nf, groups=32, reduction=16, stride=1, base_width=4, **kwargs):
    w = math.floor(nf * (base_width / 64)) * groups
    return ResBlock(expansion, ni, nf, stride=stride, groups=groups, reduction=reduction, nh2=w, **kwargs)


def SeparableBlock(expansion, ni, nf, reduction=16, stride=1, base_width=4, **kwargs):
    return ResBlock(expansion, ni, nf, stride=stride, reduction=reduction, nh2=nf * 2, dw=True, **kwargs)


def init_cnn(m):
    if getattr(m, 'bias', None) is not None:
        nn.init.constant_(m.bias, 0)
    if isinstance(m, (nn.Conv1d, nn.Conv2d, nn.Conv3d, nn.Linear)):
        nn.init.kaiming_normal_(m.weight)
    for l in m.children():
        init_cnn(l)


class XResNet(BaseClassifier):
    @delegates(ResBlock)
    def __init__(self, block: str, expansion: int, layers: list, p=0.0, c_in=6, n_out=6, stem_szs=(32, 32, 64),
                 widen=1.0, sa=False, act_cls="relu", ndim=2, ks=3, stride=2, criterion=None, val_criterion=None,
                 optimizer="adam", lr=1e-3, patience=5, min_delta=0.0, checkpoint_dir=None, use_one_cycle=False, **kwargs):
        super().__init__(criterion, val_criterion, optimizer, lr, patience, min_delta, checkpoint_dir, use_one_cycle)
        store_attr('expansion,ndim,ks')
        self.act_cls = self.get_act(act_cls)
        self.block = self._get_block(block)
        if ks % 2 == 0:
            raise Exception('kernel size has to be odd!')

        self.transform = GAFTransform("summation")

        # Create stem layers
        stem_szs = [c_in, *stem_szs]
        self.stem = nn.ModuleList([
            ConvLayer(stem_szs[i], stem_szs[i + 1], ks=ks, stride=stride if i == 0 else 1,
                      act_cls=self.act_cls, ndim=ndim)
            for i in range(3)
        ])

        # Create maxpool layer
        self.maxpool = MaxPool(ks=ks, stride=stride, padding=ks // 2, ndim=ndim)

        # Create blocks
        layer_sizes = [64, 128, 256, 512][:len(layers)]
        block_szs = [int(o * widen) for o in layer_sizes + [256] * (len(layers) - 4)]
        block_szs = [64 // expansion] + block_szs
        self.blocks = nn.ModuleList(self._make_blocks(layers, block_szs, sa, stride, **kwargs))

        # Create head layers
        self.adaptive_pool = AdaptiveAvgPool(sz=1, ndim=ndim)
        self.flatten = Flatten()
        self.dropout = nn.Dropout(p)
        self.fc = nn.Linear(block_szs[-1] * expansion, n_out)

        init_cnn(self)

    def _make_blocks(self, layers, block_szs, sa, stride, **kwargs):
        return [self._make_layer(ni=block_szs[i], nf=block_szs[i + 1], blocks=l,
                                 stride=1 if i == 0 else stride, sa=sa and i == len(layers) - 4, **kwargs)
                for i, l in enumerate(layers)]

    def _make_layer(self, ni, nf, blocks, stride, sa, **kwargs):
        return nn.Sequential(
            *[self.block(self.expansion, ni if i == 0 else nf, nf, stride=stride if i == 0 else 1,
                         sa=sa and i == (blocks - 1), act_cls=self.act_cls, ndim=self.ndim, ks=self.ks, **kwargs)
              for i in range(blocks)])

    def _get_block(self, block_name: str = "resblock"):
        block_name = block_name.lower()
        if block_name == "resblock":
            return ResBlock
        elif block_name == "seblock":
            return SEBlock
        elif block_name == "seresnextblock":
            return SEResNeXtBlock
        elif block_name == "separableblock":
            return SeparableBlock
        else:
            raise ValueError(f"Block {block_name} not recognized")

    def forward(self, x):
        # Convert to GAF
        x = self.transform(x)

        # Apply stem layers
        for stem_layer in self.stem:
            x = stem_layer(x)

        # Apply maxpool
        x = self.maxpool(x)

        # Apply blocks
        for block in self.blocks:
            x = block(x)

        # Apply head
        x = self.adaptive_pool(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.fc(x)

        return x


def get_weighted_sampler(dataset: TSDataset, num_samples: int):
    global_labels = dataset.y.numpy().max(axis=1).astype(int)
    unique, counts = np.unique(global_labels, return_counts=True)
    class_weights = 1.0 / counts
    sample_weights = class_weights[global_labels]
    sample_weights = sample_weights[dataset.window_size - 1::dataset.stride]
    return WeightedRandomSampler(sample_weights, num_samples, replacement=True)


def get_class_weights(labels: np.ndarray):
    pos_counts = labels.sum(axis=0)
    neg_counts = len(labels) - pos_counts
    class_weights = neg_counts / pos_counts
    print("Class weights:", class_weights)
    device = "cuda" if torch.cuda.is_available() else "cpu"
    return torch.tensor(class_weights, dtype=torch.float32, device=device)

In [3]:
# num_samples_per_epoch = 2_097_152  # 2048 steps
epochs = 10
batch_size = 1024
accumulate_grad_batches = 2048 // batch_size  # virtual batch size of 2048
split = 264_960
file_dir = "/kaggle/working"

seed = 0
in_channels = 6
out_channels = 6
eval_steps = 1024  # val_check_interval

In [4]:
df_train = pd.read_csv("/kaggle/input/esa-mission-1-train-dataset/84_months.train.csv")

target_channels = [f"channel_{i}" for i in range(41, 47)]
label_cols = ["is_anomaly_" + col for col in target_channels]

train_array = df_train[target_channels].iloc[:-split].values.astype(np.float32)
val_array = df_train[target_channels].iloc[-split:].values.astype(np.float32)
train_labels = df_train[label_cols].iloc[:-split].values.astype(np.float32).clip(0., 1.)
val_labels = df_train[label_cols].iloc[-split:].values.astype(np.float32).clip(0., 1.)

class_weights = get_class_weights(train_labels)

train_array.shape, train_labels.shape, val_array.shape, val_labels.shape

Class weights: [75.98615178 76.00953507 75.47281678 75.46128577 76.02624612 75.9719617 ]


((7099201, 6), (7099201, 6), (264960, 6), (264960, 6))

In [5]:
# sample model configuration
window_size = 88
block = "resblock"
expansion = 4
layers = [1] * 2
p = 0.1
stem_szs_0 = 32
stem_szs_1 = 8
widen = 0.25
sa = True
act_cls = "leakyrelu"
ks = 7
stride = 3

# set seed
L.seed_everything(seed=seed, verbose=False)

# setup datasets
train_dataset = TSDataset(train_array, train_labels, window_size=window_size)
val_dataset = TSDataset(val_array, val_labels, window_size=window_size)

# train_sampler = get_weighted_sampler(train_dataset, num_samples=num_samples_per_epoch)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, drop_last=False, sampler=None)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, drop_last=False)

# initialize model
model = XResNet(
    block=block,
    expansion=expansion,
    layers=layers,
    p=p,
    c_in=in_channels,
    n_out=out_channels,
    stem_szs=(stem_szs_0, stem_szs_1, 64),
    widen=widen,
    sa=sa,
    act_cls=act_cls,
    ndim=2,
    ks=ks,
    stride=stride,
    # fixed BaseClassifier params
    criterion=nn.BCEWithLogitsLoss(pos_weight=class_weights),
    val_criterion=nn.BCEWithLogitsLoss(pos_weight=class_weights),
    optimizer="adam",
    lr=1e-4,
    patience=40,
    min_delta=0.,
    checkpoint_dir=file_dir,
    use_one_cycle=True
).cuda()

# calculate MACs and model parameters
flops, macs, num_params = calculate_flops(
    model=model,
    input_shape=(1, in_channels, window_size),
    print_results=True,
    print_detailed=False,
    output_as_string=False,
    include_backPropagation=False,
)

# Setup trainer
# torch.set_float32_matmul_precision("high")

trainer = L.Trainer(
    accelerator="gpu",
    max_epochs=epochs,
    max_time="00:05:00:00",
    log_every_n_steps=16,
    accumulate_grad_batches=accumulate_grad_batches,
    logger=TensorBoardLogger(save_dir=file_dir, name="lightning_logs"),
    val_check_interval=eval_steps,
    enable_model_summary=True,
    enable_progress_bar=False,
)


trainer.fit(model, train_dataloaders=train_loader, val_dataloaders=val_loader)

INFO: You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
INFO: GPU available: True (cuda), used: True
INFO: TPU available: False, using: 0 TPU cores
INFO: HPU available: False, using: 0 HPUs
INFO: The following callbacks returned in `LightningModule.configure_callbacks` will override existing callbacks passed to Trainer: ModelCheckpoint



------------------------------------- Calculate Flops Results -------------------------------------
Notations:
number of parameters (Params), number of multiply-accumulate operations(MACs),
number of floating-point operations (FLOPs), floating-point operations per second (FLOPS),
fwd FLOPs (model forward propagation FLOPs), bwd FLOPs (model backward propagation FLOPs),
default model backpropagation takes 2.00 times as much computation as forward propagation.

Total Training Params:                                                  127.86 K
fwd MACs:                                                               45 MMACs
fwd FLOPs:                                                              90.48 MFLOPS
fwd+bwd MACs:                                                           135 MMACs
fwd+bwd FLOPs:                                                          271.45 MFLOPS
---------------------------------------------------------------------------------------------------


/usr/local/lib/python3.10/dist-packages/lightning/pytorch/callbacks/model_checkpoint.py:654: Checkpoint directory /kaggle/working exists and is not empty.
INFO: LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
INFO: Loading `train_dataloader` to estimate number of stepping batches.
/usr/local/lib/python3.10/dist-packages/lightning/pytorch/trainer/connectors/data_connector.py:425: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=3` in the `DataLoader` to improve performance.
INFO: 
  | Name          | Type              | Params | Mode
-----------------------------------------------------------
0 | criterion     | BCEWithLogitsLoss | 0      | eval
1 | val_criterion | BCEWithLogitsLoss | 0      | eval
2 | transform     | GAFTransform      | 0      | eval
3 | stem          | ModuleList        | 47.2 K | eval
4 | maxpool       | MaxPool2d         | 0      | eval
5 | blocks        | ModuleList  