# BOTTOM UP PYRAMID

## Architecture 

The architecture is composed of 2 modules.
* features pyramid (backbone)
* classifier (head)
  
The features pyramid is build bottom up (BUP) meanning that the features are first extracted at the biggest scale (smaller image first).
The feature extracted at this scale are fed to the head to output a prediction.
The same procedure is repeated for the next smaller scale (bigger image) however the features extracted at the immediately bigger scale is reused via a residual connection.

In [None]:
import torch as th
import torch.nn as nn

class Head(nn.Module):
    
    def __init__(self, head_features, num_classes):
        super().__init__()
        self.w0 = nn.Conv2d(head_features, head_features, kernel_size=1)
        self.w1 = nn.Conv2d(head_features, head_features, kernel_size=1)
        self.wO = nn.Conv2d(head_features, num_classes, kernel_size=1)
        self.act = nn.LeakyReLU()
        
    def forward(self,x):
        x0 = self.act(self.w0(x)) + x
        x1 = self.act(self.w1(x0)) + x0
        y = self.w0(x1)
        return y

class BUP(nn.Module):
    
    def __init__(self, input_channels, num_classes, n_steps=4, patch_features=32, head_features=128, patch_size=7):
        
        super().__init__()
        conv0 = nn.Conv2d(input_channels, input_channels, kernel_size=3, padding=1, groups=input_channels) 
        conv1 = nn.Conv2d(input_channels, patch_features, kernel_size=3, padding=0) 
        batch_norm = nn.BatchNorm2d(input_channels)
        act = nn.LeakyReLU(inplace=False)
        self.conv_in = nn.Sequential(batch_norm, conv0, act, conv1)
        self.n_steps = n_steps
        patch_batch_norm = nn.BatchNorm2d(head_features)
        self.patcher = nn.Sequential( act, nn.Conv2d(patch_features, head_features, kernel_size=patch_size, stride=patch_size), act, patch_batch_norm)
        self.upsample = nn.UpsamplingNearest2d()
        self.head = Head(head_features, num_classes)
        
    def forward_scale_i(self, x, scale, f_res=None, p_res=None, y_res=None):
        x_i = x[:,:,::scale,::scale]
        f_i = self.conv_in(x_i)
        if f_res is not None: 
            self.upsample.size = f_i.shape[2:]
            f_i = f_i + self.upsample(f_res)
        p_i = self.patcher(f_i) 
        if p_res is not None:
            self.upsample.size = p_i.shape[2:]
            p_i  = p_i + self.upsample(p_res)
        y_i = self.head(p_i) 
        if y_res is not None:
            self.upsample.size = y_i.shape[2:]
            y_i = y_i + self.upsample(y_res)
        return f_i, p_i, y_i
    
    def forward(self,x,stop_step=1):
        n_steps = self.n_steps
        f_n, p_n, y_n = [], [], []
        for s in range(n_steps, stop_step-1, -1):
            x_i = x[:,:,::s,::s]
            if len(f_n) == 0:
                f_i, p_i, y_i = self.forward_scale_i(x_i, s)
                f_n, p_n, y_n = [f_i], [p_i], [y_i]
            else:
                f_i, p_i, y_i = self.forward_scale_i(x_i, s, f_res=f_n[-1], p_res=p_n[-1], y_res=y_n[-1])
                f_n += [f_i]
                p_n += [p_i]
                y_n += [y_i]
        return y_n     

In [34]:
import os
from torch import optim, nn, utils, Tensor
import pytorch_lightning as pl

# define the LightningModule
class ClassifierModule(pl.LightningModule):
    def __init__(self, *args,**kwargs):
        super().__init__()
        self.model = BUP(*args, **kwargs)
    
    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x, y = batch
        y_n = self.model(x)
        acc_loss = 0.0
        for y_i in y_n:
            loss = nn.functional.cross_entropy(y_i, y[:,None,None].expand_as(y_i[:,0]), reduce=False)
            acc_loss += nn.functional.adaptive_max_pool2d(loss,1)[:,0,0].mean()/len(y_n)
        # Logging to TensorBoard by default
        self.log("train_loss", acc_loss)
        return acc_loss
    
    def validating_step(self, batch, batch_idx):
        x, y = batch
        y_n = self.model(x, stop_step=self.model.n_steps)
        acc_loss = 0.0
        for y_i in y_n:
            loss = nn.functional.cross_entropy(y_i, y[:,None,None].expand_as(y_i[:,0]), reduce=False)
            acc_loss += nn.functional.adaptive_max_pool2d(loss,1)[:,0,0].mean()/len(y_n)
        # Logging to TensorBoard by default
        self.log("valid_loss", acc_loss)
        return acc_loss

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        return optimizer

In [24]:
from torch.utils.data import Dataset, DataLoader
import torchvision.datasets as ds
from torchvision import transforms, utils

train_dataset = ds.STL10('./datasets', split='train', download=True, transform=transforms.Compose([transforms.RandAugment(), transforms.ToTensor()]))
test_dataset = ds.STL10('./datasets', split='test', download=True, transform=transforms.Compose([transforms.ToTensor()]))

train_dataloader = DataLoader(train_dataset, batch_size=32)
valid_dataloader = DataLoader(test_dataset, batch_size=1) 

Files already downloaded and verified
Files already downloaded and verified


In [35]:
mod = ClassifierModule(3, 10, n_steps=3)
trainer = pl.Trainer( max_epochs=100, accelerator="gpu", devices=[0])
trainer.fit(model=mod, train_dataloaders=train_dataloader, val_dataloaders=valid_dataloader)



GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0,1,2,3]

  | Name  | Type | Params
-------------------------------
0 | model | BUP  | 236 K 
-------------------------------
236 K     Trainable params
0         Non-trainable params
236 K     Total params
0.945     Total estimated model params size (MB)


Epoch 27:  76%|███████▋  | 120/157 [00:07<00:02, 16.04it/s, loss=1.69, v_num=4]