In [2]:
from torch import nn
import torch.nn.functional as F

In [3]:
# ConV Mdule with changeable eps,momentum,kernel size
def autopad(k, p=None, d=1):  # pad to same output shape
    if d > 1:
        k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k]  # actual kernel-size
    if p is None:
        p = k // 2 if isinstance(k, int) else [x // 2 for x in k]  # auto-pad
    return p
class ConV(nn.Module):
    default_act = nn.SiLU()
    def __init__(self, in_channels, out_channels, kernel_size=(3,3), stride=1, padding=None, dilation=1, groups=1, eps=0.001, momentum=0.03, act=True):
        super(ConV, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, stride=stride, padding=autopad(kernel_size, padding, dilation), bias=False)
        self.bn = nn.BatchNorm2d(out_channels, eps=eps,momentum=momentum, affine=True, track_running_stats=True)
        self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()

    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        return self.act(x)
    def forward_fuse(self, x):
        x = self.conv(x)
        return self.act(x)

In [4]:
# Bottleneck module with optional shortcut connection
class Bottleneck(nn.Module):

    def __init__(self, in_channels, out_channels, shortcut=True, groups=1, kernel_size=(3,3), expansion=0.5):
        super().__init__()
        c_ = int(out_channels * expansion)  # hidden channels
        self.conv1 = ConV(in_channels, c_, kernel_size=kernel_size[0], groups=groups, eps=0.001, momentum=0.03)
        self.conv2 = ConV(c_, out_channels, kernel_size=kernel_size[1], groups=groups, eps=0.001, momentum=0.03)
        self.add = shortcut and in_channels == out_channels

    def forward(self, x):
        if self.add:
            return x + self.conv2(self.conv1(x))
        else:
            return self.conv2(self.conv1(x))


In [5]:
# C2f module with multiple bottleneck blocks
import torch
class C2f(nn.Module):

    def __init__(self, in_channels, out_channels, num_bn_blocks=1, shortcut=False, groups=1, expansion=0.5):
        super().__init__()
        self.c = min(int(out_channels * expansion), int(in_channels*expansion))  # hidden channels
        self.conv1 = ConV(in_channels, 2 * self.c, 1,1)
        self.conv2 = ConV((2 + num_bn_blocks) * self.c, out_channels, 1)  # optional act=FReLU(c2)
        self.m = nn.ModuleList(Bottleneck(self.c, self.c, shortcut=shortcut, groups=groups, kernel_size=((3,3),(3,3)), expansion=1.0) for _ in range(num_bn_blocks))

    def forward(self, x):
        """Forward pass through C2f layer."""
        y = list(self.conv1(x).chunk(2, 1))
        y.extend(m(y[-1]) for m in self.m)
        return self.conv2(torch.cat(y, 1))

    def forward_split(self, x):
        """Forward pass using split() instead of chunk()."""
        y = self.conv1(x).split((self.c, self.c), 1)
        y = [y[0], y[1]]
        y.extend(m(y[-1]) for m in self.m)
        return self.conv2(torch.cat(y, 1))

In [6]:
class C3(nn.Module):

    def __init__(self, in_channels, out_channels, num_bn_blocks=1, shortcut=True, groups=1, expansion=0.5):
        super().__init__()
        c_ = int(out_channels * expansion)  # hidden channels
        self.cv1 = ConV(in_channels, c_, kernel_size=(1,1), stride=1)
        self.cv2 = ConV(in_channels, c_, kernel_size=(1,1), stride=1)
        self.cv3 = ConV(2 * c_, out_channels, kernel_size=(1,1))  # optional act=FReLU(c2)
        self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut=shortcut, groups=groups,expansion=1.0) for _ in range(num_bn_blocks)))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Forward pass through the CSP bottleneck with 3 convolutions."""
        return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1))


In [7]:

class C3k(C3):
    """C3k is a CSP bottleneck module with customizable kernel sizes for feature extraction in neural networks."""

    def __init__(self, in_channels, out_channels, num_bn_blocks, shortcut=True, groups=1, expansion=0.5, kernel_size=3):
        super().__init__(in_channels, out_channels, num_bn_blocks, shortcut, groups, expansion)
        c_ = int(out_channels * expansion)  # hidden channels
        self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut=shortcut, groups=groups, kernel_size=(kernel_size,kernel_size), expansion=1.0) for _ in range(num_bn_blocks)))


In [8]:
class C3k2(C2f):
    """Faster Implementation of CSP Bottleneck with 2 convolutions."""

    def __init__(self, in_channels, out_channels, num_blocks=1, c3k=False, expansion=0.5, groups=1, shortcut=True):
        super().__init__(in_channels, out_channels, num_blocks, shortcut=shortcut, groups=1, expansion=expansion)
        self.m = nn.ModuleList(C3k(self.c, self.c, 2, shortcut, groups) if c3k else Bottleneck(self.c, self.c, shortcut=shortcut, groups=groups) for _ in range(num_blocks))


In [9]:
class SPPF(nn.Module):
    """Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher."""

    def __init__(self, in_channels, out_channels, kernel_size=5):
        super().__init__()
        c_ = in_channels // 2  # hidden channels
        self.conv1 = ConV(in_channels, c_, kernel_size=1, stride=1)
        self.conv2 = ConV(c_ * 4, out_channels, 1, 1)
        self.m = nn.MaxPool2d(kernel_size=kernel_size, stride=1, padding=kernel_size // 2)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        """Apply sequential pooling operations to input and return concatenated feature maps."""
        y = [self.conv1(x)]
        y.extend(self.m(y[-1]) for _ in range(3))
        return self.conv2(torch.cat(y, 1))


In [10]:
class Attention(nn.Module):
    def __init__(self, dim, num_heads=8, attn_ratio=0.5, act=False,kernel_size=1,stride=1):
        super().__init__()
        self.num_heads = num_heads
        self.head_dim = dim // num_heads
        self.key_dim = int(self.head_dim * attn_ratio)
        self.scale = self.key_dim**-0.5
        nh_kd = self.key_dim * num_heads
        h = dim + nh_kd * 2
        self.qkv = ConV(dim, h, kernel_size=kernel_size,stride=stride, act=act)
        self.proj = ConV(dim, dim, kernel_size=kernel_size,stride=stride, act=act)
        self.pe = ConV(dim, dim, kernel_size=3, stride=1, groups=dim, act=act)

    def forward(self, x):
        B, C, H, W = x.shape
        N = H * W
        qkv = self.qkv(x)
        q, k, v = qkv.view(B, self.num_heads, self.key_dim * 2 + self.head_dim, N).split([self.key_dim, self.key_dim, self.head_dim], dim=2)
        attn = (q.transpose(-2, -1) @ k) * self.scale
        attn = attn.softmax(dim=-1)
        x = (v @ attn.transpose(-2, -1)).view(B, C, H, W) + self.pe(v.reshape(B, C, H, W))
        x = self.proj(x)
        return x



In [11]:
class PSABlock(nn.Module):
    def __init__(self, channels, attn_ratio=0.5, num_heads=4, shortcut=True,kernel_size=1,stride=1,act=False) -> None:
        super().__init__()

        self.attn = Attention(channels, attn_ratio=attn_ratio, num_heads=num_heads,kernel_size=kernel_size,stride=stride,act=act)
        self.ffn = nn.Sequential(ConV(channels, channels * 2, kernel_size=kernel_size,stride=stride), ConV(channels * 2, channels, kernel_size=kernel_size, stride=stride, act=act))
        self.add = shortcut

    def forward(self, x):
        x = x + self.attn(x) if self.add else self.attn(x)
        x = x + self.ffn(x) if self.add else self.ffn(x)
        return x


In [12]:
class C2PSA(nn.Module):
    def __init__(self, in_channels, out_channels, num_psa_blocks=1, expansion=0.5,kernel_size=1,stride=1,act=False):
        super().__init__()
        assert in_channels == out_channels
        self.c = int(in_channels * expansion)
        self.conv1 = ConV(in_channels, 2 * self.c, kernel_size=kernel_size, stride=stride)
        self.conv2 = ConV(2 * self.c, in_channels, kernel_size=kernel_size,stride=stride)

        self.m = nn.Sequential(*(PSABlock(self.c, attn_ratio=0.5, num_heads=self.c // 64,act=act) for _ in range(num_psa_blocks)))

    def forward(self, x):
        a, b = self.conv1(x).split((self.c, self.c), dim=1)
        b = self.m(b)
        return self.conv2(torch.cat((a, b), 1))


In [13]:
import math
class DWConV(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=1, stride=1, dilation=1, act=True):
        super().__init__()
        self.conv = ConV(in_channels, out_channels,kernel_size=kernel_size,stride=stride,dilation=dilation,groups=math.gcd(in_channels, out_channels),act=act)
    def forward(self, x):
        return self.conv(x)


In [175]:
from ultralytics import YOLO

# Load a pretrained YOLO11n model
model = YOLO("yolo11n.pt")
print(model)


YOLO(
  (model): DetectionModel(
    (model): Sequential(
      (0): Conv(
        (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (1): Conv(
        (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
        (act): SiLU(inplace=True)
      )
      (2): C3k2(
        (cv1): Conv(
          (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
          (act): SiLU(inplace=True)
        )
        (cv2): Conv(
          (conv): Conv2d(48, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_

In [15]:
class DFL(nn.Module):

    def __init__(self, in_channels=16):
        super().__init__()
        self.conv = nn.Conv2d(in_channels, 1, 1, bias=False).requires_grad_(False)
        x = torch.arange(in_channels, dtype=torch.float)
        self.conv.weight.data[:] = nn.Parameter(x.view(1, in_channels, 1, 1))
        self.in_channels = in_channels

    def forward(self, x):
        b, _, a = x.shape  # batch, channels, anchors
        return self.conv(x.view(b, 4, self.in_channels, a).transpose(2, 1).softmax(1)).view(b, 4, a)

In [16]:
TORCH_1_10 = torch.__version__=="1.10.0"
def make_anchors(feats, strides, grid_cell_offset=0.5):
    """Generate anchors from features."""
    anchor_points, stride_tensor = [], []
    assert feats is not None
    dtype, device = feats[0].dtype, feats[0].device
    for i, stride in enumerate(strides):
        h, w = feats[i].shape[2:] if isinstance(feats, list) else (int(feats[i][0]), int(feats[i][1]))
        sx = torch.arange(end=w, device=device, dtype=dtype) + grid_cell_offset  # shift x
        sy = torch.arange(end=h, device=device, dtype=dtype) + grid_cell_offset  # shift y
        sy, sx = torch.meshgrid(sy, sx, indexing="ij") if TORCH_1_10 else torch.meshgrid(sy, sx)
        anchor_points.append(torch.stack((sx, sy), -1).view(-1, 2))
        stride_tensor.append(torch.full((h * w, 1), stride, dtype=dtype, device=device))
    return torch.cat(anchor_points), torch.cat(stride_tensor)


def dist2bbox(distance, anchor_points, xywh=True, dim=-1):
    """Transform distance(ltrb) to box(xywh or xyxy)."""
    lt, rb = distance.chunk(2, dim)
    x1y1 = anchor_points - lt
    x2y2 = anchor_points + rb
    if xywh:
        c_xy = (x1y1 + x2y2) / 2
        wh = x2y2 - x1y1
        return torch.cat((c_xy, wh), dim)  # xywh bbox
    return torch.cat((x1y1, x2y2), dim)  # xyxy bbox



In [17]:
import numpy as np
class RepConv(nn.Module):
    default_act = nn.SiLU()  # default activation

    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, padding=1, groups=1, dilation=1, act=True, bn=False, deploy=False):
        super().__init__()
        assert kernel_size == 3 and padding == 1
        self.groups = groups
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity()

        self.bn = nn.BatchNorm2d(num_features=in_channels) if bn and out_channels == in_channels and stride == 1 else None
        self.conv1 = ConV(in_channels, out_channels, kernel_size, stride, padding=padding, groups=groups, act=False)
        self.conv2 = ConV(in_channels, out_channels, 1, stride, padding=(padding - kernel_size // 2), groups=groups, act=False)

    def forward_fuse(self, x):
        return self.act(self.conv(x))

    def forward(self, x):
        id_out = 0 if self.bn is None else self.bn(x)
        return self.act(self.conv1(x) + self.conv2(x) + id_out)

    def get_equivalent_kernel_bias(self):
        kernel3x3, bias3x3 = self._fuse_bn_tensor(self.conv1)
        kernel1x1, bias1x1 = self._fuse_bn_tensor(self.conv2)
        kernelid, biasid = self._fuse_bn_tensor(self.bn)
        return kernel3x3 + self._pad_1x1_to_3x3_tensor(kernel1x1) + kernelid, bias3x3 + bias1x1 + biasid

    @staticmethod
    def _pad_1x1_to_3x3_tensor(kernel1x1):
        if kernel1x1 is None:
            return 0
        else:
            return torch.nn.functional.pad(kernel1x1, [1, 1, 1, 1])

    def _fuse_bn_tensor(self, branch):
        if branch is None:
            return 0, 0
        if isinstance(branch, ConV):
            kernel = branch.conv.weight
            running_mean = branch.bn.running_mean
            running_var = branch.bn.running_var
            gamma = branch.bn.weight
            beta = branch.bn.bias
            eps = branch.bn.eps
        elif isinstance(branch, nn.BatchNorm2d):
            if not hasattr(self, "id_tensor"):
                input_dim = self.in_channels // self.groups
                kernel_value = np.zeros((self.in_channels, input_dim, 3, 3), dtype=np.float32)
                for i in range(self.in_channels):
                    kernel_value[i, i % input_dim, 1, 1] = 1
                self.id_tensor = torch.from_numpy(kernel_value).to(branch.weight.device)
            kernel = self.id_tensor
            running_mean = branch.running_mean
            running_var = branch.running_var
            gamma = branch.weight
            beta = branch.bias
            eps = branch.eps
        std = (running_var + eps).sqrt()
        t = (gamma / std).reshape(-1, 1, 1, 1)
        return kernel * t, beta - running_mean * gamma / std

    def fuse_convs(self):
        """Fuse convolutions for inference by creating a single equivalent convolution."""
        if hasattr(self, "conv"):
            return
        kernel, bias = self.get_equivalent_kernel_bias()
        self.conv = nn.Conv2d(
            in_channels=self.conv1.conv.in_channels,
            out_channels=self.conv1.conv.out_channels,
            kernel_size=self.conv1.conv.kernel_size,
            stride=self.conv1.conv.stride,
            padding=self.conv1.conv.padding,
            dilation=self.conv1.conv.dilation,
            groups=self.conv1.conv.groups,
            bias=True,
        ).requires_grad_(False)
        self.conv.weight.data = kernel
        self.conv.bias.data = bias
        for para in self.parameters():
            para.detach_()
        self.__delattr__("conv1")
        self.__delattr__("conv2")
        if hasattr(self, "nm"):
            self.__delattr__("nm")
        if hasattr(self, "bn"):
            self.__delattr__("bn")
        if hasattr(self, "id_tensor"):
            self.__delattr__("id_tensor")

In [18]:
import copy
import math
from typing import List, Optional, Tuple, Union

import torch
import torch.nn as nn
import torch.nn.functional as F



class Detect(nn.Module):

    dynamic = False  # force grid reconstruction
    export = False  # export mode
    format = None  # export format
    end2end = False  # end2end
    max_det = 300  # max_det
    shape = None
    anchors = torch.empty(0)  # init
    strides = torch.empty(0)  # init
    legacy = False  # backward compatibility for v3/v5/v8/v9 models
    xyxy = False  # xyxy or xywh output

    def __init__(self, nc=80, ch: Tuple = (), kernel_size=3, stride=1, padding=1):
        super().__init__()
        self.nc = nc  # number of classes
        self.nl = len(ch)  # number of detection layers
        self.reg_max = 16  # DFL channels (ch[0] // 16 to scale 4/8/12/16/20 for n/s/m/l/x)
        self.no = nc + self.reg_max * 4  # number of outputs per anchor
        self.stride = torch.zeros(self.nl)  # strides computed during build
        c2, c3 = max((16, ch[0] // 4, self.reg_max * 4)), max(ch[0], min(self.nc, 100))  # channels
        self.cv2 = nn.ModuleList(
            nn.Sequential(RepConv(x,x), ConV(x, c2, kernel_size=kernel_size), ConV(c2, c2, kernel_size=kernel_size), nn.Conv2d(c2, 4 * self.reg_max, kernel_size=1)) for x in ch
        )
        self.cv3 = (
            nn.ModuleList(nn.Sequential(RepConv(x,x), ConV(x, c3, kernel_size=kernel_size), ConV(c3, c3, kernel_size=kernel_size), nn.Conv2d(c3, self.nc, kernel_size=1)) for x in ch)
            if self.legacy
            else nn.ModuleList(
                nn.Sequential(
                    RepConv(x,x),
                    nn.Sequential(DWConV(x, x, kernel_size=kernel_size), ConV(x, c3, kernel_size=1)),
                    nn.Sequential(DWConV(c3, c3, kernel_size=kernel_size), ConV(c3, c3, kernel_size=1)),
                    nn.Conv2d(c3, self.nc, kernel_size=1),
                )
                for x in ch
            )
        )
        self.dfl = DFL(self.reg_max) if self.reg_max > 1 else nn.Identity()

        if self.end2end:
            self.one2one_cv2 = copy.deepcopy(self.cv2)
            self.one2one_cv3 = copy.deepcopy(self.cv3)

    def forward(self, x):
        """Concatenate and return predicted bounding boxes and class probabilities."""
        if self.end2end:
            return self.forward_end2end(x)

        for i in range(self.nl):
            x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
        if self.training:  # Training path
            return x
        y = self._inference(x)
        return y if self.export else (y, x)

    def forward_end2end(self, x):
        x_detach = [xi.detach() for xi in x]
        one2one = [
            torch.cat((self.one2one_cv2[i](x_detach[i]), self.one2one_cv3[i](x_detach[i])), 1) for i in range(self.nl)
        ]
        for i in range(self.nl):
            x[i] = torch.cat((self.cv2[i](x[i]), self.cv3[i](x[i])), 1)
        if self.training:  # Training path
            return {"one2many": x, "one2one": one2one}

        y = self._inference(one2one)
        y = self.postprocess(y.permute(0, 2, 1), self.max_det, self.nc)
        return y if self.export else (y, {"one2many": x, "one2one": one2one})

    def _inference(self, x):
        # Inference path
        shape = x[0].shape  # BCHW
        x_cat = torch.cat([xi.view(shape[0], self.no, -1) for xi in x], 2)
        if self.format != "imx" and (self.dynamic or self.shape != shape):
            self.anchors, self.strides = (x.transpose(0, 1) for x in make_anchors(x, self.stride, 0.5))
            self.shape = shape

        if self.export and self.format in {"saved_model", "pb", "tflite", "edgetpu", "tfjs"}:  # avoid TF FlexSplitV ops
            box = x_cat[:, : self.reg_max * 4]
            cls = x_cat[:, self.reg_max * 4 :]
        else:
            box, cls = x_cat.split((self.reg_max * 4, self.nc), 1)

        if self.export and self.format in {"tflite", "edgetpu"}:
            # Precompute normalization factor to increase numerical stability
            # See https://github.com/ultralytics/ultralytics/issues/7371
            grid_h = shape[2]
            grid_w = shape[3]
            grid_size = torch.tensor([grid_w, grid_h, grid_w, grid_h], device=box.device).reshape(1, 4, 1)
            norm = self.strides / (self.stride[0] * grid_size)
            dbox = self.decode_bboxes(self.dfl(box) * norm, self.anchors.unsqueeze(0) * norm[:, :2])
        elif self.export and self.format == "imx":
            dbox = self.decode_bboxes(
                self.dfl(box) * self.strides, self.anchors.unsqueeze(0) * self.strides, xywh=False
            )
            return dbox.transpose(1, 2), cls.sigmoid().permute(0, 2, 1)
        else:
            dbox = self.decode_bboxes(self.dfl(box), self.anchors.unsqueeze(0)) * self.strides

        return torch.cat((dbox, cls.sigmoid()), 1)

    def bias_init(self):
        m = self  # self.model[-1]  # Detect() module
        for a, b, s in zip(m.cv2, m.cv3, m.stride):  # from
            a[-1].bias.data[:] = 1.0  # box
            b[-1].bias.data[: m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)
        if self.end2end:
            for a, b, s in zip(m.one2one_cv2, m.one2one_cv3, m.stride):  # from
                a[-1].bias.data[:] = 1.0  # box
                b[-1].bias.data[: m.nc] = math.log(5 / m.nc / (640 / s) ** 2)  # cls (.01 objects, 80 classes, 640 img)

    def decode_bboxes(self, bboxes, anchors, xywh=True):
        """Decode bounding boxes from predictions."""
        return dist2bbox(bboxes, anchors, xywh=xywh and not (self.end2end or self.xyxy), dim=1)

    @staticmethod
    def postprocess(preds, max_det, nc=80):
        batch_size, anchors, _ = preds.shape  # i.e. shape(16,8400,84)
        boxes, scores = preds.split([4, nc], dim=-1)
        index = scores.amax(dim=-1).topk(min(max_det, anchors))[1].unsqueeze(-1)
        boxes = boxes.gather(dim=1, index=index.repeat(1, 1, 4))
        scores = scores.gather(dim=1, index=index.repeat(1, 1, nc))
        scores, index = scores.flatten(1).topk(min(max_det, anchors))
        i = torch.arange(batch_size)[..., None]  # batch indices
        return torch.cat([boxes[i, index // nc], scores[..., None], (index % nc)[..., None].float()], dim=-1)

In [19]:
class Concat(nn.Module):
    """
    Attributes:
        d (int): Dimension along which to concatenate tensors.
    """
    def __init__(self, dimension=1):
        super().__init__()
        self.d = dimension

    def forward(self, x):
        return torch.cat(x, self.d)

In [105]:
class DetectionModel(nn.Module):
    def __init__(self, args):
        super(DetectionModel, self).__init__()
        self.args = args
        self.nc = args.get('nc', 80)          # number of classes
        ch = args.get('ch', 3)                 # input channels
        use_repconv = args.get('use_repconv', True)  # example additional param

        self.layer0 = ConV(ch,16,kernel_size=3,stride=2,padding=1,act=True)
        self.layer1 = ConV(16,32,kernel_size=3,stride=2,padding=1,act=True)
        self.layer2 = C3k2(32,64,c3k=False)
        self.layer3 = ConV(64,64,kernel_size=3,stride=2,padding=1,act=True)
        self.layer4 = C3k2(64,128,c3k=False)
        self.layer5 = ConV(128,128,kernel_size=3,stride=2,padding=1,act=True)
        self.layer6 = C3k2(128,128,c3k=True)
        self.layer7 = ConV(128,256,kernel_size=3,stride=2,padding=1,act=True)
        self.layer8 = C3k2(256,256,c3k=True)
        self.layer9 = SPPF(256,256,kernel_size=5)
        self.layer10 = C2PSA(256,256,num_psa_blocks=1,expansion=0.5,kernel_size=1,stride=1,act=False)
        self.layer11 = nn.Upsample(scale_factor=2, mode='nearest')  # Upsample layer
        self.layer12 = Concat() #6,11 concat 
        self.layer13 = C3k2(384,128,c3k=False)
        self.layer14 = nn.Upsample(scale_factor=2, mode='nearest')  # Upsample layer
        self.layer15 = Concat() #4,14 concat
        self.layer16 = C3k2(256,64,c3k=False)
        self.repconv_small = RepConv(64, 64)
        self.layer17 = ConV(64,64,kernel_size=3,stride=2,padding=1,act=True)
        self.layer18 = Concat() #13,17 concat
        self.layer19 = C3k2(192,128,c3k=False)
        self.repconv_med = RepConv(128, 128)
        self.layer20 = ConV(128,128,kernel_size=3,stride=2,padding=1,act=True)
        self.layer21 = Concat() #10,10 concat
        self.layer22 = C3k2(384,256,c3k=True)
        self.repconv_large = RepConv(256, 256)
        self.layer23 = Detect(nc=self.nc, ch=(64, 128, 256), kernel_size=3, stride=1, padding=1)

        # Define the layers of the model
    def forward(self,x):
            x = self.layer0(x)
            x = self.layer1(x)
            x = self.layer2(x)
            x = self.layer3(x)
            x4 = self.layer4(x)
            x = self.layer5(x4)
            x6 = self.layer6(x)
            x = self.layer7(x6)
            x = self.layer8(x)
            x = self.layer9(x)
            x10 = self.layer10(x)
            x11 = self.layer11(x10)
            x = self.layer12([x11, x6])
            x13 = self.layer13(x)
            x14 = self.layer14(x13)
            x = self.layer15([x14, x4])
            x_small = self.layer16(x)
            #x_rep_s = self.repconv_small(x_small)
            x17 = self.layer17(x_small)
            x = self.layer18([x17, x13])
            x_med = self.layer19(x)
            #x_rep_m = self.repconv_med(x_med)
            x20 = self.layer20(x_med)
            x = self.layer21([x20, x10])
            x_large = self.layer22(x)
            #x_rep_l = self.repconv_large(x_large)
            x = self.layer23([x_small, x_med, x_large])
            return x

In [94]:
custom_model = DetectionModel(nc=80,ch=3)
print(custom_model)

DetectionModel(
  (layer0): ConV(
    (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
    (act): SiLU()
  )
  (layer1): ConV(
    (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
    (act): SiLU()
  )
  (layer2): C3k2(
    (conv1): ConV(
      (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU()
    )
    (conv2): ConV(
      (conv): Conv2d(48, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU()
    )
    (m): ModuleList(
      (0): Bottleneck(
        (conv1): ConV(
          (conv): Conv2d(16, 8

In [52]:
from torchvision import transforms
from PIL import Image

img = Image.open("th.jpeg")
transform = transforms.Compose([
    transforms.Resize((640, 640)),
    transforms.ToTensor(),
])
img = transform(img)
img = img.unsqueeze(0) # Add batch dimension
img.shape
print(img.shape)
print("Eval Mode:")
custom_model.eval()
output = custom_model(img) # output is now a tuple: (y_tensor, x_list_of_tensors)
print(len(output))
print(output[0].shape) # This accesses y_tensor.shape. This will SUCCEED.
                        # y_tensor's shape will be (1, N_total_anchors, 4 + nc)
                        # e.g., (1, 80*80 + 40*40 + 20*20, 84) = (1, 6400 + 1600 + 400, 84) = (1, 8400, 84)

for i, tensor in enumerate(output[1]): # Iterates through the tuple (y_tensor, x_list_of_tensors)
    print(f"Shape of output tensor {i}: {len(tensor)}, {tensor.shape}")

custom_model.train()
output = custom_model(img) # output is now a tuple: (y_tensor, x_list_of_tensors)
print("Train Mode:")
print(len(output))
print(output[0].shape) # This accesses y_tensor.shape. This will SUCCEED.
                        # y_tensor's shape will be (1, N_total_anchors, 4 + nc)
                        # e.g., (1, 80*80 + 40*40 + 20*20, 84) = (1, 6400 + 1600 + 400, 84) = (1, 8400, 84)

for i, tensor in enumerate(output): # Iterates through the tuple (y_tensor, x_list_of_tensors)
    print(f"Shape of output tensor {i}: {tensor.shape}")

torch.Size([1, 3, 640, 640])
Eval Mode:
2
torch.Size([1, 84, 8400])
Shape of output tensor 0: 1, torch.Size([1, 144, 80, 80])
Shape of output tensor 1: 1, torch.Size([1, 144, 40, 40])
Shape of output tensor 2: 1, torch.Size([1, 144, 20, 20])
Train Mode:
3
torch.Size([1, 144, 80, 80])
Shape of output tensor 0: torch.Size([1, 144, 80, 80])
Shape of output tensor 1: torch.Size([1, 144, 40, 40])
Shape of output tensor 2: torch.Size([1, 144, 20, 20])


In [135]:
print(type(hyp))
print(dir(hyp))


<class 'ultralytics.utils.IterableSimpleNamespace'>
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattr__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', 'agnostic_nms', 'amp', 'augment', 'auto_augment', 'batch', 'bgr', 'box', 'cache', 'cfg', 'classes', 'close_mosaic', 'cls', 'conf', 'copy_paste', 'copy_paste_mode', 'cos_lr', 'crop_fraction', 'data', 'degrees', 'deterministic', 'device', 'dfl', 'dnn', 'dropout', 'dynamic', 'embed', 'epochs', 'erasing', 'exist_ok', 'fliplr', 'flipud', 'format', 'fraction', 'freeze', 'get', 'half', 'hsv_h', 'hsv_s', 'hsv_v', 'imgsz', 'int8', 'iou', 'keras', 'kobj', 'label_smoothing', 'line_width', 'lr0', 'lrf', 'mask_ratio', 'max_det', 'mixup', 'mode', 'model', 'momentum', 'mosaic', 'multi

In [129]:
from ultralytics.data.dataset import YOLODataset
from ultralytics.utils import yaml_load
from ultralytics.utils import DEFAULT_CFG

hyp = DEFAULT_CFG

# Load config
data_cfg = yaml_load('coco8.yaml')
# Build dataset (for training)
trainset = YOLODataset(
    data=data_cfg,  # pass the full data dict, not just the path
    img_path=data_cfg['train'],
    imgsz=640,
    batch_size=16,
    augment=True,
    hyp=hyp,
    rect=False,
    cache=False,
    single_cls=False,
    stride=32,
    pad=0.0,
    prefix=''
)

valset = YOLODataset(
    data=data_cfg,  # pass the full data dict, not just the path
    img_path=data_cfg['val'],
    imgsz=640,
    batch_size=16,
    augment=False,
    hyp=hyp,
    rect=True,
    cache=False,
    single_cls=False,
    stride=32,
    pad=0.0,
    prefix=''
)
from torch.utils.data import DataLoader
def yolo_collate_fn(batch):
    images = []
    bboxes = []
    classes = []
    batch_indices = []

    for item in batch:
        images.append(item['img'])               # <- actual image tensor
        bboxes.append(item['bboxes'])
        classes.append(item['cls'])
        batch_indices.append(item['batch_idx'])

    images = torch.stack(images)
    bboxes = torch.cat(bboxes, dim=0)
    classes = torch.cat(classes, dim=0)
    batch_indices = torch.cat(batch_indices, dim=0)

    targets = {
        'bboxes': bboxes,
        'cls': classes,
        'batch_idx': batch_indices
    }

    return images, targets


train_loader = DataLoader(
    trainset,
    batch_size=16,
    shuffle=True,
    num_workers=4,
    collate_fn=yolo_collate_fn,
)

val_loader = DataLoader(
    valset,
    batch_size=16,
    shuffle=False,
    num_workers=4,
    collate_fn=yolo_collate_fn,
)



Scanning /home/ubuntu/Downloads/THP_YOLO/datasets/coco8/labels/train.cache... 4 images, 0 backgrounds, 0 corrupt: 100%|██████████| 4/4 [00:00<?, ?it/s]
Scanning /home/ubuntu/Downloads/THP_YOLO/datasets/coco8/labels/val.cache... 4 images, 0 backgrounds, 0 corrupt: 100%|██████████| 4/4 [00:00<?, ?it/s]


In [115]:
model.model = nn.ModuleList([model.layer23])

In [114]:
args = {
    'nc': 80,  # number of classes
    'ch': 3,   # input channels
    'use_repconv': True  # example additional param
}

model = DetectionModel(args)
print(model)


DetectionModel(
  (layer0): ConV(
    (conv): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(16, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
    (act): SiLU()
  )
  (layer1): ConV(
    (conv): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
    (act): SiLU()
  )
  (layer2): C3k2(
    (conv1): ConV(
      (conv): Conv2d(32, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(32, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU()
    )
    (conv2): ConV(
      (conv): Conv2d(48, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn): BatchNorm2d(64, eps=0.001, momentum=0.03, affine=True, track_running_stats=True)
      (act): SiLU()
    )
    (m): ModuleList(
      (0): Bottleneck(
        (conv1): ConV(
          (conv): Conv2d(16, 8

In [116]:
from ultralytics.utils.loss import v8DetectionLoss

criterion = v8DetectionLoss(model)


In [173]:
import torch
import torch.optim as optim
from ultralytics.utils.loss import v8DetectionLoss
from PIL import Image

criterion = v8DetectionLoss(model)
criterion.hyp = hyp
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=1e-3)
transform = transforms.Compose([
    transforms.Resize((640, 640)),
    transforms.ToTensor(),
])
best_loss = float('inf')  # initialize best loss as infinity
epochs = 3

for epoch in range(epochs):
    model.train()
    total_loss = 0
    for i, (images, targets) in enumerate(train_loader):
        # Apply transformations to images
        images = images.to(device).float()
        # targets should also be a tensor or list of tensors on device:
        # typically targets is (batch_index, class, x, y, w, h) or similar YOLO format
        # Adjust if needed
        targets = {k: v.to(device) for k, v in targets.items()}

        optimizer.zero_grad()
        outputs = model(images)  # model output

        # Calculate loss with yolo criterion
        loss, loss_items = criterion(outputs, targets)

        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        if i % 1 == 0:
            print(f"Epoch [{epoch+1}/{epochs}], Step [{i}/{len(train_loader)}], Loss: {loss.item():.4f}")
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch [{epoch+1}/{epochs}] Average Loss: {avg_loss:.4f}")
    if avg_loss < best_loss:
        best_loss = avg_loss
        torch.save(model.state_dict(), "best.pt")
        print(f"Saved Best Model with Loss: {best_loss:.4f}")




Epoch [1/3], Step [0/1], Loss: 3533502.0000
Epoch [1/3] Average Loss: 3533502.0000
Saved Best Model with Loss: 3533502.0000
Epoch [2/3], Step [0/1], Loss: 3503244.7500
Epoch [2/3] Average Loss: 3503244.7500
Saved Best Model with Loss: 3503244.7500
Epoch [3/3], Step [0/1], Loss: 3442219.5000
Epoch [3/3] Average Loss: 3442219.5000
Saved Best Model with Loss: 3442219.5000


In [174]:
total_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {total_params}")


Total trainable parameters: 6296976


In [144]:
!pip install pycocotools

Defaulting to user installation because normal site-packages is not writeable
Collecting pycocotools
  Using cached pycocotools-2.0.8-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (427 kB)
Installing collected packages: pycocotools
Successfully installed pycocotools-2.0.8


In [147]:
!pip install torchmetrics

Defaulting to user installation because normal site-packages is not writeable
Collecting torchmetrics
  Downloading torchmetrics-1.7.1-py3-none-any.whl (961 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m961.5/961.5 KB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
Collecting lightning-utilities>=0.8.0
  Downloading lightning_utilities-0.14.3-py3-none-any.whl (28 kB)
Installing collected packages: lightning-utilities, torchmetrics
Successfully installed lightning-utilities-0.14.3 torchmetrics-1.7.1


In [170]:
import torchvision.ops as ops

def postprocess_yolo_outputs(pred, conf_thresh=0.25, iou_thresh=0.45):
    """
    pred: [batch, num_boxes, 5 + num_classes]
    returns: list of dicts with 'boxes', 'scores', 'labels'
    """
    batch_results = []

    for i in range(pred.shape[0]):
        p = pred[i]

        # Split predictions
        boxes = p[..., :4]   # x, y, w, h
        conf = p[..., 4:5]   # objectness
        cls_scores = p[..., 5:]  # class probs

        # Compute total score = objectness * class prob
        cls_conf, cls_ids = cls_scores.max(1)
        scores = conf.squeeze() * cls_conf

        # Filter by confidence threshold
        mask = scores > conf_thresh
        if mask.sum() == 0:
            batch_results.append({
                'boxes': torch.zeros((0, 4)),
                'scores': torch.zeros((0,)),
                'labels': torch.zeros((0,), dtype=torch.int64)
            })
            continue

        boxes = boxes[mask]
        scores = scores[mask]
        cls_ids = cls_ids[mask]

        # Convert (x, y, w, h) → (x1, y1, x2, y2)
        xyxy = torch.zeros_like(boxes)
        xyxy[:, 0] = boxes[:, 0] - boxes[:, 2] / 2  # x1
        xyxy[:, 1] = boxes[:, 1] - boxes[:, 3] / 2  # y1
        xyxy[:, 2] = boxes[:, 0] + boxes[:, 2] / 2  # x2
        xyxy[:, 3] = boxes[:, 1] + boxes[:, 3] / 2  # y2

        # Apply NMS
        keep = ops.nms(xyxy, scores, iou_thresh)

        batch_results.append({
            'boxes': xyxy[keep],
            'scores': scores[keep],
            'labels': cls_ids[keep]
        })

    return batch_results


In [172]:
import torch
from tqdm import tqdm
from torchmetrics.detection.mean_ap import MeanAveragePrecision

# Initialize mAP metric
map_metric = MeanAveragePrecision()

# Put your model in eval mode
model.eval()

with torch.no_grad():
    for images, targets in val_loader:
        images = images.to(device).float()
        raw_outputs = model(images)  # e.g., (batch_size, num_boxes, 85)
        # Check if it's a tuple, extract the right part
        if isinstance(raw_outputs, tuple):
            raw_outputs = raw_outputs[0]


        preds_batch = postprocess_yolo_outputs(raw_outputs, conf_thresh=0.25, iou_thresh=0.45)

        preds = []
        gts = []

        for i in range(len(images)):
            preds.append({
                'boxes': preds_batch[i]['boxes'],
                'scores': preds_batch[i]['scores'],
                'labels': preds_batch[i]['labels'].to(torch.int64)
            })

            gt_boxes = targets['bboxes'][targets['batch_idx'] == i].cpu()
            gt_scores = torch.ones(gt_boxes.shape[0], device=gt_boxes.device)  # Assuming all GT boxes have score 1
            gt_labels = targets['cls'][targets['batch_idx'] == i].cpu().squeeze(-1).to(torch.int64)


            gts.append({
                'boxes': gt_boxes,
                'scores': gt_scores,
                'labels': gt_labels
            })
        for i, (pred_item, gt_item) in enumerate(zip(preds, gts)):
            print(f"[Batch image {i}] pred labels shape: {pred_item['labels'].shape}")
            print(f"[Batch image {i}] gt labels shape: {gt_item['labels'].shape}")

        map_metric.update(preds, gts)
        #print(preds['labels'].shape, gts['labels'].shape)

# Final metrics

results = map_metric.compute()
print("mAP:", results['map'].item())
print("mAP@0.5:", results['map_50'].item())
print("mAP@0.75:", results['map_75'].item())




[Batch image 0] pred labels shape: torch.Size([1])
[Batch image 0] gt labels shape: torch.Size([17])
[Batch image 1] pred labels shape: torch.Size([1])
[Batch image 1] gt labels shape: torch.Size([0])
[Batch image 2] pred labels shape: torch.Size([1])
[Batch image 2] gt labels shape: torch.Size([0])
[Batch image 3] pred labels shape: torch.Size([1])
[Batch image 3] gt labels shape: torch.Size([0])
mAP: 0.0
mAP@0.5: 0.0
mAP@0.75: 0.0
