In [1]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F

## General Blocks

In [4]:
class BasicConv2d(nn.Module):
    def __init__(self, in_planes, out_planes, kernel_size, stride, padding=0):
        super(BasicConv2d, self).__init__()
        self.conv = nn.Conv2d(
            in_planes, out_planes, kernel_size=kernel_size, stride=stride, padding=padding, bias=False)
        self.bn = nn.BatchNorm2d(out_planes, eps=0.001)
        self.relu = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.relu(x)
        return x

---

## MobileNet - Depthwise separable convolution

#### example

![](assets/mobilenet.png)

In [5]:
class DepthwiseSeparable(nn.Module):
    def __init__(self, nin, nout):
        super(DepthwiseSeparable, self).__init__()
        self.depthwise = nn.Sequential(nn.Conv2d(nin, nin, kernel_size=3, padding=1, groups=nin), 
                                       nn.BatchNorm2d(nin), nn.ReLU6(inplace=True))
        self.pointwise = nn.Sequential(nn.Conv2d(nin, nout, kernel_size=1), 
                                       nn.BatchNorm2d(nout), nn.ReLU6(inplace=True))
    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out
    
class General(nn.Module):
    def __init__(self, nin, nout):
        super(General, self).__init__()
        self.depthwise = nn.Sequential(nn.Conv2d(nin, nin, kernel_size=3, padding=1), 
                                       nn.BatchNorm2d(nin), nn.ReLU6(inplace=True))
        self.pointwise = nn.Sequential(nn.Conv2d(nin, nout, kernel_size=1), 
                                       nn.BatchNorm2d(nout), nn.ReLU6(inplace=True))
    def forward(self, x):
        out = self.depthwise(x)
        out = self.pointwise(out)
        return out

In [6]:
model = DepthwiseSeparable(32, 32)
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
print(f"DepthwiseSeparable params : {params}")

model = General(32, 32)
model_parameters = filter(lambda p: p.requires_grad, model.parameters())
params = sum([np.prod(p.size()) for p in model_parameters])
print(f"General params : {params}")

DepthwiseSeparable params : 1504
General params : 10432


#### v2

![](assets/inverted.png)

![](assets/inverted_detail.png)

#### practice

In [34]:
class InvertedResidual(nn.Module):
    def __init__(self, nin, nout, stride, expansion_ratio=4):
        super(InvertedResidual, self).__init__()
        expand_ = expansion_ratio * nin
        self.stride = stride
        self.expansion = nn.Sequential(nn.Conv2d(nin, expand_, kernel_size=(1, 1), padding=0, groups=1),
                                       nn.ReLU6(inplace=True))
        self.depthwise = nn.Sequential(nn.Conv2d(expand_, expand_, kernel_size=(3, 3),
                                                 stride=stride, padding=1, groups=expand_),
                                       nn.ReLU6(inplace=True))
        self.pointwise = nn.Conv2d(expand_, nout, kernel_size=(1, 1), padding=0, groups=1)
        
        if stride == 1:
            if (nin != nout):
                self.map = nn.Conv2d(nin, nout, kernel_size=(1, 1), stride=stride, padding=0, bias=False)
            else:
                self.map = nn.Identity()
    def forward(self, x):
        out = self.expansion(x)
        out = self.depthwise(out)
        out = self.pointwise(out)
        
        if self.stride == 1:
            identity = self.map(x)
            out += identity
        return out

In [35]:
batch_size = 4
features = torch.randn(batch_size, 32, 32, 32)

In [33]:
model = InvertedResidual(32, 32, 2, 4)
output = model(features)
print(f"Input shape : {features.shape}\nOutput shape : {output.shape}")

Input shape : torch.Size([4, 32, 32, 32])
Output shape : torch.Size([4, 32, 16, 16])


---

## SqueezeExcitation

![](assets/se.png)

#### example

In [9]:
def _make_divisible(v: float, divisor: int, min_value=None) -> int:
    """
    This function is taken from the original tf repo.
    It ensures that all layers have a channel number that is divisible by 8
    It can be seen here:
    https://github.com/tensorflow/models/blob/master/research/slim/nets/mobilenet/mobilenet.py
    """
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    # Make sure that round down does not go down by more than 10%.
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class SqueezeExcitation(nn.Module):
    # Implemented as described at Figure 4 of the MobileNetV3 paper
    def __init__(self, input_channels: int, squeeze_factor: int = 4):
        super().__init__()
        squeeze_channels = _make_divisible(input_channels // squeeze_factor, 8)
        self.fc1 = nn.Conv2d(input_channels, squeeze_channels, 1)
        self.relu = nn.ReLU(inplace=True)
        self.fc2 = nn.Conv2d(squeeze_channels, input_channels, 1)

    def _scale(self, input: torch.Tensor, inplace: bool) -> torch.Tensor:
        scale = F.adaptive_avg_pool2d(input, 1)
        scale = self.fc1(scale)
        scale = self.relu(scale)
        scale = self.fc2(scale)
        return torch.sigmoid(scale)

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        scale = self._scale(input, True)
        return scale * input
    
    
class SEResidualBlockV2(nn.Module):
    def __init__(self, in_planes: int, out_planes: int, stride: int = 1, se=True):
        super().__init__()
        self.se = se
        squeeze = out_planes // 4
        self.conv1 = BasicConv2d(in_planes, squeeze, (1, 1), 1, 0) ## squeeze
        self.conv2 = BasicConv2d(squeeze, squeeze, (3, 3), stride=stride, padding=1) ## squeeze
        self.conv3 = nn.Conv2d(squeeze, out_planes, (1, 1), 1, 0) ## expand
        
        if self.se:
            self.squeeze_excitation = SqueezeExcitation(out_planes)
        if (in_planes != out_planes) or (stride != 1):
            self.map = nn.Conv2d(in_planes, out_planes, kernel_size=(1, 1), stride=stride, padding=0, bias=False)
        else:
            self.map = nn.Identity()
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = self.map(x)
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        if self.se:
            x = self.squeeze_excitation(x)
        x += identity
        return x 

In [10]:
model = SEResidualBlockV2(64, 64, se=True)

In [11]:
batch_size = 4
features = torch.randn(batch_size, 64, 32, 32)
output = model(features)
print(f"Input shape : {features.shape}\nOutput shape : {output.shape}")

Input shape : torch.Size([4, 64, 32, 32])
Output shape : torch.Size([4, 64, 32, 32])


---

## EfficientNet

![](assets/swish.png)

#### examples

![](assets/silu.png)

In [12]:
class SiLU(nn.Module):  #The SiLU function is also known as the swish function - export-friendly version of nn.SiLU() 
    def forward(self, x): 
        return x * torch.sigmoid(x) 

#### memory-efficient version

In [13]:
sigmoid = torch.nn.Sigmoid()
class SwishCustomized(torch.autograd.Function):
    @staticmethod
    def forward(ctx, i):
        result = i * sigmoid(i)
        ctx.save_for_backward(i)
        return result

    @staticmethod
    def backward(ctx, grad_output):
        i = ctx.saved_variables[0]
        sigmoid_i = sigmoid(i)
        return grad_output * (sigmoid_i * (1 + i * (1 - sigmoid_i)))

swish = SwishCustomized.apply

class Swish(nn.Module):
    def forward(self, x):
        return swish(x)


---

In [14]:
class SqueezeExcitation(nn.Module):
    # Implemented as described at Figure 4 of the MobileNetV3 paper
    def __init__(self, input_channels: int, squeeze_factor: int = 4):
        super().__init__()
        squeeze_channels = _make_divisible(input_channels // squeeze_factor, 8)
        self.fc1 = nn.Conv2d(input_channels, squeeze_channels, 1)
        self.relu = nn.ReLU()
        self.fc2 = nn.Conv2d(squeeze_channels, input_channels, 1)

    def _scale(self, input: torch.Tensor, inplace: bool) -> torch.Tensor:
        scale = F.adaptive_avg_pool2d(input, 1)
        scale = self.fc1(scale)
        scale = self.relu(scale)
        scale = self.fc2(scale)
        return torch.sigmoid(scale)

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        scale = self._scale(input, True)
        return scale * input
    
class MBConv(nn.Module):
    def __init__(self, nin, nout, stride, expansion_ratio=4, se=True):
        super(MBConv, self).__init__()
        expand_ = expansion_ratio * nout
        self.stride = stride
        self.se = se
        self.expansion = nn.Sequential(nn.Conv2d(nin, expand_, kernel_size=1), 
                                       nn.BatchNorm2d(expand_), SiLU())
        
        self.depthwise = nn.Sequential(nn.Conv2d(expand_, expand_, kernel_size=3, padding=1, stride=stride, groups=nin), 
                                       nn.BatchNorm2d(expand_), SiLU())
        
        self.pointwise = nn.Sequential(nn.Conv2d(expand_, nout, kernel_size=1), 
                                       nn.BatchNorm2d(nout))
        
        if self.se:
            self.squeeze_excitation = SqueezeExcitation(expand_)
        
        if stride == 1:
            if (nin != nout):
                self.map = nn.Conv2d(nin, nout, kernel_size=(1, 1), stride=stride, padding=0, bias=False)
            else:
                self.map = nn.Identity()
    def forward(self, x):
        out = self.expansion(x)
        out = self.depthwise(out)
        
        if self.se:
            out = self.squeeze_excitation(out)
            
        out = self.pointwise(out)
            
        if self.stride == 1:
            identity = self.map(x)
            out += identity
        return out
    

In [15]:
demo = False
if demo:
    model = nn.Conv2d(10, 10, 1)
    model_parameters = filter(lambda p: p.requires_grad, model.parameters())
    params = sum([np.prod(p.size()) for p in model_parameters])
    print(f"params : {params}")

    model = nn.Linear(10, 10)
    model_parameters = filter(lambda p: p.requires_grad, model.parameters())
    params = sum([np.prod(p.size()) for p in model_parameters])
    print(f"params : {params}")

In [16]:
model = MBConv(64, 64, 1, 6, se=True)

In [17]:
batch_size = 4
features = torch.randn(batch_size, 64, 32, 32)
output = model(features)
print(f"Input shape : {features.shape}\nOutput shape : {output.shape}")

Input shape : torch.Size([4, 64, 32, 32])
Output shape : torch.Size([4, 64, 32, 32])


#### practice
* Build MobileNetV3 Block
* Replace sigmoid / swish to h-sigmoid / h-swish

![](assets/hswish.png)

![](assets/h-swish.png)

In [18]:
class HSigmoid(nn.Module):
    def forward(self, x):
        return (F.relu6(x + 3, inplace=True) / 6)
            
class HSwish(nn.Module):
    def __init__(self):
        super().__init__()
        self.hsigmoid = HSigmoid()
    def forward(self, x):
        return x * self.hsigmoid(x)
        

---

## GhostNet

#### examples

![](assets/ghost.png)

In [19]:
import math
class GhostModule(nn.Module):
    def __init__(self, inp, oup, kernel_size=1, ratio=2, dw_size=3, stride=1, relu=True):
        super(GhostModule, self).__init__()
        self.oup = oup
        init_channels = math.ceil(oup / ratio)
        new_channels = init_channels*(ratio-1)

        self.primary_conv = nn.Sequential(
            nn.Conv2d(inp, init_channels, kernel_size,
                      stride, kernel_size//2, bias=False),
            nn.BatchNorm2d(init_channels),
            nn.ReLU(inplace=True) if relu else nn.Sequential(),
        )

        self.cheap_operation = nn.Sequential(
            nn.Conv2d(init_channels, new_channels, dw_size, 1,
                      dw_size//2, groups=init_channels, bias=False),
            nn.BatchNorm2d(new_channels),
            nn.ReLU(inplace=True) if relu else nn.Sequential(),
        ) ## 1*1 depthwise convolution

    def forward(self, x):
        x1 = self.primary_conv(x)
        x2 = self.cheap_operation(x1)
        out = torch.cat([x1, x2], dim=1)
        return out[:, :self.oup, :, :]

In [20]:
model = GhostModule(64, 64, 3)

In [21]:
batch_size = 4
features = torch.randn(batch_size, 64, 32, 32)
output = model(features)
print(f"Input shape : {features.shape}\nOutput shape : {output.shape}")

Input shape : torch.Size([4, 64, 32, 32])
Output shape : torch.Size([4, 64, 32, 32])


## NFNet

### Can we remove batch normalization layer?

#### Advantage of BN
    * Batch normalization downscales the residual branch.
    * Batch normalization allows efficient large-batch training 
    * Smoothens the loss landscape
    * Avoid elimination singularities
    * Batch normalization eliminates mean-shift
#### Disadvantage of BN
    * Expensive computational primitive.
    * Introduces a discrepancy between the be- haviour of the model during training and at inference time.
    * Batch normalization breaks the independence between training examples in the minibatch

### Towards Removing Batch Normalization

#### Adaptive Gradient Clipping
* Batch normalization allows efficient large-batch training

![](assets/AGC.png)

In [22]:
## source : https://github.com/vballoli/nfnets-pytorch
def unitwise_norm(x: torch.Tensor):
    if x.ndim <= 1:
        dim = 0
        keepdim = False
    elif x.ndim in [2, 3]:
        dim = 0
        keepdim = True
    elif x.ndim == 4:
        dim = [1, 2, 3]
        keepdim = True
    else:
        raise ValueError('Wrong input dimensions')

    return torch.sum(x**2, dim=dim, keepdim=keepdim) ** 0.5

def example():
    param_norm = torch.max(unitwise_norm(p.detach()), torch.tensor(0.001).to(p.device)) ## ||W(l,i)||*F
    grad_norm = unitwise_norm(p.grad.detach()) ## ||G(l,i)||*F
    max_norm = param_norm * self.clipping

    trigger = grad_norm > max_norm ## ||G(l,i)||*F > ||W(l,i)||*F * self.clipping

    clipped_grad = p.grad * (max_norm / torch.max(grad_norm,torch.tensor(1e-6).to(grad_norm.device)))
    p.grad.detach().data.copy_(torch.where(trigger, clipped_grad, p.grad))

#### Scaled Weight Standardization
* In addition, Brock et al. (2021) prevent the emergence of a **mean-shift** in the hidden activations by introducing Scaled Weight Standardization

![](assets/sws.png)

In [23]:
class ScaledStdConv2d(nn.Conv2d):
    """Conv2d layer with Scaled Weight Standardization.
    Paper: `Characterizing signal propagation to close the performance gap in unnormalized ResNets` -
        https://arxiv.org/abs/2101.08692
    Adapted from timm: https://github.com/rwightman/pytorch-image-models/blob/4ea593196414684d2074cbb81d762f3847738484/timm/models/layers/std_conv.py
    """

    def __init__(self, in_channels, out_channels, kernel_size, stride=1, padding=0, dilation=1, groups=1,
                 bias=True, gain=True, gamma=1.0, eps=1e-5, use_layernorm=False):
        super().__init__(
            in_channels, out_channels, kernel_size, stride=stride,
            padding=padding, dilation=dilation, groups=groups, bias=bias)
        self.gain = nn.Parameter(torch.ones(
            self.out_channels, 1, 1, 1)) if gain else None
        # gamma * 1 / sqrt(fan-in)
        # Fan-in: is a term that defines the maximum number of inputs that a system can accept.
        self.scale = gamma * self.weight[0].numel() ** -0.5 ## numel() : number of element -> 1 / N**0.5
        self.eps = eps ** 2 if use_layernorm else eps
        # experimental, slightly faster/less GPU memory use
        self.use_layernorm = use_layernorm

    def get_weight(self):
        if self.use_layernorm:
            weight = self.scale * \
                F.layer_norm(self.weight, self.weight.shape[1:], eps=self.eps)
        else:
            mean = torch.mean(
                self.weight, dim=[1, 2, 3], keepdim=True)
            std = torch.std(
                self.weight, dim=[1, 2, 3], keepdim=True, unbiased=False)
            weight = self.scale * (self.weight - mean) / (std + self.eps)
        if self.gain is not None:
            weight = weight * self.gain ## scaled
        return weight

    def forward(self, x):
        return F.conv2d(x, self.get_weight(), self.bias, self.stride, self.padding, self.dilation, self.groups)
    

#### Normalizer-Free Residual Block
* Batch normalization downscales the residual branch.

![](assets/free.png)

In [24]:
from typing import Optional
from functools import partial
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1, base_conv: nn.Conv2d = ScaledStdConv2d) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return base_conv(in_planes, out_planes, kernel_size=3, stride=stride,
                     padding=dilation, groups=groups, bias=False, dilation=dilation)

class BasicBlock(nn.Module):
    expansion: int = 1

    def __init__(
        self,
        inplanes: int,
        planes: int,
        stride: int = 1,
        downsample: Optional[nn.Module] = None,
        groups: int = 1,
        base_width: int = 64,
        dilation: int = 1,
        alpha: float = 0.2,
        beta: float = 1.0,
        base_conv: nn.Conv2d = ScaledStdConv2d
    ) -> None:
        super(BasicBlock, self).__init__()
        if groups != 1 or base_width != 64:
            raise ValueError(
                'BasicBlock only supports groups=1 and base_width=64')
        if dilation > 1:
            raise NotImplementedError(
                "Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        self.conv1 = conv3x3(inplanes, planes, stride, base_conv=base_conv)
        
        self.act = nn.LeakyReLU()
        self.conv2 = conv3x3(planes, planes, base_conv=base_conv)
        self.downsample = downsample
        self.stride = stride
        self.alpha = alpha
        self.beta = beta

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = x
        
        out = x * self.beta

        out = self.conv1(out)
        out = self.act(out)

        out = self.conv2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out *= self.alpha
        out += identity

        return out

In [25]:
model = BasicBlock(3, 3)

In [26]:
output = model(torch.randn(1, 3, 32, 32))
print(output.shape)

torch.Size([1, 3, 32, 32])


## EfficientNetV2
* Combination of training-aware neural architecture search (NAS) and scaling to improve both training speed and parameter efficiency.
* Depthwise convolutions are slow in **early layers** but effective in later stages.
* Gradually increase the learning difficulty with larger im- age sizes and stronger regularization

#### examples

![](assets/fused.png)

In [27]:
class SqueezeExcitation(nn.Module):
    # Implemented as described at Figure 4 of the MobileNetV3 paper
    def __init__(self, input_channels: int, squeeze_factor: int = 4):
        super().__init__()
        squeeze_channels = _make_divisible(input_channels // squeeze_factor, 8)
        self.fc1 = nn.Conv2d(input_channels, squeeze_channels, 1)
        self.relu = nn.ReLU()
        self.fc2 = nn.Conv2d(squeeze_channels, input_channels, 1)

    def _scale(self, input: torch.Tensor, inplace: bool) -> torch.Tensor:
        scale = F.adaptive_avg_pool2d(input, 1)
        scale = self.fc1(scale)
        scale = self.relu(scale)
        scale = self.fc2(scale)
        return torch.sigmoid(scale)

    def forward(self, input: torch.Tensor) -> torch.Tensor:
        scale = self._scale(input, True)
        return scale * input
    
class FusedMBConv(nn.Module):
    def __init__(self, nin, nout, stride, expansion_ratio=4, se=True):
        super(FusedMBConv, self).__init__()
        expand_ = expansion_ratio * nout
        self.stride = stride
        self.se = se
        self.expansion = nn.Sequential(nn.Conv2d(nin, expand_, kernel_size=3, padding=1), 
                                       nn.BatchNorm2d(expand_), SiLU())
        
        if self.se:
            self.squeeze_excitation = SqueezeExcitation(expand_)
            
        self.pointwise = nn.Sequential(nn.Conv2d(expand_, nout, kernel_size=1), 
                                       nn.BatchNorm2d(nout))
        
        
        if stride == 1:
            if (nin != nout):
                self.map = nn.Conv2d(nin, nout, kernel_size=(1, 1), stride=stride, padding=0, bias=False)
            else:
                self.map = nn.Identity()
    def forward(self, x):
        out = self.expansion(x)
        
        if self.se:
            out = self.squeeze_excitation(out)
            
        out = self.pointwise(out)
            
        if self.stride == 1:
            identity = self.map(x)
            out += identity
        return out
    

In [28]:
model = FusedMBConv(64, 64, 1)

In [29]:
batch_size = 4
features = torch.randn(batch_size, 64, 32, 32)
output = model(features)
print(f"Input shape : {features.shape}\nOutput shape : {output.shape}")

Input shape : torch.Size([4, 64, 32, 32])
Output shape : torch.Size([4, 64, 32, 32])


---

## Practice : Train your model on Cifar10
* reference : https://github.com/jeff52415/Tips-for-improving-your-neural-network-pytorch