In [1]:
import os
from typing import Tuple, List, Sequence, Callable, Dict  #  함수에 넘겨줄 파라미터들에 대해 타입들을 알려줌. 함수를 만들 때 사용자 입장에서 사용하기 좀 더 편하게 하는 라이브러리.

import cv2
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import json

import torch
from torch import Tensor
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.detection import keypointrcnn_resnet50_fpn
# from torchvision.ops import MultiScaleRoIAlign
from torchvision.transforms import functional as F

# Data Augmentation. 데이터 증대 시 필요한 라이브러리
import albumentations as A
from albumentations.pytorch import ToTensorV2

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# Check if image file and label files label files in each directory has been sorted
'''
프로그램을 작성할 때 이미지와 레이블에 해당하는 두 가지 파일을 입력값으로 넣도록 작성했습니다.
두 가지 파일은 각각 다른 폴더에 저장되어 있는데, 이 파일들은 폴더의 index 형식으로 불러들여집니다.
따라서 각 폴더 내 데이터들의 순서가 정렬되어있지 않은 경우 이미지에 엉뚱한 정답지를 붙여 학습을 진행하는 경우가 발생합니다.
ex) 1번째 이미지에 5번째 이미지의 레이블값이 정답지로 주어짐.
이는 모델 학습에 치명적이므로 이미지 파일과 레이블 파일이 정렬되어있는지 확인하는 코드입니다.
대부분은 정렬되어있습니다. (파일 이름을 맞춰 준 경우) -> Just in case
'''
count = 0
for index in range(len(os.listdir('./data/images'))):
    image_id = os.listdir('./data/images')[index]
    label_id = os.listdir('./data/annotations')[index]
    if image_id.replace('.jpg', '') == label_id.replace('.json', ''):
        count += 1

print(count)

IndexError: list index out of range

In [None]:
class KeypointDataset(Dataset):
    def __init__(
        self,
        image_path: os.PathLike,
        label_path: os.PathLike,
        transform: Sequence[Callable] = None,
        demo:bool = False # Use demo=True if you need transformed and original images (for example, for visualization purposes)
    ) -> None:
        self.image_path = image_path
        self.label_path = label_path
        self.transform = transform
        self.demo = demo
    '''
    KeypointDataset 객체 : 학습에 필요한 입력 데이터를 반환하는 클래스 입니다.
    이미지와 레이블의 경로를 읽어들여 외부에서 요청한 index 에 해당하는 이미지와 레이블을 각각 로드합니다.
    각 이미지와 레이블은 학습 시 사용할 수 있는 tensor 데이터 형태로 변환 후 반환하게 됩니다.
    target 구성 시 하나의 이미지 파일에 여러 target들을 만들어 원하는 부분을 학습하도록 할 수 있습니다.

    def __getitem__ ()
        output          :   image(tensor), targets(tensor) -> 한 장의 이미지만 출력함
        input parameter -
            image_path  :   os.PathLike 
            label_path  :   os.PathLike
            transform   :   Sequence[Callable]  (default = None)
            demo        :   bool                (default = False)
            
            image_path는 불러들일 이미지의 경로입니다.
            label_path는 불러들일 레이블의 경로입니다.
            transfrom은 이미지 증대 시 어떻게 이미지를 transform 해 줄지에 대한 정보를 갖고 있습니다.
            demo는 transform 된 이미지와 되지 않은 원본 이미지를 반환합니다.
                * 오로지 시각화 목적으로 사용합니다.
                * 학습시에 demo 값을 True 로 둘 경우 -> 학습시엔 항상 demo = False
                학습 입력 데이터는 한 번에 하나의 이미지와 레이블을 짝 지어서 입력받는 반면,
                데이터가 transformed, original 각 이미지와 레이블이 구성되어 전달받기 때문에 에러를 발생시킵니다.
    '''
    
    def __len__(self) -> int:
        # 이미지 파일이 저장된 경로내에 저장되어 있는 모든 파일의 숫자를 반환합니다.
        return len(os.listdir(self.image_path))
        
    def __getitem__(self, index:int) -> Tuple[Tensor, Dict]:
        
        image_id = os.listdir(self.image_path)[index]  #  폴더 내 index 번째 이미지 파일 읽기
        label_id = os.listdir(self.label_path)[index]  #  폴더 내 index 번째 레이블 파일 읽기

        with open(os.path.join(self.label_path, label_id)) as f:
            label_data = json.load(f)
            bboxes_original = label_data['bboxes']
            bboxes_labels_original = [1,2]  # 0 is for background.
            keypoints_original = label_data['keypoints']

        img_original = cv2.imread(os.path.join(self.image_path, image_id), cv2.IMREAD_GRAYSCALE)
        
        if self.transform:   # augmentation process
            keypoints_original_flattened = [el[0:2] for kp in keypoints_original for el in kp]
            
            # Apply augmentations
            transformed = self.transform(image=img_original, bboxes=bboxes_original, bboxes_labels=bboxes_labels_original, keypoints=keypoints_original_flattened)
            img = transformed['image']
            bboxes = transformed['bboxes']
            
            keypoints_transformed_unflattened = np.reshape(np.array(transformed['keypoints']), (-1,2,2)).tolist()

            # Converting transformed keypoints from [x, y]-format to [x,y,visibility]-format by appending original visibilities to transformed coordinates of keypoints
            keypoints = []
            for o_idx, obj in enumerate(keypoints_transformed_unflattened): # Iterating over objects
                obj_keypoints = []
                for k_idx, kp in enumerate(obj): # Iterating over keypoints in each object
                    # kp - coordinates of keypoint
                    # keypoints_original[o_idx][k_idx][2] - original visibility of keypoint
                    obj_keypoints.append(kp + [keypoints_original[o_idx][k_idx][2]])
                keypoints.append(obj_keypoints)
        
        else:
            print('transfrom doesn"t work')
            img, bboxes, keypoints = img_original, bboxes_original, keypoints_original        
        
        # 모든 정보를 학습에 사용할 수 있는 tensor 형태로 데이터 타입을 변환합니다.        
        # target들은 key value 값을 갖고 있는 dictionary 형태로 변환합니다.
        # boxes      :  boundary 박스에 해당하는 꼭지점의 xy 좌표 정보를 갖고 있음
        #               boundary 박스를 그리는 두 개의 꼭지점 왼쪽 상단, 오른쪽 하단 [ [x1, y1, x2, y2], [x1, y1, x2, y2] ]
        # labels     :  박스에 대한 레이블. femoral 인지 tibial 인지 구분할 수 있도록
        # area       :  boundary 박스의 면적
        # keypoints  :  박스 안에서 관심있는 부분인 keypoints 들의 좌표
        #               각 keypoints [ [ [ x1, y1] [ x2, y2 ] ], [ [ x1, y1] [ x2, y2] ] ] - 첫 번째는 femoral, 두 전째는 tibial   
        bboxes = torch.as_tensor(bboxes, dtype=torch.float32)       
        target = {}
        target["boxes"] = bboxes
        target["labels"] = torch.as_tensor([1,2], dtype=torch.int64)
        target["image_id"] = torch.tensor([index])
        target["area"] = (bboxes[:, 3] - bboxes[:, 1]) * (bboxes[:, 2] - bboxes[:, 0])
        target["iscrowd"] = torch.zeros(len(bboxes), dtype=torch.int64)
        target["keypoints"] = torch.as_tensor(keypoints, dtype=torch.float32)        
        img = F.to_tensor(img)

        bboxes_original = torch.as_tensor(bboxes_original, dtype=torch.float32)
        target_original = {}
        target_original["boxes"] = bboxes_original
        target_original["labels"] = torch.as_tensor([1,2], dtype=torch.int64)
        target_original["image_id"] = torch.tensor([index])
        target_original["area"] = (bboxes_original[:, 3] - bboxes_original[:, 1]) * (bboxes_original[:, 2] - bboxes_original[:, 0])
        target_original["iscrowd"] = torch.zeros(len(bboxes_original), dtype=torch.int64)
        target_original["keypoints"] = torch.as_tensor(keypoints_original, dtype=torch.float32)        
        img_original = F.to_tensor(img_original)

        if self.demo == True:
            return img, target, img_original, target_original
        else:
            return img, target

In [None]:
'''
데이터 증대를 위해 이미지를 어떻게 transform 할 지 sequence를 정의해줍니다.
데이터 증대 : 원본 이미지를 영상처리 기법을 통해 회전시키거나 밝기, 대비등을 조정하여 다른 형태의 이미지로 구성하는 기법
             활용 시 하나의 이미지로 여러개의 이미지를 만들 수 있습니다.
             이는 이미지 데이터가 부족할 때 학습 성능을 높이기 위해서 활용할 수 있으며, 과적합에 따른 성능 저하를 방지하는데 중요한 기법 중 하나입니다.
'''
def train_transform():
    return A.Compose([
        A.Sequential([
            A.RandomRotate90(p=1), # Random rotation of an image by 90 degrees zero or more times
            A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, brightness_by_max=True, always_apply=False, p=1), # Random change of brightness & contrast
        ], p=1)
    ],
    # 이미지를 회전시킴에 따라 keypoints와 boundary box도 회전되어야 합니다.
    keypoint_params=A.KeypointParams(format='xy'), # More about keypoint formats used in albumentations library read at https://albumentations.ai/docs/getting_started/keypoints_augmentation/
    bbox_params=A.BboxParams(format='pascal_voc', label_fields=['bboxes_labels']) # Bboxes should have labels, read more at https://albumentations.ai/docs/getting_started/bounding_boxes_augmentation/
    )

def collate_fn(batch: torch.Tensor) -> Tuple:
    return tuple(zip(*batch))

In [None]:
# 이미지와 레이블에 해당하는 경로를 지정하고 데이터들이 잘 구성되어 들어오는지 확인합니다.
image_path = './data/images'
label_path = './data/annotations'

dataset = KeypointDataset(image_path, label_path, transform=train_transform(), demo=True) # demo for only visualization
data_loader = DataLoader(dataset, batch_size=1, shuffle=True, collate_fn=collate_fn)

iterator = iter(data_loader)
batch = next(iterator)

print("Original targets:\n", batch[3], "\n\n")
print("Transformed targets:\n", batch[1])

Original targets:
 ({'boxes': tensor([[3796., 1310., 4742., 5198.],
        [3877., 4990., 4777., 8198.]]), 'labels': tensor([1, 2]), 'image_id': tensor([136]), 'area': tensor([3678048., 2887200.]), 'iscrowd': tensor([0, 0]), 'keypoints': tensor([[[4.5000e+03, 1.5430e+03, 1.0000e+00],
         [4.3140e+03, 4.9890e+03, 1.0000e+00]],

        [[4.3030e+03, 5.1520e+03, 1.0000e+00],
         [4.4550e+03, 8.0100e+03, 1.0000e+00]]])},) 


Transformed targets:
 ({'boxes': tensor([[3802., 3796., 7690., 4742.],
        [ 802., 3877., 4010., 4777.]]), 'labels': tensor([1, 2]), 'image_id': tensor([136]), 'area': tensor([3678048., 2887200.]), 'iscrowd': tensor([0, 0]), 'keypoints': tensor([[[7.4560e+03, 4.5000e+03, 1.0000e+00],
         [4.0100e+03, 4.3140e+03, 1.0000e+00]],

        [[3.8470e+03, 4.3030e+03, 1.0000e+00],
         [9.8900e+02, 4.4550e+03, 1.0000e+00]]])},)


In [1]:
def get_model(num_keypoints, num_objects, weights_path=None):
    '''
    num_objects : int   |   예측하고자 하는 객체의 숫자를 지정합니다.(boundary box 숫자)
                            * 여기서 이미지의 배경에 해당하는 객체도 포함시켜야 합니다.
                            * 따라서 찾고자 하는 객체가 2개라면, 3을 입력시켜야 합니다.
    num_keypoints : int |   예측하고자 하는 keypoints의 숫자를 지정합니다.
    weight_path : str   |   학습된 모델의 weight 파일이 있을 경우, 저장된 weight 값을 불러옵니다.
    '''
    anchor_generator = AnchorGenerator(sizes=(32, 64, 128, 256, 512), aspect_ratios=(0.25, 0.5, 0.75, 1.0, 2.0, 3.0, 4.0))
    model = keypointrcnn_resnet50_fpn(
                                    # weights=False,            # Is deprecated since 0.13, default = None (Annotated cause it occurs warnings)
                                    # weights_backbone=True,    # Is deprecated since 0.13, default = ResNet50_Weights.IMAGENET1K_V1 (Annotated cause it occurs warnings)
                                    num_keypoints=num_keypoints,
                                    num_classes=num_objects,    # Background is the first class, objects are other classes
                                    rpn_anchor_generator=anchor_generator
                                )

    if weights_path:
        state_dict = torch.load(weights_path)
        model.load_state_dict(state_dict)        
        
    return model

In [None]:
def train(data_loader, device='cuda:0'):
    '''
    원하는 학습 모델을 불러 학습을 진행합니다.
    각각 optimizer와 epochs를 커스텀하여 학습을 진행합니다.
    '''
    model = get_model(num_keypoints=2, num_objects=3)
    model.to(device)
    optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.9, weight_decay=5e-4)
    num_epochs = 40
    hist_loss = []

    for epoch in range(num_epochs):
        print("number of epoch : ",epoch)
        model.train()
        for i, (images, targets) in enumerate(data_loader):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            optimizer.zero_grad()
            losses = model(images, targets)
    
            loss = sum(loss for loss in losses.values())
            loss.backward()
            optimizer.step()

            if (i+1) % 10 == 0:
                print(f'| epoch: {epoch} | loss: {loss.item():.4f}', end=' | ')
                for k, v in losses.items():
                    print(f'{k[5:]}: {v.item():.4f}', end=' | ')
                print()
                hist_loss.append(round(loss.item(), 4))
    
    return model, hist_loss

In [None]:
# GPU 활용하여 학습 진행
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
print(torch.cuda.is_available())

dataset = KeypointDataset(image_path, label_path, transform=train_transform())
data_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=collate_fn)

trained_model, hist = train(data_loader=data_loader, device=device)

True
number of epoch :  0
| epoch: 0 | loss: 9.6470 | classifier: 0.7943 | box_reg: 0.0249 | keypoint: 8.1219 | objectness: 0.6946 | rpn_box_reg: 0.0113 | 
| epoch: 0 | loss: 9.1625 | classifier: 0.4059 | box_reg: 0.0631 | keypoint: 7.9837 | objectness: 0.6918 | rpn_box_reg: 0.0180 | 
| epoch: 0 | loss: 8.8122 | classifier: 0.2274 | box_reg: 0.0806 | keypoint: 7.8035 | objectness: 0.6877 | rpn_box_reg: 0.0130 | 
| epoch: 0 | loss: 8.3316 | classifier: 0.1395 | box_reg: 0.0442 | keypoint: 7.4511 | objectness: 0.6813 | rpn_box_reg: 0.0155 | 
| epoch: 0 | loss: 7.7973 | classifier: 0.1600 | box_reg: 0.0722 | keypoint: 6.8905 | objectness: 0.6594 | rpn_box_reg: 0.0152 | 
| epoch: 0 | loss: 7.4239 | classifier: 0.1504 | box_reg: 0.0606 | keypoint: 6.5909 | objectness: 0.6079 | rpn_box_reg: 0.0141 | 
number of epoch :  1
| epoch: 1 | loss: 6.3217 | classifier: 0.1292 | box_reg: 0.0520 | keypoint: 5.6212 | objectness: 0.5079 | rpn_box_reg: 0.0114 | 
| epoch: 1 | loss: 6.6220 | classifier: 0.0

IndexError: list index out of range

In [None]:
# 학습된 모델의 weight 값들을 저장합니다.
model_path = './weights'
model_name = 'xray_rcnnkeypoints_weight_exp04.pth'

torch.save(trained_model.state_dict(), os.path.join(model_path, model_name))

NameError: name 'trained_model' is not defined

In [None]:
# torch가 잡아둔 GPU 메모리 제거
torch.cuda.empty_cache()

In [None]:
# epoch 마다 학습이 진행된 상황(loss) 시각화
plt.plot(list(range(len(hist))), hist)

NameError: name 'hist' is not defined