#1. 라이브러리 임포트

In [1]:
! pip install albumentations==0.4.6
import torch,gc
import torch.nn as nn
import torchvision
import torch.optim as optim
import torchvision.transforms as transforms
from torch import Tensor
from torch.utils.data import DataLoader, Dataset
from torchvision.models.detection.backbone_utils import resnet_fpn_backbone
from torchvision.ops import MultiScaleRoIAlign
from torchvision.models.detection import KeypointRCNN
import matplotlib.pyplot as plt
import cv2
import numpy as np
import os
import pandas as pd
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm.notebook import tqdm
from typing import Tuple, List, Sequence, Callable, Dict



In [2]:
gc.collect()
torch.cuda.empty_cache()
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'

# 2. 코랩 연결 부분

In [3]:
from google.colab import drive
drive.mount('/content/drive', force_remount=True)
root_dir = '/content/drive/MyDrive/'

Mounted at /content/drive


In [4]:
feature_extracting=True
num_classes = 48
learning_rate = 1e-4
batch_size = 4
num_epochs = 1000
test_dir = 'data/test_imgs'
train_dir = "data/train_imgs"
train_df_csv = "data/train_df.csv"
test_imgs = os.listdir(os.path.join(root_dir,test_dir))

# 3. 함수 정의 부분

In [5]:
def collate_fn(batch):
    return tuple(zip(*batch))

In [6]:
class KeypointDataset(Dataset):
    def __init__(self, data_dir, label_path, phase, transforms):
        self.data_dir = data_dir
        self.df = pd.read_csv(label_path)
        self.transforms = transforms
        self.phase= phase
    def __len__(self) -> int:
        return self.df.shape[0]
    
    def __getitem__(self, index) -> Tuple[Tensor, Dict]:
        image_id = self.df.iloc[index, 0]
        labels = np.array([1])
        keypoints = self.df.iloc[index, 1:].values.reshape(-1, 2).astype(np.int64)

        x1, y1 = min(keypoints[:, 0]), min(keypoints[:, 1])
        x2, y2 = max(keypoints[:, 0]), max(keypoints[:, 1])
        boxes = np.array([[x1, y1, x2, y2]], dtype=np.int64)

        image = cv2.imread(os.path.join(self.data_dir, image_id), cv2.COLOR_BGR2RGB)

        targets ={
            'image': image,
            'bboxes': boxes,
            'labels': labels,
            'keypoints': keypoints
        }

        if self.transforms is not None:
          targets = self.transforms[self.phase](**targets)
        
        image = targets['image']
        image = image / 255.0

        targets = {
            'labels': torch.as_tensor(targets['labels'], dtype=torch.int64),
            'boxes': torch.as_tensor(targets['bboxes'], dtype=torch.float32),
            'keypoints': torch.as_tensor(
                np.concatenate([targets['keypoints'], np.ones((24, 1))], axis=1)[np.newaxis], dtype=torch.float32
            )
        }

        return image, targets

In [7]:
class TestDataset(Dataset):
    """__init__ and __len__ functions are the same as in TorchvisionDataset"""
    def __init__(self, data_dir, imgs, phase, transforms=None):
        self.data_dir = data_dir
        self.imgs = imgs
        self.phase = phase
        self.transforms = transforms

    def __getitem__(self, idx):
        filename = self.imgs[idx]
        # Read an image with OpenCV
        img = cv2.imread(os.path.join(self.data_dir, self.imgs[idx]))

        if self.transforms:
            augmented = self.transforms[self.phase](image=img)
            img = augmented['image']

        img = img / 255.0
        return filename, img
    
    def __len__(self):
        return len(self.imgs)
  


In [8]:
def get_model() -> nn.Module:
    backbone = resnet_fpn_backbone('resnet101', pretrained=True)
    roi_pooler = MultiScaleRoIAlign(
        featmap_names=['0', '1', '2', '3'],
        output_size=7,
        sampling_ratio=2
    )

    keypoint_roi_pooler = MultiScaleRoIAlign(
        featmap_names=['0', '1', '2', '3'],
        output_size=14,
        sampling_ratio=2
    )

    model = KeypointRCNN(
        backbone, 
        num_classes=2,
        num_keypoints=24,
        box_roi_pool=roi_pooler,
        keypoint_roi_pool=keypoint_roi_pooler
    )

    return model

In [None]:
def set_parameter_requires_grad(model,feature_extracting):
  if feature_extracting:
    for param in model.backbone.parameters():
      param.requires_grad = False
      # False로 바뀐 부분을 학습 안하겠다.


In [9]:
# augmentation 
A_transforms = {
    'train':
        A.Compose([
            A.Resize(224, 224, always_apply=True),
            A.Rotate(limit=40,p=0.9),
            A.OneOf([A.HorizontalFlip(p=1),
                     A.RandomRotate90(p=1),
                     A.VerticalFlip(p=1)            
            ], p=0.5),
            A.OneOf([A.MotionBlur(p=1),
                     A.GaussNoise(p=1)                 
            ], p=0.5),
            #A.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ToTensorV2()
        ],  bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']),
            keypoint_params=A.KeypointParams(format='xy')),
    
    'val':
        A.Compose([
            A.Resize(224, 224, always_apply=True),
            #A.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ToTensorV2()
        ], keypoint_params=A.KeypointParams(format='xy')),
    
    'test':
        A.Compose([
            A.Resize(224, 224, always_apply=True),
          
          #  A.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
            ToTensorV2()
        ])
}

In [10]:
dataset = KeypointDataset(data_dir = os.path.join(root_dir,train_dir),label_path = os.path.join(root_dir,train_df_csv) ,transforms=A_transforms,phase="train")
train_loader = DataLoader(dataset=dataset,batch_size=batch_size,shuffle=True,num_workers=2, collate_fn=collate_fn)
    
test_data = TestDataset(os.path.join(root_dir,test_dir), test_imgs,transforms=A_transforms,  phase='test')
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

# 4. 모델 초기화

In [11]:
model = get_model()
model.cuda()
set_parameter_requires_grad(model,feature_extracting)
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(),lr=learning_rate)
#optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.9, weight_decay=5e-4)
#patience만큼 loss가 향상되지 않으면 learning_rate에 factor을 곱해줌 
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer,factor = 0.1, patience = 5, verbose=True)


## 4.1 모델 로드

In [12]:
# 4.1 model load


# 5. train / save

In [13]:
for epoch in range(num_epochs):
    model.train()
    losses = []
    min_loss = 9999
    loop = tqdm(train_loader)
    for i, (images, targets) in enumerate(loop):
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
        optimizer.zero_grad()
        #loss = criterion(model(images), targets)
        loss = model(images,targets)['loss_keypoint']
        losses.append(loss)
        loss.backward()
        optimizer.step()
        if (i+1) % 10 == 0:
            print(f'| epoch: {epoch} | loss: {loss.item():.4f}')
            print()
    mean_loss = sum(losses) / len(losses)
    scheduler.step(mean_loss)
    print(optimizer.param_groups[0]['lr'])
    if mean_loss < min_loss:
      min_loss = mean_loss
      torch.save({
              'epoch': epoch,
              'model_state_dict': model.state_dict(),
              'optimizer_state_dict': optimizer.state_dict(),
              'loss': loss}
              ,os.path.join(root_dir,'data/best_model.pt'))
## 7 7 6

HBox(children=(FloatProgress(value=0.0, max=1049.0), HTML(value='')))

KeyboardInterrupt: ignored

# 6. test

In [None]:
#추론
model.eval()
all_predictions = []
files = []
with torch.no_grad():
  loop = tqdm(test_loader)
  for filenames, inputs in loop:
    pred = model(inputs.to(device))
    # x means pred[0],pred[1],-----,pred[batch]
    predictions = [x['keypoints'][0][:,:2].reshape(-1).detach().cpu().numpy() for x in pred]
    files.extend(filenames)
    for prediction in predictions:
      all_predictions.append(prediction)

# 7. 파일 저장

In [None]:
all_predictions = np.array(all_predictions)
for i in range(all_predictions.shape[0]):
    all_predictions[i, [2*j for j in range(num_classes//2)]] /= 300 / 1920
    all_predictions[i, [2*j + 1 for j in range(num_classes//2)]] /= 150 / 1080
df_sub = pd.read_csv(os.path.join(root_dir,'data/sample_submission.csv'))
df = pd.DataFrame(columns=df_sub.columns)
df['image'] = files
df.iloc[:, 1:] = all_predictions
df.head()

In [None]:
from datetime import datetime
now = datetime.now()
timeday = str(now)[5:10]
df.to_csv(os.path.join(root_dir,f'data/submission_{timeday}.csv'), index=False)

In [None]:
# train 데이터 확인
train_image , target = dataset.__getitem__(0)
np.array(train_image).shape
target_array = np.array(target['keypoints'][0][:,:2])
plt.imshow(np.array(train_image).transpose(1,2,0))

In [None]:
#test 확인
filename, test_img = test_data.__getitem__(0)
plt.imshow(np.array(test_img).transpose(1,2,0))

In [None]:
# def draw_keypoints(
#     image: np.ndarray,
#     keypoints: np.ndarray,
#     edges: List[Tuple[int, int]] = None,
#     keypoint_names: Dict[int, str] = None, 
#     boxes: bool = True,
#     dpi: int = 200
# ) -> None:
#     """
#     Args:
#         image (ndarray): [H, W, C]
#         keypoints (ndarray): [N, 3]
#         edges (List(Tuple(int, int))): 
#     """
#     np.random.seed(42)
#     colors = {k: tuple(map(int, np.random.randint(0, 255, 3))) for k in range(24)}

#     if boxes:
#         x1, y1 = min(keypoints[:, 0]), min(keypoints[:, 1])
#         x2, y2 = max(keypoints[:, 0]), max(keypoints[:, 1])
#         cv2.rectangle(image, (x1, y1), (x2, y2), (255, 100, 91), thickness=3)

#     for i, keypoint in enumerate(keypoints):
#         cv2.circle(
#             image, 
#             tuple(keypoint), 
#             3,(255,0,0), thickness=3, lineType=cv2.FILLED)

#         if keypoint_names is not None:
#             cv2.putText(
#                 image, 
#                 f'{i}: {keypoint_names[i]}', 
#                 tuple(keypoint), 
#                 cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

#     if edges is not None:
#         for i, edge in enumerate(edges):
#             cv2.line(
#                 image, 
#                 tuple(keypoints[edge[0]]), 
#                 tuple(keypoints[edge[1]]),
#                 colors.get(edge[0]), 3, lineType=cv2.LINE_AA)

#     fig, ax = plt.subplots(dpi=dpi)
#     ax.imshow(image)
#     ax.axis('off')
#     plt.show()
#     keypoints = target_array
# keypoint_names = {
#     0: 'nose',
#     1: 'left_eye',
#     2: 'right_eye',
#     3: 'left_ear', 
#     4: 'right_ear', 
#     5: 'left_shoulder', 
#     6: 'right_shoulder',
#     7: 'left_elbow', 
#     8: 'right_elbow',
#     9: 'left_wrist', 
#     10: 'right_wrist',
#     11: 'left_hip', 
#     12: 'right_hip',
#     13: 'left_knee', 
#     14: 'right_knee',
#     15: 'left_ankle', 
#     16: 'right_ankle',
#     17: 'neck', 
#     18: 'left_palm', 
#     19: 'right_palm', 
#     20: 'spine2(back)',
#     21: 'spine1(waist)', 
#     22: 'left_instep',
#     23: 'right_instep'
# }

# edges = [
#     (0, 1), (0, 2), (2, 4), (1, 3), (6, 8), (8, 10), (9, 18),
#     (10, 19), (5, 7), (7, 9), (11, 13), (13, 15), (12, 14),
#     (14, 16), (15, 22), (16, 23), (20, 21), (5, 6), (5, 11),
#     (6, 12), (11, 12), (17, 20), (20, 21), 
# ]
# draw_keypoints(np.array(train_image).transpose(1,2,0), keypoints, edges, keypoint_names, boxes=False, dpi=400)