In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import pytorch_lightning as pl
import wandb
from pytorch_lightning.loggers import WandbLogger

torch.set_float32_matmul_precision('medium')
if torch.cuda.is_available():
    print(f"GPU is available: {torch.cuda.get_device_name(0)}")
else:
    print("GPU is not available.")

GPU is available: NVIDIA GeForce RTX 3060 Laptop GPU


In [7]:
class myCNN(pl.LightningModule):
    def __init__(self, input_channels=1, input_size=28, num_classes=10, filter_counts=[32], filter_sizes=[5], 
                 padding_sizes=[1], pooling_sizes=[2], pooling_strides=[2], pooling_paddings=[0], 
                 conv_activation='relu', learning_rate=0.001):
        super().__init__()
        self.save_hyperparameters()
        
        assert len(filter_counts) == len(filter_sizes) == len(padding_sizes) == \
            len(pooling_sizes) == len(pooling_strides) == len(pooling_paddings), \
            "All layer parameter lists must have the same length"
                
        self.conv_activation = {
            'relu': F.relu,
            'sigmoid': torch.sigmoid,
            'tanh': torch.tanh
        }.get(conv_activation.lower(), F.relu)
        
        self.convs = nn.ModuleList()
        self.pools = nn.ModuleList()
        
        current_channels = input_channels
        current_size = input_size
        
                
        for i, (filter_count, filter_size, padding_size, pooling_size, pooling_stride, pooling_padding) in \
            enumerate(zip(filter_counts, filter_sizes, padding_sizes, pooling_sizes, pooling_strides, pooling_paddings)):
            
            conv = nn.Conv2d(in_channels=input_channels,
                out_channels=filter_count,
                kernel_size=filter_size,
                stride=1,
                padding=padding_size
            )
            self.convs.append(conv)
            
            current_size = ( current_size + 2 * padding_size - filter_size ) // 1 + 1
            
            pool = nn.MaxPool2d(kernel_size=pooling_size, 
                stride=pooling_stride, 
                padding=pooling_padding
            )
            self.pools.append(pool)
            
            current_size = ( current_size + 2 * pooling_padding - pooling_size) // pooling_stride + 1
            
            current_channels = filter_count
            
            
        self.flat_size =  current_size * current_size * current_channels
        self.fc = nn.Linear(self.flat_size, num_classes)
        
        self.learning_rate = learning_rate
    
    def forward(self, x):
        
        for conv, pool in zip(self.convs, self.pools):
            x = self.conv_activation(conv(x))
            x = pool(x)
            
        x = x.view(-1, self.flat_size)
        x = self.fc(x)
        
        return x
        
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        
        self.log('train_loss', loss, prog_bar=True)
        self.log('train_acc', acc, prog_bar=True)
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        
        # Log metrics
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        
        # Log metrics
        self.log('test_loss', loss, prog_bar=True)
        self.log('test_acc', acc, prog_bar=True)
        
        return loss
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)
    

In [3]:
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307,), (0.3081,))
])

# Download and load the MNIST dataset
train_dataset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dataset = datasets.MNIST('data', train=False, transform=transform)

# Create data loaders
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=8, pin_memory=True)
val_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=8, pin_memory=True)

In [4]:
model = myCNN().to('cuda')

wandb_logger = WandbLogger(project="cnn-pytorch-lightning", log_model=True)

trainer = pl.Trainer(
        max_epochs=1,
        accelerator="gpu",  # Uses GPU if available
        logger=wandb_logger,
        log_every_n_steps=500,
        precision=16
    )

c:\Users\DELL\.conda\envs\DL\lib\site-packages\lightning_fabric\connector.py:571: `precision=16` is supported for historical reasons but its usage is discouraged. Please set your precision to 16-mixed instead!
Using 16bit Automatic Mixed Precision (AMP)
You are using the plain ModelCheckpoint callback. Consider using LitModelCheckpoint which with seamless uploading to Model registry.
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs


In [5]:
trainer.fit(model, train_loader, val_loader)

wandb: Using wandb-core as the SDK backend.  Please refer to https://wandb.me/wandb-core for more information.
wandb: Currently logged in as: bullseye2608 (bullseye2608-indian-institute-of-technology-madras) to https://api.wandb.ai. Use `wandb login --relogin` to force relogin


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name  | Type       | Params | Mode 
---------------------------------------------
0 | convs | ModuleList | 832    | train
1 | pools | ModuleList | 0      | train
2 | fc    | Linear     | 54.1 K | train
---------------------------------------------
54.9 K    Trainable params
0         Non-trainable params
54.9 K    Total params
0.220     Total estimated model params size (MB)
5         Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

c:\Users\DELL\.conda\envs\DL\lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'val_dataloader' to speed up the dataloader worker initialization.
c:\Users\DELL\.conda\envs\DL\lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'train_dataloader' to speed up the dataloader worker initialization.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=1` reached.


In [6]:
trainer.test(dataloaders=val_loader)
wandb.finish()

Restoring states from the checkpoint path at .\cnn-pytorch-lightning\bhwun64m\checkpoints\epoch=0-step=938.ckpt
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
Loaded model weights from the checkpoint at .\cnn-pytorch-lightning\bhwun64m\checkpoints\epoch=0-step=938.ckpt
c:\Users\DELL\.conda\envs\DL\lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:420: Consider setting `persistent_workers=True` in 'test_dataloader' to speed up the dataloader worker initialization.


Testing: |          | 0/? [00:00<?, ?it/s]

0,1
epoch,▁▁█
test_acc,▁
test_loss,▁
train_acc,▁
train_loss,▁
trainer/global_step,▁██
val_acc,▁
val_loss,▁

0,1
epoch,1.0
test_acc,0.9802
test_loss,0.06546
train_acc,0.98438
train_loss,0.05502
trainer/global_step,938.0
val_acc,0.9802
val_loss,0.06546


## Main


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
import torchvision.transforms as transforms
from torchvision.datasets import ImageFolder
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
import os
import argparse
from typing import List, Tuple, Callable, Union, Type

class FlexibleCNN(pl.LightningModule):
    def __init__(
        self,
        input_channels: int = 3,
        num_classes: int = 10,
        conv_filters: List[int] = [32, 64, 128, 256, 512],
        kernel_sizes: Union[int, List[int]] = 3,
        conv_activation: Union[str, Type[nn.Module]] = "relu",
        dense_neurons: int = 512,
        dense_activation: Union[str, Type[nn.Module]] = "relu",
        pooling_size: Union[int, List[int]] = 2,
        learning_rate: float = 0.001
    ):
        """
        Flexible CNN model with 5 conv-activation-maxpool blocks
        
        Args:
            input_channels: Number of input image channels (3 for RGB)
            num_classes: Number of output classes
            conv_filters: List of filter counts for each conv layer
            kernel_sizes: Kernel size for conv layers (int or list)
            conv_activation: Activation function for conv layers
            dense_neurons: Number of neurons in the dense layer
            dense_activation: Activation function for dense layer
            pooling_size: Max pooling size (int or list)
            learning_rate: Learning rate for optimizer
        """
        super().__init__()
        self.save_hyperparameters()
        
        # Convert activation strings to functions
        self.conv_activation = self._get_activation(conv_activation)
        self.dense_activation = self._get_activation(dense_activation)
        
        # Convert single value to lists if needed
        if isinstance(kernel_sizes, int):
            kernel_sizes = [kernel_sizes] * 5
        if isinstance(pooling_size, int):
            pooling_size = [pooling_size] * 5
            
        # Create 5 convolution blocks
        self.conv_blocks = nn.ModuleList()
        in_channels = input_channels
        
        for i in range(5):
            conv_block = nn.Sequential(
                nn.Conv2d(in_channels, conv_filters[i], kernel_size=kernel_sizes[i], padding=kernel_sizes[i]//2),
                self._get_activation_layer(self.conv_activation),
                nn.MaxPool2d(kernel_size=pooling_size[i], stride=pooling_size[i])
            )
            self.conv_blocks.append(conv_block)
            in_channels = conv_filters[i]
        
        # Calculate the size of the flattened features
        # Assuming input image size is 224x224 (common for iNaturalist)
        # Each pooling with size 2 reduces dimensions by half
        final_size = 224 // (2 ** 5)  # After 5 pooling layers
        self.flat_size = final_size * final_size * conv_filters[-1]
        
        # Dense layer
        self.fc1 = nn.Linear(self.flat_size, dense_neurons)
        
        # Output layer
        self.fc2 = nn.Linear(dense_neurons, num_classes)
        
        self.learning_rate = learning_rate
    
    def _get_activation(self, activation):
        """Convert activation name to function or return the provided activation"""
        if isinstance(activation, str):
            activation = activation.lower()
            if activation == 'relu':
                return F.relu
            elif activation == 'leaky_relu':
                return F.leaky_relu
            elif activation == 'elu':
                return F.elu
            elif activation == 'tanh':
                return F.tanh
            elif activation == 'sigmoid':
                return F.sigmoid
            else:
                raise ValueError(f"Unsupported activation: {activation}")
        return activation
    
    def _get_activation_layer(self, activation_fn):
        """Convert activation function to layer"""
        if activation_fn == F.relu:
            return nn.ReLU()
        elif activation_fn == F.leaky_relu:
            return nn.LeakyReLU()
        elif activation_fn == F.elu:
            return nn.ELU()
        elif activation_fn == F.tanh:
            return nn.Tanh()
        elif activation_fn == F.sigmoid:
            return nn.Sigmoid()
        else:
            # For custom activations, use a Lambda layer
            return nn.Identity()  # Placeholder, will use functional activation in forward
    
    def forward(self, x):
        # Apply all conv blocks
        for block in self.conv_blocks:
            x = block(x)
        
        # Flatten
        x = x.view(x.size(0), -1)
        
        # Apply dense layer with activation
        x = self.dense_activation(self.fc1(x))
        
        # Output layer
        x = self.fc2(x)
        
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        
        # Log metrics
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        
        # Log metrics
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        preds = torch.argmax(logits, dim=1)
        acc = (preds == y).float().mean()
        
        # Log metrics
        self.log('test_loss', loss)
        self.log('test_acc', acc)
        
        return loss
    
    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.learning_rate)


class iNaturalistDataModule(pl.LightningDataModule):
    def __init__(
        self, 
        data_dir: str,
        batch_size: int = 64,
        num_workers: int = 8,
        image_size: int = 224
    ):
        super().__init__()
        self.train_data_dir = os.path.join(data_dir, 'train')
        self.test_data_dir = os.path.join(data_dir, 'val')
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.image_size = image_size
        
    def setup(self, stage=None):
        # Data transformations
        train_transforms = transforms.Compose([
            transforms.Resize((self.image_size, self.image_size)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
        val_transforms = transforms.Compose([
            transforms.Resize((self.image_size, self.image_size)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
        ])
        
        # Load dataset
        dataset = ImageFolder(root=self.train_data_dir, transform=train_transforms)
        
        # Split dataset
        dataset_size = len(dataset)
        train_size = int(0.8 * dataset_size)
        val_size = dataset_size - train_size
        
        self.train_dataset, self.val_dataset = random_split(
            dataset, [train_size, val_size]
        )
        
        # Apply different transforms to validation set
        self.val_dataset.dataset = ImageFolder(root=self.train_data_dir, transform=val_transforms)
        
        self.test_dataset = ImageFolder(root=self.test_data_dir, transform=val_transforms)
        
    def train_dataloader(self):
        return DataLoader(
            self.train_dataset,
            batch_size=self.batch_size,
            shuffle=True,
            num_workers=self.num_workers,
            pin_memory=True
        )
    
    def val_dataloader(self):
        return DataLoader(
            self.val_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            pin_memory=True
        )
    
    def test_dataloader(self):
        return DataLoader(
            self.test_dataset,
            batch_size=self.batch_size,
            shuffle=False,
            num_workers=self.num_workers,
            pin_memory=True
        )


In [None]:
data_directory = r'C:\Users\DELL\Desktop\Coding\Python\DL\Assignment 2\da6401_assignment2\data\inaturalist_12K'
data_module = iNaturalistDataModule(
        data_dir=data_directory
        batch_size=64
    )
data_module.setup()

Dataset ImageFolder
    Number of datapoints: 2000
    Root location: C:\Users\DELL\Desktop\Coding\Python\DL\Assignment 2\da6401_assignment2\data\inaturalist_12K\val
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=bilinear, max_size=None, antialias=True)
               ToTensor()
               Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
           )


In [58]:
print(len(data_module.test_dataloader()) * 63, len(data_module.test_dataloader()) * 64)

2016 2048


9984