In [1]:
import os
from pathlib import Path

import pandas as pd
import numpy as np
import math

from PIL import Image

import torch

root_dir = Path('C:/Users/USER/Desktop/project/test')
model_dir = root_dir / 'model'
data_dir = root_dir / 'data'

debug_dir = root_dir / 'debug'

if not debug_dir.exists():
    os.mkdir(debug_dir)

bbox_path = 'C:/Users/USER/Desktop/project/test/data/oidv6-train-annotations-bbox.csv'

face_id = '/m/0dzct'
img_size = 640

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(device)

cuda


In [2]:
print('Loading Data...')
df = pd.read_csv(bbox_path)
print(f'{len(df)} Images find!\nDone!')

Loading Data...
14610229 Images find!
Done!


In [3]:
list(df)

['ImageID',
 'Source',
 'LabelName',
 'Confidence',
 'XMin',
 'XMax',
 'YMin',
 'YMax',
 'IsOccluded',
 'IsTruncated',
 'IsGroupOf',
 'IsDepiction',
 'IsInside',
 'XClick1X',
 'XClick2X',
 'XClick3X',
 'XClick4X',
 'XClick1Y',
 'XClick2Y',
 'XClick3Y',
 'XClick4Y']

In [4]:
train_df = df[['ImageID', 'XMin', 'XMax', 'YMin', 'YMax', 'LabelName']]
train_df = train_df[train_df['LabelName'] == face_id]

In [58]:
from torch.utils.data import Dataset

class DataSet(Dataset):
    def __init__(self, train_df, ths=0.046, DEBUG_MODE=False, is_train=False) -> None:
        i = 0

        self._train_set = []
        self._ths = ths
        self._DEBUG = DEBUG_MODE

        for img, face, debug_img in Image_loader(train_df, self._ths):
            if len(face) == 0:
                continue
            
            if DEBUG_MODE:
                debug_img.save(str(debug_dir / f"test img {i}.jpg"))
            
            train_data, train_label = Data_Pretreatment(img, face)
            
            print(len(train_data), train_label)
            for idx in range(len(train_data)):
                #self._train_set.append([train_data[idx], list(map(lambda x: int(x * img_size), [box for box in train_label[idx]]))])
                self._train_set.append([train_data[idx], train_label[idx]])

            i += 1
            if i == 10:
                break
        
        self._data_len = len(self._train_set)
        self._indices = np.arange(self._data_len, dtype=np.uint)

    def __len__(self):
        return self._data_len

    def __getitem__(self, idx):
        '''
        데이터 셋을 리턴
        train_set: [train_data, train_label]
        train_data: (image_size, image_size, 3)
        train_label: (xmax, xmin, ymax, ymin, anchor_size)
        '''
        return self._train_set[idx]
            

In [59]:
def Image_loader(df, ths):
    import copy
    iamge_dir = Path('C:/Users/USER/Desktop/project/test/data/[target_dir/train]')
    
    last_id = ''
    faces = []
    for idx, image in enumerate(df.values):
        Id, xmin, xmax, ymin, ymax, _ = image
        
        if last_id == '':
            img = Image.open(str(iamge_dir / (Id + '.jpg')))
            w, h = img.size
            debug_img = copy.deepcopy(img)
            last_id = Id
        
        if last_id != Id or idx == len(df.values) - 1:
            yield img.resize((img_size, img_size), Image.LANCZOS), faces, debug_img
            faces = []
            img = Image.open(str(iamge_dir / (Id + '.jpg')))
            w, h = img.size
            debug_img = copy.deepcopy(img)

        last_id = Id        
        if xmax - xmin < ths or ymax - ymin < ths:
            continue
        
        debug_img = draw_face_line(debug_img, [w, h, xmax, xmin, ymax, ymin])
        faces.append([xmax, xmin, ymax, ymin])
        

def draw_face_line(img, cord):
    from PIL import ImageDraw
    w, h, xmax, xmin, ymax, ymin = cord
    draw = ImageDraw.Draw(img)
    draw.rectangle(((int(w * xmin), int(h * ymin)), (int(w * xmax), int(h * ymax))), outline='green')
    return img

In [93]:
def Data_Pretreatment(img, face):
    new_img, new_label = argument(img, face)
    new_label = rate2cord(new_label)
    return new_img, new_label
    
def argument(img, faces):
    from torchvision import transforms
    new_image = [np.array(img)]
    new_labels = [faces]

    # crop_size_list = [0.3, 0.45, 0.6, 0.8, 1.0]
    # ths = torch.randint(0, 5)
    # if ths != 4:
    #     crop_ratio = crop_size_list(ths)
    #     crop_w, crop_h = img_size * crop_ratio, img_size * crop_ratio
    #     image = np.array(transforms.RandomCrop((crop_h, crop_w))(img))
    #     new_image.append(image)
    #     for i in range(face_count):
    #         face = np.array(faces[i])
    #         new_face = face * crop_ratio
    #         boxes.append(new_face)

    image = transforms.RandomHorizontalFlip(0.5)(img)
    if image != img:
        new_image.append(np.array(image))
        new_faces = []
        for face in faces:
            face = np.array(face)
            new_face = [1 - face[1], 1 - face[0], face[2], face[3]]
            new_faces.append(new_face)
        new_labels.append(new_faces)
    return new_image, new_labels

def rate2cord(labels):
    new_labels = []
    for label in labels:
        labels =[]
        for l in label:
            labels.append(list(map(lambda x: int(x * img_size), l)))
        new_labels.append(labels)

    return new_labels

In [94]:
'''
Random crop test
from torchvision import transforms
img = Image.open('C:/Users/USER/Desktop/project/test/data/[target_dir/train]/000004f4400f6ec5.jpg')
w, h = img.size
img = transforms.RandomCrop((round(h * 0.8), round(w * 0.8)))(img)
img.save(str(debug_dir / f"test.jpg"))
'''

'\nRandom crop test\nfrom torchvision import transforms\nimg = Image.open(\'C:/Users/USER/Desktop/project/test/data/[target_dir/train]/000004f4400f6ec5.jpg\')\nw, h = img.size\nimg = transforms.RandomCrop((round(h * 0.8), round(w * 0.8)))(img)\nimg.save(str(debug_dir / f"test.jpg"))\n'

In [95]:
train_data = DataSet(train_df, DEBUG_MODE=True)

[[[318, 275, 362, 198], [437, 364, 296, 139]]]
1 [[[318, 275, 362, 198], [437, 364, 296, 139]]]
[[[414, 230, 586, 238]], [[410, 226, 586, 238]]]
2 [[[414, 230, 586, 238]], [[410, 226, 586, 238]]]
[[[297, 208, 270, 146], [472, 380, 262, 142]], [[432, 342, 270, 146], [259, 167, 262, 142]]]
2 [[[297, 208, 270, 146], [472, 380, 262, 142]], [[432, 342, 270, 146], [259, 167, 262, 142]]]
[[[424, 244, 350, 148]]]
1 [[[424, 244, 350, 148]]]
[[[145, 114, 262, 186], [241, 206, 267, 208], [330, 292, 240, 152]], [[526, 494, 262, 186], [433, 398, 267, 208], [347, 309, 240, 152]]]
2 [[[145, 114, 262, 186], [241, 206, 267, 208], [330, 292, 240, 152]], [[526, 494, 262, 186], [433, 398, 267, 208], [347, 309, 240, 152]]]
[[[274, 188, 363, 170], [537, 459, 459, 314]]]
1 [[[274, 188, 363, 170], [537, 459, 459, 314]]]
[[[166, 116, 312, 219], [256, 201, 261, 154], [479, 400, 240, 91]]]
1 [[[166, 116, 312, 219], [256, 201, 261, 154], [479, 400, 240, 91]]]
[[[162, 115, 339, 202], [266, 229, 287, 147], [326, 25

In [96]:
train_data[0]

[array([[[34, 39, 39],
         [35, 38, 39],
         [36, 36, 39],
         ...,
         [ 7,  5,  6],
         [ 6,  4,  5],
         [ 7,  5,  6]],
 
        [[33, 38, 41],
         [36, 38, 40],
         [35, 39, 42],
         ...,
         [ 6,  4,  5],
         [ 6,  6,  6],
         [ 6,  6,  6]],
 
        [[33, 37, 40],
         [35, 37, 41],
         [33, 38, 41],
         ...,
         [ 5,  5,  5],
         [ 6,  6,  6],
         [ 6,  6,  6]],
 
        ...,
 
        [[36, 29, 23],
         [40, 33, 29],
         [38, 33, 30],
         ...,
         [10,  8,  9],
         [11,  9,  9],
         [12, 11,  9]],
 
        [[31, 26, 20],
         [36, 29, 23],
         [35, 30, 26],
         ...,
         [10,  8,  9],
         [11,  9,  9],
         [14, 13, 11]],
 
        [[29, 23, 17],
         [32, 25, 19],
         [32, 27, 24],
         ...,
         [ 9,  7,  8],
         [11,  9,  9],
         [16, 15, 13]]], dtype=uint8),
 [[318, 275, 362, 198], [437, 364, 296, 13

In [97]:
class encoder:
    def __init__(self) -> None:
        self.base_scale = 2 ** (4/3)
        anchor_scale_step = 2 ** (1/3)
        self.base_anchor = [self.base_scale * pow(2, i) for i in range(2, 8)]
        self.num_fms = 6

        self.aspect_ratios = [1/2., 1/1., 2/1.]
        self.anchor_scale = [self.base_scale * (anchor_scale_step ** i) for i in range(3)]
        self.anchor_list = self._get_anchor_wh()

    def _get_anchor_wh(self):
        anchors = []

        for anchor in self.base_anchor:
            for ar in self.aspect_ratios:
                h = anchor / ar
                w = ar * h
                for scale in self.anchor_scale:
                    anchors.append([w * scale, h * scale])
        
        return torch.Tensor(anchors).view(self.num_fms, -1, 2)


    def _get_anchor_boxes(self, input_size):
        boxes = []
        fm_sizes = [(input_size / i).ceil() for i in self.base_anchor]

        for i in range(self.num_fms):
            fm_size = fm_sizes[i]
            grid_size = input_size / fm_size
            fm_w, fm_h = int(fm_size[0]), int(fm_size[1])
            xy = self._meshgrid(fm_w, fm_h) + 0.5
            xy = (xy * grid_size).view(fm_h, fm_w, 1, 2).expand(fm_h, fm_w, 9, 2)
            wh = self.anchor_list[i].view(1, 1, 9, 2).expand(fm_h, fm_w, 9, 2)
            box = torch.cat([xy, wh], 3)
            boxes.append(box.view(-1, 4))
        return torch.cat(boxes, 0)
            
    
    def _get_iou(self, boxes):
        """
        box별 각 anchor들의 iou를 구함
        """
        bbox = []
        for box in boxes:
            box_wh = np.array([box[2] * 2, box[3] * 2])
            ious = []
            for fms in range(self.num_fms):
                fms_iou = []
                for anchor in self.anchor_list[fms]:
                    intersect_wh = np.maximum(np.minimum(box_wh, anchor), 0.0)
                    intersect_area = intersect_wh[0] * intersect_wh[1]
                    box_area = box_wh[0] * box_wh[1]
                    anchor_area = anchor[0] * anchor[1]
                    iou = intersect_area / (box_area + anchor_area - intersect_area)
                    fms_iou.append(iou)
                ious.append(fms_iou)
            bbox.append(ious)

        return torch.Tensor(bbox)
    
    def _meshgrid(self, x, y, row_major=True):
        a = torch.arange(0, x)
        b = torch.arange(0, y)
        xx = a.repeat(y).view(-1 ,1)
        yy = b.view(-1, 1).repeat(1, x).view(-1, 1)
        return torch.cat([xx, yy], 1) if row_major else torch.cat([yy, xx])

    def encode(self, boxes, labels, input_size):
        input_size = torch.Tensor([input_size, input_size]) if isinstance(input_size, int) else torch.Tensor(input_size)
        anchor_boxes = self._get_anchor_boxes(input_size)
        boxes = self._change_box_order(boxes, 'xxyy2xywh')

        ious = self._get_iou(boxes)
        best_ious, best_ids = ious.max(1)
        print(best_ids, boxes)
        boxes = boxes[best_ids]
    
        loc_xy = (boxes[:,:2] - anchor_boxes[:,:2]) / anchor_boxes[:,2:]
        loc_wh = torch.log(boxes[:,2:]/anchor_boxes[:,2:])
        loc_targets = torch.cat([loc_xy, loc_wh], 1)

        # class 할당
        cls_targets = 1 + labels[best_ids[idx]]
        cls_targets[best_ious<0.35] = 0 # iou < 0.35 anchor는 negative
        return loc_targets, cls_targets

    def decode(self, loc_preds, cls_preds, input_size):
        cls_thresh = 0.5
        nms_thresh = 0.5

        input_size = torch.Tensor([input_size,input_size]) if isinstance(input_size, int) else torch.Tensor(input_size)
        anchor_boxes = self._get_anchor_boxes(input_size) # 앵커 박스 생성

        loc_xy = loc_preds[:,:2] # 결과값 offset 추출
        loc_wh = loc_preds[:,2:]

        xy = loc_xy * anchor_boxes[:,2:] + anchor_boxes[:,:2] # offset + anchor
        wh = loc_wh.exp() * anchor_boxes[:,2:]
        boxes = torch.cat([xy-wh/2, xy+wh/2], 1)

        score, labels = cls_preds.sigmoid().max(1)
        ids = score > cls_thresh
        ids = ids.nonzero().squeeze()
        keep = self._box_nms(boxes[ids], score[ids], threshold=nms_thresh) # nms
        return boxes[ids][keep], labels[ids][keep]
    
    def _change_box_order(self, boxes, order):
        assert order in ['xxyy2xywh','xywh2xxyy']

        if order == 'xxyy2xywh':
            x = torch.div((boxes[:,0] + boxes[:,1]), 2, rounding_mode='trunc')
            y = torch.div((boxes[:,2] + boxes[:,3]), 2, rounding_mode='trunc')
            w = boxes[:,0] - boxes[:,1]
            h = boxes[:,2] - boxes[:,3]
            return torch.cat([x.view(-1, 1), y.view(-1, 1), w.view(-1, 1), h.view(-1, 1)], 1)
        else:
            x1 = boxes[:,0] + torch.div(boxes[:,2], 2, rounding_mode='trunc')
            x2 = boxes[:,0] - torch.div(boxes[:,2], 2, rounding_mode='trunc')
            y1 = boxes[:,1] + torch.div(boxes[:,3], 2, rounding_mode='trunc')
            y2 = boxes[:,1] - torch.div(boxes[:,3], 2, rounding_mode='trunc')
            return torch.cat([x1.view(-1, 1), x2.view(-1, 1), y1.view(-1, 1), y2.view(-1, 1)], 1)

    def _box_nms(self, bboxes, scores, threshold=0.5, mode='union'):
        x1 = bboxes[:,0]
        y1 = bboxes[:,1]
        x2 = bboxes[:,2]
        y2 = bboxes[:,3]

        areas = (x2 - x1 + 1) * (y2 - y1 + 1)
        _, order = scores.sort(0, descending=True) # confidence 순 정렬
        keep = []
        while order.numel() > 0:
            if order.numel() == 1:
                keep.append(order.data)
                break
            i = order[0] # confidence 가장 높은 anchor 추출
            keep.append(i) # 최종 detection에 저장

            xx1 = x1[order[1:]].clamp(min=x1[i])
            yy1 = y1[order[1:]].clamp(min=y1[i])
            xx2 = x2[order[1:]].clamp(max=x2[i])
            yy2 = y2[order[1:]].clamp(max=y2[i])

            w = (xx2-xx1+1).clamp(min=0)
            h = (yy2-yy1+1).clamp(min=0)
            inter = w*h

            if mode == 'union':
                ovr = inter / (areas[i] + areas[order[1:]] - inter)
            elif mode == 'min':
                ovr = inter / areas[order[1:]].clamp(max=areas[i])
            else:
                raise TypeError('Unknown nms mode: %s.' % mode)

            ids = (ovr<=threshold).nonzero().squeeze()
            if ids.numel() == 0:
                break
            order = order[ids+1]
        return torch.LongTensor(keep)

In [98]:
class DataLoader:
    def __init__(self, df, batch_size) -> None:
        self._df = np.array(df, dtype='object')
        self._batch_size = batch_size
        
        self.encoder = encoder()
        self._imgs = [img for img in self._df[:, 0]]
        self._boxes = torch.stack([torch.Tensor(box) for box in self._df[:, 1]])

        self._loc_targets = []
        self._cls_targets = []

        loc_target, cls_target = self.encoder.encode(self._boxes, [1 for _ in range(self.data_size())], input_size=(img_size, img_size))
        self._loc_targets.append(loc_target)
        self._cls_targets.append(cls_target)
        

    def next(self, idx):
        start_idx = idx * self._batch_size
        end_idx = min((idx + 1) * self._batch_size, self.len())
        
        return self._imgs[start_idx:end_idx], torch.stack(self._loc_targets[start_idx:end_idx]), torch.stack(self._cls_targets[start_idx:end_idx])

    def data_size(self):
        return len(self._df)

    def iter(self):
        from math import ceil
        return ceil(self.len() / self._batch_size)



In [99]:
a = torch.Tensor([[296.], [343.], [400.], [322.], [252.], [426.], [334.], [129.], [223.], [311.], [231.]])
b = torch.Tensor([[280.], [280.], [217.], [412.], [208.], [202.], [249.], [224.], [237.], [196.], [266.]])
c = torch.Tensor([[ 43.,  42.,  73., 184.,  89.,  92., 180.,  31.,  35.,  38.,  86.]])
d = torch.Tensor([[164., 164., 157., 348., 124., 120., 202.,  76.,  59.,  88., 193.]])

torch.cat((c.view(-1, 1), d.view(-1, 1)), 1)


tensor([[ 43., 164.],
        [ 42., 164.],
        [ 73., 157.],
        [184., 348.],
        [ 89., 124.],
        [ 92., 120.],
        [180., 202.],
        [ 31.,  76.],
        [ 35.,  59.],
        [ 38.,  88.],
        [ 86., 193.]])

In [100]:
batch = 8

train_data_loader = DataLoader(train_data, batch)

RuntimeError: stack expects each tensor to be equal size, but got [2, 4] at entry 0 and [1, 4] at entry 1

In [None]:
for idx in range(train_data_loader.iter() + 1):
    x, y = train_data_loader.next(idx)
    print(x, y)
    break

NameError: name 'train_data_loader' is not defined

In [None]:
from torch import nn

In [None]:
class Face_detect_model(nn.Module):
    def __init__(self, backbone='resnet', conf=None) -> None:
        super().__init__()
        if conf:
            self._class_num = conf.class_num
            self._class_label = conf.class_label
        else:
            self._class_label = ['face']
            self._class_num = len(self._class_label)

        self._anchors = [4, 8, 16, 32, 64, 128]
        def get_anchor(eps):
            from math import pow
            return [[int(pow(2, i)) * eps for _ in range(2)] for i in range(2, 8)]

        eps = 2 ** (4/3)
        anchor_list = get_anchor(eps)
        self.num_anchors = len(anchor_list)

        if backbone == 'resnet':
            self._backbone = FPN(num_blocks=[3,4,6,3])
        else:
            pass
        self.reg_head = self._make_head(self.num_anchors * 4, iou=True)
        self.cls_head = self._make_head(self.num_anchors * self._class_num)

        self.loc_head = self._loc_head(self.num_anchors * 4) # 바운딩 박스 좌표 예측
        self.iou_head = self._iou_head()
    
    def foward(self, x):
        fms = self._backbone(x)
        loc_preds = []
        cls_preds = []
        iou_preds = []
        for fm in fms: 
            reg_pred = self.reg_head(fm)
            cls_pred = self.cls_head(fm)
            loc_pred = self.loc_head(reg_pred)
            iou_pred = self.iou_head(reg_pred)

            loc_pred = loc_pred.permute(0,2,3,1).contiguous().view(x.size(0),-1,4)  # [N, 9*4,H,W] -> [N,H,W, 9*4] -> [N,H*W*9, 4]
            cls_pred = cls_pred.permute(0,2,3,1).contiguous().view(x.size(0),-1,self.num_classes) # [N,9,H,W] -> [N,H,W,9*20] -> [N,H*W*9,20]
            iou_pred = self._iou_aware(iou_pred)

            loc_preds.append(loc_pred)
            cls_preds.append(cls_pred)
            iou_preds.append(iou_pred)
        return torch.cat(loc_preds,1), torch.cat(cls_preds,1), torch.cat(iou_preds,1)

    def _make_head(self, out_channels, iou=False): # 예측을 수행하는 Layer 생성
        layers = []
        for _ in range(4):
            layers.append(nn.Conv2d(256,256,3, stride=1, padding=1))
            layers.append(nn.ReLU())
        if iou:
            return nn.Sequential(*layers)
        layers.append(nn.Conv2d(256, out_channels, 3, stride=1, padding=1)) # (batch,9*4,H,W) or (batch,9,H,W) 

        return nn.Sequential(*layers)
    
    def _loc_head(self, out_channels):
        layers = []
        layers.append(nn.Conv2d(256, out_channels, 3, stride=1, padding=1))
        return nn.Sequential(*layers)

    def _iou_head(self):
        layers = []
        layers.append(nn.Conv2d(256, 3, 3, stride=1, padding=1)) # (batch, 9, H, W)
        layers.append(nn.Sigmoid())# (batch, 9, H, W)
        return nn.Sequential(*layers)

    def freeze_bn(self): # pre-trained model을 사용하므로, BN freeze
        for layer in self.modules():
            if isinstance(layer, nn.BatchNorm2d):
                layer.eval()

In [None]:
class FPN(nn.Module):
    def __init__(self, num_blocks):
        super(FPN, self).__init__()
        self.in_channels = 64

        self.conv1 = nn.Conv2d(3, 64, 7, stride=2, padding=3, bias=False) # 640x640
        self.bn1 = nn.BatchNorm2d(64)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(3, stride=2, padding=1) # 320x320

        self.layer1 = self._make_layer(64, num_blocks[0], stride=1)  # c2, 320x320
        self.layer2 = self._make_layer(128, num_blocks[1], stride=2)  # c3 160x160
        self.layer3 = self._make_layer(256, num_blocks[2], stride=2, DCN=True) # c4 80x80
        self.layer4 = self._make_layer(512, num_blocks[3], stride=2, DCN=True) # c5
        self.conv6 = nn.Conv2d(2048, 256, 3, stride=2, padding=1)    # p6
        self.conv7 = nn.Sequential(                                  # p7
            nn.ReLU(),
            nn.Conv2d(256, 256, 3, stride=2, padding=1)
        )

        # Lateral layers
        self.lateral_1 = nn.Conv2d(2048, 256, 1, stride=1, padding=0)
        self.lateral_2 = nn.Conv2d(1024, 256, 1, stride=1, padding=0)
        self.lateral_3 = nn.Conv2d(512, 256, 1, stride=1, padding=0)
        self.lateral_4 = nn.Conv2d(256, 256, 1, stride=1, padding=0)

        # Top-down layers
        self.top_down_1 = nn.Conv2d(256, 256, 3, stride=1, padding=1)
        self.top_down_2 = nn.Conv2d(256, 256, 3, stride=1, padding=1)
        self.top_down_3 = nn.Conv2d(256, 256, 3, stride=1, padding=1)

        self.upsample_1 = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=False)
        self.upsample_2 = nn.Upsample(size=(75,75), mode='bilinear', align_corners=False)

    def forward(self, x):
        # Feature extractor(ResNet)
        c1 = self.relu(self.bn1(self.conv1(x)))
        c1 = self.maxpool(c1)
        c2 = self.layer1(c1)
        c3 = self.layer2(c2)
        c4 = self.layer3(c3)
        c5 = self.layer4(c4)

        # FPN
        p6 = self.conv6(c5)
        p7 = self.conv7(p6)
        p5 = self.lateral_1(c5)
        p4 = self.top_down_1(self.upsample_1(p5) + self.lateral_2(c4))
        p3 = self.top_down_2(self.upsample_2(p4) + self.lateral_3(c3))
        p2 = self.top_down_3(self.upsample_2(p3) + self.lateral_4(c2))

        return p2, p3, p4, p5, p6, p7

    def _make_layer(self, inner_channels, num_block, stride, DCN=False):
        strides = [stride] + [1] * (num_block-1)
        layers = []
        for stride in strides:
            layers.append(BottleNeck(self.in_channels, inner_channels, stride=stride, DCN=DCN))
            self.in_channels = inner_channels*BottleNeck.expension
        return nn.Sequential(*layers)

In [None]:
class BottleNeck(nn.Module):
    expension = 4
    def __init__(self, in_channels, out_channels, stride=1, DCN=False):
        super().__init__()
        
        if DCN:
            self.residual_function = nn.Sequential(
                DeformableConv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                DeformableConv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                DeformableConv2d(out_channels, out_channels * BottleNeck.expension, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(out_channels * BottleNeck.expension),
            )
        else:
            self.residual_function = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                nn.Conv2d(out_channels, out_channels * BottleNeck.expension, kernel_size=1, stride=1, bias=False),
                nn.BatchNorm2d(out_channels * BottleNeck.expension),
            )

        self.shortcut = nn.Sequential()

        self.relu = nn.ReLU()

        if stride != 1 or in_channels != out_channels * BottleNeck.expension:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels * BottleNeck.expension, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels * BottleNeck.expension)
            )
            
    def forward(self, x):
        x = self.residual_function(x) + self.shortcut(x)
        x = self.relu(x)
        return x

In [None]:
import torchvision.ops

class DeformableConv2d(nn.Module):
    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size=3,
                 stride=1,
                 padding=1,
                 bias=False):

        super(DeformableConv2d, self).__init__()
        
        assert type(kernel_size) == tuple or type(kernel_size) == int

        kernel_size = kernel_size if type(kernel_size) == tuple else (kernel_size, kernel_size)
        self.stride = stride if type(stride) == tuple else (stride, stride)
        self.padding = padding
        
        self.offset_conv = nn.Conv2d(in_channels, 
                                     2 * kernel_size[0] * kernel_size[1],
                                     kernel_size=kernel_size, 
                                     stride=stride,
                                     padding=self.padding, 
                                     bias=True)

        nn.init.constant_(self.offset_conv.weight, 0.)
        nn.init.constant_(self.offset_conv.bias, 0.)
        
        self.modulator_conv = nn.Conv2d(in_channels, 
                                     1 * kernel_size[0] * kernel_size[1],
                                     kernel_size=kernel_size, 
                                     stride=stride,
                                     padding=self.padding, 
                                     bias=True)

        nn.init.constant_(self.modulator_conv.weight, 0.)
        nn.init.constant_(self.modulator_conv.bias, 0.)
        
        self.regular_conv = nn.Conv2d(in_channels=in_channels,
                                      out_channels=out_channels,
                                      kernel_size=kernel_size,
                                      stride=stride,
                                      padding=self.padding,
                                      bias=bias)

    def forward(self, x):
        #h, w = x.shape[2:]
        #max_offset = max(h, w)/4.

        offset = self.offset_conv(x)#.clamp(-max_offset, max_offset)
        modulator = 2. * torch.sigmoid(self.modulator_conv(x))
        
        x = torchvision.ops.deform_conv2d(input=x, 
                                          offset=offset, 
                                          weight=self.regular_conv.weight, 
                                          bias=self.regular_conv.bias, 
                                          padding=self.padding,
                                          mask=modulator,
                                          stride=self.stride,
                                          )
        return x

In [None]:
from easydict import EasyDict
conf = EasyDict({})

conf.class_label = ['humen_face']
conf.class_num = len(conf.class_label)
conf.iou_ths = 0.5

In [None]:
model = Face_detect_model(conf=conf).to(device)

NVIDIA GeForce RTX 3070 Laptop GPU with CUDA capability sm_86 is not compatible with the current PyTorch installation.
The current PyTorch install supports CUDA capabilities sm_37 sm_50 sm_60 sm_61 sm_70 sm_75 compute_37.
If you want to use the NVIDIA GeForce RTX 3070 Laptop GPU GPU with PyTorch, please check the instructions at https://pytorch.org/get-started/locally/



In [None]:
print(model)

Face_detect_model(
  (_backbone): FPN(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU()
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BottleNeck(
        (residual_function): Sequential(
          (0): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (2): ReLU()
          (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (4): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
          (5): ReLU()
          (6): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (7): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
   

In [None]:
class DIoULoss(nn.Module):
    def __init__(self) -> None:
        super().__init__()

    def get_metrics(self, iou_preds, p, alpha=0.5):
        '''
        (y_true&y_pred_iou, cls_preds)
        '''
        size = len(iou_preds) if len(iou_preds) == len(p) else 0

        scores = []
        for i in range(size):
            score = (iou_preds[i] ** (1 - alpha)) * p[i] ** alpha
            scores.append(score)
        
        return sum(scores) / len(scores)

    def get_p(self, box1, box2):
        w = h = img_size
        p = 0
        for i in range(self.batch_size):
            points = self.get_cords((box1[i], box2[i]), w, h)
            p += np.linalg.norm(points[0] - points[1])

        return p

    def get_IoU(self, box1, box2):
        # box = (x1, y1, x2, y2)
        # box = (x1, x2, y1, y2)
        iou = 0
        for i in range(self.batch_size):
            box1_area = (box1[i, 1] - box1[i, 0] + 1) * (box1[i, 3] - box1[i, 2] + 1)
            box2_area = (box2[i, 1] - box2[i, 0] + 1) * (box2[i, 3] - box2[i, 2] + 1)

            # obtain x1, y1, x2, y2 of the intersection
            x1 = max(box1[i, 0], box2[i, 0])
            x2 = max(box1[i, 1], box2[i, 1])
            y1 = min(box1[i, 2], box2[i, 2])
            y2 = min(box1[i, 3], box2[i, 3])

            # compute the width and height of the intersection
            w = max(0, x2 - x1 + 1)
            h = max(0, y2 - y1 + 1)

            inter = w * h
            iou += inter / (box1_area + box2_area - inter)
        return iou

    def get_cords(self, boxs, w, h):
        '''
        input xmin, xmax, ymin, ymax (float)
        return (x, y) center cordinate
        '''
        cords = []
        for box in boxs:
            box_cord = np.array((int(((box[0] + box[1]) * w) / 2), int(((box[2] + box[3]) * h) / 2)))
            cords.append(box_cord)
        
        return cords
    
    def foward(self, loc_preds, loc_targets):
        ''' 
        Distance-IoU Loss
        '''
        self.batch_size = loc_preds.size()

        loss = 1 - (self.get_IoU(loc_targets, loc_preds) / self.batch_size)
        loss += self.get_p(loc_targets, loc_preds) / self.batch_size

        return loss

In [None]:
lr_up_iter = 500
lr_down_epoch = 30
lr_down_epoch_end = 630

conf.lr = 3.75e-3
conf.mm = 0.9
conf.wd = 5e-4

conf.lr_warmup = 10. / lr_up_iter

conf.lr_down_ratio = 20. / (lr_down_epoch_end // lr_down_epoch)

In [None]:
class Warmup(torch.optim.lr_scheduler.LambdaLR):
    def __init__(self, optimizer, warmup_steps, last_epoch) -> None:

        def lr_lambda(step):
            if step < warmup_steps:
                return float(step) / float(max(1.0, warmup_steps))
            return 1.
        
        super().__init__(optimizer, lr_lambda, last_epoch=last_epoch)

In [None]:
loss_func = DIoULoss()


opt = torch.optim.SGD(lr = conf.lr, momentum=conf.mm, weight_decay=conf.wd)
lr_scheduler = torch.optim.lr_scheduler.MultiStepLR(
    opt, 
    [i for i in range(0, conf.lr_down_epoch_end, conf.lr_down_epoch)],
    conf.lr_down_ratio
    )
lr_warmup = Warmup(opt, conf.lr_warmup, lr_up_iter)

In [None]:
def train(model, params):
    pass

In [None]:
import os

iter = int(input('target iterlation: '))
save_dir = ""

try:
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
except:
    print("Can't make save dir")

model = Face_detect_model().to(device)

if iter == 0:
    while True:
        pass
else:
    for i in range(iter):
        model, history = train(model, conf)
        lr_warmup.step()