#Детекция с использованием SSD
Задача: создать и запрограмировать модель для детекции изображений. Модель может быть не одна.

План:
1.     Библиотеки
2.     Загрузка и предобработка данных
3.     (Опционально) Можно создать свой датасет для тренировки.

Модель:

Вход - изображение

Выход - классы (с уверенностью), ббоксы. Идут последовательно

Процесс обучения:
Функции ошибки для классификации подаем элементы у которых IoU >= 0.5 и максимальные на их класс. Остальным должен соответствовать пустота.
Функции ошибки для ббоксов должны подаваться те кто подавались на классы.

Требование к модели:
* Модель должна сохранять лучшие веса.
* Модель предсказывает не точное местоположение в пикселях, а относительное.
* При обучении идет обработка одновременно нескольких изображений.
* Функция ошибки будет встроена в модель.


Фишки которые хочу добавить:


# Библиотеки

In [None]:
import torch
from torch import nn
import torchvision
from torchsummary import summary
from torchvision import transforms as T
from torch.nn import functional as F
import torch.autograd.profiler as profiler


In [None]:
CLASS_COUNT = 21
IMG_SIZE = 224
BATCH_SIZE = 100
BBOX_COUNT = 6
EPOCH_COUNT = 10
NUM2CLASS_DICT = {0: 'aeroplane',
    1: 'bicycle',
    2: 'bird',
    3: 'boat',
    4: 'bottle',
    5: 'bus',
    6: 'car',
    7: 'cat',
    8: 'chair',
    9: 'cow',
    10: 'diningtable',
    11: 'dog',
    12: 'horse',
    13: 'motorbike',
    14: 'person',
    15: 'pottedplant',
    16: 'sheep',
    17: 'sofa',
    18: 'train',
    19: 'tvmonitor',
    20: 'background'}
CLASS2NUM_DICT = dict([(x[1], x[0]) for x in NUM2CLASS_DICT.items()])
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


# Модель

In [None]:
class ScaleKnowler():
    def __init__(self) -> None:

        smax = 0.9
        smin = 0.2
        m = 6
        k = torch.arange(m) + 1
        self.sk = smin + (smax - smin) * (k - 1) / (m - 1)

    def getSk(self, k):
        """Нумерация идет от 0 до 5"""
        if k > 5:
            return 1
        return self.sk[k].item()

class AbsBboxKnowler():
    def __init__(self):
        self.ar = torch.tensor([1,2,3,1/2,1/3])
        self.scale = ScaleKnowler()
    def getBboxCount(self):
        return self.ar.shape[0] + 1
    def getAbsBbox(self, k):
        # TODO: еще не проверял
        sk = self.scale.getSk(k)
        skP = self.scale.getSk(k+1)
        w = sk * torch.sqrt(self.ar)
        h = sk / torch.sqrt(self.ar)
        whP = torch.sqrt(torch.tensor([sk * skP]))
        w, h = torch.cat([w, whP]), torch.cat([h, whP])
        return w, h


class SSDClassificator(nn.Module):
    def __init__(self, inChannel, classCount=CLASS_COUNT):
        super().__init__()
        self.layer = nn.Conv2d(inChannel, classCount,3,1,1)
    def forward(self, x):
        return self.layer(x)

class SSDBBoxPredictor(nn.Module):
    def __init__(self, inChannel, classCount=CLASS_COUNT):
        super().__init__()
        # dx, dy, cx, cy, 6 bboxes
        # Структура вывода:
        # (d,c)(1), (d,c)(2), (d,c)(3), (d,c)(4) - class 0
        # (d,c)(1), (d,c)(2), (d,c)(3), (d,c)(4) - class 1
        self.layer = nn.Conv2d(inChannel, 6*4*classCount,3,1,1)
        self.a = nn.Tanh()
    def forward(self, x):
        x = self.layer(x)
        x = self.a(x)
        return x


In [None]:
class SSD(nn.Module):
    def __init__(self):
        super().__init__()
        # 3x224x224 -> 512x28x28
        self.backbone = torchvision.models.vgg16(weights=torchvision.models.VGG16_Weights.DEFAULT).features[:23]

        # 512x28x28 -> 1024x13x13
        self.l2 = nn.Sequential(
                nn.Conv2d(512,1024,3,2),
                nn.ReLU()
        )
        # 512x13x13 -> 1024x13x13
        self.l3 = nn.Sequential(
                nn.Conv2d(1024,1024,1),
                nn.ReLU()
            )
        # 1024x13x13 -> 512x6x6
        self.l4 = nn.Sequential(
                nn.Conv2d(1024,256,1),
                nn.ReLU(),
                nn.Conv2d(256,512,3,2),
                nn.ReLU()
        )
        # 512x6x6 -> 256x3x3
        self.l5 = nn.Sequential(
                nn.Conv2d(512,128,1),
                nn.ReLU(),
                nn.Conv2d(128,256,3,2,1),
                nn.ReLU()
        )
        # 256x3x3 -> 256x1x1
        self.l6 = nn.Sequential(
                nn.Conv2d(256,128,1,1),
                nn.ReLU(),
                nn.Conv2d(128,256,3),
                nn.ReLU()
        )
        self.detectors = nn.ModuleList([
                SSDBBoxPredictor(512),
                SSDBBoxPredictor(1024),
                SSDBBoxPredictor(1024),
                SSDBBoxPredictor(512),
                SSDBBoxPredictor(256),
                SSDBBoxPredictor(256)
        ])
        self.classifier = nn.ModuleList([
                SSDClassificator(512),
                SSDClassificator(1024),
                SSDClassificator(1024),
                SSDClassificator(512),
                SSDClassificator(256),
                SSDClassificator(256)
        ])
    def forward(self, x):
        prList = []
        x = self.backbone(x)
        prList.append(x)
        x = self.l2(x)
        prList.append(x)
        x = self.l3(x)
        prList.append(x)
        x = self.l4(x)
        prList.append(x)
        x = self.l5(x)
        prList.append(x)
        x = self.l6(x)
        prList.append(x)

        classPred, bboxPred = [self.classifier[i](prList[i]) for i in range(6)], [self.detectors[i](prList[i]) for i in range(6)]

        return classPred, bboxPred



# Функции утилиты

In [None]:
@ torch.no_grad
def selectBBoxByClass(classList, bboxList):
    """
    Функция выбират только те ббоксы, которые соответствуют предсказаному классу.
    Форма тензора в остальном остается той же
    """
    res = []

    for classPr, bboxPr in zip(classList, bboxList):
        bboxNum = classPr.argmax(1).unsqueeze(1) * BBOX_COUNT * 4
        indices = torch.arange(BBOX_COUNT * 4).view(1, BBOX_COUNT * 4, 1, 1).expand(bboxPr.shape[0], BBOX_COUNT * 4, bboxPr.shape[-2], bboxPr.shape[-1]).to(device)
        indices = bboxNum + indices
        result = torch.gather(bboxPr, 1, indices)
        res.append(result)
    return res

def classList2ClassTensor(classList):
    res = []
    for pred in classList:
        r = pred.view(pred.shape[0],21,-1).permute(0,2,1)
        res.append(r)
    res = torch.cat(res, 1)
    res = res.repeat_interleave(BBOX_COUNT,1)
    return res

def bboxList2BboxTensor(bboxList):
    absBboxKnowler = AbsBboxKnowler()
    bboxCount = absBboxKnowler.getBboxCount()
    res = []
    for i in range(len(bboxList)):
        data = bboxList[i]
        x = torch.cat([(torch.arange(data.shape[-1]) + 0.5).view(1,-1)] * data.shape[-1]) / data.shape[-1]
        x = x.to(device)
        y = x.T
        for i in range(bboxCount):
            w, h = absBboxKnowler.getAbsBbox(i)
            w, h = w.to(device), h.to(device)
            # dx, dy, cx, cy
            data[:,::4] += x
            data[:,1::4] +=y
            for j in range(4):
                data[:,int(2+4*j)] *= w[j]
                data[:,int(3+4*j)] *= h[j]
        res.append(data.view(data.shape[0], 24, -1).permute(0,2,1).reshape(data.shape[0], -1, 4))
    return torch.cat(res,1)

def convertModelResult(classPred, bboxPred):
    """
    Функция принимает списки предсказаний модели, и преабразует в два 2/3 мерных тензора.
    """
    # TODO: Проверенно, но не идеально
    return classList2ClassTensor(classPred), bboxList2BboxTensor(selectBBoxByClass(classPred, bboxPred))



# Работа с данными

In [None]:
def taregetPreProcessing(x):
    """
    В результате мы должны получать такие данные:
    - кординаты ббоксов в формате cxcywh / IMG_SIZE
    - метку класса

    """
    l = x['annotation']['object']
    # l = trainDataset[0][1]['annotation']['object']

    cord = [[int(y) for y in x['bndbox'].values()] for x in l]
    cord = torch.tensor(cord)
    cord = torchvision.ops.box_convert(cord, 'xyxy', 'cxcywh') / IMG_SIZE

    lbl = [CLASS2NUM_DICT[x['name']] for x in l]
    lbl = torch.tensor(lbl).view(1,-1).T
    return lbl, cord


def collate_fn(batch):
    imgs = []
    cord = []
    lbl = []
    objCount = []

    for element in batch:
        imgs.append(element[0])
        cord.append(element[1][1])
        lbl.append(element[1][0])
        objCount.append(element[1][0].shape[0])

    imgs = torch.stack(imgs, 0)
    cord = torch.cat(cord)
    lbl = torch.cat(lbl).view(-1)
    objCount = torch.tensor(objCount)

    return imgs, cord, lbl, objCount



In [None]:
imgTransforms = T.Compose([
        T.Resize((IMG_SIZE, IMG_SIZE)),
        T.ToTensor()
])
trainDataset = torchvision.datasets.VOCDetection(
        root='data',
        image_set='train',
        # download=True,
        transform=imgTransforms,
        target_transform=taregetPreProcessing
)
testDataset = torchvision.datasets.VOCDetection(
        root='data',
        image_set='trainval',
        # download=True,
        transform=imgTransforms,
        target_transform=taregetPreProcessing
)

trainLoader = torch.utils.data.DataLoader(trainDataset, batch_size=BATCH_SIZE, collate_fn=collate_fn, shuffle=True)
testLoader = torch.utils.data.DataLoader(testDataset, batch_size=BATCH_SIZE, collate_fn=collate_fn)


# Обучение

In [None]:
# Инициализируем модель
model = SSD().to(device)
optimizer = torch.optim.Adam(model.parameters(),lr=0.0001)


# Костыли
# torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
# torch.autograd.set_detect_anomaly(True)

In [None]:
%env CUDA_LAUNCH_BLOCKING=1
%env TORCH_USE_CUDA_DSA=1
# %%export TORCH_USE_CUDA_DSA=1

env: CUDA_LAUNCH_BLOCKING=1
env: TORCH_USE_CUDA_DSA=1


In [None]:
# model = torch.load('model.pth', weight_only)

In [None]:
def calculateLoss(classTensor, bboxTensor, bboxs, lbls, objCount):
        loss = torch.tensor(0, device=device)
        for iter in range(len(classTensor)):
            classPr, bboxPr = classTensor[iter], bboxTensor[iter]
            classPr, bboxPr = classPr.to(device), bboxPr.to(device)
            # Получаем истинные значения ббоксов и меток классов для конкретного изображения
            objCountAtImg = objCount[iter].item()
            objCountBefore = objCount[:iter].sum().item()
            bboxAtImg = bboxs[objCountBefore:objCountBefore+objCountAtImg]
            lblAtImg = lbls[objCountBefore:objCountBefore+objCountAtImg]

            # Здесь пройдут основные вычисления
            with torch.no_grad():
                iou = torchvision.ops.box_iou(torchvision.ops.box_convert(bboxPr*IMG_SIZE, 'cxcywh', 'xyxy'), torchvision.ops.box_convert(bboxAtImg*IMG_SIZE, 'cxcywh', 'xyxy'))
                iouMax = iou.argmax(0)
                iouBig = torch.where((iou> 0.5).sum(1) >= 1)[0]
                iouBig = iouBig[~torch.isin(iouBig, iouMax)]
                iouAll = torch.cat([iouMax, iouBig])
                mask = torch.ones(len(iou)).bool()
                mask[iouAll] = False
                negClassTarget = torch.tensor([CLASS_COUNT - 1] * (classPr[mask].shape[0])).to(device)


            loss = loss + F.cross_entropy(classPr[iouMax], lblAtImg) # Добавляем ошибку максимальных IoU по классификации
            loss = loss + F.mse_loss(bboxPr[iouMax], bboxAtImg) # Добавляем ошибку максимальных IoU по расстоянию
            loss = loss + F.cross_entropy(classPr[mask], negClassTarget)# Добавляем ошибку по негативному классу
            if len(iouBig) != 0:
                iouBigAM = iou[iouBig].argmax(1)
                loss = loss + F.cross_entropy(classPr[iouBig], lblAtImg[iouBigAM]) # Добавляем ошибку IoU >=0.5 по классу
                loss = loss + F.mse_loss(bboxPr[iouBig], bboxAtImg[iouBigAM]) # Добавляем ошибку IoU >=0.5 по расстоянию
        return loss

@torch.no_grad
def calculateValLoss(model, testLoader):
    model.eval()
    loss = torch.tensor(0).to(device)
    for batchNum, (imgs, bboxs, lbls, objCount) in enumerate(testLoader):
        imgs, bboxs, lbls, objCount = imgs.to(device), bboxs.to(device), lbls.to(device), objCount
        classTensor, bboxTensor = model(imgs)
        classTensor, bboxTensor = convertModelResult(classTensor, bboxTensor)
        loss = loss + calculateLoss(classTensor, bboxTensor, bboxs, lbls, objCount)
    return loss

def trainIteration(model, trainLoader, optimizer):
    model.train()
    for batchNum, (imgs, bboxs, lbls, objCount) in enumerate(trainLoader):
        model.train()
        imgs, bboxs, lbls, objCount = imgs.to(device), bboxs.to(device), lbls.to(device), objCount
        classTensor, bboxTensor = model(imgs)
        classTensor, bboxTensor = convertModelResult(classTensor, bboxTensor)
        loss = calculateLoss(classTensor, bboxTensor, bboxs, lbls, objCount)
        print(f'Bathc: {batchNum+1}/{len(trainLoader)}, loss: {loss}')

        loss.backward()
        optimizer.step()
        torch.save(model.state_dict(),'model.pth')


for ep in range(EPOCH_COUNT):
  print(f"Epoch: {ep+1}\n______________________")
  trainIteration(model, trainLoader, optimizer)
  valLoss = calculateValLoss(model, testLoader)
  print(f'val loss: {valLoss/len(testLoader)}')


Epoch: 1
______________________
Bathc: 1/58, loss: 707.025146484375
Bathc: 2/58, loss: 649.7816772460938
Bathc: 3/58, loss: 617.2135620117188
Bathc: 4/58, loss: 587.9247436523438
Bathc: 5/58, loss: 539.531494140625
Bathc: 6/58, loss: 515.249755859375
Bathc: 7/58, loss: 560.9061279296875
Bathc: 8/58, loss: 598.77978515625
Bathc: 9/58, loss: 714.049072265625
Bathc: 10/58, loss: 596.368896484375
Bathc: 11/58, loss: 532.8328247070312
Bathc: 12/58, loss: 489.95654296875
Bathc: 13/58, loss: 453.1678771972656
Bathc: 14/58, loss: 463.7268371582031
Bathc: 15/58, loss: 492.1435546875
Bathc: 16/58, loss: 500.94635009765625
Bathc: 17/58, loss: 523.8145141601562
Bathc: 18/58, loss: 521.1709594726562
Bathc: 19/58, loss: 494.1229248046875
Bathc: 20/58, loss: 488.3055725097656
Bathc: 21/58, loss: 454.9127502441406
Bathc: 22/58, loss: 476.13006591796875
Bathc: 23/58, loss: 541.3934936523438
Bathc: 24/58, loss: 617.6058959960938
Bathc: 25/58, loss: 721.648681640625
Bathc: 26/58, loss: 689.8566284179688


KeyboardInterrupt: 

In [None]:
# iou shape - 7008 x 2
#