In [17]:
import torch
import torchvision.transforms as transforms

import os
import cv2
import math
import random
import numpy as np
import matplotlib.pyplot as plt
import xml.etree.ElementTree as ET

from PIL import Image, ImageDraw

# Encoder

In [18]:
def meshgrid(x, y, row_major=True):
    a = torch.arange(0, x)
    b = torch.arange(0, y)
    xx = a.repeat(y).view(-1, 1)
    yy = b.view(-1, 1).repeat(1, x).view(-1, 1)
    return torch.cat([xx, yy], 1) if row_major else torch.cat([yy, xx], 1)

def change_box_order(boxes, order):
    assert order in ['xyxy2xywh', 'xywh2xyxy']
    a = boxes[:, :2]
    b = boxes[:, 2:]
    if order == 'xyxy2xywh':
        return torch.cat([(a + b) / 2, b - a + 1], 1)
    return torch.cat([a - b / 2, a + b / 2], 1)

def box_iou(box1, box2, order='xyxy'):
    if order == 'xywh':
        box1 = change_box_order(box1, 'xywh2xyxy')
        box2 = change_box_order(box2, 'xywh2xyxy')

    N = box1.size(0)
    M = box2.size(0)

    lt = torch.max(box1[:, None, :2], box2[:, :2])
    rb = torch.min(box1[:, None, 2:], box2[:, 2:])

    wh = (rb - lt + 1).clamp(min=0)
    inter = wh[:, :, 0] * wh[:, :, 1]

    area1 = (box1[:, 2] - box1[:, 0] + 1) * (box1[:, 3] - box1[:, 1] + 1)
    area2 = (box2[:, 2] - box2[:, 0] + 1) * (box2[:, 3] - box2[:, 1] + 1)
    iou = inter / (area1[:, None] + area2 - inter)

    # print(area1)
    # print(area1.shape)
    # print(area2)
    # print(area2.shape)
    # print(f"iou: {iou}")
    return iou

def box_nms(bboxes, scores, threshold=0.5, mode='union'):
    x1 = bboxes[:, 0]
    y1 = bboxes[:, 1]
    x2 = bboxes[:, 2]
    y2 = bboxes[:, 3]

    areas = (x2 - x1 + 1) * (y2 - y1 + 1)
    _, order = scores.sort(0, descending=True)
    keep = []
    sco = []

    while order.numel() > 0:
        i = order[0] if order.numel() > 1 else order.item()
        keep.append(i)
        sco.append(scores[i])

        if order.numel() == 1:
            break

        xx1 = x1[order[1:]].clamp(min=x1[i])
        yy1 = y1[order[1:]].clamp(min=y1[i])
        xx2 = x2[order[1:]].clamp(max=x2[i])
        yy2 = y2[order[1:]].clamp(max=y2[i])

        w = (xx2 - xx1 + 1).clamp(min=0)
        h = (yy2 - yy1 + 1).clamp(min=0)
        inter = w * h

        if mode == 'union':
            ovr = inter / (areas[i] + areas[order[1:]] - inter)
        elif mode == 'min':
            ovr = inter / areas[order[1:]].clamp(max=areas[i])
        else:
            raise TypeError(f"Unknow nms mode: {mode}.")

        ids = (ovr <= threshold).nonzero().squeeze()
        if ids.numel() == 0:
            break
        order = order[ids + 1]
    return torch.LongTensor(keep), torch.Tensor(sco)

In [30]:
class DataEncoder:
    def __init__(self, input_scales):
        self.input_area = input_scales[0] * input_scales[1]
        self.anchor_areas = [32*32, 64*64, 128*128, 256*256, 512*512] # p3 -> p7
        self.aspect_ratios = (0.5, 1.0, 2.0)
        self.scale_ratios = (1.0, pow(2.0, 1.0/3.0), pow(2.0, 2.0/3.0))
        self.anchor_wh = self._get_anchor_wh()

    def _get_anchor_wh(self):
        anchor_wh = []
        for s in self.anchor_areas:
            for ar in self.aspect_ratios:
                h = math.sqrt(s / ar)
                w = ar * h
                for sr in self.scale_ratios:
                    anchor_h = h * sr
                    anchor_w = w * sr
                    anchor_wh.append([anchor_w, anchor_h])
        num_fms = len(self.anchor_areas)
        return torch.Tensor(anchor_wh).view(num_fms, -1, 2)

    def _get_anchor_boxes(self, input_size):
        num_fms = len(self.anchor_areas)
        fm_sizes = [(input_size / pow(2., i+3)).ceil() for i in range(num_fms)]

        boxes = []
        for i in range(num_fms):
            fm_size = fm_sizes[i]
            grid_size = input_size / fm_size
            fm_w, fm_h = int(fm_size[0]), int(fm_size[1])
            xy = meshgrid(fm_w, fm_h) + 0.5
            xy = (xy * grid_size).view(fm_h, fm_w, 1, 2).expand(fm_h, fm_w, 9, 2)
            wh = self.anchor_wh[i].view(1, 1, 9, 2).expand(fm_h, fm_w, 9, 2)
            box = torch.cat([xy, wh], 3)
            boxes.append(box.view(-1, 4))
        return torch.cat(boxes, 0)

    def encode(self, boxes, labels, input_size):
        input_size = torch.Tensor([input_size, input_size]) if isinstance(input_size, int) else torch.Tensor(input_size)
        anchor_boxes = self._get_anchor_boxes(input_size)
        boxes = change_box_order(boxes, 'xyxy2xywh')

        ious = box_iou(anchor_boxes, boxes, 'xywh')
        max_ious, max_ids = ious.max(1)
        boxes = boxes[max_ids]

        loc_xy = (boxes[:, :2] - anchor_boxes[:, :2]) / anchor_boxes[:, 2:]
        loc_wh = torch.log(boxes[:, 2:] / anchor_boxes[:, 2:])
        loc_targets = torch.cat([loc_xy, loc_wh], 1)
        cls_targets = 1 + labels[max_ids]

        cls_targets[max_ious < 0.4] = 0
        ignore = (max_ious >= 0.4) & (max_ious < 0.5)
        cls_targets[ignore] = -1
        return loc_targets, cls_targets

    def decode(self, loc_preds, cls_preds, input_size):
        CLS_THRESH = 0.3
        NMS_THRESH = 0.3

        input_size = torch.Tensor([input_size, input_size]) if isinstance(input_size, int) else torch.Tensor(input_size)
        anchor_boxes = self._get_anchor_boxes(input_size)

        loc_xy = loc_preds[:, :2]
        loc_wh = loc_preds[:, 2:]

        xy = loc_xy * anchor_boxes[:, 2:] + anchor_boxes[:, :2]
        wh = loc_wh.exp() * anchor_boxes[:, 2:]
        boxes = torch.cat([xy - wh / 2, xy + wh / 2], 1)

        score, labels = cls_preds.sigmoid().max(1)
        ids = score > CLS_THRESH
        ids = ids.nonzero().squeeze()
        if not ids.size():
            return torch.tensor([0, 0, 0, 0]), torch.tensor([0]), torch.tensor([0]), False
        if ids.size()[0] == 0:
            return torch.tensor([0, 0, 0, 0]), torch.tensor([0]), torch.tensor([0]), False
        keep, sco = box_nms(boxes[ids], score[ids], threshold=NMS_THRESH)
        return boxes[ids][keep], labels[ids][keep], sco, True

# Define model

## RetinaNet

In [31]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

### FPN

In [32]:
class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion * planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion * planes)

        self.downsample = nn.Sequential()
        if stride != 1 or in_planes != self.expansion * planes:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion * planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion * planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))

        out += self.downsample(x)
        out = F.relu(out)
        
        return out

In [33]:
class FPN(nn.Module):
    def __init__(self, block, num_blocks):
        super(FPN, self).__init__()
        self.in_planes = 64

        self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(64)

        # Bottom-up layers
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
        self.conv6 = nn.Conv2d(2048, 256, kernel_size=3, stride=2, padding=1)
        self.conv7 = nn.Conv2d(256, 256, kernel_size=3, stride=2, padding=1)

        # Lateral layers
        self.latlayer1 = nn.Conv2d(2048, 256, kernel_size=1, stride=1, padding=0)
        self.latlayer2 = nn.Conv2d(1024, 256, kernel_size=1, stride=1, padding=0)
        self.latlayer3 = nn.Conv2d(512, 256, kernel_size=1, stride=1, padding=0)

        # Top-down layers
        self.toplayer1 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)
        self.toplayer2 = nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def _upsample_add(self, x, y):
        _, _, H, W = y.size()
        return F.upsample(x, size=(H, W), mode='bilinear', align_corners=True) + y

    def forward(self, x):
        # Bottom-up
        c1 = F.relu(self.bn1(self.conv1(x)))
        c1 = F.max_pool2d(c1, kernel_size=3, stride=2, padding=1)

        c2 = self.layer1(c1)
        c3 = self.layer2(c2)
        c4 = self.layer3(c3)
        c5 = self.layer4(c4)

        p6 = self.conv6(c5)
        p7 = self.conv7(F.relu(p6))

        # Top_down
        p5 = self.latlayer1(c5)
        p4 = self._upsample_add(p5, self.latlayer2(c4))
        p4 = self.toplayer1(p4)
        p3 = self._upsample_add(p4, self.latlayer3(c3))
        p3 = self.toplayer2(p3)
        return p3, p4, p5, p6, p7

def FPN50():
    return FPN(Bottleneck, [3, 4, 6, 3])

def FPN101():
    return FPN(Bottleneck, [2, 4, 23, 3])

### RetinaNet Model

In [34]:
OPTIMIZER = 'sgd'
OPTIM_BASE_LR = 0.001
OPTIM_MOMENTUM = 0.9
OPTIM_ALPHA = 0.5
OPTIM_EPS = 1e-8
OPTIM_WEIGHT_DECAY = 0.0005
OPTIM_BETA = (0.9, 0.999)

In [35]:
class RetinaNet(nn.Module):
    num_anchors = 9
    
    def __init__(self, nets='FPN50', num_classes=1):
        super(RetinaNet, self).__init__()
        self.fpn = nets()
        self.num_classes = num_classes
        self.loc_head = self._make_head(self.num_anchors * 4)
        self.cls_head = self._make_head(self.num_anchors * self.num_classes)

    def forward(self, x):
        fms = self.fpn(x)
        loc_preds = []
        cls_preds = []
        for fm in fms:
            loc_pred = self.loc_head(fm)
            cls_pred = self.cls_head(fm)
            loc_pred = loc_pred.permute(0, 2, 3, 1).contiguous().view(x.size(0), -1, 4)
            cls_pred = cls_pred.permute(0, 2, 3, 1).contiguous().view(x.size(0), -1, self.num_classes)
            loc_preds.append(loc_pred)
            cls_preds.append(cls_pred)

        return torch.cat(loc_preds, 1), torch.cat(cls_preds, 1)

    def _make_head(self, out_planes):
        layers = []
        for _ in range(4):
            layers.append(nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1))
            layers.append(nn.ReLU(True))
        layers.append(nn.Conv2d(256, out_planes, kernel_size=3, stride=1, padding=1))
        return nn.Sequential(*layers)

    def freeze_bn(self):
        for layer in self.modules():
            if isinstance(layer, nn.BatchNorm2d):
                layer.eval()

In [36]:
def create(conv_body, num_classes):
    networks_map = {
        'ResNet50_FPN': FPN50,
        'ResNet101_FPN': FPN101
    }

    model = RetinaNet(networks_map[conv_body], num_classes)
    return model

def config_optimizer(param):
    print(f"using {OPTIMIZER}: base_learning_rate = {OPTIM_BASE_LR}, momentum = {OPTIM_MOMENTUM}, weight_decay = {OPTIM_WEIGHT_DECAY}")
    if OPTIMIZER == 'sgd':
        optimizer = optim.SGD(param, lr=OPTIM_BASE_LR, momentum=OPTIM_MOMENTUM, weight_decay=OPTIM_WEIGHT_DECAY)
    elif OPTIMIZER == 'rmsprop':
        optimizer = optim.RMSprop(param, lr=OPTIM_BASE_LR, momentum=OPTIM_MOMENTUM, alpha=OPTIM_ALPHA, eps=OPTIM_EPS, weight_decay=OPTIM_WEIGHT_DECAY)
    elif OPTIMIZER == 'adam':
        optimizer = optim.Adam(param, lr=OPTIM_BASE_LR, betas=OPTIM_BETA, eps=OPTIM_EPS, weight_decay=OPTIM_WEIGHT_DECAY)
    else:
        AssertionError('optimizer can not be recognized.')
    return optimizer

# Test

In [37]:
import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
import torch.nn.init as init

In [38]:
MODEL_CONV_BODY = 'ResNet50_FPN'
MODEL_NUM_CLASSES = 1
MODEL_CHECKPOINT_DIR = 'D:/For Me/3_Dataset/Shrimp/checkpoint'

TEST_DATASET = 'D:/ForME/3_Dataset/Shrimp/test'
TEST_SCALE = (600, 600)

category = ['shrimp']

In [39]:
def resize(img, boxes, size, max_size=1000):
    ## **warning
    w, h = img.size
    if isinstance(size, int):
        size_min = min(w, h)
        size_max = max(w, h)
        # print(f"size min : {size_min}")
        # print(f"size max : {size_max}")
        sw = sh = float(size) / size_min
        if sw * size_max > max_size:
            sw = sh = float(max_size) / size_max
        ow = int(w * sw + 0.5)
        oh = int(h * sh + 0.5)
    else:
        ow, oh = size
        sw = float(ow) / w
        sh = float(oh) / h
    return img.resize((ow, oh), Image.BILINEAR), boxes * torch.Tensor([sw, sh, sw, sh])
    
def denoise(img):
    # convert PIL image to cv2 image
    img = np.array(img)

    # denoise
    img = cv2.fastNlMeansDenoisingColored(img, None, 10, 10, 7, 21)

    # convert cv2 image to PIL image
    img = Image.fromarray(img)
    return img

def segmentation(img):
    # convert PIL image to cv2 image
    img = np.array(img)

    # init input for kmean
    twoDimage = img.reshape((-1, 3))
    twoDimage = np.float32(twoDimage)

    # init parameter for kmean
    criteria = (cv2.TERM_CRITERIA_EPS + cv2.TERM_CRITERIA_MAX_ITER, 10, 1.0)
    k = 2
    attempts = 10

    # kmean
    ret, label, center = cv2.kmeans(twoDimage, k, None, criteria, attempts, cv2.KMEANS_PP_CENTERS)
    center = np.uint8(center)
    res = center[label.flatten()]
    result_image = res.reshape((img.shape))
    
    # convert cv2 image to PIL image
    result_image = Image.fromarray(result_image)

    return result_image

def preprocess_block(img, test_scales, get_segmentation):
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.485,0.456,0.406), (0.229,0.224,0.225))])

    img = denoise(img)
    img = img.resize(test_scales)
    if get_segmentation:
        img = segmentation(img)
    
    plt.imshow(img)
    plt.show()

    img = transform(img)
    return img

In [40]:
def test_model(display=True):
    model = create(MODEL_CONV_BODY, MODEL_NUM_CLASSES)
    try:
        # fpn50_b1_600_93_0.483.pkl
        checkpoint = torch.load(os.path.join(MODEL_CHECKPOINT_DIR, 'fpn50_b1_600_93_0.483.pkl'))
        model.load_state_dict(checkpoint['net'])
        print('init complete')
    except:
        print(f"not checkpoint")

    if not torch.cuda.is_available():
        raise print(f"You could use GPU for train model")

    model.cuda()
    model.eval()

    img_dir = os.path.join(TEST_DATASET, 'imgs')
    img_list = os.path.join(TEST_DATASET, 'image_list.txt')

    with open(img_list, 'r') as lst : 
        img_list = lst.readlines()
    img_nums = len(img_list)
    print(img_nums)

    test_scales = TEST_SCALE
    dic = {}
    for i in range(1):
        dic[str(i)] = []

    for im in range(img_nums):
        if im % 10 == 0:
            print(f"{im} imgs were processed, total {img_nums}")

        img = Image.open(os.path.join(img_dir, img_list[im].strip() + '.jpg')).convert('RGB')

        img_size = img.size
        
        x = preprocess_block(img, test_scales, False)

        x = x.cuda()
        x = x.unsqueeze(0) # ** why use unsqueeze
        x = torch.autograd.Variable(x)
        loc_preds, cls_preds = model(x)
        print(cls_preds.shape)

        loc_preds = loc_preds.data.squeeze().type(torch.FloatTensor)
        cls_preds = cls_preds.data.squeeze(0).type(torch.FloatTensor)

        print(f"decode: {cls_preds.shape}")

        encoder = DataEncoder(test_scales)
        boxes, labels, sco, is_found = encoder.decode(loc_preds, cls_preds, test_scales)
        print(is_found)
        if is_found:
            img, boxes = resize(img, boxes, img_size)
            img_c = img.copy()
            img_c = np.array(img_c)

            boxes = boxes.ceil()
            xmin = boxes[:, 0].clamp(min=1)
            ymin = boxes[:, 1].clamp(min=1)
            xmax = boxes[:, 2].clamp(max=img_size[0] - 1)
            ymax = boxes[:, 3].clamp(max=img_size[1] - 1)

            nums = len(boxes)
            for i in range(nums):
                dic[str(labels[i].item())].append([img_list[im].strip(), sco[i].item(), xmin[i].item(), ymin[i].item(), xmax[i].item(), ymax[i].item()])

            if display:
                for i in range(nums):
                    cv2.rectangle(img_c, (int(xmin[i]), int(ymin[i])), (int(xmax[i]), int(ymax[i])), (255, 0, 0), 2)
                
                plt.figure(figsize=(10, 8))
                plt.imshow(img_c)
                plt.show()

In [41]:
test_model()

not checkpoint
You could use GPU for train model


TypeError: exceptions must derive from BaseException