In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
cd /content/drive/Shareddrives/Data/Internship_study_2/Detection/YOLOv3

/content/drive/Shareddrives/Data/Internship_study_2/Detection/YOLOv3


In [3]:
from __future__ import division

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import numpy as np

from utils import *

In [4]:
# parse_cfg 함수 정의하기, 구성 파일의 경로를 입력으로 받습니다.
def parse_cfg(cfgfile):
    '''
    configuration 파일을 입력으로 받습니다.
    
    blocks의 list를 반환합니다. 각 blocks는 신경망에서 구축되어지는 block을 의미합니다.
    block는 list안에 dictionary로 나타냅니다.
    '''
    # cfg 파일 전처리
    file = open(cfgfile, 'r')
    lines = file.read().split('\n')               # lines를 list로 저장합니다.
    lines = [x for x in lines if len(x) > 0]      # 빈 lines를 삭제합니다.
    lines = [x for x in lines if x[0] != '#']     # 주석을 삭제합니다.
    lines = [x.rstrip().lstrip() for x in lines]  # 공백을 제거합니다.

    # blocks를 얻기 위해 결과 list를 반복합니다.
    block = {}
    blocks = []

    for line in lines:
        if line[0] == '[':              # 새로운 block의 시작을 표시합니다.
            if len(block) != 0:         # block이 비어있지 않으면, 이전 block의 값을 저장합니다.
                blocks.append(block)    # 이것을 blocks list에 추가합니다.
                block = {}              # block을 초기화 합니다.
            block['type'] = line[1:-1].rstrip()
        else:
            key, value = line.split('=')
            block[key.rstrip()] = value.lstrip()
    blocks.append(block)

    return blocks

In [5]:
# EmptyLayer를 정의합니다.
# EmptyLayer는 residual unit의 shortcut과 FPN의 lateral connection 용도로 사용합니다.
class EmptyLayer(nn.Module):
    def __init__(self):
        super().__init__()

class DetectionLayer(nn.Module):
	def __init__(self,anchors):
		super(DetectionLayer, self).__init__()
		self.anchors = anchors

# nn.Module class를 사용하여 layers에 대한 module인 create_modules 함수를 정의합니다.
# 입력은 parse_cfg 함수에서 반환된 blocks를 취합니다.
def create_modules(blocks):
    net_info = blocks[0]    # 입력과 전처리에 대한 정보를 저장합니다.
    module_list = nn.ModuleList()
    prev_filters = 3    # input image : RGB 3 channels
    output_filters = []

    for index, x in enumerate(blocks[1:]):
        module = nn.Sequential()
        
        # block의 type을 확인합니다.
        # block에 대한 새로운 module을 생성합니다.
        # module_list에 append 합니다.

        # conv layer
        if (x['type'] == 'convolutional'):
            # layer에 대한 정보를 얻습니다.
            try:
                batch_normalize = int(x['batch_normalize'])
                bias = False
            except:
                batch_normalize = 0
                bias = True
			
            filters = int(x['filters'])
            padding = int(x['pad'])
            kernel_size = int(x['size'])
            stride = int(x['stride'])
            activation = str(x['activation'])
			
            if padding:
                pad = (kernel_size - 1) // 2
            else:
                pad = 0
			
            # convolutional layer를 추가합니다.
            try:
                conv = nn.Conv2d(prev_filters, filters, kernel_size, stride, pad, bias = bias)
            except TypeError:
                print(index)

            module.add_module('conv_{0}'.format(index), conv)
            
            # Batch Norm Layer를 추가합니다.
            if batch_normalize:
                bn = nn.BatchNorm2d(filters)
                module.add_module('batch_norm_{0}'.format(index), bn)
                
            # activation을 확인합니다.
            # YOLO에서 Leaky ReLU 또는 Linear 입니다.
            if activation == 'leaky':
                activn = nn.LeakyReLU(0.1, inplace = True)
                module.add_module('leaky_{0}'.format(index), activn)
		
        # upsampling layer
        # Bilinear2dUpsampling을 사용합니다.
        elif (x['type'] == 'upsample'):
            stride = int(x['stride'])
            upsample = nn.Upsample(scale_factor=stride, mode='bilinear')
            #upsample = nn.Upsample(scale_factor = 2, mode = 'bilinear')
            module.add_module('upsample_{}'.format(index), upsample)

        # route layer
        elif (x['type'] == 'route'):
            #x['layers'] = x['layers'].split(',')
            #x['layers'] = x['layers'][0].split(',')
            x['layers'] = [int(x) for x in x['layers'].split(',')]

            # route 시작
            start = int(x['layers'][0])
            # 1개만 존재하면 종료
            try:
                end = int(x['layers'][1])
            except:
                end = 0
            # 양수인 경우
            if start > 0:
                start = start - index
            if end > 0:
                end = end - index
            route = EmptyLayer()
            module.add_module('route_{0}'.format(index), route)
            # 음수 인 경우
            if end < 0:
                filters = output_filters[index + start] + output_filters[index + end]
            else:
                filters = output_filters[index + start]
            
        # skip connection에 해당하는 shortcut
        elif x['type'] == 'shortcut':
            shortcut = EmptyLayer()
            module.add_module('shortcut_{}'.format(index), shortcut)

        # YOLO는 detection layer입니다.
        elif x['type'] == 'yolo':
            mask = x['mask'].split(',')
            mask = [int(x) for x in mask]
            
            anchors = x['anchors'].split(',')
            anchors = [int(a) for a in anchors]
            anchors = [(anchors[i], anchors[i+1]) for i in range(0, len(anchors),2)]
            anchors = [anchors[i] for i in mask]
            
            detection = DetectionLayer(anchors)
            module.add_module('Detection_{}'.format(index), detection)

        module_list.append(module)
        prev_filters = filters
        output_filters.append(filters)
  
    return (net_info, module_list)

In [6]:
####################### test ############################
blocks = parse_cfg('cfg/yolov3.cfg')
print(create_modules(blocks))
#########################################################

({'type': 'net', 'batch': '64', 'subdivisions': '16', 'width': '608', 'height': '608', 'channels': '3', 'momentum': '0.9', 'decay': '0.0005', 'angle': '0', 'saturation': '1.5', 'exposure': '1.5', 'hue': '.1', 'learning_rate': '0.001', 'burn_in': '1000', 'max_batches': '500200', 'policy': 'steps', 'steps': '400000,450000', 'scales': '.1,.1'}, ModuleList(
  (0): Sequential(
    (conv_0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (batch_norm_0): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (leaky_0): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (1): Sequential(
    (conv_1): Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
    (batch_norm_1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (leaky_1): LeakyReLU(negative_slope=0.1, inplace=True)
  )
  (2): Sequential(
    (conv_2): Conv2d(64, 32, kernel_size=(1, 1), stride=(1, 1), bias=False)
  

# **Define Neural Network**

In [7]:
def predict_transform_ttt(prediction, inp_dim, anchors, num_classes, CUDA = True):
    inp_dim = 416   # yolov3.cfg file의 [net]의 width, height를 입력크기(416, 416)에 맞춰 바꿔주어야 한다.
    batch_size = prediction.size(0)
    stride = inp_dim // prediction.size(2)
    grid_size = inp_dim // stride
    bbox_attrs = 5 + num_classes
    num_anchors = len(anchors)
    
    prediction = prediction.view(batch_size, bbox_attrs * num_anchors, grid_size * grid_size)
    prediction = prediction.transpose(1,2).contiguous()
    prediction = prediction.view(batch_size, grid_size * grid_size * num_anchors, bbox_attrs)
    anchors = [(a[0]/stride, a[1]/stride) for a in anchors] # a[0]: [net] height, a[1]: [net] width

    # prediction[:, :, 0:5] = x, y, W, H, c(obj conf)
    # 중심 x,y 좌표와 object confidence를 SIgmoid 합니다.
    prediction[:,:,0] = torch.sigmoid(prediction[:,:,0])
    prediction[:,:,1] = torch.sigmoid(prediction[:,:,1])
    prediction[:,:,4] = torch.sigmoid(prediction[:,:,4])

    # 높이와 넓이를 log space 변환합니다.
    anchors = torch.FloatTensor(anchors)
    
    if CUDA:
        anchors = anchors.cuda()
        prediction = prediction.cuda()
        
    anchors = anchors.repeat(grid_size * grid_size, 1).unsqueeze(0)
    prediction[:,:,2:4] = torch.exp(prediction[:,:,2:4] * anchors)

    # class score에 sigmoid activation을 적용합니다.
    prediction[:,:,5: 5 + num_classes] = torch.sigmoid((prediction[:,:,5 : 5 + num_classes]))

    # detection map을 입력 이미지의 크기로 resize 합니다.
    # if input.shape = (416, 416) and detection_map.shape = (13, 13), then stride = 32
    prediction[:,:,:4] *= stride

    return prediction

In [20]:
class Darknet(nn.Module):
    def __init__(self, cfgfile):
        super(Darknet, self).__init__()
        self.blocks = parse_cfg(cfgfile)
        self.net_info, self.module_list = create_modules(self.blocks)

    def forward(self, x, CUDA):
        modules = self.blocks[1:]   # self.blocks[0]은 net block
        outputs = {}    # route layer에 대한 출력값을 저장합니다.
        write = 0   # collector는 초기화되지 않았다.
        '''
        하지만 우리는 empty tensor를 초기화 할 수 없으므로, non-empty(다른 형태의) tensor를 이것으로 연결해야 합니다. 
        그래서 첫 번째 detection map을 얻을 때 까지 collector(detection을 지닌 tensor)의 초기화를 지연시킵니다.
        그리고나서 연속적인 detection을 얻을 때, 이것을 map으로 연결시킵니다.

        write flag는 첫 번째 detection을 얻었는지 아닌지를 나타내는데 사용됩니다. 
        만약 write = 1이면, collector는 초기화 된 것을 의미하고 detection map을 이것으로 연결 시킬 수 있습니다.
        '''

        for i, module in enumerate(modules):
            module_type = (module['type'])

            if module_type == 'convolutional' or module_type == 'upsample': # 순전파 작동
                x = self.module_list[i](x)

            elif module_type == 'route':
                layers = module['layers']
                layers = [int(a) for a in layers]
                
                if (layers[0]) > 0:
                    layers[0] = layers[0] - i   # route layer 앞의 layer와의 연결

                if len(layers) == 1:    # [route] block의 layers = x
                    x = outputs[i + (layers[0])]
                else:                   # [route] block의 layers = x, y
                    if (layers[1]) > 0: 
                        layers[1] = layers[1] - i   # route layer 앞의 layer와의 연결
                        
                    map1 = outputs[i + layers[0]]
                    map2 = outputs[i + layers[1]]
                
                    x = torch.cat((map1, map2), 1)

            elif module_type == 'shortcut':
                from_ = int(module['from']) # yolov3.cfg 의 from은 모두 -3임.
                x = outputs[i-1] + outputs[i+from_]

            elif module_type == 'yolo':
                anchors = self.module_list[i][0].anchors

                # 입력 차원, 클래스의 수를 얻습니다.
                inp_dim = int(self.net_info['height'])
                num_classes = int(module['classes'])

                # util.py에 있는 predict_transform 함수를 이용하여 변환
                x = x.data
                x = predict_transform_ttt(x, inp_dim, anchors, num_classes, CUDA)

                if not write:    # collector가 초기화 되지 않은 경우
                    detections = x
                    write = 1
                else:
                    detections = torch.cat((detections, x), 1)

            outputs[i] = x

        return detections
    
    def load_weights(self, weightfile):
        
        # Open the weights file
        fp = open(weightfile, "rb")

        # The first 4 values are header information 
        # 1. Major version number
        # 2. Minor Version Number
        # 3. Subversion number 
        # 4. IMages seen 
        header = np.fromfile(fp, dtype = np.int32, count = 5)
        self.header = torch.from_numpy(header)
        self.seen = self.header[3]
        
        # The rest of the values are the weights
        # Let's load them up
        weights = np.fromfile(fp, dtype = np.float32)
        
        ptr = 0 # weight배열에서 현재 위치를 추적
        for i in range(len(self.module_list)):
            module_type = self.blocks[i + 1]["type"]
            
            if module_type == "convolutional":
                model = self.module_list[i]
                try:
                    batch_normalize = int(self.blocks[i+1]["batch_normalize"])
                except:
                    batch_normalize = 0
                
                conv = model[0]
                
                if (batch_normalize):
                    bn = model[1]
                    
                    #Get the number of weights of Batch Norm Layer
                    num_bn_biases = bn.bias.numel()
                    
                    #Load the weights
                    bn_biases = torch.from_numpy(weights[ptr:ptr + num_bn_biases])
                    ptr += num_bn_biases
                    
                    bn_weights = torch.from_numpy(weights[ptr: ptr + num_bn_biases])
                    ptr  += num_bn_biases
                    
                    bn_running_mean = torch.from_numpy(weights[ptr: ptr + num_bn_biases])
                    ptr  += num_bn_biases
                    
                    bn_running_var = torch.from_numpy(weights[ptr: ptr + num_bn_biases])
                    ptr  += num_bn_biases
                    
                    #Cast the loaded weights into dims of model weights. 
                    bn_biases = bn_biases.view_as(bn.bias.data)
                    bn_weights = bn_weights.view_as(bn.weight.data)
                    bn_running_mean = bn_running_mean.view_as(bn.running_mean)
                    bn_running_var = bn_running_var.view_as(bn.running_var)

                    #Copy the data to model
                    bn.bias.data.copy_(bn_biases)
                    bn.weight.data.copy_(bn_weights)
                    bn.running_mean.copy_(bn_running_mean)
                    bn.running_var.copy_(bn_running_var)
                
                else:
                    #Number of biases
                    num_biases = conv.bias.numel()
                
                    #Load the weights
                    conv_biases = torch.from_numpy(weights[ptr: ptr + num_biases])
                    ptr = ptr + num_biases
                    
                    #reshape the loaded weights according to the dims of the model weights
                    conv_biases = conv_biases.view_as(conv.bias.data)
                    
                    #Finally copy the data
                    conv.bias.data.copy_(conv_biases)
                    
                #Let us load the weights for the Convolutional layers
                num_weights = conv.weight.numel()
                
                #Do the same as above for weights
                conv_weights = torch.from_numpy(weights[ptr:ptr+num_weights])
                ptr = ptr + num_weights

                conv_weights = conv_weights.view_as(conv.weight.data)
                conv.weight.data.copy_(conv_weights)

### Test Forward

In [9]:
def get_test_input():
    img = cv2.imread('dog-cycle-car.png')
    img = cv2.resize(img, (416,416))        # 입력 크기로 resize 합니다.
    img_ = img[:,:,::-1].transpose((2,0,1)) # BGR - > RGB, HxWxC -> CxHxW
    img_ = img_[np.newaxis,:,:,:]/255.0     # (배치를 위한) 0 채널 추가하고 normalize 합니다.
    img_ = torch.from_numpy(img_).float()   # 정수로 변환합니다.
    img_ = Variable(img_)                   # 변수로 변환합니다.
    return img_

In [10]:
model = Darknet("cfg/yolov3.cfg")
inp = get_test_input()
pred = model(inp, torch.cuda.is_available())
print(pred)
print(pred.shape)   # 10647 = (13x13 + 26x26 + 52x52) x 3 (각 grid별 3개의 anchor박스가 각 좌표별로 존재)

tensor([[[ 17.2335,  16.3838,  15.8209,  ...,   0.4711,   0.5091,   0.4777],
         [ 12.6092,  15.7899,  60.9297,  ...,   0.7389,   0.5342,   0.4830],
         [ 12.9106,  16.5587, 115.1949,  ...,   0.5300,   0.5721,   0.3856],
         ...,
         [  3.9679,   3.3897,  11.7663,  ...,   0.4462,   0.5517,   0.6189],
         [  3.3151,   3.7653,   5.1571,  ...,   0.4560,   0.5629,   0.4415],
         [  4.4027,   4.0126,   6.4384,  ...,   0.6028,   0.4403,   0.4782]]],
       device='cuda:0')
torch.Size([1, 10647, 85])


# **Load Weights**

In [21]:
model = Darknet("cfg/yolov3.cfg")
model.load_weights("yolov3.weights")