# An example of train pipeline, model and lossfn design

Since we need to detect people in realtime, we decided to search for groundbreaking papers in that area. One of them [YOLOv1](https://arxiv.org/pdf/1506.02640), that for 2015 year was one of the fastest CNN models for real time detection. Thus, we decided to learn its design and recreate approaches to achieve same or even better performance.

In [16]:
import torch, sys, platform
import torch.nn as nn


## **Important note**

This .ipynb is just a structural reference, that may not working or training too long, miss some preparational parts. This will be updated in nearest future

In [17]:
def layer(input,output,kernel,stride=1,padding=None):
        return nn.Sequential(
            nn.Conv2d(input,output,kernel,stride,padding if padding is not None else kernel//2,bias=False),
            nn.BatchNorm2d(output),
            nn.LeakyReLU(0.1,inplace=True)
        )

class YOLOv1(nn.Module):
    def __init__(self, S=7, B=2, C=1):
        super(YOLOv1, self).__init__()
        self.S = S  # Grid size
        self.B = B  # Number of bounding boxes per grid cell
        self.C = C  # Number of classes, we will use 1 for person detection

        L =[]
        L+=[layer(3,64,7,2,3),  nn.MaxPool2d(2,2)]
        L+=[layer(64,192,3),    nn.MaxPool2d(2,2)]
        L+=[layer(192,128,1),   layer(128,256,3),       layer(256,256,1), layer(256,512,3),  nn.MaxPool2d(2,2)]
        L+=[layer(512,256,1),   layer(256,512,3)]*4 + [ layer(512,512,1), layer(512,1024,3), nn.MaxPool2d(2,2)]
        L+=[layer(1024,512,1),  layer(512,1024,3)]*2 
        L+=[layer(1024,1024,3), layer(1024,1024,3,2,1)]
        L+=[layer(1024,1024,3), layer(1024,1024,3)]
        self.backbone = nn.Sequential(*L)
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * S * S, 4096),
            nn.LeakyReLU(0.1, inplace=True),
            nn.Dropout(0.5),
            nn.Linear(4096, S * S * (C + B * 5))
        )
    def forward(self, x):
        x = self.backbone(x)
        x = self.fc(x)
        # [batch_size, S*S*(C+5*B)] -> [batch_size, S, S, C+5*B]
        # [C+5*B] = [x,y,w,h,conf1,conf2,...,confB,class1,class2,...,classC]
        x = x.view(-1, self.S, self.S, self.C + self.B * 5)
        return x



In [18]:
def _iou_xywh(a, b, eps=1e-9):
    # a,b: [...,4] in cx,cy,w,h (abs in [0,1])
    a_x1 = a[...,0] - a[...,2]/2; a_y1 = a[...,1] - a[...,3]/2
    a_x2 = a[...,0] + a[...,2]/2; a_y2 = a[...,1] + a[...,3]/2
    b_x1 = b[...,0] - b[...,2]/2; b_y1 = b[...,1] - b[...,3]/2
    b_x2 = b[...,0] + b[...,2]/2; b_y2 = b[...,1] + b[...,3]/2
    inter = (torch.minimum(a_x2,b_x2)-torch.maximum(a_x1,b_x1)).clamp_(min=0) * \
            (torch.minimum(a_y2,b_y2)-torch.maximum(a_y1,b_y1)).clamp_(min=0)
    area_a = (a_x2-a_x1).clamp(min=0)*(a_y2-a_y1).clamp(min=0)
    area_b = (b_x2-b_x1).clamp(min=0)*(b_y2-b_y1).clamp(min=0)
    return inter / (area_a + area_b - inter + eps)

class YoloV1Loss(nn.Module):
    def __init__(self, S=7, B=2, lambda_coord=5.0, lambda_noobj=0.5):
        super().__init__()
        self.S, self.B = S, B
        self.lc = lambda_coord
        self.lno = lambda_noobj
        self.mse = nn.MSELoss(reduction='sum')

    def forward(self, pred, target):
        """
        pred: [N,S,S,B*5+1] raw logits
        target: {'tconf':[N,S,S,1], 'txywh':[N,S,S,4], 'tcls':[N,S,S,1]}
        """
        N, S, B = pred.size(0), self.S, self.B
        device = pred.device

        # split prediction
        pb = pred[...,:B*5].view(N,S,S,B,5)         # x,y,w,h,conf (raw)
        pcls = pred[...,B*5:]                       # one class logit

        # activations per YOLOv1 practice
        px = pb[...,0].sigmoid(); py = pb[...,1].sigmoid()
        pw = pb[...,2].relu().pow(2)                # ensure positive; model learns sqrt(w)
        ph = pb[...,3].relu().pow(2)
        pc = pb[...,4].sigmoid()
        pcl = pcls.sigmoid()                        # single-class prob

        # absolute boxes in [0,1]
        gy, gx = torch.meshgrid(torch.arange(S, device=device),
                                torch.arange(S, device=device), indexing='ij')
        gx = gx.view(1,S,S,1).float(); gy = gy.view(1,S,S,1).float()
        bx = (gx + px)/S; by = (gy + py)/S
        bw = pw/S; bh = ph/S
        boxes_abs = torch.stack([bx,by,bw,bh], dim=-1)  # [N,S,S,B,4]

        # targets
        tconf = target['tconf'].to(device)          # [N,S,S,1]
        txywh = target['txywh'].to(device)          # [N,S,S,4]
        tcls  = target['tcls' ].to(device)          # [N,S,S,1]

        # assign responsible box by IoU
        ious = _iou_xywh(boxes_abs, txywh.unsqueeze(3).expand_as(boxes_abs))  # [N,S,S,B]
        iou_max, argmax = ious.max(dim=3, keepdim=True)                       # [N,S,S,1]
        obj_mask   = tconf                                                     # [N,S,S,1]
        noobj_mask = 1.0 - obj_mask

        # one-hot over B for responsible box
        resp = torch.zeros_like(ious)
        resp.scatter_(3, argmax, 1.0)              # [N,S,S,B]
        resp = resp * obj_mask                     # mask only cells with object

        # ----- coordinate loss (only responsible box) -----
        # predicted responsible components
        pxr = (px * resp).sum(dim=3, keepdim=True)
        pyr = (py * resp).sum(dim=3, keepdim=True)
        pwr = (pw * resp).sum(dim=3, keepdim=True).clamp(min=1e-9)
        phr = (ph * resp).sum(dim=3, keepdim=True).clamp(min=1e-9)

        # targets to cell-relative and sqrt(w,h) (YOLOv1)
        tx, ty, tw, th = txywh.unbind(-1)
        tx = tx * S - gx; ty = ty * S - gy
        tw = (tw * S).clamp(min=1e-9).sqrt()
        th = (th * S).clamp(min=1e-9).sqrt()

        coord_loss = self.lc * (
            ((pxr - tx.unsqueeze(-1))**2 + (pyr - ty.unsqueeze(-1))**2 +
             (pwr.sqrt() - tw.unsqueeze(-1))**2 + (phr.sqrt() - th.unsqueeze(-1))**2) * obj_mask
        ).sum()

        # ----- confidence loss -----
        # target conf for responsible box = IoU; for others and empty = 0
        conf_tgt_resp = (ious * resp).detach()          # [N,S,S,B]
        conf_obj_loss   = ((pc - conf_tgt_resp)**2 * resp).sum()
        conf_noobj_loss = self.lno * ((pc**2) * (1.0 - resp)).sum()
        conf_loss = conf_obj_loss + conf_noobj_loss

        # ----- class loss (single class) -----
        cls_loss = ((pcl - tcls)**2 * obj_mask).sum()

        total = (coord_loss + conf_loss + cls_loss) / N
        logs = {
            'loss': total.item(),
            'L_coord': coord_loss.item()/N,
            'L_conf_obj': conf_obj_loss.item()/N,
            'L_conf_noobj': conf_noobj_loss.item()/N,
            'L_cls': cls_loss.item()/N
        }
        return total, logs




In [None]:
model = YOLOv1(S=7, B=2, C=1)
# [batch,3,448,448] -> [batch,7,7,C+5*B]
x = torch.randn(1,3,448,448)
y = model(x)  # [batch_size,7,7,11]
print(y.shape)

optimizer = torch.optim.Adam(
        model.parameters(),
        lr=0.001
    )


loss_fn = YoloV1Loss(S=7,B=2,lambda_coord=5.0,lambda_noobj=0.5)
pred = model(x)  # [N,7,7,11]
target_batch = {
    'tconf': torch.randn(1,7,7,1), # confidence
    'txywh': torch.randn(1,7,7,4),  # [cx,cy,w,h]
    'tcls': torch.randn(1,7,7,1)   # class
}
loss, logs = loss_fn(pred, target_batch)
loss.backward()
optimizer.step()

torch.Size([1, 7, 7, 10])
