대회: https://dacon.io/competitions/official/235701/overview/description


참고: https://maihon.oopy.io/competition/dacon-keypoints


HRNet: https://github.com/leoxiaobin/deep-high-resolution-net.pytorch

UDPpose: https://github.com/HuangJunJie2017/UDP-Pose

In [None]:
!pip install -U git+https://github.com/albu/albumentations

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting git+https://github.com/albu/albumentations
  Cloning https://github.com/albu/albumentations to /tmp/pip-req-build-4ew97ul_
  Running command git clone --filter=blob:none --quiet https://github.com/albu/albumentations /tmp/pip-req-build-4ew97ul_
  Resolved https://github.com/albu/albumentations to commit cb372736a0b7da362e5c2e23f3cce4304ddac402
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: albumentations
  Building wheel for albumentations (setup.py) ... [?25l[?25hdone
  Created wheel for albumentations: filename=albumentations-1.3.0-py3-none-any.whl size=125638 sha256=1875e167eb705dbd3c3cd9203280a5283be79a942658768033f50110f5a9dc02
  Stored in directory: /tmp/pip-ephem-wheel-cache-yx7p4tc7/wheels/c5/ca/df/fae131e2d3a8174cd8668f10bf0591fa158f0824214d3017bc
Successfully built albumentations
Installing collected packages: albumentat

In [None]:
import os
import cv2
import random
import pandas as pd
import numpy as np
from tqdm import tqdm
import albumentations as A
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold, train_test_split
from typing import Tuple, List, Sequence, Callable

import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms as tfms
from torch.utils.data import DataLoader, Dataset

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
main_dir = '/content/drive/MyDrive/방학 CV분반 KUBIG CONTEST/임채명/kubigcontestdata'
train_img_path = os.path.join(main_dir, 'train_imgs')
test_img_path = os.path.join(main_dir, 'test_imgs')
meta_info_dir = os.path.join(main_dir, 'train_df.csv') # csv에서는 잘못된 데이터 삭제되어 있음 

In [None]:
train_df = pd.read_csv(meta_info_dir)
train_df = train_df.reset_index(drop=True)

- Seed 고정 https://dacon.io/codeshare/2363
- torch.use_deterministic_algorithms(True)
https://jh-bk.tistory.com/19

In [None]:
def seed_everything(seed=16):
    random.seed(seed)
    np.random.seed(seed)
    os.environ["PYTHONHASHSEED"] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    torch.use_deterministic_algorithms(True)

    import imgaug
    imgaug.random.seed(seed)

seed_everything(16)

In [None]:
def show_image(cfg, image, keypoints, factor=None):
    if keypoints.shape[-1] == 3:
      keypoints = keypoints[:, :2].astype(np.int)

    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    colors = cfg.joint_colors

    if factor is not None:
      keypoints[:, 0] = keypoints[:, 0] * factor[0]
      keypoints[:, 1] = keypoints[:, 1] * factor[1]

    x1, y1 = int(min(keypoints[:, 0])), int(min(keypoints[:, 1]))
    x2, y2 = int(max(keypoints[:, 0])), int(max(keypoints[:, 1]))
    cv2.rectangle(image, (x1, y1), (x2, y2), (255, 100, 91), thickness=3)

    for i, keypoint in enumerate(keypoints):
        cv2.circle(
            image, 
            tuple(keypoint), 
            3, colors.get(i), thickness=2, lineType=cv2.FILLED)

        cv2.putText(
            image, 
            f'{i}: {cfg.joints_name[i]}', 
            tuple(keypoint), 
            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 0), 1)

    for i, pair in enumerate(cfg.joint_pair):
        cv2.line(
            image, 
            tuple(keypoints[pair[0]]), 
            tuple(keypoints[pair[1]]),
            colors.get(pair[0]), 2, lineType=cv2.LINE_AA)

    fig, ax = plt.subplots(dpi=200)
    ax.imshow(image)
    ax.axis('off')
    plt.show()

- 테스트 환경 구축을 위한 Config 만들기


In [None]:
from typing import List

class SingleModelConfig:
  def __init__(self,
               
               input_size: List[int] = [512, 512],
               kpd: float = 4.0,
               epochs: int = 150,
               sigma: float = 3.0,
               num_joints: int = 24,
               batch_size: int = 16,
               random_seed: int = 2021,
               test_ratio: float = 0.15,
               learning_rate: float = 1e-3,

               save_folder: str = 'result',
               main_dir: str = main_dir,
               loss_type: str = "MSE",
               target_type: str = "gaussian",
               post_processing: str = "dark",

               debug: bool = False,
               shift: bool = False,
               startify: bool = False,
               init_training: bool = False,
               startify_with_dir: bool = True,
    ):

    self.main_dir = main_dir
    self.save_folder = os.path.join(main_dir, save_folder)
    if not os.path.exists(self.save_folder) and self.save_folder != '':
      os.makedirs(self.save_folder, exist_ok=True)

    self.epochs = epochs
    self.seed = random_seed
    self.lr = learning_rate
    self.loss_type = loss_type
    self.num_joints = num_joints
    self.batch_size = batch_size
    self.test_ratio = test_ratio
    self.init_training = init_training

    self.kpd = kpd
    self.sigma = sigma
    self.shift = shift
    self.debug = debug
    self.startify = startify
    self.target_type = target_type
    self.image_size = np.array(input_size)
    self.output_size = self.image_size//4
    self.post_processing = post_processing
    self.startify_with_dir = startify_with_dir


    self.joints_name = {
          0: 'nose', 1: 'left_eye', 2: 'right_eye', 3: 'left_ear', 4: 'right_ear',
          5: 'left_shoulder', 6: 'right_shoulder', 7: 'left_elbow', 8: 'right_elbow',
          9: 'left_wrist', 10: 'right_wrist', 11: 'left_hip', 12: 'right_hip',
          13: 'left_knee', 14: 'right_knee', 15: 'left_ankle', 16: 'right_ankle',
          17: 'neck', 18: 'left_palm', 19: 'right_palm', 20: 'back_spine', 21: 'waist_spine',
          22: 'left_instep', 23: 'right_instep'
    }

    self.joint_pair = [
          (0, 1), (0, 2), (2, 4), (1, 3), (6, 8), (8, 10),
          (5, 7), (7, 9), (11, 13), (13, 15), (12, 14), 
          (14, 16), (5, 6), (15, 22), (16, 23), (11, 21),
          (21, 12), (20, 21), (5, 20), (6, 20), (17, 6), (17, 5)
    ]

    self.flip_pair = [
          (1, 2), (3, 4), (5, 6), (7, 8),
          (9, 10), (11, 12), (13, 14), (15, 16),
          (18, 19), (22, 23)
    ]

    
    cmap = plt.get_cmap("rainbow")
    colors = [cmap(i) for i in np.linspace(0, 1,  self.num_joints + 2)]
    colors = [(c[2] * 255, c[1] * 255, c[0] * 255) for c in colors]
    self.joint_colors = {k: colors[k] for k in range(self.num_joints)}

- top down 방식으로 pose estimation 진행하기 위해선 사람의 위치를 먼저 찾아줘야함.
- train image는 keypoint가 주어지기 때문에 위치를 파악하기 위해 별도의 모델을 사용할 필요X
- 아래 코드에서는 keypoint를 바탕으로 사람의 중심을 정의하고 이를 바탕으로 affine transformation하는 방식을 사용

In [None]:
# https://github.com/leoxiaobin/deep-high-resolution-net.pytorch/blob/master/lib/utils/transforms.py

def get_affine_transform(center,
                         scale,
                         rot,
                         output_size,
                         shift=np.array([0, 0], dtype=np.float32),
                         inv=0,):
    if not isinstance(scale, np.ndarray) and not isinstance(scale, list):
        print(scale)
        scale = np.array([scale, scale])

    src_w = scale[0]
    dst_w = output_size[0]
    dst_h = output_size[1]

    rot_rad = np.pi * rot / 180
    src_dir = get_dir([0, src_w * -0.5], rot_rad)
    dst_dir = np.array([0, dst_w * -0.5], np.float32)

    src = np.zeros((3, 2), dtype=np.float32)
    dst = np.zeros((3, 2), dtype=np.float32)
    src[0, :] = center + scale * shift
    src[1, :] = center + src_dir + scale * shift
    dst[0, :] = [dst_w * 0.5, dst_h * 0.5]
    dst[1, :] = np.array([dst_w * 0.5, dst_h * 0.5]) + dst_dir

    src[2:, :] = get_3rd_point(src[0, :], src[1, :])
    dst[2:, :] = get_3rd_point(dst[0, :], dst[1, :])

    if inv:
        trans = cv2.getAffineTransform(np.float32(dst), np.float32(src))
    else:
        trans = cv2.getAffineTransform(np.float32(src), np.float32(dst))

    return trans

# 실제 어파인 변환 수행하는 부분
# t는 위의 get_affine_transform 함수를 통해 trans 매트릭스 구한다.
def affine_transform(pt, t):
    new_pt = np.array([pt[0], pt[1], 1.]).T
    new_pt = np.dot(t, new_pt)
    return new_pt[:2]


def get_dir(src_point, rot_rad):
    """ 
        Transformation Matrix 
        x = x * cosΘ - y * sinΘ
        y = x * cosΘ + y * sinΘ
        [ cosΘ   sinΘ   0]
        [ -sinΘ  cosΘ   0]
        [  0       0    0]
    """

    sn, cs = np.sin(rot_rad), np.cos(rot_rad)

    src_result = [0, 0]
    src_result[0] = src_point[0] * cs - src_point[1] * sn
    src_result[1] = src_point[0] * sn + src_point[1] * cs

    return src_result


def get_3rd_point(a, b):
    direct = a - b
    return b + np.array([-direct[1], direct[0]], dtype=np.float32)

UDP 방식으로 data preprocessing

In [None]:
class DaconKeypointsDataset(Dataset):
    def __init__(
        self,
        cfg: SingleModelConfig,
        image_dir: str, 
        label_df: pd.DataFrame, 
        transforms: Sequence = None,
        mode: str = 'train'
    ) -> None:
        self.image_dir = image_dir
        self.df = label_df
        self.transforms = transforms

        self.mode = mode
        self.kpd = cfg.kpd
        self.debug = cfg.debug
        self.shift = cfg.shift
        self.num_joints = cfg.num_joints
        self.flip_pairs = cfg.flip_pair
        self.image_size = cfg.image_size
        self.heatmap_size = cfg.output_size
        self.sigma = cfg.sigma
        self.target_type = cfg.target_type



    def __len__(self) -> int:
        return self.df.shape[0]
    
    def __getitem__(self, index: int):
        image_id = self.df.iloc[index, 0]

        labels = np.array([1])
        keypoints = self.df.iloc[index, 1:].values.reshape(-1, 2).astype(np.float32)
        keypoints = np.concatenate([keypoints,  np.ones((24, 1))], axis=1)

        # define bbox
        xmin = np.min(keypoints[:, 0])
        xmax = np.max(keypoints[:, 0])
        width = xmax - xmin if xmax > xmin else 20
        center = (xmin + xmax)/2.
        xmin = int(center - width/2.*1.2)
        xmax = int(center + width/2.*1.2)

        ymin = np.min(keypoints[:, 1])
        ymax = np.max(keypoints[:, 1])
        height = ymax - ymin if ymax > ymin else 20
        center = (ymin + ymax)/2.
        ymin = int(center - height/2.*1.2)
        ymax = int(center + height/2.*1.2)
        

        x, y, w, h = xmin, ymin, xmax-xmin, ymax-ymin
        aspect_ratio = self.image_size[1] / self.image_size[0]
        centre = np.array([x+w*.5, y+h*.5])
        if w > aspect_ratio * h:
            h = w * 1.0 / aspect_ratio
        # if w < aspect_ratio * h:
        elif w < aspect_ratio * h:
            w = h * aspect_ratio
        
        scale = np.array([w, h]) * 1.25
        rotation = 0

        image = cv2.imread(os.path.join(self.image_dir, image_id), cv2.COLOR_BGR2RGB)
          
        # if it's train mode
        if self.mode == 'train':
            scale_factor = 0.3
            rotate_factor = 45
            scale = scale * np.clip(np.random.randn()*scale_factor+1,1-scale_factor, 1+scale_factor)
            rotation = np.clip(np.random.randn()*rotate_factor, -rotate_factor*2, rotate_factor*2) if random.random() <= 0.5 else 0
            

            # lr flipping
            if np.random.random() <= 0.5:
              image = np.flip(image, 1)
              centre[0] = image.shape[1] - 1 - centre[0]

              keypoints[:, 0] = image.shape[1] - 1 - keypoints[:, 0]
              for (q, w) in self.flip_pairs:
                  keypoints_q, keypoints_w = keypoints[q, :].copy(), keypoints[w, :].copy()
                  keypoints[w, :], keypoints[q, :] = keypoints_q, keypoints_w
            

        trans = get_affine_transform(centre, scale, rotation, (self.image_size[1], self.image_size[0]))
        cropped_image = cv2.warpAffine(image, trans, (self.image_size[1], self.image_size[0]), flags=cv2.INTER_LINEAR)
        for j in range(self.num_joints):
            if keypoints[j, 2] > 0:
                keypoints[j, :2] = affine_transform(keypoints[j, :2], trans)
                keypoints[j, 2] *= ((keypoints[j, 0] >= 0) & (keypoints[j, 0] < self.image_size[1]) \
                                  & (keypoints[j, 1] >= 0) & (keypoints[j, 1] < self.image_size[0]))
        
        target, target_weight = self.generate_target(keypoints[:, :2], keypoints[:, 2])
        target = torch.from_numpy(target)
        target_weight = torch.from_numpy(target_weight)

        if self.transforms is not None:
            cropped_image = self.transforms(image=cropped_image)['image']

        # random horizontal & vertical shifting
        if self.mode=='train' and self.shift and np.random.random() <= 0.5:
              cropped_image, keypoints = self.shift_images(cropped_image, keypoints)


        if self.debug:
          show_image(cropped_image, keypoints)


          target_heatmap = self.render_gaussian_heatmap(keypoints[:, :2], output_shape=self.heatmap_size)
          visualize_heatmap = target_heatmap #* 255.
          visualize_heatmap = visualize_heatmap.astype('uint8')[0]
          visualize_heatmap = np.max(visualize_heatmap, axis=2)
          visualize_heatmap = cv2.applyColorMap(visualize_heatmap, cv2.COLORMAP_JET)
          fig, ax = plt.subplots(dpi=200)
          ax.imshow(visualize_heatmap)
          ax.axis('off')
          plt.show()


        sample = {
                  'image': torch.from_numpy(cropped_image).float().permute(2, 0, 1),
                  'keypoints': torch.from_numpy(keypoints).float(),
                  'target': target,
                  'target_weight': target_weight
                 }
        return sample
    
    def shift_images(self, image, keypoints, max_v=25, max_h=25):
        shift_v = np.random.randint(low=-max_v, high=max_v, size=1)
        shift_h = np.random.randint(low=-max_h, high=max_h, size=1)

        m = np.array([
            [1, 0, shift_h],
            [0, 1, shift_v]
        ]).astype(np.float32)
        
        rows, cols = image.shape[:-1]
        image = cv2.warpAffine(image, m, (cols, rows))

        for j in range(len(keypoints)):
            if keypoints[j, 2] > 0:
                  keypoints[j, :2] = affine_transform(keypoints[j, :2], m)
                  keypoints[j, 2] *= ((keypoints[j, 0] >= 0) & (keypoints[j, 0] < self.image_size[1]) \
                                    & (keypoints[j, 1] >= 0) & (keypoints[j, 1] < self.image_size[0]))
        return image, keypoints

    # https://github.com/leoxiaobin/deep-high-resolution-net.pytorch/blob/master/lib/dataset/JointsDataset.py
    # heatmap과 offset에 대해 설명해놓은 글 참고: https://ivdevlog.tistory.com/2
    
    def generate_target(self, joints, joints_vis):
        '''
        :param joints:  [num_joints, 3]
        :param joints_vis: [num_joints, 3]
        :return: target, target_weight(1: visible, 0: invisible)
        '''
        target_weight = np.ones((self.num_joints, 1), dtype=np.float32)
        target_weight[:, 0] = joints_vis

        
        target = np.zeros((self.num_joints,
                          self.heatmap_size[0],
                          self.heatmap_size[1]),
                         dtype=np.float32)
        tmp_size = self.sigma * 3

        for joint_id in range(self.num_joints):
          feat_stride = self.image_size / self.heatmap_size
          mu_x = int(joints[joint_id][0] / feat_stride[0] + 0.5)
          mu_y = int(joints[joint_id][1] / feat_stride[1] + 0.5)
          # Check that any part of the gaussian is in-bounds
          ul = [int(mu_x - tmp_size), int(mu_y - tmp_size)]
          br = [int(mu_x + tmp_size + 1), int(mu_y + tmp_size + 1)]
          if ul[0] >= self.heatmap_size[1] or ul[1] >= self.heatmap_size[0] or br[0] < 0 or br[1] < 0:
            target_weight[joint_id] = 0
            continue

        # # Generate gaussian
        size = 2 * tmp_size + 1
        x = np.arange(0, size, 1, np.float32)
        y = x[:, np.newaxis]
        x0 = y0 = size // 2
        # The gaussian is not normalized, we want the center value to equal 1
        g = np.exp(- ((x - x0) ** 2 + (y - y0) ** 2) / (2 * self.sigma ** 2))

        # Usable gaussian range
        g_x = max(0, -ul[0]), min(br[0], self.heatmap_size[1]) - ul[0]
        g_y = max(0, -ul[1]), min(br[1], self.heatmap_size[0]) - ul[1]
        # Image range
        img_x = max(0, ul[0]), min(br[0], self.heatmap_size[1])
        img_y = max(0, ul[1]), min(br[1], self.heatmap_size[0])

        v = target_weight[joint_id]
        if v > 0.5:
          target[joint_id][img_y[0]:img_y[1], img_x[0]:img_x[1]] = g[g_y[0]:g_y[1], g_x[0]:g_x[1]]

        return target, target_weight

- 최종 예측을 위한 함수 정의
  - dark-pose https://github.com/ilovepose/DarkPose/blob/master/lib/core/inference.py
  - get_max_preds
  - get_final_preds

In [None]:
# https://github.com/leoxiaobin/deep-high-resolution-net.pytorch/blob/ba50a82dce412df97f088c572d86d7977753bf74/lib/core/inference.py#L18:5

from numpy.linalg import LinAlgError

def get_max_preds(batch_heatmaps):
    '''
    get predictions from score maps
    heatmaps: numpy.ndarray([batch_size, num_joints, height, width])
    '''
    assert isinstance(batch_heatmaps, np.ndarray), \
        'batch_heatmaps should be numpy.ndarray'
    assert batch_heatmaps.ndim == 4, 'batch_images should be 4-ndim'

    batch_size = batch_heatmaps.shape[0]
    num_joints = batch_heatmaps.shape[1]
    width = batch_heatmaps.shape[3]
    heatmaps_reshaped = batch_heatmaps.reshape((batch_size, num_joints, -1))
    idx = np.argmax(heatmaps_reshaped, 2)
    maxvals = np.amax(heatmaps_reshaped, 2)

    maxvals = maxvals.reshape((batch_size, num_joints, 1))
    idx = idx.reshape((batch_size, num_joints, 1))

    preds = np.tile(idx, (1, 1, 2)).astype(np.float32)

    preds[:, :, 0] = (preds[:, :, 0]) % width
    preds[:, :, 1] = np.floor((preds[:, :, 1]) / width)

    pred_mask = np.tile(np.greater(maxvals, 0.0), (1, 1, 2))
    pred_mask = pred_mask.astype(np.float32)

    preds *= pred_mask
    return preds, maxvals

def dark_post_processing(coords,batch_heatmaps):
    '''
    DARK post-pocessing
    :param coords: batchsize*num_kps*2
    :param batch_heatmaps:batchsize*num_kps*high*width
    :return:
    '''

    shape_pad = list(batch_heatmaps.shape)
    shape_pad[2] = shape_pad[2] + 2
    shape_pad[3] = shape_pad[3] + 2

    for i in range(shape_pad[0]):
        for j in range(shape_pad[1]):
            mapij=batch_heatmaps[i,j,:,:]
            maxori = np.max(mapij)
            mapij= cv2.GaussianBlur(mapij,(7, 7), 0)
            max = np.max(mapij)
            min = np.min(mapij)
            mapij = (mapij-min)/(max-min) * maxori
            batch_heatmaps[i, j, :, :]= mapij
    batch_heatmaps = np.clip(batch_heatmaps,0.001,50)
    batch_heatmaps = np.log(batch_heatmaps)
    batch_heatmaps_pad = np.zeros(shape_pad,dtype=float)
    batch_heatmaps_pad[:, :, 1:-1,1:-1] = batch_heatmaps
    batch_heatmaps_pad[:, :, 1:-1, -1] = batch_heatmaps[:, :, :,-1]
    batch_heatmaps_pad[:, :, -1, 1:-1] = batch_heatmaps[:, :, -1, :]
    batch_heatmaps_pad[:, :, 1:-1, 0] = batch_heatmaps[:, :, :, 0]
    batch_heatmaps_pad[:, :, 0, 1:-1] = batch_heatmaps[:, :, 0, :]
    batch_heatmaps_pad[:, :, -1, -1] = batch_heatmaps[:, :, -1 , -1]
    batch_heatmaps_pad[:, :, 0, 0] = batch_heatmaps[:, :, 0, 0]
    batch_heatmaps_pad[:, :, 0, -1] = batch_heatmaps[:, :, 0, -1]
    batch_heatmaps_pad[:, :, -1, 0] = batch_heatmaps[:, :, -1, 0]
    I = np.zeros((shape_pad[0],shape_pad[1]))
    Ix1 = np.zeros((shape_pad[0], shape_pad[1]))
    Iy1 = np.zeros((shape_pad[0], shape_pad[1]))
    Ix1y1 = np.zeros((shape_pad[0],shape_pad[1]))
    Ix1_y1_ = np.zeros((shape_pad[0], shape_pad[1]))
    Ix1_ = np.zeros((shape_pad[0], shape_pad[1]))
    Iy1_ = np.zeros((shape_pad[0], shape_pad[1]))
    coords = coords.astype(np.int32)
    for i in range(shape_pad[0]):
        for j in range(shape_pad[1]):
            I[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1]+1, coords[i, j, 0]+1]
            Ix1[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1]+1, coords[i, j, 0] + 2]
            Ix1_[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1]+1, coords[i, j, 0] ]
            Iy1[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1] + 2, coords[i, j, 0]+1]
            Iy1_[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1] , coords[i, j, 0]+1]
            Ix1y1[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1] + 2, coords[i, j, 0] + 2]
            Ix1_y1_[i, j] = batch_heatmaps_pad[i, j, coords[i, j, 1], coords[i, j, 0]]
    dx = 0.5 * (Ix1 -  Ix1_)
    dy = 0.5 * (Iy1 - Iy1_)
    D = np.zeros((shape_pad[0],shape_pad[1],2))
    D[:,:,0]=dx
    D[:,:,1]=dy
    D.reshape((shape_pad[0],shape_pad[1],2,1))
    dxx = Ix1 - 2*I + Ix1_
    dyy = Iy1 - 2*I + Iy1_
    dxy = 0.5*(Ix1y1- Ix1 -Iy1 + I + I -Ix1_-Iy1_+Ix1_y1_)
    hessian = np.zeros((shape_pad[0],shape_pad[1],2,2))
    hessian[:, :, 0, 0] = dxx
    hessian[:, :, 1, 0] = dxy
    hessian[:, :, 0, 1] = dxy
    hessian[:, :, 1, 1] = dyy
    inv_hessian = np.zeros(hessian.shape)
    # hessian_test = np.zeros(hessian.shape)
    for i in range(shape_pad[0]):
        for j in range(shape_pad[1]):
            hessian_tmp = hessian[i,j,:,:]
            try:
                inv_hessian[i,j,:,:] = np.linalg.inv(hessian_tmp)
            except LinAlgError:
                inv_hessian[i, j, :, :] = np.zeros((2,2))
            # hessian_test[i,j,:,:] = np.matmul(hessian[i,j,:,:],inv_hessian[i,j,:,:])
            # print( hessian_test[i,j,:,:])
    res = np.zeros(coords.shape)
    coords = coords.astype(np.float)
    for i in range(shape_pad[0]):
        for j in range(shape_pad[1]):
            D_tmp = D[i,j,:]
            D_tmp = D_tmp[:,np.newaxis]
            shift = np.matmul(inv_hessian[i,j,:,:],D_tmp)
            # print(shift.shape)
            res_tmp = coords[i, j, :] -  shift.reshape((-1))
            res[i,j,:] = res_tmp
    return res


def get_final_preds(cfg, batch_heatmaps):
    heatmap_height = batch_heatmaps.shape[2]
    heatmap_width = batch_heatmaps.shape[3]
    if cfg.target_type == 'gaussian':
        coords, maxvals = get_max_preds(batch_heatmaps)
        if cfg.post_processing == "dark":
            coords = dark_post_processing(coords,batch_heatmaps)
    
    preds = coords.copy()
    preds[:,:, 0] = preds[:,:, 0] / (heatmap_width - 1.0) * (4 * heatmap_width - 1.0)
    preds[:,:, 1] = preds[:,:, 1] / (heatmap_height - 1.0) * (4 * heatmap_height - 1.0)

    return preds

In [None]:
# https://github.com/leoxiaobin/deep-high-resolution-net.pytorch/blob/ba50a82dce412df97f088c572d86d7977753bf74/lib/core/evaluate.py#L41

def calc_dists(preds, target, normalize):
    preds = preds.astype(np.float32)
    target = target.astype(np.float32)
    dists = np.zeros((preds.shape[1], preds.shape[0]))
    for n in range(preds.shape[0]):
        for c in range(preds.shape[1]):
            if target[n, c, 0] > 1 and target[n, c, 1] > 1:
                normed_preds = preds[n, c, :] / normalize[n]
                normed_targets = target[n, c, :] / normalize[n]
                dists[c, n] = np.linalg.norm(normed_preds - normed_targets)
            else:
                dists[c, n] = -1
    return dists


def dist_acc(dists, thr=0.5):
    ''' Return percentage below threshold while ignoring values with a -1 '''
    dist_cal = np.not_equal(dists, -1)
    num_dist_cal = dist_cal.sum()
    if num_dist_cal > 0:
        return np.less(dists[dist_cal], thr).sum() * 1.0 / num_dist_cal
    else:
        return -1

def accuracy(output, target, hm_type='gaussian', thr=0.5):
    '''
    Calculate accuracy according to PCK,
    but uses ground truth heatmap rather than x,y locations
    First value to be returned is average accuracy across 'idxs',
    followed by individual accuracies
    '''
    idx = list(range(output.shape[1]))
    norm = 1.0
    if hm_type == 'gaussian':
        pred, _ = get_max_preds(output)
        target, _ = get_max_preds(target)
        h = output.shape[2]
        w = output.shape[3]
        norm = np.ones((pred.shape[0], 2)) * np.array([h, w]) / 10
    dists = calc_dists(pred, target, norm)

    acc = np.zeros((len(idx) + 1))
    avg_acc = 0
    cnt = 0

    for i in range(len(idx)):
        acc[i + 1] = dist_acc(dists[idx[i]])
        if acc[i + 1] >= 0:
            avg_acc = avg_acc + acc[i + 1]
            cnt += 1

    avg_acc = avg_acc / cnt if cnt != 0 else 0
    if cnt != 0:
        acc[0] = avg_acc
    return acc, avg_acc, cnt, pred

- Loss 함수 정의
  - JointsRMSELoss
  - HeatmapMSELoss
  - HeatmapOHKMMSELoss

In [None]:
class JointsRMSELoss(nn.Module):
    def __init__(self, use_target_weight=True):
        super(JointsRMSELoss, self).__init__()
        self.use_target_weight = use_target_weight
        self.criterion = nn.MSELoss(reduction='none')

    def forward(self, pred, target):
        target_coord = target[:, :, :2]
        target_weight = target[:, :, 2].unsqueeze(-1)

        loss = self.criterion(pred, target_coord)
        if self.use_target_weight:
          loss *= target_weight
          
        loss = torch.sqrt(torch.mean(torch.mean(loss, dim=0)))
        return loss


class HeatmapMSELoss(nn.Module):
    def __init__(self, use_target_weight=True):
        super(HeatmapMSELoss, self).__init__()
        self.criterion = nn.MSELoss(reduction='mean')
        self.use_target_weight = use_target_weight

    def forward(self, output, target, target_weight):
        batch_size = output.size(0)
        num_joints = output.size(1)
        heatmaps_pred = output.reshape((batch_size, num_joints, -1)).split(1, 1)
        heatmaps_gt = target.reshape((batch_size, num_joints, -1)).split(1, 1)

        loss = 0

        for idx in range(num_joints):
            heatmap_pred = heatmaps_pred[idx].squeeze()
            heatmap_gt = heatmaps_gt[idx].squeeze()

            if self.use_target_weight:
                loss += 0.5 * self.criterion(
                    heatmap_pred.mul(target_weight[:, idx]),
                    heatmap_gt.mul(target_weight[:, idx])
                )
            else:
                loss += 0.5 * self.criterion(heatmap_pred, heatmap_gt)

        return loss / num_joints


class HeatmapOHKMMSELoss(nn.Module):
    def __init__(self, use_target_weight=True, topk=8):
        super(HeatmapOHKMMSELoss, self).__init__()
        self.criterion = nn.MSELoss(reduction='none')
        self.use_target_weight = use_target_weight
        self.topk = topk

    def ohkm(self, loss):
        ohkm_loss = 0.
        for i in range(loss.size()[0]):
            sub_loss = loss[i]
            topk_val, topk_idx = torch.topk(
                sub_loss, k=self.topk, dim=0, sorted=False
            )
            tmp_loss = torch.gather(sub_loss, 0, topk_idx)
            ohkm_loss += torch.sum(tmp_loss) / self.topk
        ohkm_loss /= loss.size()[0]
        return ohkm_loss

    def forward(self, output, target, target_weight):
        batch_size = output.size(0)
        num_joints = output.size(1)
        heatmaps_pred = output.reshape((batch_size, num_joints, -1)).split(1, 1)
        heatmaps_gt = target.reshape((batch_size, num_joints, -1)).split(1, 1)

        loss = []
        for idx in range(num_joints):
            heatmap_pred = heatmaps_pred[idx].squeeze()
            heatmap_gt = heatmaps_gt[idx].squeeze()
            if self.use_target_weight:
                loss.append(0.5 * self.criterion(
                    heatmap_pred.mul(target_weight[:, idx]),
                    heatmap_gt.mul(target_weight[:, idx])
                ))
            else:
                loss.append(
                    0.5 * self.criterion(heatmap_pred, heatmap_gt)
                )

        loss = [l.sum(dim=1).unsqueeze(dim=1) for l in loss]
        loss = torch.cat(loss, dim=1)

        return self.ohkm(loss)

모델(HRNEt) 정의

In [None]:
import torch
import torch.nn as nn


BN_MOMENTUM = 0.1


def conv3x3(in_planes, out_planes, stride=1):
    """3x3 convolution with padding"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(inplanes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes, momentum=BN_MOMENTUM)
        self.conv3 = nn.Conv2d(planes, planes * self.expansion, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(planes * self.expansion, momentum=BN_MOMENTUM)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        residual = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            residual = self.downsample(x)

        out += residual
        out = self.relu(out)

        return out


class HighResolutionModule(nn.Module):
    def __init__(self, num_branches, blocks, num_blocks, num_inchannels, num_channels, multi_scale_output=True):
        super(HighResolutionModule, self).__init__()
        self._check_branches(num_branches, blocks, num_blocks, num_inchannels, num_channels)

        self.num_inchannels = num_inchannels
        self.num_branches = num_branches

        self.multi_scale_output = multi_scale_output

        self.branches = self._make_branches(num_branches, blocks, num_blocks, num_channels)
        self.fuse_layers = self._make_fuse_layers()
        self.relu = nn.ReLU(True)

    def _check_branches(self, num_branches, blocks, num_blocks, num_inchannels, num_channels):
        if num_branches != len(num_blocks):
            error_msg = "NUM_BRANCHES({}) <> NUM_BLOCKS({})".format(num_branches, len(num_blocks))
            raise ValueError(error_msg)

        if num_branches != len(num_channels):
            error_msg = "NUM_BRANCHES({}) <> NUM_CHANNELS({})".format(num_branches, len(num_channels))
            raise ValueError(error_msg)

        if num_branches != len(num_inchannels):
            error_msg = "NUM_BRANCHES({}) <> NUM_INCHANNELS({})".format(num_branches, len(num_inchannels))
            raise ValueError(error_msg)

    def _make_one_branch(self, branch_index, block, num_blocks, num_channels, stride=1):
        downsample = None
        if stride != 1 or self.num_inchannels[branch_index] != num_channels[branch_index] * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(
                    self.num_inchannels[branch_index],
                    num_channels[branch_index] * block.expansion,
                    kernel_size=1,
                    stride=stride,
                    bias=False,
                ),
                nn.BatchNorm2d(num_channels[branch_index] * block.expansion, momentum=BN_MOMENTUM),
            )

        layers = []
        layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index], stride, downsample))
        self.num_inchannels[branch_index] = num_channels[branch_index] * block.expansion
        for i in range(1, num_blocks[branch_index]):
            layers.append(block(self.num_inchannels[branch_index], num_channels[branch_index]))

        return nn.Sequential(*layers)

    def _make_branches(self, num_branches, block, num_blocks, num_channels):
        branches = []

        for i in range(num_branches):
            branches.append(self._make_one_branch(i, block, num_blocks, num_channels))

        return nn.ModuleList(branches)

    def _make_fuse_layers(self):
        if self.num_branches == 1:
            return None

        num_branches = self.num_branches
        num_inchannels = self.num_inchannels
        fuse_layers = []
        for i in range(num_branches if self.multi_scale_output else 1):
            fuse_layer = []
            for j in range(num_branches):
                if j > i:
                    fuse_layer.append(
                        nn.Sequential(
                            nn.Conv2d(num_inchannels[j], num_inchannels[i], 1, 1, 0, bias=False),
                            nn.BatchNorm2d(num_inchannels[i]),
                            nn.Upsample(scale_factor=2 ** (j - i), mode="nearest"),
                        )
                    )
                elif j == i:
                    fuse_layer.append(None)
                else:
                    conv3x3s = []
                    for k in range(i - j):
                        if k == i - j - 1:
                            num_outchannels_conv3x3 = num_inchannels[i]
                            conv3x3s.append(
                                nn.Sequential(
                                    nn.Conv2d(num_inchannels[j], num_outchannels_conv3x3, 3, 2, 1, bias=False),
                                    nn.BatchNorm2d(num_outchannels_conv3x3),
                                )
                            )
                        else:
                            num_outchannels_conv3x3 = num_inchannels[j]
                            conv3x3s.append(
                                nn.Sequential(
                                    nn.Conv2d(num_inchannels[j], num_outchannels_conv3x3, 3, 2, 1, bias=False),
                                    nn.BatchNorm2d(num_outchannels_conv3x3),
                                    nn.ReLU(True),
                                )
                            )
                    fuse_layer.append(nn.Sequential(*conv3x3s))
            fuse_layers.append(nn.ModuleList(fuse_layer))

        return nn.ModuleList(fuse_layers)

    def get_num_inchannels(self):
        return self.num_inchannels

    def forward(self, x):
        if self.num_branches == 1:
            return [self.branches[0](x[0])]

        for i in range(self.num_branches):
            x[i] = self.branches[i](x[i])

        x_fuse = []

        for i in range(len(self.fuse_layers)):
            y = x[0] if i == 0 else self.fuse_layers[i][0](x[0])
            for j in range(1, self.num_branches):
                if i == j:
                    y = y + x[j]
                else:
                    y = y + self.fuse_layers[i][j](x[j])
            x_fuse.append(self.relu(y))

        return x_fuse


class PoseHighResolutionNet(nn.Module):
    def __init__(self, width=32, num_keypoints=17):
        assert width in [32, 48], f"PoseHighResolutionNet width must be in [32, 48] not {width}"
        self.width = width

        block = BasicBlock
        num_modules = [1, 4, 3]
        num_branches = [2, 3, 4]
        num_inchannels = [
            [2 ** i * width * block.expansion for i in range(2)],
            [2 ** i * width * block.expansion for i in range(3)],
            [2 ** i * width * block.expansion for i in range(4)],
        ]
        self.pre_stage_channels = [256]

        self.inplanes = 64
        super(PoseHighResolutionNet, self).__init__()

        # stem net
        self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(64, momentum=BN_MOMENTUM)
        self.conv2 = nn.Conv2d(64, 64, kernel_size=3, stride=2, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(64, momentum=BN_MOMENTUM)
        self.relu = nn.ReLU(inplace=True)
        self.layer1 = self._make_layer(Bottleneck, 64, 4)

        self.transition1 = self._make_transition_layer(num_inchannels[0])
        self.stage2 = self._make_stage(block, num_modules[0], num_branches[0], num_inchannels[0])
        self.transition2 = self._make_transition_layer(num_inchannels[1])
        self.stage3 = self._make_stage(block, num_modules[1], num_branches[1], num_inchannels[1])
        self.transition3 = self._make_transition_layer(num_inchannels[2])
        self.stage4 = self._make_stage(block, num_modules[2], num_branches[2], num_inchannels[2], multi_scale_output=False)

        self.final_layer = nn.Conv2d(self.pre_stage_channels[0], num_keypoints, 1)

        self.init_weights()
        self.num_branches = num_branches

        self.finetune_step = 3

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.layer1(x)

        x_list = []
        for i in range(self.num_branches[0]):
            if self.transition1[i] is not None:
                x_list.append(self.transition1[i](x))
            else:
                x_list.append(x)
        y_list = self.stage2(x_list)
        x = y_list[-1]

        x_list = []
        for i in range(self.num_branches[1]):
            if self.transition2[i] is not None:
                x_list.append(self.transition2[i](x))
            else:
                x_list.append(y_list[i])
        y_list = self.stage3(x_list)
        x = y_list[-1]

        x_list = []
        for i in range(self.num_branches[2]):
            if self.transition3[i] is not None:
                x_list.append(self.transition3[i](x))
            else:
                x_list.append(y_list[i])
        y_list = self.stage4(x_list)
        x = y_list[0]

        x = self.final_layer(x)

        return x

    def _make_transition_layer(self, num_channels_cur_layer):
        num_channels_pre_layer = self.pre_stage_channels
        num_branches_pre = len(num_channels_pre_layer)
        num_branches_cur = len(num_channels_cur_layer)

        transition_layers = []
        for i in range(num_branches_cur):
            if i < num_branches_pre:
                if num_channels_cur_layer[i] != num_channels_pre_layer[i]:
                    transition_layers.append(
                        nn.Sequential(
                            nn.Conv2d(num_channels_pre_layer[i], num_channels_cur_layer[i], 3, 1, 1, bias=False),
                            nn.BatchNorm2d(num_channels_cur_layer[i]),
                            nn.ReLU(inplace=True),
                        )
                    )
                else:
                    transition_layers.append(None)
            else:
                conv3x3s = []
                for j in range(i + 1 - num_branches_pre):
                    inchannels = num_channels_pre_layer[-1]
                    outchannels = num_channels_cur_layer[i] if j == i - num_branches_pre else inchannels
                    conv3x3s.append(
                        nn.Sequential(
                            nn.Conv2d(inchannels, outchannels, 3, 2, 1, bias=False),
                            nn.BatchNorm2d(outchannels),
                            nn.ReLU(inplace=True),
                        )
                    )
                transition_layers.append(nn.Sequential(*conv3x3s))

        return nn.ModuleList(transition_layers)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv2d(self.inplanes, planes * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(planes * block.expansion, momentum=BN_MOMENTUM),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for i in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def _make_stage(self, block, num_module, num_branch, num_inchannels, multi_scale_output=True):
        modules = []
        for i in range(num_module):
            # multi_scale_output is only used last module
            if not multi_scale_output and i == num_module - 1:
                reset_multi_scale_output = False
            else:
                reset_multi_scale_output = True

            modules.append(
                HighResolutionModule(
                    num_branch,
                    block,
                    [4 for _ in range(num_branch)],
                    num_inchannels,
                    [2 ** i * self.width for i in range(num_branch)],
                    reset_multi_scale_output,
                )
            )
            num_inchannels = modules[-1].get_num_inchannels()

        self.pre_stage_channels = num_inchannels
        return nn.Sequential(*modules)

    def init_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
                nn.init.normal_(m.weight, std=0.001)
                for name, _ in m.named_parameters():
                    if name in ["bias"]:
                        nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
            elif isinstance(m, nn.ConvTranspose2d):
                nn.init.normal_(m.weight, std=0.001)
                for name, _ in m.named_parameters():
                    if name in ["bias"]:
                        nn.init.constant_(m.bias, 0)

    def freeze_step1(self):
        for p in self.parameters():
            p.requires_grad_(False)
        self.final_layer.requires_grad_(True)
        self.finetune_step = 1

    def freeze_step2(self):
        for p in self.parameters():
            p.requires_grad_(True)
        self.final_layer.requires_grad_(False)
        self.finetune_step = 2

    def freeze_step3(self):
        for p in self.parameters():
            p.requires_grad_(True)
        self.finetune_step = 3

- 훈련 & 검증 method 정의

In [None]:
def calc_coord_loss(pred, gt):
    batch_size = gt.size(0)
    valid_mask = gt[:, :, -1].view(batch_size, -1, 1)
    gt = gt[:, :, :2]
    return torch.mean(torch.sum(torch.abs(pred-gt) * valid_mask, dim=-1))

def train(cfg, train_tfms=None, valid_tfms=None):
  # for reporduction
  seed = cfg.seed
  torch.cuda.empty_cache()
  seed_everything(2021)

  # device type
  device = 'cuda' if torch.cuda.is_available() else 'cpu'

  model = PoseHighResolutionNet(48)
  model.load_state_dict(torch.load('/content/drive/MyDrive/방학 CV분반 KUBIG CONTEST/임채명/pose_hrnet_w48_384x288.pth')) 

  final_layer = nn.Conv2d(48, 24, 1)

  with torch.no_grad():
    final_layer.weight[:17] = model.final_layer.weight
    final_layer.bias[:17] = model.final_layer.bias

    final_layer.weight[17] = model.final_layer.weight[[0, 5, 6]].clone().mean(0)
    final_layer.bias[17] = model.final_layer.bias[[0, 5, 6]].clone().mean(0)
    
    final_layer.weight[18] = model.final_layer.weight[9].clone()
    final_layer.bias[18] = model.final_layer.bias[9].clone()
    
    final_layer.weight[19] = model.final_layer.weight[10].clone()
    final_layer.bias[19] = model.final_layer.bias[10].clone()
    
    final_layer.weight[20] = model.final_layer.weight[[5, 6, 11, 12]].clone().mean(0)
    final_layer.bias[20] = model.final_layer.bias[[5, 6, 11, 12]].clone().mean(0)
    
    final_layer.weight[21] = torch.cat(
        (
            model.final_layer.weight[[11, 12]].clone() * 1 / 3,
            model.final_layer.weight[[5, 6]].clone() * 6 / 1,
        )
    ).mean(0)
    final_layer.bias[21] = torch.cat(
        (
            model.final_layer.bias[[11, 12]].clone() * 1 / 3,
            model.final_layer.bias[[5, 6]].clone() * 6 / 1,
        )
    ).mean(0)
    final_layer.weight[22] = model.final_layer.weight[15].clone()
    final_layer.bias[22] = model.final_layer.bias[15].clone()

    final_layer.weight[23] = model.final_layer.weight[16].clone()
    final_layer.bias[23] = model.final_layer.bias[16].clone()
        
  model.final_layer = final_layer

  model = model.to(device)
 

  # define criterions
  #if cfg.target_type == "gaussian":
  #  if cfg.loss_type == "MSE":
  #    main_criterion = HeatmapMSELoss(True)
  #  elif cfg.loss_type == "OHKMMSE":
  #    main_criterion = HeatmapOHKMMSELoss(True)

  main_criterion = nn.CrossEntropyLoss()
  rmse_criterion = JointsRMSELoss()

  # define optimizer and scheduler
  optimizer = torch.optim.Adam(model.parameters(), lr=cfg.lr)

  # data read and add sector column for startify
  total_df = pd.read_csv(meta_info_dir)
  if not cfg.startify_with_dir:
    def making_sector_label(image_name):
        sector_name = image_name.split('-')[0]
        return sector_name
  else:
    def making_sector_label(image_name):
        pose = image_name.split('-')
        cam_dir = pose[4].split('_')[1]
        sector_name = pose[0] + cam_dir
        return sector_name

  print(total_df.describe())
  total_df['sector'] = total_df.apply(
        lambda x: making_sector_label(x['image']), axis=1
  )

  columns = total_df.columns.tolist()
  columns = columns[-1:] + columns[:-1]
  total_df = total_df[columns]


  # data prepare
  if cfg.startify:
    train_df, valid_df = train_test_split(total_df.iloc[:, 1:], test_size=cfg.test_ratio, random_state=seed, stratify=total_df.iloc[:, 0])
  else:
    train_df, valid_df = train_test_split(total_df.iloc[:, 1:], test_size=cfg.test_ratio, random_state=seed)
  

  train_ds = DaconKeypointsDataset(cfg, train_img_path, train_df, train_tfms, mode='train')
  valid_ds  = DaconKeypointsDataset(cfg, train_img_path, valid_df, valid_tfms, mode='valid')
  train_dl = DataLoader(train_ds, batch_size=cfg.batch_size, shuffle=True)
  valid_dl  = DataLoader(valid_ds, batch_size=cfg.batch_size, shuffle=False)

  print("Train Transformation:\n", train_tfms, "\n")
  print("Valid Transformation:\n", valid_tfms, "\n")


  best_loss = float('INF')
  for epoch in range(cfg.epochs):
      ################
      #    Train     #
      ################
      with tqdm(train_dl, total=train_dl.__len__(), unit="batch") as train_bar:
          train_acc_list = []
          train_rmse_list = []
          train_heatmap_list = []
          train_coord_list = []
          train_offset_list = []
          train_total_list = []

          for sample in train_bar:
              train_bar.set_description(f"Train Epoch {epoch+1}")

              optimizer.zero_grad()
              images, targ_coords = sample['image'].to(device), sample['keypoints'].to(device)
              target, target_weight = sample['target'].to(device), sample['target_weight'].to(device)

              model.train()
              with torch.set_grad_enabled(True):
                  preds = model(images)
                  loss = main_criterion(preds, target)
#                  loss = main_criterion(preds, target, target_weight)

                  heatmap_height = preds.shape[2]
                  heatmap_width = preds.shape[3]
                  pred_coords, _ = get_max_preds(preds.detach().cpu().numpy())
                  pred_coords[:, :, 0] = pred_coords[:, :, 0] / (heatmap_width - 1.0) * (4 * heatmap_width - 1.0)
                  pred_coords[:, :, 1] = pred_coords[:, :, 1] / (heatmap_height - 1.0) * (4 * heatmap_height - 1.0)

                  pred_coords = torch.tensor(pred_coords).float().to(device)
                  coord_loss  = calc_coord_loss(pred_coords, targ_coords)

                  rmse_loss = rmse_criterion(pred_coords, targ_coords)
                  _, avg_acc, cnt, pred = accuracy(preds.detach().cpu().numpy()[:, ::3, :, :],
                                                   target.detach().cpu().numpy()[:, ::3, :, :])
                  
                  loss.backward()
                  optimizer.step()

                  train_rmse_list.append(rmse_loss.item())
                  train_total_list.append(loss.item())
                  train_coord_list.append(coord_loss.item())
                  train_acc_list.append(avg_acc)
              train_acc = np.mean(train_acc_list)
              train_rmse = np.mean(train_rmse_list)
              train_coord = np.mean(train_coord_list)
              train_total = np.mean(train_total_list)

              train_bar.set_postfix(coord_loss = train_coord,
                                    rmse_loss = train_rmse,
                                    total_loss = train_total,
                                    train_acc  = train_acc)
      
      ################
      #    Valid     #
      ################
      with tqdm(valid_dl, total=valid_dl.__len__(), unit="batch") as valid_bar:
          valid_acc_list = []
          valid_rmse_list = []
          valid_heatmap_list = []
          valid_coord_list = []
          valid_offset_list = []
          valid_total_list = []
          for sample in valid_bar:
              valid_bar.set_description(f"Valid Epoch {epoch+1}")

              images, targ_coords = sample['image'].to(device), sample['keypoints'].to(device)
              target, target_weight = sample['target'].to(device), sample['target_weight'].to(device)

              model.eval()
              with torch.no_grad():
                  preds = model(images)
                  loss = main_criterion(preds, target)
#                  loss = main_criterion(preds, target, target_weight)
                  
                  pred_coords = get_final_preds(cfg, preds.detach().cpu().numpy())
                  pred_coords = torch.tensor(pred_coords).float().to(device)
                  coord_loss  = calc_coord_loss(pred_coords, targ_coords)

                  rmse_loss = rmse_criterion(pred_coords, targ_coords)
                  _, avg_acc, cnt, pred = accuracy(preds.detach().cpu().numpy()[:, ::3, :, :],
                                                   target.detach().cpu().numpy()[:, ::3, :, :])
                  
                  valid_rmse_list.append(rmse_loss.item())
                  valid_total_list.append(loss.item())
                  valid_coord_list.append(coord_loss.item())
                  valid_acc_list.append(avg_acc)
              valid_acc = np.mean(valid_acc_list)
              valid_rmse = np.mean(valid_rmse_list)
              valid_coord = np.mean(valid_coord_list)
              valid_total = np.mean(valid_total_list)
              valid_bar.set_postfix(coord_loss = valid_coord,
                                      rmse_loss = valid_rmse,
                                      total_loss = valid_total,
                                      valid_acc  = valid_acc)

      if best_loss > valid_total:
          best_model = model
          save_dir = os.path.join(main_dir, cfg.save_folder)
          save_name = f'best_model.pth'
          torch.save(model.state_dict(), os.path.join(save_dir, save_name))
          print(f"Valid Loss: {valid_total:.8f}\nBest Model saved.")
          best_loss = valid_total
  return best_model

In [None]:
n_tfms = A.Compose([
        A.GaussNoise(p=0.5),

        A.OneOf([
            A.MotionBlur(p=1.0, blur_limit=15),
            A.Blur(p=1.0),
            A.ImageCompression(p=1.0),
            A.GaussianBlur(p=1.0),
        ], p=0.7),

        A.OneOf([
            A.ChannelShuffle(p=1.0),
            A.HueSaturationValue(p=1.0),
            A.RGBShift(p=1.0),
        ], p=0.5),

        A.RandomBrightnessContrast(p=0.6),
        A.RandomContrast(p=0.6),
        A.RandomGamma(p=0.6),
        A.CLAHE(p=0.5),

        A.Normalize(p=1.0),
      ])

valid_tfms = A.Normalize(p=1.0)

cfg = SingleModelConfig(
    epochs=15,
    input_size=[512, 512], 
    learning_rate=1e-3,
    sigma=3.0,
    batch_size=8,

    shift = True,
    startify=True,
    init_training=True, 
    startify_with_dir=True,

    loss_type = "MSE",
    target_type = "gaussian",
    save_folder='/content/drive/MyDrive/방학 CV분반 KUBIG CONTEST/임채명/kubigcontestdata/result'
    )

best_model = train(cfg, train_tfms=train_tfms, valid_tfms=valid_tfms)



            nose_x       nose_y   left_eye_x   left_eye_y  right_eye_x  \
count  4094.000000  4094.000000  4094.000000  4094.000000  4094.000000   
mean    926.762676   467.327242   927.786692   460.544454   923.517024   
std     172.927768   138.818331   177.920833   143.639188   178.603650   
min     599.537515   236.000000   604.328953   228.000000   592.879610   
25%     779.939805   344.961634   779.221413   333.560038   772.511865   
50%     933.491697   467.266649   944.083046   462.029150   923.000307   
75%    1071.240718   579.000000  1064.550604   569.087374  1069.999213   
max    1312.405257   907.121424  1315.772361   893.906490  1332.979992   

       right_eye_y   left_ear_x   left_ear_y  right_ear_x  right_ear_y  ...  \
count  4094.000000  4094.000000  4094.000000  4094.000000  4094.000000  ...   
mean    460.885430   930.411870   465.535287   924.564418   466.156267  ...   
std     143.745995   167.722248   144.639208   168.536510   145.285203  ...   
min     228.00000

  m = np.array([
Train Epoch 1: 100%|██████████| 435/435 [29:46<00:00,  4.11s/batch, coord_loss=246, rmse_loss=173, total_loss=0.000382, train_acc=0]
Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  coords = coords.astype(np.float)
Valid Epoch 1: 100%|██████████| 77/77 [03:28<00:00,  2.70s/batch, coord_loss=445, rmse_loss=359, total_loss=1.48e-6, valid_acc=0]


Valid Loss: 0.00000148
Best Model saved.


Train Epoch 2: 100%|██████████| 435/435 [16:03<00:00,  2.22s/batch, coord_loss=444, rmse_loss=246, total_loss=5.1e-6, train_acc=0]
Valid Epoch 2: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=436, rmse_loss=241, total_loss=4.87e-7, valid_acc=0]


Valid Loss: 0.00000049
Best Model saved.


Train Epoch 3: 100%|██████████| 435/435 [15:59<00:00,  2.21s/batch, coord_loss=454, rmse_loss=249, total_loss=5.72e-7, train_acc=0]
Valid Epoch 3: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=457, rmse_loss=285, total_loss=6.64e-9, valid_acc=0]


Valid Loss: 0.00000001
Best Model saved.


Train Epoch 4: 100%|██████████| 435/435 [16:02<00:00,  2.21s/batch, coord_loss=456, rmse_loss=250, total_loss=5e-8, train_acc=0]
Valid Epoch 4: 100%|██████████| 77/77 [01:02<00:00,  1.24batch/s, coord_loss=451, rmse_loss=246, total_loss=4.09e-9, valid_acc=0]


Valid Loss: 0.00000000
Best Model saved.


Train Epoch 5: 100%|██████████| 435/435 [15:59<00:00,  2.21s/batch, coord_loss=456, rmse_loss=250, total_loss=7.14e-8, train_acc=0]
Valid Epoch 5: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=450, rmse_loss=244, total_loss=8.75e-9, valid_acc=0]
Train Epoch 6: 100%|██████████| 435/435 [15:57<00:00,  2.20s/batch, coord_loss=455, rmse_loss=250, total_loss=1.98e-8, train_acc=0]
Valid Epoch 6: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=450, rmse_loss=245, total_loss=4.67e-9, valid_acc=0]
Train Epoch 7: 100%|██████████| 435/435 [15:57<00:00,  2.20s/batch, coord_loss=457, rmse_loss=251, total_loss=6.08e-8, train_acc=0]
Valid Epoch 7: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=453, rmse_loss=247, total_loss=5.47e-9, valid_acc=0]
Train Epoch 8: 100%|██████████| 435/435 [15:56<00:00,  2.20s/batch, coord_loss=457, rmse_loss=251, total_loss=1.86e-8, train_acc=0]
Valid Epoch 8: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=453, 

Valid Loss: 0.00000000
Best Model saved.


Train Epoch 14: 100%|██████████| 435/435 [15:58<00:00,  2.20s/batch, coord_loss=474, rmse_loss=256, total_loss=3.05e-9, train_acc=0]
Valid Epoch 14: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=469, rmse_loss=251, total_loss=1.91e-9, valid_acc=0]


Valid Loss: 0.00000000
Best Model saved.


Train Epoch 15: 100%|██████████| 435/435 [15:58<00:00,  2.20s/batch, coord_loss=475, rmse_loss=257, total_loss=2.09e-9, train_acc=0]
Valid Epoch 15: 100%|██████████| 77/77 [01:01<00:00,  1.25batch/s, coord_loss=469, rmse_loss=251, total_loss=1.47e-9, valid_acc=0]


Valid Loss: 0.00000000
Best Model saved.







TEST: YOLOv5 써서 detection 먼저 수행

In [None]:
!git clone https://github.com/ultralytics/yolov5

%cd yolov5 
!pip install -r requirements.txt

Cloning into 'yolov5'...
remote: Enumerating objects: 15274, done.[K
remote: Counting objects: 100% (38/38), done.[K
remote: Compressing objects: 100% (33/33), done.[K
remote: Total 15274 (delta 11), reused 23 (delta 5), pack-reused 15236[K
Receiving objects: 100% (15274/15274), 14.17 MiB | 37.79 MiB/s, done.
Resolving deltas: 100% (10467/10467), done.
/content/yolov5
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting gitpython>=3.1.30
  Downloading GitPython-3.1.31-py3-none-any.whl (184 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m184.3/184.3 KB[0m [31m4.9 MB/s[0m eta [36m0:00:00[0m
Collecting thop>=0.1.1
  Downloading thop-0.1.1.post2209072238-py3-none-any.whl (15 kB)
Collecting setuptools>=65.5.1
  Downloading setuptools-67.4.0-py3-none-any.whl (1.1 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.1/1.1 MB[0m [31m27.3 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting gitdb<5

In [None]:
test_df = pd.read_csv(os.path.join(main_dir, 'sample_submission.csv'))
yolo_v5 = torch.hub.load('ultralytics/yolov5', 'yolov5x', pretrained=True).cuda()
yolo_v5.eval()

Downloading: "https://github.com/ultralytics/yolov5/zipball/master" to /root/.cache/torch/hub/master.zip
[31m[1mrequirements:[0m YOLOv5 requirement "setuptools>=65.5.1" not found, attempting AutoUpdate...
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/

[31m[1mrequirements:[0m 1 package updated per /root/.cache/torch/hub/ultralytics_yolov5_master/requirements.txt
[31m[1mrequirements:[0m ⚠️ [1mRestart runtime or rerun command for updates to take effect[0m

YOLOv5 🚀 2023-2-27 Python-3.8.10 torch-1.13.1+cu116 CUDA:0 (Tesla T4, 15102MiB)

Downloading https://github.com/ultralytics/yolov5/releases/download/v7.0/yolov5x.pt to yolov5x.pt...


  0%|          | 0.00/166M [00:00<?, ?B/s]


Fusing layers... 
YOLOv5x summary: 444 layers, 86705005 parameters, 0 gradients
Adding AutoShape... 


AutoShape(
  (model): DetectMultiBackend(
    (model): DetectionModel(
      (model): Sequential(
        (0): Conv(
          (conv): Conv2d(3, 80, kernel_size=(6, 6), stride=(2, 2), padding=(2, 2))
          (act): SiLU(inplace=True)
        )
        (1): Conv(
          (conv): Conv2d(80, 160, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
          (act): SiLU(inplace=True)
        )
        (2): C3(
          (cv1): Conv(
            (conv): Conv2d(160, 80, kernel_size=(1, 1), stride=(1, 1))
            (act): SiLU(inplace=True)
          )
          (cv2): Conv(
            (conv): Conv2d(160, 80, kernel_size=(1, 1), stride=(1, 1))
            (act): SiLU(inplace=True)
          )
          (cv3): Conv(
            (conv): Conv2d(160, 160, kernel_size=(1, 1), stride=(1, 1))
            (act): SiLU(inplace=True)
          )
          (m): Sequential(
            (0): Bottleneck(
              (cv1): Conv(
                (conv): Conv2d(80, 80, kernel_size=(1, 1), stride=(1, 1

In [None]:
from PIL import Image
import torch
from torchvision import transforms
convert_tensor = transforms.ToTensor()

test_data = {'path': [], 'x1': [], 'y1': [], 'x2': [], 'y2': []}

total_test_imgs = []
for i in range(len(test_df)):
  total_test_imgs.append(os.path.join(test_img_path, test_df.iloc[i, 0]))

w, h = 900, 900
offset = np.array([w//2, h//2])

for idx, path in tqdm(enumerate(total_test_imgs), total=len(total_test_imgs)):
  w, h = 900, 900
  offset = np.array([w//2, h//2])

  img = cv2.imread(path)[:, :, ::-1]
  centre = np.array(img.shape[:-1])//2
  x1,y1 = centre - offset
  x2,y2 = centre + offset

  with torch.no_grad():
    cropped_img = img[x1:x2, y1:y2, :]
    results = yolo_v5([cropped_img])

  cropped_img = cv2.cvtColor(cropped_img, cv2.COLOR_BGR2RGB)
  try:
    for i in range(len(results.xyxy[0])):
      xyxy = results.xyxy[0][i].detach().cpu().numpy()
      cropped_centre = np.array([(xyxy[0]+xyxy[2])/2, (xyxy[1]+xyxy[3])/2], dtype=np.float32)
      box_w = (xyxy[2]-xyxy[0])/2 * 1.2
      box_h = (xyxy[3]-xyxy[1])/2 * 1.2

      new_x1 = np.clip(int(cropped_centre[0] - box_w), 0, img.shape[1])
      new_x2 = np.clip(int(cropped_centre[0] + box_w), 0, img.shape[1])
      new_y1 = np.clip(int(cropped_centre[1] - box_h), 0, img.shape[0])
      new_y2 = np.clip(int(cropped_centre[1] + box_h), 0, img.shape[0])

      if int(xyxy[-1]) == 0:
        new_x1 += y1
        new_x2 += y1
        new_y1 += x1
        new_y2 += x1

        test_data['path'].append(path)
        test_data['x1'].append(new_x1)
        test_data['y1'].append(new_y1)
        test_data['x2'].append(new_x2)
        test_data['y2'].append(new_y2)
        
  except Exception as e:
    print("Skip")


test_df = pd.DataFrame(data=test_data)
test_df.to_csv(os.path.join(main_dir, 'test_bbox.csv'), index=False)

100%|██████████| 1600/1600 [11:30<00:00,  2.32it/s]


In [None]:
from typing import List

class SingleModelTestConfig:
  def __init__(self,
               input_size: List[int] = [512, 512],
               num_joints: int = 24,
               kpd: float = 4.0,
               main_dir: str = main_dir,
               target_type: str = "gaussian",
               post_processing: str = "dark",
    ):

    self.main_dir = main_dir
    self.image_size = np.array(input_size)
    self.num_joints = num_joints
    self.kpd = kpd
    self.target_type = target_type
    self.post_processing = post_processing

    self.joints_name = {
          0: 'nose', 1: 'left_eye', 2: 'right_eye', 3: 'left_ear', 4: 'right_ear',
          5: 'left_shoulder', 6: 'right_shoulder', 7: 'left_elbow', 8: 'right_elbow',
          9: 'left_wrist', 10: 'right_wrist', 11: 'left_hip', 12: 'right_hip',
          13: 'left_knee', 14: 'right_knee', 15: 'left_ankle', 16: 'right_ankle',
          17: 'neck', 18: 'left_palm', 19: 'right_palm', 20: 'back_spine', 21: 'waist_spine',
          22: 'left_instep', 23: 'right_instep'
    }

    self.joint_pair = [
          (0, 1), (0, 2), (2, 4), (1, 3), (6, 8), (8, 10),
          (5, 7), (7, 9), (11, 13), (13, 15), (12, 14), 
          (14, 16), (5, 6), (15, 22), (16, 23), (11, 21),
          (21, 12), (20, 21), (5, 20), (6, 20), (17, 6), (17, 5)
    ]

    self.flip_pair = [
          (1, 2), (3, 4), (5, 6), (7, 8),
          (9, 10), (11, 12), (13, 14), (15, 16),
          (18, 19), (22, 23)
    ]

    cmap = plt.get_cmap("rainbow")
    colors = [cmap(i) for i in np.linspace(0, 1,  self.num_joints + 2)]
    colors = [(c[2] * 255, c[1] * 255, c[0] * 255) for c in colors]
    self.joint_colors = {k: colors[k] for k in range(self.num_joints)}

- bbox의 중점을 기준으로
  - 앉아 있는 경우
  - 누워 있는 경우
  - 서 있는 경우
  - 위 세가지 경우로 나누어서 잘라낼 영역의 중점을 계산하고 affine transformation 해줬음

In [None]:
class DaconKeypointsBBoxTestDataset(Dataset):
    def __init__(
        self, image_size,
        submission_df, transforms=None,
    ) -> None:
        self.df = submission_df
        self.image_size = image_size
        self.transforms = transforms

    def __len__(self) -> int:
        return self.df.shape[0]
    
    def __getitem__(self, index: int):
        image_path = self.df.iloc[index, 0]

        img_name = image_path.split('/')[-1]
        if img_name[:3] in ["649", "650", "665", "666"]:
          offset_h = 380
          offset_w = int(offset_h*1.333)
        elif img_name[:3] in ["785", "786"]:
          offset_h = 220
          offset_w = int(offset_h*1.333)
        else:
          offset_w = 300
          offset_h = int(offset_w*1.333)

        image = cv2.imread(image_path, cv2.COLOR_BGR2RGB)
        image_centre = np.array(image.shape[:-1])//2

        x1, y1, x2, y2 = self.df.iloc[index, 1:]
        bbox_centre = np.array([
                          (x1+x2)//2,
                          (y1+y2)//2
                      ])
        
        cropped_y2 = np.clip(bbox_centre[1]+offset_h, 0, image.shape[0])
        cropped_y1 = np.clip(bbox_centre[1]-offset_h, 0, image.shape[0])
        cropped_x2 = np.clip(bbox_centre[0]+offset_w, 0, image.shape[1])
        cropped_x1 = np.clip(bbox_centre[0]-offset_w, 0, image.shape[1])
        
        x, y, w, h = cropped_x1, cropped_y1, cropped_x2-cropped_x1, cropped_y2-cropped_y1
        aspect_ratio = self.image_size[1] / self.image_size[0]
        centre = np.array([x+w*.5, y+h*.5])
        if w > aspect_ratio * h:
            h = w * 1.0 / aspect_ratio
        elif w < aspect_ratio * h:
            w = h * aspect_ratio

        image_centre = np.array([cropped_y1, cropped_x1])
        scale = np.array([w, h])
        rotation = 0

        trans = get_affine_transform(centre, scale, rotation, (self.image_size[1], self.image_size[0]))
        cropped_img = cv2.warpAffine(image, trans, (self.image_size[1], self.image_size[0]), flags=cv2.INTER_LINEAR)
        cropped_img_shape = np.array([h, w])


        if self.transforms:
          transposed_img = self.transforms(image=cropped_img)['image']

        
        sample = {
                  'transposed_img': torch.from_numpy(transposed_img).float().permute(2, 0, 1),
                  'centre': torch.from_numpy(centre).float(),
                  'scale': torch.from_numpy(scale).float()
                 }
        return sample

In [None]:
def flip_back(output_flipped, matched_parts):
    '''
    ouput_flipped: numpy.ndarray(batch_size, num_joints, height, width)
    '''
    assert output_flipped.ndim == 4,\
        'output_flipped should be [batch_size, num_joints, height, width]'

    output_flipped = output_flipped[:, :, :, ::-1]

    for pair in matched_parts:
        tmp = output_flipped[:, pair[0], :, :].copy()
        output_flipped[:, pair[0], :, :] = output_flipped[:, pair[1], :, :]
        output_flipped[:, pair[1], :, :] = tmp

    return output_flipped

- 예측 함수 새로 정의

In [None]:
def transform_preds(coords, center, scale, output_size):
    target_coords = np.zeros(coords.shape)
    trans = get_affine_transform(center, scale, 0, output_size, inv=1)
    for p in range(coords.shape[0]):
        target_coords[p, 0:2] = affine_transform(coords[p, 0:2], trans)
    return target_coords


def get_final_preds2(cfg, batch_heatmaps, center, scale):
    heatmap_height = batch_heatmaps.shape[2]
    heatmap_width = batch_heatmaps.shape[3]
    if cfg.target_type == 'gaussian':
        coords, maxvals = get_max_preds(batch_heatmaps)
        if cfg.post_processing == "dark":
            coords = dark_post_processing(coords,batch_heatmaps)

    preds = coords.copy()
    preds_in_input_space = preds.copy()
    preds_in_input_space[:,:, 0] = preds_in_input_space[:,:, 0] / (heatmap_width - 1.0) * (4 * heatmap_width - 1.0)
    preds_in_input_space[:,:, 1] = preds_in_input_space[:,:, 1] / (heatmap_height - 1.0) * (4 * heatmap_height - 1.0)
    # Transform back
    for i in range(coords.shape[0]):
        preds[i] = transform_preds(
            coords[i], center[i], scale[i], [heatmap_width, heatmap_height]
        )

    return preds, preds_in_input_space

In [24]:
def bbox_test(cfg, filp_test=False, debug=False):
  global main_dir
  flip_pair = [
          (1, 2), (3, 4), (5, 6), (7, 8),
          (9, 10), (11, 12), (13, 14), (15, 16),
          (18, 19), (22, 23)
    ]

  seed_everything(2021)
  
  predictions = []
  test_tfms = A.Normalize()
  
  device = 'cuda' if torch.cuda.is_available() else 'cpu'

  model = PoseHighResolutionNet(48)
  model.final_layer = nn.Conv2d(48, 24, 1)
  bestmodel_path = '/content/drive/MyDrive/방학 CV분반 KUBIG CONTEST/임채명/kubigcontestdata/result/best_model.pth'
  model.load_state_dict(torch.load(bestmodel_path))

  model = model.to(device)

  submission_path = os.path.join(main_dir, 'test_bbox.csv')
  submission = pd.read_csv(submission_path)
  test_ds = DaconKeypointsBBoxTestDataset(cfg.image_size, submission, test_tfms)
  test_dl = DataLoader(test_ds, batch_size=32, shuffle=False)
  
  img_num = 1
  save_folder = os.path.join(main_dir, "debug/test_data")
  if not os.path.exists(save_folder): os.makedirs(save_folder, exist_ok=True)

  model.eval()
  with tqdm(test_dl, total=test_dl.__len__(), unit="batch") as test_bar:
        for sample in test_bar:
            images = sample['transposed_img'].to(device)
            scale = sample['scale'].detach().cpu().numpy()
            center = sample['centre'].detach().cpu().numpy()

            with torch.no_grad():
              preds = model(images)
              if filp_test:
                inp_flip = images.clone().flip(3)
                flip_preds = model(inp_flip)
                flip_preds = flip_back(flip_preds.cpu().numpy(), flip_pair)
                flip_preds = torch.from_numpy(flip_preds.copy()).to(device)
                preds = (preds + flip_preds)*0.5

              heatmap_height = preds.shape[2]
              heatmap_width = preds.shape[3]
              pred_coords, pred_coords_input_space = get_final_preds2(cfg, preds.detach().cpu().numpy(), center, scale)
              pred_coords = pred_coords.astype(np.float32)
              predictions.extend(pred_coords)
  return np.array(predictions)

In [25]:
cfg = SingleModelTestConfig(input_size=[512, 512], target_type='gaussian')
predictions = bbox_test(cfg, filp_test=True, debug=False)

Deprecated in NumPy 1.20; for more details and guidance: https://numpy.org/devdocs/release/1.20.0-notes.html#deprecations
  coords = coords.astype(np.float)
100%|██████████| 50/50 [04:08<00:00,  4.97s/batch]


In [26]:
preds = []
for prediction in predictions:
  row = []
  
  for x,y in zip(prediction[:, 0], prediction[:, 1]):
    row.append(x)
    row.append(y)
  preds.append(row)
preds = np.array(preds)

In [27]:
submission_path = os.path.join(main_dir, 'sample_submission.csv')
submission = pd.read_csv(submission_path)

In [28]:
save_path = os.path.join(main_dir, 'final_submissions.csv')
submission.iloc[:, 1:] = preds
submission.to_csv(save_path, index=False)

해볼만한 시도
- 전처리, 후처리 다르게
- 테스트시 사용하는 detection 모델 다르게 (YOLOv5 말고 다른 걸로)




1. epoch 바꿔보기 -> epoch 5, 15, 30 중에 15가 가장 높은 결과
2. loss를 cross entropy로 바꿔보기 -> 성능 오히려 떨어짐..
3. learning rate scheduler 사용해보기
4. input size 512x512 (1등 코드 참고)