In [104]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import torchvision.transforms as transforms
import torchvision.transforms.functional as FT


import os
import numpy as np
import pandas as pd
from tqdm import tqdm

# 이미지 불러오기 및 그리기
from PIL import Image
import matplotlib.pyplot as plt
import matplotlib.patches as patches

In [105]:
# ipynb 파일 import 하기.
import import_ipynb

# utils.ipynb에 있는 함수 가지고 오기.
from utils import (
    non_max_suppression,
    mean_average_precision,
    intersection_over_union,
    cellboxes_to_boxes,
    get_bboxes,
    plot_image,
    save_checkpoint,
    load_checkpoint,
)

## 변수 선언

In [113]:
ori_dir = './'
img_dir = ori_dir + 'image/'
label_dir = ori_dir + 'label_txt/'

train_csv = ori_dir + 'png_txt.csv'

img_size = 416
S = 7   # grid cell w,h크기
C = 4

classes = [ "AC", "FL", "HC", "HUM" ]

In [114]:
'''
에러 내용 : TypeError: __call__() takes 2 positional arguments but 3 were given. self.
https://stackoverflow.com/questions/62341052/typeerror-call-takes-2-positional-arguments-but-3-were-given-to-train-ra

해결
아래 있는 Compose 함수를 호출해서 transform을 하니 해결이 됨.
'''
class Compose(object):
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, img, bboxes):
        for t in self.transforms:
            img, bboxes = t(img), bboxes

        return img, bboxes

transform = Compose([transforms.Resize((448, 448)), transforms.ToTensor(),])

In [107]:
seed = 123
torch.manual_seed(seed)

# Hyperparameters etc. 
LEARNING_RATE = 2e-5
DEVICE = "cuda" if torch.cuda.is_available else "cpu"
BATCH_SIZE = 16 # 64 in original paper but I don't have that much vram, grad accum?
WEIGHT_DECAY = 0
EPOCHS = 3
NUM_WORKERS = 2
PIN_MEMORY = True
LOAD_MODEL = False
LOAD_MODEL_FILE = "overfit.pth.tar"

# bbox 좌표 시각화

In [46]:
import cv2

df = pd.read_csv(train_csv)

info_csv = ori_dir + 'result_center.csv'
info = pd.read_csv(info_csv)

In [47]:
df.head()

Unnamed: 0,img_png,img_txt
0,20151103_E0000056_I0004613.png,20151103_E0000056_I0004613.txt
1,20151103_E0000057_I0004695.png,20151103_E0000057_I0004695.txt
2,20151103_E0000057_I0004696.png,20151103_E0000057_I0004696.txt
3,20151103_E0000058_I0004790.png,20151103_E0000058_I0004790.txt
4,20151103_E0000059_I0004888.png,20151103_E0000059_I0004888.txt


In [48]:
info.head()

Unnamed: 0,img_name,class,center_x,center_y,w,h
0,20151103_E0000056_I0004613.png,0,0.4629,0.4824,0.2559,0.5312
1,20151103_E0000057_I0004695.png,0,0.562,0.415,0.2393,0.4785
2,20151103_E0000057_I0004696.png,0,0.5474,0.3975,0.2275,0.4785
3,20151103_E0000058_I0004790.png,0,0.4785,0.6377,0.1719,0.3574
4,20151103_E0000059_I0004888.png,0,0.4883,0.5078,0.1934,0.3867


# bbox가 이미지에서 잘 위치한지 확인

In [8]:
def draw_bbox(img_file, boxes):
    # 이미지를 로드합니다.
    image = cv2.imread(img_file)
    
    # 바운딩 박스 좌표를 추출합니다.
    c, x, y, w, h = boxes
    x1, y1 = int((x - w/2) * 1024) , int((y - h/2) * 512)
    x2, y2 = int((x + w/2) * 1024) , int((y + h/2) * 512)

    # 바운딩 박스를 시각화합니다.
    cv2.rectangle(image, (x1, y1), (x2, y2), (0, 0, 255), 2)
    # center 좌표 보기.
    #cv2.line(image, (x, y), (x, y), (0, 0, 255), 3)
    # 해당 이미지 class 확인
    cv2.putText(image, str(c), (x2, y1), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255,0), 2)
    
    
    # 시각화된 이미지를 보여줍니다.
    cv2.imshow('Bounding Boxes', image)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

In [9]:
img_file = img_dir + df['img_png'][0]
boxes = info['class'][0] , info['center_x'][0] , info['center_y'][0] , info['w'][0] , info['h'][0]

draw_bbox(img_file , boxes)

# Dataset 불러오기

In [115]:
class VOCDataset(torch.utils.data.Dataset):
    
    def __init__(
        self, csv_file, img_dir, label_dir, S=7, B=2, C=4, transform=None,
    ):
        self.annotations = pd.read_csv(csv_file)
        self.img_dir = img_dir
        self.label_dir = label_dir
        self.transform = transform
        self.S = S
        self.B = B
        self.C = C

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        # 이미지 객체 정보 가져오기.
        label_path = os.path.join(self.label_dir, self.annotations.iloc[index, 1])
        boxes = []
        
        with open(label_path) as f:
            for label in f.readlines():
                # 이 순서대로 txt파일에 저장되어있음.
                class_label, x, y, width, height = [
                    float(x) if float(x) != int(float(x)) else int(x)
                    for x in label.replace("\n", "").split()
                ]
                
                # boxes 변수에 이미지 정보 한번에 배열로 저장하기.
                boxes.append([class_label, x, y, width, height])
                
                
        # 이미지 파일 불러오기.
        img_path = os.path.join(self.img_dir, self.annotations.iloc[index, 0])
        image = Image.open(img_path)
        
        # 이미지 정보를 tensor로 변환해주기.
        boxes = torch.tensor(boxes)

        if self.transform:
            '''
            image = self.transform(image)
            boxes = self.transform(boxes)
            '''
            image, boxes = self.transform(image, boxes)

        # ====================================================== 
        # ENCODING
        # 이미지를 SxS그리드로 나누고, feature map의 tensor는 S x S x (B*5 + C). 5 : x, y, w, h, confidence
        label_matrix = torch.zeros((self.S, self.S, self.C + 5 * self.B))
        for box in boxes:
            class_label, x, y, width, height = box.tolist()
            class_label = int(class_label)

            # 객체가 속한 grid cell의 행(=i)과 열(=j)을 계산한다.
            i, j = int(self.S * y), int(self.S * x)
            # cell 내에서 객체의 상대적인 좌표 계산.
            x_cell, y_cell = self.S * x - j, self.S * y - i
            
            """
            cell 기준으로 bbox cell의 너비와 높이 계산.
            
            width_pixels = (width*self.image_width)
            cell_pixels = (self.image_width)
            
            cell의 상대적인 너비를 찾는 방법 : width_pixels/cell_pixels
            """
            width_cell, height_cell = (
                width * self.S,
                height * self.S,
            )

            # label_matrix가 현재 모든 값이 0인 상태이다.
            # 이때 위에서 구한 grid cell에서의 객체 좌표에 대한 정보를 넣을 것이다.
            # 좌표가 i, j일 때 해당 값이 0이라면 1값을 넣어준다.
            # 13 : 객체가 존재하는지를 나타내는 index
            if label_matrix[i, j, 4] == 0:
                # 객체가 있다면 1을 넣어준다.
                label_matrix[i, j, 4] = 1

                # box 좌표
                # cell 내에서의 bbox 좌표 및 너비를 텐서로 전환 후 저장.
                box_coordinates = torch.tensor([
                    x_cell, y_cell,
                    min(width_cell, self.S - 1),
                    min(height_cell, self.S - 1)
                ])
                # 14 ~ 23번째 index에 값 저장.
                label_matrix[i, j, 5:9] = box_coordinates

                # class_label에 대해 one-hot encoding 해주기.
                # 값이 있는 cell을 1로 저장.
                label_matrix[i, j, class_label] = 1
                
        return image, label_matrix

# yolov1 network architecture

In [116]:
""" 
Information about architecture config:
Tuple is structured : (kernel_size, filters, stride, padding) 
"M" is simply maxpooling with stride 2x2 and kernel 2x2
List is structured by tuples and lastly int with number of repeats
"""

architecture_config = [
    (7, 64, 2, 3),
    "M",
    
    (3, 192, 1, 1),
    "M",
    
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],   # -> conv 4번 반복
    (1, 512, 1, 0),
    (3, 1024, 1, 1),
    "M",
    
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],  # -> conv 2번 반복
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
]

### CNN block

In [117]:
class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(CNNBlock, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, bias=False, **kwargs)
        self.batchnorm = nn.BatchNorm2d(out_channels)
        self.leakyrelu = nn.LeakyReLU(0.1)

    def forward(self, x):
        return self.leakyrelu(self.batchnorm(self.conv(x)))

# Yolov1 모델

In [118]:
class Yolov1(nn.Module):
    
    def __init__(self, in_channels=3, **kwargs):
        super(Yolov1, self).__init__()
        self.architecture = architecture_config
        self.in_channels = in_channels
        self.darknet = self._create_conv_layers(self.architecture)
        self.fcs = self._create_fcs(**kwargs)

    def forward(self, x):
        x = self.darknet(x)
        return self.fcs(torch.flatten(x, start_dim=1))

    def _create_conv_layers(self, architecture):
        layers = []
        in_channels = self.in_channels

        for x in architecture:
            # arch_config에서 값이 tuple인 경우 : (7, 64, 2, 3)
            if type(x) == tuple:
                layers += [
                    CNNBlock(
                        in_channels, x[1], kernel_size=x[0], stride=x[2], padding=x[3],
                    )
                ]
                in_channels = x[1]
                
            # arch_config에서 값이 string인 경우 : 'M'
            # -> maxpooling
            elif type(x) == str:
                layers += [nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))]
                
            # arch_config에서 값이 list인 경우 : [(1, 256, 1, 0), (3, 512, 1, 1), 4]
            elif type(x) == list:
                conv1 = x[0]
                conv2 = x[1]
                num_repeats = x[2]
                
                # 2번 혹은 4번 돌아감.
                for _ in range(num_repeats):
                    
                    # 1x1 conv인 경우
                    layers += [
                        CNNBlock(
                            in_channels,
                            conv1[1],
                            kernel_size=conv1[0],
                            stride=conv1[2],
                            padding=conv1[3],
                        )
                    ]
                    
                    # 3x3 conv인 경우
                    layers += [
                        CNNBlock(
                            conv1[1],
                            conv2[1],
                            kernel_size=conv2[0],
                            stride=conv2[2],
                            padding=conv2[3],
                        )
                    ]
                    in_channels = conv2[1]

        return nn.Sequential(*layers)

    def _create_fcs(self, split_size, num_boxes, num_classes):
        S, B, C = split_size, num_boxes, num_classes

        # In original paper this should be
        # nn.Linear(1024*S*S, 4096),
        # nn.LeakyReLU(0.1),
        # nn.Linear(4096, S*S*(B*5+C))

        return nn.Sequential(
            nn.Flatten(),
            nn.Linear(1024 * S * S, 496),
            nn.Dropout(0.0),
            nn.LeakyReLU(0.1),
            nn.Linear(496, S * S * (C + B * 5)),
        )

In [119]:
model = Yolov1(split_size=7, num_boxes=2, num_classes=4)

In [14]:
model

Yolov1(
  (darknet): Sequential(
    (0): CNNBlock(
      (conv): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (batchnorm): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (leakyrelu): LeakyReLU(negative_slope=0.1)
    )
    (1): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (2): CNNBlock(
      (conv): Conv2d(64, 192, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (batchnorm): BatchNorm2d(192, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (leakyrelu): LeakyReLU(negative_slope=0.1)
    )
    (3): MaxPool2d(kernel_size=(2, 2), stride=(2, 2), padding=0, dilation=1, ceil_mode=False)
    (4): CNNBlock(
      (conv): Conv2d(192, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (batchnorm): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (leakyrelu): LeakyReLU(negative_slope=0.1)
  

# LOSS

In [120]:
class YoloLoss(nn.Module):
    
    def __init__(self, S=7, B=2, C=4):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")

        """
        S is split size of image (in paper 7),
        B is number of boxes (in paper 2),
        C is number of classes (in paper and VOC dataset is 4),
        """
        self.S = S
        self.B = B
        self.C = C

        # These are from Yolo paper, signifying how much we should
        # pay loss for no object (noobj) and the box coordinates (coord)
        self.lambda_noobj = 0.5
        self.lambda_coord = 5   # 상자 좌표
        
    # 손실함수 계산 시작
    def forward(self, predictions, target):
        # predictions : 모델의 출력으로 받은 예측 값.
        # target : 실제 target 값. 모델이 예측하려는 bbox와 클래스 정보 포함.
        
        # grid cell 형태로 예측값들을 다시 배치하기 위해서 pred를 재구조화한다.
        '''
        predictions 입력 시 shape : (BATCH_SIZE, S, S, (C+B*5))
        
        [..., :C] = 각 클래스에 대한 확률  /  [..., C:C+4] : 첫 번째 bbox에 대한 좌표 및 너비(x, y, w, h)
        [..., C+4:C+5] : 첫 번째 bbox에 대한 confidence score
        
        [..., C+6:C+9] : 두 번째 bbox에 대한 좌표 및 너비(x, y, w, h)
        [..., C+9:C+10] : 두 번째 bbox에 대한 confidence score
        
        target 입력 시 shape : (BATCH_SIZE, S, S, (C+B*5))
        [..., :4] : 각 target bbox 실제 좌표 및 너비(x, y, w, h)
        [..., 4:5] : 각 target bbox에 대한 존재 여부 (1=객체있음 , 0=객체없음)
        [..., 5:C+5] : 각 target bbox에 대한 class 정보로 one-hot encoding된 벡터.
                       ex) 만약 class=2 -> [0, 1, 0, 0]으로 표현
        '''
        
        predictions = predictions.reshape(-1, self.S, self.S, self.C + self.B * 5)
        print('predictions', predictions)
        
        # intersection_over_union -> utils.ipybn 파일에 있는 함수.
        # 해당 함수를 사용해 target bbox로 예측된 두 개의 bbox에 대한 IoU를 계산한다.
        # prediction에서 첫번째 bbox 좌표
        iou_b1 = intersection_over_union(predictions[..., (self.C+1):(self.C+5)], target[..., (self.C+1):(self.C+5)])
        # prediction에서 두번째 bbox 좌표
        iou_b2 = intersection_over_union(predictions[..., (self.C+6):(self.C+self.B * 5)], target[..., (self.C+1):(self.C+5)])
        ious = torch.cat([iou_b1.unsqueeze(0), iou_b2.unsqueeze(0)], dim=0)
        print('iou_b1', iou_b1)
        print('iou_b2', iou_b2)
        print('ious', ious)
        
        # ious에서 두 예측 중에서 IoU가 가장 높은 상자를 선택
        # 해당 값은 변수 bestbox에 저장한다. 이때 
        # bestbox는 어떤 box가 가장 좋은지에 대해 0, 1의 인덱스로 저장된다.
        iou_maxes, bestbox = torch.max(ious, dim=0)
        # exists_box : target의 마지막 차원에서 bbox 존재 여부 나타내는 값 저장.
        exists_box = target[..., 4].unsqueeze(3)  # in paper this is Iobj_i

        
        # ======================== #
        #   FOR BOX COORDINATES    #
        # ======================== #

        # object 없는 상자를 0으로 설정.
        # 두 prediction 중 이전에 계산 된 IoU에서 가장 높은 예측 중 하나만 꺼낸다.
        box_predictions = exists_box * (
            (
                bestbox * predictions[..., (self.C+6):(self.C+self.B * 5)]
                + (1 - bestbox) * predictions[..., (self.C+1):(self.C+5)]
            )
        )

        box_targets = exists_box * target[..., (self.C+1):(self.C+5)]

        # =====================================================================
        # box predictions의 tensor가 어떻게 구성되어있는지 확인하고 index 수정.
        
        # Take sqrt of width, height of boxes to ensure that
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(
            torch.abs(box_predictions[..., B:C] + 1e-6)
        )
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])
        
        # IoU기준으로 선택된 bbox 사용해 pred_bbox와 실제 bbox 간의 좌표 손실 계산.
        box_loss = self.mse(
            torch.flatten(box_predictions, end_dim=-2),
            torch.flatten(box_targets, end_dim=-2),
        )

        
        # ==================== #
        #   FOR OBJECT LOSS    #
        # ==================== #

        # pred_box : IoU가 가장 높은 bbox의 신뢰도 점수
        pred_box = (
            bestbox * predictions[..., (self.C+5):(self.C+6)] + (1 - bestbox) * predictions[..., (self.C):(self.C+1)]
        )
        print('prediction box : ', pred_box)
        # IoU기준으로 선택된 bbox에 해당하는 신뢰도 점수 사용하여 계산.
        object_loss = self.mse(
            torch.flatten(exists_box * pred_box),
            torch.flatten(exists_box * target[..., (self.C):(self.C+1)]), # --> ?
        )

        
        # ======================= #
        #   FOR NO OBJECT LOSS    #
        # ======================= #

        # max_no_obj = torch.max(predictions[..., 20:21], predictions[..., 25:26])
        # no_object_loss = self.mse(
        #    torch.flatten((1 - exists_box) * max_no_obj, start_dim=1),
        #    torch.flatten((1 - exists_box) * target[..., 20:21], start_dim=1),
        #)

        # 비객체 손실 계산 : 객체가 없는 grid cell에 대한 손실 계산.
        # -> 신뢰도 점수에 대한 loss이고, 모델이 객체가 없는 위치에 대한 신뢰도를 낮출 수 있게 도와준다.
        no_object_loss = self.mse(
            torch.flatten((1 - exists_box) * predictions[..., (self.C):(self.C+1)], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 4:5], start_dim=1)  # --> ?
        )
        
        no_object_loss += self.mse(
            torch.flatten((1 - exists_box) * predictions[..., (self.C+5):(self.C+6)], start_dim=1),
            torch.flatten((1 - exists_box) * target[..., 4:5], start_dim=1)  # --> ?
        )
        
        # ================== #
        #   FOR CLASS LOSS   #
        # ================== #
        # 클래스 손실 게산 : 예측된 클래스 확률과 실제 클래스 정보 사이의 loss를 계산한다.
        class_loss = self.mse(
            torch.flatten(exists_box * predictions[..., :self.C], end_dim=-2,),
            torch.flatten(exists_box * target[..., :self.C], end_dim=-2,),
        )
        
        # box_loss , object_loss , no_object_loss , class_loss를 전부 더하기.
        loss = (
            self.lambda_coord * box_loss  # first two rows in paper
            + object_loss  # third row in paper
            + self.lambda_noobj * no_object_loss  # forth row
            + class_loss  # fifth row
        )

        return loss

# train

In [121]:
def train_fn(train_loader, model, optimizer, loss_fn):
    loop = tqdm(train_loader, leave=True)
    mean_loss = []

    for batch_idx, (x, y) in enumerate(loop):
        x, y = x.to(DEVICE), y.to(DEVICE)
        out = model(x)
        
        loss = loss_fn(out, y)
        mean_loss.append(loss.item())
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # update progress bar
        loop.set_postfix(loss=loss.item())

    print(f"Mean loss was {sum(mean_loss)/len(mean_loss)}")

In [122]:
train_dataset = VOCDataset(
    train_csv,
    transform=transform,
    img_dir=img_dir,
    label_dir=label_dir
)

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=BATCH_SIZE,
    num_workers=0,
    pin_memory=PIN_MEMORY,
    shuffle=True,
    drop_last=True
)

### train loader에 값이 제대로 있는지 확인

In [123]:
len(train_loader)

12

In [124]:
dataiter = iter(train_loader)
images, labels = dataiter.next()
images.size()

torch.Size([16, 3, 448, 448])

# model 불러오기

In [125]:
model = Yolov1(split_size=7, num_boxes=2, num_classes=4).to(DEVICE)
optimizer = optim.Adam(
    model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY
)
loss_fn = YoloLoss()

if LOAD_MODEL:
    load_checkpoint(torch.load(LOAD_MODEL_FILE), model, optimizer)

# 훈련 시작

In [126]:
for epoch in tqdm(range(1)):
    print('epoch : ', epoch + 1)

    pred_boxes, target_boxes = get_bboxes(
        train_loader, model, iou_threshold=0.3, threshold=0.3
    )
    print('pred_boxes : ', pred_boxes)
    print('target_boxes : ', target_boxes)

    mean_avg_prec = mean_average_precision(
        pred_boxes, target_boxes, iou_threshold=0.5, box_format="midpoint"
    )
    print(f"Train mAP: {mean_avg_prec}")

    train_fn(train_loader, model, optimizer, loss_fn)

  0%|                                                                                            | 0/1 [00:00<?, ?it/s]

epoch :  1
for문 시작


  0%|                                                                                            | 0/1 [00:11<?, ?it/s]


RuntimeError: shape '[16, 7, 7, 30]' is invalid for input of size 10976

### 에러내용 : RuntimeError: shape '[16, 7, 7, 30]' is invalid for input of size 10976
<br>
발생이유<br><br>
input shape이 [16, 7, 7, 30]이어야 하는데, 내가 사용한 input size는 [16, 7, 7, 14]이다.<br>
yolo architecture에서 마지막 부분은 7x7x30이기 때문에 이에 맞추기..<br><br>

## B:5, C:5로 변경해서 하기!

In [None]:
if __name__ == "__main__":
    main()