In [37]:
import pytorch_lightning as pl
# your favorite machine learning tracking tool
from pytorch_lightning.loggers import WandbLogger
import scipy.io
import skimage.io
from skimage.color import gray2rgb
import os
import math
import numpy as np
import random

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import random_split, DataLoader, TensorDataset, Dataset

from torchmetrics.classification import MulticlassF1Score

from torchvision import transforms
import wandb

In [38]:
def set_seed(seed: int = 42) -> None:
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # When running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # Set a fixed value for the hash seed
    os.environ["PYTHONHASHSEED"] = str(seed)
    print(f"Random seed set as {seed}")

set_seed(42)

Random seed set as 42


In [39]:
class CarsDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, mat_file, image_dir, transform=None):
        """
        Args:
            csv_file (string): Path to the csv file with annotations.
            root_dir (string): Directory with all the images.
            transform (callable, optional): Optional transform to be applied
                on a sample.
        """
        labels = scipy.io.loadmat(mat_file)
        self.labels = labels['annotations'][0]
        self.image_dir = image_dir
        self.transform = transform

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img_name = os.path.join(self.image_dir,
                                self.labels[idx][5][0])
        image = skimage.io.imread(img_name)
        if len(image.shape) < 3:
            image = gray2rgb(image)

        label = self.labels[idx][4][0][0]
        
        sample = [image.copy(), label.copy()]

        if self.transform:
            sample[0] = self.transform(sample[0])
            sample[1] = torch.tensor(sample[1])

        return sample

In [40]:
class CarsDataModule(pl.LightningDataModule):
    def __init__(self, batch_size, train_mat, test_mat, train_data_dir: str = './', test_data_dir: str = './'):
        super().__init__()
        self.train_mat = train_mat
        self.test_mat = test_mat
        self.train_data_dir = train_data_dir
        self.test_data_dir = test_data_dir
        self.batch_size = batch_size
        
        self.num_classes = 197

        self.transform_train = transforms.Compose([
            transforms.ToTensor(),
            transforms.ColorJitter(brightness=.5, hue=.3),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(degrees=(-10, 10)),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
            transforms.Resize((250,250)),
            transforms.RandomCrop((224, 224))
        ])
        
        self.transform_test = transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225)),
            transforms.Resize((224,224))
        ])
    
    def prepare_data(self):
        pass
    
    def setup(self, stage=None):
        # Assign train/val datasets for use in dataloaders
        if stage == 'fit' or stage is None:
            self.cars_train = CarsDataset(self.train_mat, self.train_data_dir, transform=self.transform_train)

        # Assign test dataset for use in dataloader(s)
        if stage == 'test' or stage is None:
            cars_full_test = CarsDataset(self.test_mat, self.test_data_dir, transform=self.transform_test)
            self.cars_test, self.cars_val = random_split(cars_full_test, [7041, 1000])
    
    def train_dataloader(self):
        return DataLoader(self.cars_train, batch_size=self.batch_size, shuffle=True)

    def val_dataloader(self):
        return DataLoader(self.cars_val, batch_size=self.batch_size)

    def test_dataloader(self):
        return DataLoader(self.cars_test, batch_size=self.batch_size)

In [41]:
class ImagePredictionLogger(pl.callbacks.Callback):
    def __init__(self, val_samples, num_samples=32):
        super().__init__()
        self.num_samples = num_samples
        self.val_imgs, self.val_labels = val_samples
    
    def on_validation_epoch_end(self, trainer, pl_module):
        # Bring the tensors to CPU
        val_imgs = self.val_imgs.to(device=pl_module.device)
        val_labels = self.val_labels.to(device=pl_module.device)
        # Get model prediction
        logits = pl_module(val_imgs)
        preds = torch.argmax(logits, -1)
        # Log the images as wandb Image
        trainer.logger.experiment.log({
            "examples":[wandb.Image(x, caption=f"Pred:{pred}, Label:{y}") 
                           for x, pred, y in zip(val_imgs[:self.num_samples], 
                                                 preds[:self.num_samples], 
                                                 val_labels[:self.num_samples])]
            })

1. SEBlock
2. DropSample
3. BatchNorm
4. Correct kernel sizes
5. Resudual connections

In [42]:
class ConvBnAct(nn.Module):
    """Layer grouping a convolution, batchnorm, and activation function"""
    def __init__(self, n_in, n_out, kernel_size=3, 
                 stride=1, padding=0, groups=1, bias=False,
                 bn=True, act=True):
        super().__init__()

        self.conv = nn.Conv2d(n_in, n_out, kernel_size=kernel_size,
                              stride=stride, padding=padding,
                              groups=groups, bias=bias)
        self.bn = nn.BatchNorm2d(n_out) if bn else nn.Identity()
        self.act = nn.SiLU() if act else nn.Identity()
  
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.act(x)
        return x

class SEBlock(nn.Module):
    """Squeeze-and-excitation block"""
    def __init__(self, n_in, r=24):
        super().__init__()

        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(nn.Conv2d(n_in, n_in//r, kernel_size=1),
                                        nn.SiLU(),
                                        nn.Conv2d(n_in//r, n_in, kernel_size=1),
                                        nn.Sigmoid())

    def forward(self, x):
        y = self.squeeze(x)
        y = self.excitation(y)
        return x * y

class DropSample(nn.Module):
    """Drops each sample in x with probability p during training"""
    def __init__(self, p=0):
        super().__init__()

        self.p = p

    def forward(self, x):
        if (not self.p) or (not self.training):
            return x

        batch_size = len(x)
        random_tensor = torch.FloatTensor(batch_size, 1, 1, 1).uniform_()
        
        if x.device.type == 'cuda':
            random_tensor = random_tensor.cuda()
        bit_mask = self.p<random_tensor

        x = x.div(1-self.p)
        x = x * bit_mask
        return x

class MBConvN(nn.Module):
    """MBConv with an expansion factor of N, plus squeeze-and-excitation"""
    def __init__(self, n_in, n_out, expansion_factor,
                kernel_size=3, stride=1, r=24, p=0):
        super().__init__()

        padding = (kernel_size-1)//2
        expanded = expansion_factor*n_in
        self.skip_connection = (n_in == n_out) and (stride == 1)

        self.expand_pw = nn.Identity() if (expansion_factor == 1) else ConvBnAct(n_in, expanded, kernel_size=1)
        self.depthwise = ConvBnAct(expanded, expanded, kernel_size=kernel_size, 
                                    stride=stride, padding=padding, groups=expanded)
        self.se = SEBlock(expanded, r=r)
        self.reduce_pw = ConvBnAct(expanded, n_out, kernel_size=1,
                                    act=False)
        self.dropsample = DropSample(p)
  
    def forward(self, x):
        residual = x

        x = self.expand_pw(x)
        x = self.depthwise(x)
        x = self.se(x)
        x = self.reduce_pw(x)

        if self.skip_connection:
            x = self.dropsample(x)
            x = x + residual

        return x

class MBConv1(MBConvN):
    def __init__(self, n_in, n_out, kernel_size=3,
                stride=1, r=24, p=0):
        super().__init__(n_in, n_out, expansion_factor=1,
                            kernel_size=kernel_size, stride=stride,
                            r=r, p=p)
    
 
class MBConv6(MBConvN):
    def __init__(self, n_in, n_out, kernel_size=3,
                stride=1, r=24, p=0):
        super().__init__(n_in, n_out, expansion_factor=6,
                            kernel_size=kernel_size, stride=stride,
                            r=r, p=p)

In [43]:
class EfficientNet(pl.LightningModule):
    def __init__(self, w_factor=1, d_factor=1, num_classes=1000, learning_rate=1e-3):
        super().__init__()
        
        # log hyperparameters
        self.save_hyperparameters()
        base_widths = [(32, 16), (16, 24), (24, 40),
                       (40, 80), (80, 112), (112, 192),
                       (192, 320), (320, 1280)]
        base_depths = [1, 2, 2, 3, 3, 4, 1]

        scaled_widths = [(self.__scale_width(w[0], w_factor), self.__scale_width(w[1], w_factor)) 
                        for w in base_widths]
        scaled_depths = [math.ceil(d_factor*d) for d in base_depths]
        
        kernel_sizes = [3, 3, 5, 3, 5, 5, 3]
        strides = [1, 2, 2, 2, 1, 2, 1]
        ps = [0, 0.029, 0.057, 0.086, 0.114, 0.143, 0.171]

        self.stem = ConvBnAct(3, scaled_widths[0][0], stride=2, padding=1)
        
        stages = []
        for i in range(7):
            layer_type = MBConv1 if (i == 0) else MBConv6
            r = 4 if (i == 0) else 24
            stage = self.__create_stage(*scaled_widths[i], scaled_depths[i],
                                        layer_type, kernel_size=kernel_sizes[i], 
                                        stride=strides[i], r=r, p=ps[i])
            stages.append(stage)
        self.stages = nn.Sequential(*stages)

        self.pre_head = ConvBnAct(*scaled_widths[-1], kernel_size=1)

        self.head = nn.Sequential(nn.AdaptiveAvgPool2d(1),
                                nn.Flatten(),
                                nn.Linear(scaled_widths[-1][1], num_classes),
                                nn.Sigmoid())
        
        self.num_classes = num_classes
        self.learning_rate = learning_rate
        self.loss = nn.CrossEntropyLoss()
        self.f1 = MulticlassF1Score(num_classes=num_classes)
    
    def __create_stage(self, n_in, n_out, num_layers, layer_type, 
                     kernel_size=3, stride=1, r=24, p=0):
        """Creates a Sequential consisting of [num_layers] layer_type"""
        layers = [layer_type(n_in, n_out, kernel_size=kernel_size,
                             stride=stride, r=r, p=p)]
        layers += [layer_type(n_out, n_out, kernel_size=kernel_size,
                              r=r, p=p) for _ in range(num_layers-1)]
        layers = nn.Sequential(*layers)
        return layers
    
    def __scale_width(self, w, w_factor):
        """Scales width given a scale factor"""
        w *= w_factor
        new_w = (int(w+4) // 8) * 8
        new_w = max(8, new_w)
        if new_w < 0.9*w:
            new_w += 8
        return int(new_w)
    
    def __feature_extractor(self, x):
        x = self.stem(x)
        x = self.stages(x)
        x = self.pre_head(x)
        return x
    
    def forward(self, x):
        x = self.__feature_extractor(x)
        x = self.head(x)
        return x
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = self.loss(y_hat, y)
        
        # training metrics
        preds = torch.argmax(y_hat, dim=1)
        f1_score = self.f1(preds, y)
        self.log('train_loss', loss, on_step=True, on_epoch=True, logger=True)
        self.log('train_F1', f1_score, on_step=True, on_epoch=True, logger=True)
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = self.loss(y_hat, y)

        # validation metrics
        preds = torch.argmax(y_hat, dim=1)
        f1_score = self.f1(preds, y)
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_F1', f1_score, prog_bar=True)
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        y_hat = self(x)
        loss = self.loss(y_hat, y)
        
        # validation metrics
        preds = torch.argmax(y_hat, dim=1)

        f1_score = self.f1(preds, y)
        self.log('test_loss', loss, prog_bar=True)
        self.log('test_F1', f1_score, prog_bar=True)
        return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        return optimizer

In [44]:
dm = CarsDataModule(train_mat='data/cars_train_annos.mat',
                    test_mat='data/cars_test_annos.mat',
                    train_data_dir='data/cars_train',
                    test_data_dir='data/cars_test',
                    batch_size=32)
# To access the x_dataloader we need to call prepare_data and setup.
dm.prepare_data()
dm.setup()

# Samples required by the custom ImagePredictionLogger callback to log image predictions.
val_samples = next(iter(dm.val_dataloader()))
val_imgs, val_labels = val_samples[0], val_samples[1]
val_imgs.shape, val_labels.shape

(torch.Size([32, 3, 224, 224]), torch.Size([32]))

In [45]:
model = EfficientNet(num_classes=dm.num_classes)

# Initialize wandb logger
wandb_logger = WandbLogger(project='wandb-lightning', job_type='train')

# Initialize Callbacks
early_stop_callback = pl.callbacks.EarlyStopping(monitor="val_loss")
checkpoint_callback = pl.callbacks.ModelCheckpoint()

# Initialize a trainer
trainer = pl.Trainer(max_epochs=40,
                     accelerator='gpu',
                     devices=1,
                     logger=wandb_logger,
                     callbacks=[ImagePredictionLogger(val_samples),
                                checkpoint_callback],
                     )

# Train the model ⚡🚅⚡
trainer.fit(model, dm)

# Evaluate the model on the held-out test set ⚡⚡
trainer.test(dataloaders=dm.test_dataloader())

# Close wandb run
wandb.finish()

  rank_zero_warn(
GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name     | Type              | Params
-----------------------------------------------
0 | stem     | ConvBnAct         | 928   
1 | stages   | Sequential        | 3.6 M 
2 | pre_head | ConvBnAct         | 412 K 
3 | head     | Sequential        | 252 K 
4 | loss     | CrossEntropyLoss  | 0     
5 | f1       | MulticlassF1Score | 0     
-----------------------------------------------
4.3 M     Trainable params
0         Non-trainable params
4.3 M     Total params
17.040    Total estimated model params size (MB)


Epoch 16:  32%|███▏      | 92/287 [04:22<09:15,  2.85s/it, loss=4.98, v_num=guix, val_loss=5.000, val_F1=0.000171] 