# Notebook to build PP-PicoDet model with basic pytorch building blocks
- https://arxiv.org/abs/2111.00902

## Note new version picodetv2
- https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/architectures/picodet.py
- LCNET backbone (https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/backbones/lcnet.py)
- LCPAN (https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/necks/lc_pan.py)
- picoheadv2 (https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/heads/pico_head.py)

In [6]:
import torch
import torch.nn as nn

# Backbone PPLCNET (https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/backbones/lcnet.py)

In [7]:
# utils 
NET_CONFIG = {
    "blocks2":
    #k, in_c, out_c, s, use_se
    [[3, 16, 32, 1, False], ],
    "blocks3": [
        [3, 32, 64, 2, False],
        [3, 64, 64, 1, False],
    ],
    "blocks4": [
        [3, 64, 128, 2, False],
        [3, 128, 128, 1, False],
    ],
    "blocks5": [
        [3, 128, 256, 2, False],
        [5, 256, 256, 1, False],
        [5, 256, 256, 1, False],
        [5, 256, 256, 1, False],
        [5, 256, 256, 1, False],
        [5, 256, 256, 1, False],
    ],
    "blocks6": [[5, 256, 512, 2, True], [5, 512, 512, 1, True]]
}



def make_divisible(v, divisor=8, min_value=None):
    if min_value is None:
        min_value = divisor
    new_v = max(min_value, int(v + divisor / 2) // divisor * divisor)
    if new_v < 0.9 * v:
        new_v += divisor
    return new_v

class ConvBNLayer(nn.Module):
    def __init__(self,
                 num_channels,
                 filter_size,
                 num_filters,
                 stride,
                 num_groups=1):
        super().__init__()
        
        self.conv = nn.Conv2d(in_channels=num_channels,
                              out_channels=num_filters,
                              kernel_size=filter_size,
                              stride=stride,
                              padding=(filter_size-1)//2,
                              groups=num_groups,
                              bias=False)
        # in inference fuse to conv
        self.bn = nn.BatchNorm2d(num_filters)
        self.hardswish = nn.Hardswish()
        
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.hardswish(x)
        return x

class SEModule(nn.Module):
    def __init__(self, channel, reduction=4):
        super().__init__()
        self.avg_pool= nn.AdaptiveAvgPool2d(1)
        self.conv1 = nn.Conv2d(in_channels=channel,
                               out_channels=channel//reduction,
                               kernel_size=1,
                               stride=1,
                               padding=0)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(in_channels=channel//reduction,
                               out_channels=channel,
                               kernel_size=1,
                               stride=1,
                               padding=0)
        self.hardsigmoid = nn.Hardsigmoid()
    def forward(self, x):
        idendity = x
        x = self.avg_pool(x)
        x = self.conv1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.hardsigmoid(x)
        x = x*idendity
        return x
        
        
class DepthWiseSeparable(nn.Module):
    def __init__(self,
                 num_channels,
                 num_filters,
                 stride,
                 dw_size=3,
                 use_se=False):
        super().__init__()
        self.use_se = use_se
        self.dw_conv = ConvBNLayer(num_channels=num_channels,
                                   num_filters=num_channels,
                                   filter_size=dw_size,
                                   stride=stride,
                                   num_groups=num_channels)
        if use_se:
            self.se = SEModule(num_channels)
        self.pw_conv = ConvBNLayer(num_channels=num_channels,
                                   filter_size=1,
                                   num_filters=num_filters,
                                   stride=1)
    def forward(self, x):
        x = self.dw_conv(x)
        if self.use_se:
            x = self.se(x)
        x = self.pw_conv(x)
        return x
            
        
        

test_scale = 1
conv_bn = ConvBNLayer(num_channels=3,
                      filter_size=3,
                      num_filters=make_divisible(16*test_scale),
                      stride=2)
se_module = SEModule(make_divisible(16*test_scale))
blocks2 = nn.Sequential(*[DepthWiseSeparable(
                            num_channels=make_divisible(in_c*test_scale),
                            num_filters=make_divisible(out_c*test_scale),
                            dw_size=k,
                            stride=s,
                            use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["blocks2"])])

test_input = torch.randn(1, 3, 320, 320)
test_out = conv_bn(test_input)
test_out = blocks2(test_out)
print(test_out.size())

torch.Size([1, 32, 160, 160])


In [8]:
class LCNet(nn.Module):
    def __init__(self, scale=1.0, feature_maps=[3, 4, 5]):
        super().__init__()
        self.scale = scale
        self.feature_maps = feature_maps
        
        out_channels = []
        self.conv1 = ConvBNLayer(num_channels=3,
                                 filter_size=3,
                                 num_filters=make_divisible(16*scale),
                                 stride=2)
        self.blocks2 = nn.Sequential(*[DepthWiseSeparable(
                            num_channels=make_divisible(in_c*scale),
                            num_filters=make_divisible(out_c*scale),
                            dw_size=k,
                            stride=s,
                            use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["blocks2"])])
        self.blocks3 = nn.Sequential(*[DepthWiseSeparable(
                            num_channels=make_divisible(in_c*scale),
                            num_filters=make_divisible(out_c*scale),
                            dw_size=k,
                            stride=s,
                            use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["blocks3"])])
        
        out_channels.append(make_divisible(NET_CONFIG["blocks3"][-1][2]*scale))
        
        self.blocks4 = nn.Sequential(*[DepthWiseSeparable(
                            num_channels=make_divisible(in_c*scale),
                            num_filters=make_divisible(out_c*scale),
                            dw_size=k,
                            stride=s,
                            use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["blocks4"])])
        
        out_channels.append(make_divisible(NET_CONFIG["blocks4"][-1][2]*scale))
        
        self.blocks5 = nn.Sequential(*[DepthWiseSeparable(
                            num_channels=make_divisible(in_c*scale),
                            num_filters=make_divisible(out_c*scale),
                            dw_size=k,
                            stride=s,
                            use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["blocks5"])])
        
        out_channels.append(make_divisible(NET_CONFIG["blocks5"][-1][2]*scale))
        
        self.blocks6 = nn.Sequential(*[DepthWiseSeparable(
                            num_channels=make_divisible(in_c*scale),
                            num_filters=make_divisible(out_c*scale),
                            dw_size=k,
                            stride=s,
                            use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["blocks6"])])
        
        out_channels.append(make_divisible(NET_CONFIG["blocks6"][-1][2]*scale))
        self._out_channels = [
            ch for idx, ch in enumerate(out_channels) if idx+2 in feature_maps]
        
    def forward(self, inputs):
        x = inputs['image']
        outs = []
        
        x = self.conv1(x)
        x = self.blocks2(x)
        x = self.blocks3(x)
        outs.append(x)
        x = self.blocks4(x)
        outs.append(x)
        x = self.blocks5(x)
        outs.append(x)
        x = self.blocks6(x)
        outs.append(x)
        outs = [o for i, o in enumerate(outs) if i+2 in self.feature_maps]
        return outs

backbone = LCNet()
inp_t = torch.randn(1, 3, 320, 320)
output = backbone({'image': inp_t})
for t in output:
    print(t.size())

torch.Size([1, 128, 40, 40])
torch.Size([1, 256, 20, 20])
torch.Size([1, 512, 10, 10])


# Detector Neck LCPan: (https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/necks/lc_pan.py

In [43]:
class ConvBNLayerPAN(nn.Module):
    """
    In Paddle Paddle there is two modules
    named ConvBNLayer so we name this ConvBNLayerPAN
    to separate the two
    """
    def __init__(self,
             in_channel=96,
             out_channel=96,
             kernel_size=3,
             stride=1,
             groups=1,
             act='leaky_relu'):
        super(ConvBNLayerPAN, self).__init__()
        self.conv = nn.Conv2d(
            in_channels=in_channel,
            out_channels=out_channel,
            kernel_size=kernel_size,
            groups=groups,
            padding=(kernel_size - 1) // 2,
            stride=stride,
            bias=False)
        self.bn = nn.BatchNorm2d(out_channel)
        
        self.has_act = False
        if act:
            self.has_act = True
            
        self.act = nn.LeakyReLU()
        if act == "hard_swish":
            self.act = nn.Hardswish()

        
    def forward(self, x):
        x = self.bn(self.conv(x))
        if self.act:
            x = self.act(x)
        return x
    
class Channel_T(nn.Module):
    def __init__(self,
                 in_channels=[116, 232, 464],
                 out_channels=96,
                 act="leaky_relu"):
        super(Channel_T, self).__init__()
        self.convs = nn.ModuleList()
        for channel_count in in_channels:
            self.convs.append(ConvBNLayerPAN(channel_count, out_channels, 1, act=act))
    def forward(self, x):
        outs = [self.convs[i](x[i]) for i in range(len(x))]
        return outs
            
class DPModule(nn.Module):
    """
    Depth-wise and point-wise module.
     Args:
        in_channel (int): The input channels of this Module.
        out_channel (int): The output channels of this Module.
        kernel_size (int): The conv2d kernel size of this Module.
        stride (int): The conv2d's stride of this Module.
        act (str): The activation function of this Module,
                   Now support `leaky_relu` and `hard_swish`.
    """
    def __init__(self,
             in_channel=96,
             out_channel=96,
             kernel_size=3,
             stride=1,
             act='leaky_relu',
             use_act_in_out=True):
        super(DPModule, self).__init__()
        self.use_act = False
        if act:
            self.use_act = True
        self.use_act_in_out = use_act_in_out
        self.dwconv = nn.Conv2d(
            in_channels=in_channel,
            out_channels=out_channel,
            kernel_size=kernel_size,
            groups=out_channel,
            padding=(kernel_size - 1) // 2,
            stride=stride,
            bias=False)
        self.bn1 = nn.BatchNorm2d(out_channel)
        self.pwconv = nn.Conv2d(
            in_channels=out_channel,
            out_channels=out_channel,
            kernel_size=1,
            groups=1,
            padding=0,
            bias=False)
        self.bn2 = nn.BatchNorm2d(out_channel)
        self.act_func = nn.LeakyReLU()
        if act == "hard_swish":
            self.act_func = nn.Hardswish()
    def forward(self, x):
        x = self.bn1(self.dwconv(x))
        if self.use_act:
            x = self.act_func(x)
        x = self.bn2(self.pwconv(x))
        if self.use_act_in_out:
            x = self.act_func(x)
        return x
            
# test Channel_T
channel_t = Channel_T(act='hard_swish')
inp = [torch.randn(1, c, 255, 255) for c in [116, 232, 464]]
out = channel_t(inp)
for t in out:
    print(t.size())

# test DPModule
dp_module = DPModule()
inp = torch.randn(1, 96, 25, 25)
out = dp_module(inp)
print(out.size())

torch.Size([1, 96, 255, 255])
torch.Size([1, 96, 255, 255])
torch.Size([1, 96, 255, 255])
torch.Size([1, 96, 25, 25])


In [59]:
class LCPAN(nn.Module):
    """Path Aggregation Network with LCNet module.
    Args:
        in_channels (List[int]): Number of input channels per scale.
        out_channels (int): Number of output channels (used at each scale)
        kernel_size (int): The conv2d kernel size of this Module.
        num_features (int): Number of output features of CSPPAN module.
        num_csp_blocks (int): Number of bottlenecks in CSPLayer. Default: 1
        use_depthwise (bool): Whether to depthwise separable convolution in
            blocks. Default: True
    """
    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size=5,
                 num_features=3,
                 use_depthwise=True,
                 act='hard_swish',
                 spatial_scales=[0.125, 0.0625, 0.03125]):
        super(LCPAN, self).__init__()
        self.conv_t = Channel_T(in_channels, out_channels, act=act)
        in_channels = [out_channels]*len(spatial_scales)
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.spatial_scales = spatial_scales
        self.num_features = num_features
        conv_func = DPModule if use_depthwise else ConvBNLayer
        
        NET_CONFIG = {
            #k, in_c, out_c, stride, use_se
            "block1": [
                [kernel_size, out_channels * 2, out_channels * 2, 1, False],
                [kernel_size, out_channels * 2, out_channels, 1, False],
            ],
            "block2": [
                [kernel_size, out_channels * 2, out_channels * 2, 1, False],
                [kernel_size, out_channels * 2, out_channels, 1, False],
            ]
        }
        
        if self.num_features == 4:
            self.first_top_conv = conv_func(
                in_channels[0], in_channels[0], kernel_size, stride=2, act=act)
            self.second_top_conv = conv_func(
                in_channels[0], in_channels[0], kernel_size, stride=2, act=act)
            self.spatial_scales.append(self.spatial_scales[-1] / 2)
        
        self.upsample = nn.Upsample(scale_factor=2, mode='nearest')
        self.top_down_blocks = nn.ModuleList()
        for idx in range(len(in_channels)-1 , 0, -1):
            self.top_down_blocks.append(nn.Sequential(*[
                DepthWiseSeparable(num_channels=in_c,
                                   num_filters=out_c,
                                   dw_size=k,
                                   stride=s,
                                   use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG['block1'])
            ]))
            
        self.downsamples = nn.ModuleList()
        self.bottom_up_blocks = nn.ModuleList()
        
        for idx in range(len(in_channels)-1):
            self.downsamples.append(conv_func(in_channels[idx],
                                              in_channels[idx],
                                              kernel_size=kernel_size,
                                              stride=2,
                                              act=act))
            self.bottom_up_blocks.append(nn.Sequential(*[
                DepthWiseSeparable(num_channels=in_c,
                                   num_filters=out_c,
                                   dw_size=k,
                                   stride=s,
                                   use_se=se) for i, (k, in_c, out_c, s, se) in enumerate(NET_CONFIG["block2"])
            ]))
        
    def forward(self, inputs):
        """
        Args:
            inputs (tuple[Tensor]): input features.
        Returns:
            tuple[Tensor]: CSPPAN features.
        """
        
        assert len(inputs) == len(self.in_channels)
        inputs = self.conv_t(inputs)
        
        # top-down path
        inner_outs = [inputs[-1]]
        for idx in range(len(self.in_channels)-1, 0, -1):
            feat_heigh = inner_outs[0] #(sic)
            feat_low = inputs[idx-1]
            
            upsample_feat = self.upsample(feat_heigh)
            
            inner_out = self.top_down_blocks[len(self.in_channels)-1-idx](
                torch.cat((upsample_feat, feat_low), dim=1))
            inner_outs.insert(0, inner_out)
        
        # bottom-up path
        outs = [inner_outs[0]]
        for idx in range(len(self.in_channels)-1):
            feat_low = outs[-1]
            feat_height = inner_outs[idx+1]
            downsample_feat = self.downsamples[idx](feat_low)
            out = self.bottom_up_blocks[idx](
                torch.cat((downsample_feat, feat_height), dim=1))

            outs.append(out)
        
        top_features = None
        if self.num_features == 4:
            top_features = self.first_top_conv(inputs[-1])
            top_features = top_features+self.second_top_conv(outs[-1])
            outs.append(top_features)
        return tuple(outs)
lcpan = LCPAN(in_channels=[128, 256, 512], out_channels=96)

# Test backbone plus LCPAN

In [62]:
# model config
scale = 0.75
feature_maps = [3, 4, 5]
out_channels = 96

backbone = LCNet(scale, feature_maps)
# get backbone output shape
outputs = backbone({'image': torch.randn(1, 3, 320, 320)})
in_channels = [c.size()[1] for c in outputs]

neck = LCPAN(in_channels, out_channels)
outputs = neck(outputs)
for out in outputs:
    print(out.size())

torch.Size([1, 96, 40, 40])
torch.Size([1, 96, 20, 20])
torch.Size([1, 96, 10, 10])


# PicoHeadV2: https://github.com/PaddlePaddle/PaddleDetection/blob/release/2.4/ppdet/modeling/heads/pico_head.py