In [24]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

import numpy as np

In [25]:
class EmptyLayer(nn.Module):
    def __init__(self):
        super().__init__()

class DetectionLayer(nn.Module):
    def __init__(self, anchors):
        super().__init__()
        self.anchors = anchors

In [26]:
def read_cfg(filename):
    with open(filename, 'r') as f:
        lines = f.readlines()

    lines: list[str] = list(map(lambda x: x.strip(), lines))
    lines = [line for line in lines if len(line) > 0 and line[0] != '#']

    blocks = []
    for line in lines:
        if line.startswith('['):
            blocks.append({})
            blocks[-1]['type'] = line.strip('[]')
        else:
            key, value = map(lambda x: x.strip(), line.split('='))

            try:
                value = float(value)
            except Exception:
                pass
            else:
                value = int(value) if float(value).is_integer() else float(value)

            blocks[-1][key] = value

    return blocks

In [27]:
def create_modules(blocks: list[dict[str, str | int | float]]):
    net = blocks[0]
    del blocks[0]

    prev_filters = 3
    filters = 3

    out_filters = []

    modules = nn.ModuleList()
    for i, block in enumerate(blocks):
        module = nn.Sequential()
        block_type = block['type']

        if block_type == 'convolutional':
            batch_normalize = block.get('batch_normalize', 0)
            is_bias = not batch_normalize
            filters = block['filters']
            kernel_size = block['size']
            stride = block['stride']
            padding = block['pad']
            activation = block['activation']

            module.add_module(
                f"conv_{i}",
                nn.Conv2d(prev_filters, filters, kernel_size, stride, padding, bias=is_bias)
            )

            if batch_normalize:
                bn = nn.BatchNorm2d(filters)
                module.add_module(f"batch_norm_{i}", bn)

            if activation == 'leaky':
                activation = nn.LeakyReLU(0.1, inplace=True)
                module.add_module(f"leaky_{i}", activation)
        elif block_type == 'upsample':
            stride = block['stride']

            module.add_module(
                f"upsample_{i}",
                nn.Upsample(scale_factor=stride, mode='bilinear')
            )
        elif block_type == 'route':
            start, end = block['layers'], 0
            if isinstance(start, str):
                _split = map(int, start.split(','))
                start, end = _split

            if start > 0:
                start -= i
            if end > 0:
                end -= i

            module.add_module(f"route_{i}", EmptyLayer())

            if end < 0:
                filters = out_filters[i + start] + out_filters[i + end]
            else:
                filters = out_filters[i + start]
        elif block_type == 'shortcut':
            module.add_module(
                f"shortcut_{i}", EmptyLayer()
            )
        elif block_type == 'yolo':
            mask = list(map(int, block['mask'].split(',')))
            anchors = list(map(int, block['anchors'].split(',')))
            anchors = [(anchors[i], anchors[i + 1]) for i in range(0, len(anchors), 2)]
            anchors = [anchors[i] for i in mask]
            detection = DetectionLayer(anchors)

            module.add_module(f"detection_{i}", detection)

        modules.append(module)
        prev_filters = filters
        out_filters.append(prev_filters)

    return net, modules

In [None]:
def predict_transform(prediction, input_dim, anchors, num_classes, device = torch.device('cpu')):
    batch_size = prediction.size(0)
    stride = input_dim // prediction.size(2)
    grid_size = input_dim // stride
    bbox_attrs = 5 + num_classes
    num_anchors = len(anchors)
    
    prediction = prediction.view(batch_size, bbox_attrs * num_anchors, grid_size * grid_size)
    prediction = prediction.transpose(1, 2).contiguous()
    prediction = prediction.view(batch_size, grid_size * grid_size * num_anchors, bbox_attrs)
    
    anchors = [(a[0] / stride, a[1] / stride) for a in anchors]
    
    prediction[..., ..., 0] = torch.sigmoid(prediction[..., ..., 0])
    prediction[..., ..., 1] = torch.sigmoid(prediction[..., ..., 1])
    prediction[..., ..., 4] = torch.sigmoid(prediction[..., ..., 4])
    
    grid = np.arange(grid_size)
    x, y = np.meshgrid(grid, grid)
    
    x_offset = torch.FloatTensor(x).view(-1, 1).device(device)
    y_offset = torch.FloatTensor(y).view(-1, 1).device(device)
    x_y_offset = torch.cat((x_offset, y_offset), 1).repeat(1, num_anchors).view(-1, 2).unsqueeze(0)
    
    prediction[..., ..., :2] += x_y_offset
    
    anchors = torch.FloatTensor(anchors).device(device)
    anchors = anchors.repeat(grid_size * grid_size, 1).unsqueeze(0)
    
    prediction[..., ..., 2:4] = torch.exp(prediction[..., ..., 2:4]) * anchors
    prediction[..., ..., 5:5+num_classes] = torch.sigmoid(prediction[..., ..., 5:5+num_classes])
    
    prediction[..., ..., :4] *= stride
    
    return prediction

In [30]:
blocks = read_cfg('detector/yolov3.cfg')
create_modules(blocks)

({'type': 'net',
  'batch': 64,
  'subdivisions': 16,
  'width': 608,
  'height': 608,
  'channels': 3,
  'momentum': 0.9,
  'decay': 0.0005,
  'angle': 0,
  'saturation': 1.5,
  'exposure': 1.5,
  'hue': 0.1,
  'learning_rate': 0.001,
  'burn_in': 1000,
  'max_batches': 500200,
  'policy': 'steps',
  'steps': '400000,450000',
  'scales': '.1,.1'},
 ModuleList(
   (0): Sequential(
     (conv_0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
     (batch_norm_0): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (leaky_0): LeakyReLU(negative_slope=0.1, inplace=True)
   )
   (1): Sequential(
     (conv_1): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
     (batch_norm_1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
     (leaky_1): LeakyReLU(negative_slope=0.1, inplace=True)
   )
   (2): Sequential(
     (conv_2): Conv2d(64, 32, kernel_size=(1, 1), stride=(1

In [None]:
class Darknet(nn.Module):
    def __init__(self, filename):
        super().__init__()
        self.blocks = read_cfg(filename)
        self.net, self.modules = create_modules(self.blocks)
    
    def forward(self, x, cuda):
        blocks = self.blocks
        output = {}
        
        write = False
        detections = None
        for i, block in enumerate(blocks):
            block_type = block['type']
            
            if block_type == 'convolutional' or block_type == 'upsample':
                x = self.modules[i](x)
            elif block_type == 'route':
                layers = list(map(int, block['layers']))
                
                if layers[0] > 0:
                    layers[0] -= i
                
                if len(layers) == 1:
                    x = output[i + layers[0]]
                else:
                    if layers[1] > 0:
                        layers[1] -= i
                    
                    map1 = output[i + layers[0]]
                    map2 = output[i + layers[1]]
                    
                    x = torch.cat((map1, map2), 1)
            elif block_type == 'shortcut':
                from_ = int(block['from'])
                x = output[i - 1] + output[i + from_]
            elif block_type == 'yolo':
                anchors = self.modules[i][0].anchors
                inp_dim = int(blocks[0]['height'])
                num_classes = int(block['classes'])
                
                x = x.data
                x = predict_transform(x, inp_dim, anchors, num_classes, cuda)
                
                if not write:
                    detections = x
                    write = True
                else:
                    detections = torch.cat((detections, x), 1)
                
        return detections