# picodet from scratch

In [1]:
import sys
from pathlib import Path
from typing import Sequence, Optional, Union
sys.path.insert(0, "..")

import numpy as np
import torch
import torch.nn as nn
from pydantic.dataclasses import dataclass
from pydantic import Field
from mmcv.cnn import ConvModule
from mmdet.models.utils import make_divisible
from mmcv.runner import BaseModule

from src.esnet import ESNet
from src.csppan import ChannelEqualiser, DarknetBottleneck, CSPLayer, CSPPAN

## Backbone: ESNet

In [2]:
esnet = ESNet()
print(esnet.out_ixs)

[2, 9, 12]




In [3]:
esnet.stage_out_channels
test_input = torch.from_numpy(np.random.rand(1, 3, 320, 320).astype(np.float32))
test_outputs = esnet(test_input)
print([a.shape for a in test_outputs])

[torch.Size([1, 96, 40, 40]), torch.Size([1, 192, 20, 20]), torch.Size([1, 384, 10, 10])]


TODO: Factor inverted residual blocks into this codebase

## Neck: CSPPAN

In [4]:
c = ChannelEqualiser([96, 192, 384], 128)
channel_eq_outputs = c(test_outputs)
print([c.shape for c in channel_eq_outputs])

[torch.Size([1, 128, 40, 40]), torch.Size([1, 128, 20, 20]), torch.Size([1, 128, 10, 10])]


In [5]:
test_input = torch.from_numpy(np.random.rand(1, 32, 100, 100).astype(np.float32))
dbb = DarknetBottleneck(in_channels=32, out_channels=32)
dbb_output = dbb(test_input)
print(dbb_output.shape)

torch.Size([1, 32, 100, 100])


In [6]:
cspl = CSPLayer(32, 32)
cspl_output = cspl(test_input)
print(cspl_output.shape)

torch.Size([1, 32, 100, 100])


In [7]:
csppan = CSPPAN(
    in_channels=[96, 192, 384],
    act_cfg=dict(type='HSwish'),
    norm_cfg=dict(type='BN', requires_grad=True),
    out_channels=96,
    squeeze_ratio=1,
    num_csp_blocks=1
)
csppan_outputs = csppan(test_outputs)
print([t.shape for t in csppan_outputs])

[torch.Size([1, 96, 40, 40]), torch.Size([1, 96, 20, 20]), torch.Size([1, 96, 10, 10]), torch.Size([1, 96, 5, 5])]


## Head: PicoDetHead

In [8]:
from mmdet.models.dense_heads import AnchorFreeHead

In [9]:
AFH_callable = [d for d in dir(AnchorFreeHead) if callable(getattr(AnchorFreeHead, d))]

In [82]:
from mmcv.cnn import DepthwiseSeparableConvModule, ConvModule

@dataclass
class PicoDetHead(BaseModule):
    """
    Head of PP-PicoDet (v1)
    
    GFL: generalised focal loss 
    VFL: varifocal loss (classification loss)
    DFL: distribution focal loss (localisation loss)
    
    """
    num_classes: int = Field(
        description="Number of categories excluding the background category"
    )
    in_channels: int = Field(
        description="Number of channels in the input feature map"
    )
    feat_channels: int = Field(
        default=96,
        description="Number of hidden channels in stacking convs"
    )
    stacked_convs: int = Field(
        default=2,
        description="Number of stacked convolutions in the head"
    )
    strides: tuple[int] = Field(
        default=(8, 16, 32, 64),
        description="Downsample factor of each feature map"
    )
    use_depthwise: bool = Field(
        default=True,
        description="Enable depthwise-separable convolutions"
    )
    kernel_size: int = Field(default=5, description="Kernel size of conv layers")
    share_cls_reg: bool = Field(
        default=True,
        description="Flag to share weights between regression and classificaiton branches"
    )
    sigmoid_classifier: bool = Field(
        default=True,
        description="Whether sigmoid loss will be used. This will reduce output channels by 1."
    )
    # I think this is the number of objects allowed assigned to the same prior point?
    reg_max: int = Field(
        default=7,
        description="Max value of integral set {0, ..., reg_max} in DFL setting."
    )
    conv_cfg: dict = Field(
        default=None,
        description="Config dict for 2D convolution layer."
    )
    norm_cfg: dict = Field(
        default_factory=lambda: dict(type='BN', requires_grad=True),
        description="Config dict for 2D convolution layer."
    ) 
    act_cfg: dict = Field(
        default_factory=lambda: dict(type='HSwish'),
        description="Config dict for activation layer."
    )    
    init_cfg: Optional[dict] = Field(
        default=None, description="Weight initialisation config dict"
    )    

    def __post_init__(self):
        pass

    @property
    def ConvModule(self):
        return DepthwiseSeparableConvModule if self.use_depthwise else ConvModule
    
    @property
    def output_channels_classification(self):
        return self.num_classes + (not self.sigmoid_classifier)
    
    def __post_init_post_parse__(self):
        super().__init__(self.init_cfg)
        # build conv layers for interpreting neck outputs - classification
        self.cls_convs = nn.ModuleList([self._build_convs() for _ in self.strides])
        # regression (optional, if not sharing weights from classification)
        self.reg_convs = nn.ModuleList([
            self._build_convs() for _ in self.strides if not self.share_cls_reg
        ])
        # generalised focal loss head classification
        self.output_channels_regression = 4 * (self.reg_max + 1)
        # if sharing the weights between classification and regression,
        # the GFL head will calculate both together then split
        self.gfl_classification_conv_out_channels = self.output_channels_classification
        if self.share_cls_reg:
            # C + (x1, x2, y1, y2 + ?)
            self.gfl_classification_conv_out_channels += self.output_channels_regression
        # classification
        self.gfl_classification_convs = nn.ModuleList([
            nn.Conv2d(
                in_channels=self.feat_channels,
                out_channels=self.gfl_classification_conv_out_channels, 
                kernel_size=1,
                padding=0
            )
            for _ in self.strides
        ])
        # regression (optional, if weights shared done together by classificaiton conv)
        self.gfl_regression_convs = nn.ModuleList([
            nn.Conv2d(
                in_channels=self.feat_channels,
                out_channels=self.output_channels_regression,
                kernel_size=1,
                padding=0
            )
            for _ in self.strides
            if not self.share_cls_reg
        ])
    
    def _build_convs(self):
        """Create a list of self.stacked_convs conv blocks"""
        chn = (lambda i: self.in_channels if not i else self.feat_channels)
        return nn.ModuleList([
            self.ConvModule(
                in_channels=chn(i),
                out_channels=self.feat_channels,
                kernel_size=self.kernel_size,
                stride=1,
                padding=(self.kernel_size-1) // 2,
                act_cfg=self.act_cfg,
                norm_cfg=self.norm_cfg,
                bias=self.norm_cfg is None
            )
            for i in range(self.stacked_convs)
        ])
            

In [83]:
h = PicoDetHead(in_channels=96, num_classes=80)

In [84]:
h

PicoDetHead(num_classes=80, in_channels=96, feat_channels=96, stacked_convs=2, strides=(8, 16, 32, 64), use_depthwise=True, kernel_size=5, share_cls_reg=True, sigmoid_classifier=True, reg_max=7, conv_cfg=None, norm_cfg={'type': 'BN', 'requires_grad': True}, act_cfg={'type': 'HSwish'}, init_cfg=None)