In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD, AdamW
import lightning as pl
from torch.utils.data import DataLoader,random_split
from torchvision import datasets, transforms
from lightning.pytorch.callbacks import ModelCheckpoint
from torch.optim.lr_scheduler import ReduceLROnPlateau, StepLR, CosineAnnealingLR
from lightning.pytorch.loggers.tensorboard import TensorBoardLogger
from lightning.pytorch.callbacks.early_stopping import EarlyStopping
from torchmetrics import Accuracy
import torchvision.models as models
from lightning.pytorch import Callback

# Optimizers and schedulars class
### *here I have written a class which consists of some optimizers and some schedulars to make it easy to combine them together and use in model*

In [2]:
class OptimizerSchedulerFactory:
    def __init__(self, model_parameters, optimizer_name="adam", lr=1e-4, scheduler_name=None):
        self.model_parameters = model_parameters
        self.optimizer_name = optimizer_name.lower()
        self.lr = lr
        self.scheduler_name = scheduler_name.lower() if scheduler_name else None

    def get_optimizer(self):
        if self.optimizer_name == "adam":
            optimizer = Adam(self.model_parameters, lr=self.lr)
        elif self.optimizer_name == "sgd":
            optimizer = SGD(self.model_parameters, lr=self.lr, momentum=0.9)
        elif self.optimizer_name == "adamw":
            optimizer = AdamW(self.model_parameters, lr=self.lr)
        else:
            raise ValueError(f"Unsupported optimizer: {self.optimizer_name}")
        return optimizer

    def get_scheduler(self, optimizer):
        if self.scheduler_name is None:
            return None
        elif self.scheduler_name == "reduce_on_plateau":
            scheduler = ReduceLROnPlateau(optimizer, mode="min", factor=0.1, patience=5)
        elif self.scheduler_name == "step_lr":
            scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
        elif self.scheduler_name == "cosine_annealing":
            scheduler = CosineAnnealingLR(optimizer, T_max=50)
        else:
            raise ValueError(f"Unsupported scheduler: {self.scheduler_name}")
        return scheduler

    def get_optimizer_scheduler(self):
        optimizer = self.get_optimizer()
        scheduler = self.get_scheduler(optimizer)
        return optimizer, scheduler


# Model
### *Here I write the model based on it's structure in the article*

In [7]:
class SEBlock(nn.Module):

    def __init__(self, channels, reduction_ratio):
        super(SEBlock, self).__init__()
        self.global_pooling = nn.AdaptiveAvgPool2d(1)
        self.fc1 = nn.Linear(channels, channels // reduction_ratio)
        self.activation = nn.Mish()
        self.fc2 = nn.Linear(channels // reduction_ratio, channels)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        batch, channels, _, _ = x.size()
        y = self.global_pooling(x).view(batch, channels)
        y = self.fc1(y)
        y = self.activation(y)
        y = self.fc2(y)
        y = self.sigmoid(y).view(batch, channels, 1, 1)
        return x * y


# class SeparableConvBlock(nn.Module):
#     def __init__(self, in_channels, out_channels, kernel_size, activation,padding):
#         super(SeparableConvBlock, self).__init__()
#         self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding=padding)
#         self.activation = activation

#     def forward(self, x):
#         x = self.conv(x)
#         return self.activation(x)
class SeparableConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, activation, padding):
        super(SeparableConvBlock, self).__init__()
        self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=kernel_size, 
                                   padding=padding, groups=in_channels)
        self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1)
        self.activation = activation

    def forward(self, x):
        x = self.depthwise(x)
        x = self.activation(x)
        x = self.pointwise(x)
        return self.activation(x)



class MultiScaleModel(pl.LightningModule):
    def __init__(self, f, k, p1, p2, input_channels):
        super(MultiScaleModel, self).__init__()

        self.mish = nn.Mish()


        self.block_01 = SeparableConvBlock(input_channels, int(1.5 * f), (k, 2), self.mish, padding='same')
        self.block_02 = SeparableConvBlock(int(1.5 * f), f, (k, 1), self.mish, padding='same')



        self.block_11 = SeparableConvBlock(input_channels, int(1.5 * f), (k, 1), self.mish, padding='same')
        self.block_12 = SeparableConvBlock(int(1.5 * f), f, (k, 2), self.mish, padding='same')



        self.block_21 = SeparableConvBlock(input_channels, f, (k, 2), self.mish, padding='same')
        self.block_22 = SeparableConvBlock(f, f, (k, 2), self.mish, padding='same')



        self.block_31 = SeparableConvBlock(input_channels, f, (k, 1), self.mish, padding='same')
        self.block_32 = SeparableConvBlock(f, f, (k, 1), self.mish, padding='same')


        self.se_block = SEBlock(4 * f, reduction_ratio=16)
        self.max_pool = nn.MaxPool2d(kernel_size=(p1, p2))


        self.final_conv = SeparableConvBlock(4 * f, f, (1, 1), self.mish, padding='same')



    def forward(self, x):

        x1 = self.block_01(x)
        x1 = self.block_02(x1)

        x2 = self.block_11(x)
        x2 = self.block_12(x2)

        x3 = self.block_21(x)
        x3 = self.block_22(x3)

        x4 = self.block_31(x)
        x4 = self.block_32(x4)

        concat_out = torch.cat([x1, x2, x3, x4], dim=1)

        se_out = self.se_block(concat_out)

        pooled_out = self.max_pool(se_out)

        out = self.final_conv(pooled_out)


        return out


class FinalAMCModel(pl.LightningModule):
    def __init__(self, input_shape=(1024, 2), num_classes=24, optimizer_name="adam", scheduler_name="reduce_on_plateau"):
        super(FinalAMCModel, self).__init__()
        
        self.multiscale_1 = MultiScaleModel(f=64, k=9, p1=4, p2=1, input_channels=1)

        self.multiscale_2 = MultiScaleModel(f=32, k=5, p1=4, p2=1, input_channels=64)

        self.max_pooling = nn.MaxPool2d(kernel_size=(8, 2))

        self.separable_conv = SeparableConvBlock(in_channels=32, out_channels=16, kernel_size=(1, 1), activation=nn.Mish(), padding='same')

        self.flatten = nn.Flatten()
    
        self.dense_1 = nn.Sequential(
            nn.Linear(128, 128),
            nn.Dropout(p=0.25), 
            nn.Mish()
        )

        self.dense_2 = nn.Linear(128, num_classes)

        self.softmax = nn.Softmax(dim=1)

    def forward(self, x):
        x=x.unsqueeze(1)

        x = self.multiscale_1(x)

        x = self.multiscale_2(x)

        x = self.max_pooling(x)

        x = self.separable_conv(x)

        x = self.flatten(x)

        x = self.dense_1(x)


        x = self.dense_2(x)

        return x
    
    
    def predict(self, x):
        logits = self.forward(x)
        probabilities = self.softmax(logits)
        return probabilities

    def training_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self(inputs)
        loss = F.cross_entropy(outputs, targets)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self(inputs)
        loss = F.cross_entropy(outputs, targets)
        acc = (outputs.argmax(dim=1) == targets).float().mean()
        self.log('val_loss', loss, on_epoch=True, prog_bar=True)
        self.log('val_acc', acc, on_epoch=True, prog_bar=True)
        return {"val_loss": loss, "val_acc": acc}

    def test_step(self, batch, batch_idx):
        inputs, targets = batch
        outputs = self(inputs)
        loss = F.cross_entropy(outputs, targets)
        acc = (outputs.argmax(dim=1) == targets).float().mean()
        self.log('test_loss', loss, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_epoch=True, prog_bar=True)
        return {"test_loss": loss, "test_acc": acc}

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode="min", factor=0.1, patience=5
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val_loss",
            },
        }


# Testing the model and debugging
### *to ensure that the model works properly here I generate some data with the shape of (1024,2) and then test the model*

In [9]:
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import lightning as pl
import numpy as np

# Synthetic Dataset
class SyntheticDataset(Dataset):
    def __init__(self, num_samples=1000, input_shape=(1024, 2), num_classes=24):
        self.num_samples = num_samples
        self.input_shape = input_shape
        self.num_classes = num_classes
        self.data = torch.randn((num_samples, input_shape[0], input_shape[1]), dtype=torch.float32)
        self.labels = torch.randint(0, num_classes, (num_samples,), dtype=torch.long)
        
    def __len__(self):
        return self.num_samples

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]


# DataModule for PyTorch Lightning
class SyntheticDataModule(pl.LightningDataModule):
    def __init__(self, batch_size=32, num_samples=1000, num_classes=24):
        super().__init__()
        self.batch_size = batch_size
        self.num_samples = num_samples
        self.num_classes = num_classes

    def setup(self, stage=None):
        dataset = SyntheticDataset(num_samples=self.num_samples, num_classes=self.num_classes)
        train_size = int(0.8 * len(dataset))
        val_size = len(dataset) - train_size
        self.train_dataset, self.val_dataset = random_split(dataset, [train_size, val_size])

    def train_dataloader(self):
        return DataLoader(self.train_dataset, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.batch_size, shuffle=False)


# Model Training Setup
if __name__ == "__main__":
    # Parameters
    batch_size = 32
    num_samples = 1000
    num_classes = 24

    # Instantiate the model
    model = FinalAMCModel(input_shape=(1024, 2), num_classes=num_classes,optimizer_name="adam", scheduler_name="reduce_on_plateau")

    # Create DataModule
    data_module = SyntheticDataModule(batch_size=batch_size, num_samples=num_samples, num_classes=num_classes)

    # Training Config
    checkpoint_callback = ModelCheckpoint(
        monitor="val_loss", mode="min", save_top_k=1, dirpath="./checkpoints", filename="best_model"
    )
    early_stopping = EarlyStopping(monitor="val_loss", patience=5, mode="min")

    trainer = pl.Trainer(
        max_epochs=10,
        callbacks=[checkpoint_callback, early_stopping],
        log_every_n_steps=10
    )

    # Train the model
    trainer.fit(model, data_module)


GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name           | Type               | Params | Mode 
--------------------------------------------------------------
0 | multiscale_1   | MultiScaleModel    | 51.5 K | train
1 | multiscale_2   | MultiScaleModel    | 25.8 K | train
2 | max_pooling    | MaxPool2d          | 0      | train
3 | separable_conv | SeparableConvBlock | 592    | train
4 | flatten        | Flatten            | 0      | train
5 | dense_1        | Sequential         | 16.5 K | train
6 | dense_2        | Linear             | 3.1 K  | train
7 | softmax        | Softmax            | 0      | train
--------------------------------------------------------------
97.5 K    Trainable params
0         Non-trainable params
97.5 K    Total params
0.390     Total estimated model params size (MB)
84        Modules in train mode
0         Modules in eval mode


Epoch 0:   4%|▍         | 1/25 [00:02<00:48,  0.50it/s, v_num=10, train_loss_step=3.190]


Detected KeyboardInterrupt, attempting graceful shutdown ...


NameError: name 'exit' is not defined