In [7]:
import os
import json
import numpy as np
import tensorflow as tf
from PIL import Image
from tqdm import tqdm  # 진행률 표시를 위한 라이브러리

import io, json, os, math

import tensorflow as tf
from tensorflow.keras.layers import Add, Concatenate, Lambda
from tensorflow.keras.layers import Input, Conv2D, ReLU, MaxPool2D
from tensorflow.keras.layers import UpSampling2D, ZeroPadding2D
from tensorflow.keras.layers import BatchNormalization
import ray

import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
# 경로 설정
PROJECT_PATH = os.getenv('HOME') + '/aiffel/mpii'
IMAGE_PATH = os.path.join(PROJECT_PATH, 'images')
MODEL_PATH = os.path.join(PROJECT_PATH, 'model')
VALID_JSON = os.path.join(PROJECT_PATH, 'mpii_human_pose_v1_u12_2', 'validation.json')
WEIGHTS_PATH_A = os.path.join(MODEL_PATH, 'model-epoch-9-loss-1.1228.h5') 
WEIGHTS_PATH = os.path.join(MODEL_PATH, 'model_simplebaseline-epoch-9-loss-0.2786.h5')  

In [8]:
def BottleneckBlock(inputs, filters, strides=1, downsample=False, name=None):
    identity = inputs
    if downsample:
        identity = Conv2D(
            filters=filters,
            kernel_size=1,
            strides=strides,
            padding='same',
            kernel_initializer='he_normal')(inputs)

    x = BatchNormalization(momentum=0.9)(inputs)
    x = ReLU()(x)
    x = Conv2D(
        filters=filters // 2,
        kernel_size=1,
        strides=1,
        padding='same',
        kernel_initializer='he_normal')(x)

    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    x = Conv2D(
        filters=filters // 2,
        kernel_size=3,
        strides=strides,
        padding='same',
        kernel_initializer='he_normal')(x)

    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    x = Conv2D(
        filters=filters,
        kernel_size=1,
        strides=1,
        padding='same',
        kernel_initializer='he_normal')(x)

    x = Add()([identity, x])
    return x

In [9]:
def HourglassModule(inputs, order, filters, num_residual):
    
    up1 = BottleneckBlock(inputs, filters, downsample=False)
    for i in range(num_residual):
        up1 = BottleneckBlock(up1, filters, downsample=False)

    low1 = MaxPool2D(pool_size=2, strides=2)(inputs)
    for i in range(num_residual):
        low1 = BottleneckBlock(low1, filters, downsample=False)

    low2 = low1
    if order > 1:
        low2 = HourglassModule(low1, order - 1, filters, num_residual)
    else:
        for i in range(num_residual):
            low2 = BottleneckBlock(low2, filters, downsample=False)

    low3 = low2
    for i in range(num_residual):
        low3 = BottleneckBlock(low3, filters, downsample=False)

    up2 = UpSampling2D(size=2)(low3)

    return up2 + up1

In [10]:
def LinearLayer(inputs, filters):
    x = Conv2D(
        filters=filters,
        kernel_size=1,
        strides=1,
        padding='same',
        kernel_initializer='he_normal')(inputs)
    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    return x

In [11]:
def StackedHourglassNetwork(
        input_shape=(256, 256, 3), 
        num_stack=4, 
        num_residual=1,
        num_heatmap=16):
    
    inputs = Input(shape=input_shape)

    x = Conv2D(
        filters=64,
        kernel_size=7,
        strides=2,
        padding='same',
        kernel_initializer='he_normal')(inputs)
    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    x = BottleneckBlock(x, 128, downsample=True)
    x = MaxPool2D(pool_size=2, strides=2)(x)
    x = BottleneckBlock(x, 128, downsample=False)
    x = BottleneckBlock(x, 256, downsample=True)

    ys = []
    for i in range(num_stack):
        x = HourglassModule(x, order=4, filters=256, num_residual=num_residual)
        for i in range(num_residual):
            x = BottleneckBlock(x, 256, downsample=False)

        x = LinearLayer(x, 256)

        y = Conv2D(
            filters=num_heatmap,
            kernel_size=1,
            strides=1,
            padding='same',
            kernel_initializer='he_normal')(x)
        ys.append(y)

        if i < num_stack - 1:
            y_intermediate_1 = Conv2D(filters=256, kernel_size=1, strides=1)(x)
            y_intermediate_2 = Conv2D(filters=256, kernel_size=1, strides=1)(y)
            x = Add()([y_intermediate_1, y_intermediate_2])

    return tf.keras.Model(inputs, ys, name='stacked_hourglass')

In [12]:
def SimpleBaseline(input_shape=(256, 256, 3), num_heatmap=16):
    inputs = tf.keras.Input(shape=input_shape)

    # Backbone: ResNet50 without top layers
    backbone = tf.keras.applications.ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)

    x = backbone.output

    # Deconvolution layers to upsample
    x = tf.keras.layers.Conv2DTranspose(filters=256, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    x = tf.keras.layers.Conv2DTranspose(filters=256, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    x = tf.keras.layers.Conv2DTranspose(filters=256, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = tf.keras.layers.BatchNormalization()(x)
    x = tf.keras.layers.ReLU()(x)

    # Final convolution to get the heatmap
    outputs = tf.keras.layers.Conv2D(filters=num_heatmap, kernel_size=1, strides=1, padding='same', activation=None)(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs, name='simple_baseline')

    return model

In [13]:
# 검증용 어노테이션 로드
with open(VALID_JSON) as val_json:
    val_annos = json.load(val_json)

# 이미지 전처리 함수 정의
def preprocess_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = image.resize((256, 256))
    image = np.array(image)
    image = image.astype(np.float32) / 255.0  # Normalize to [0,1]
    return image

# 키포인트 추출 함수 정의
def extract_keypoints_from_heatmap(heatmaps):
    num_heatmap = heatmaps.shape[-1]
    keypoints = []
    for i in range(num_heatmap):
        heatmap = heatmaps[:, :, i]
        y, x = np.unravel_index(np.argmax(heatmap), heatmap.shape)
        keypoints.append((x, y))
    return keypoints

In [14]:
def compute_pckh(pred_keypoints, gt_keypoints, head_size, threshold=0.5):
    num_keypoints = len(gt_keypoints)
    correct_keypoints = 0
    total_keypoints = 0

    for i in range(num_keypoints):
        # 관절점이 가려져 있는 경우 제외
        if gt_keypoints[i][2] == 0:
            continue

        pred_x, pred_y = pred_keypoints[i]
        gt_x, gt_y, visibility = gt_keypoints[i]

        # 유클리드 거리 계산
        distance = np.sqrt((pred_x - gt_x) ** 2 + (pred_y - gt_y) ** 2)

        # 헤드 크기로 정규화
        normalized_distance = distance / head_size

        # 임계값보다 작은 경우 정확한 키포인트로 간주
        if normalized_distance <= threshold:
            correct_keypoints += 1

        total_keypoints += 1

    return correct_keypoints, total_keypoints

In [15]:
# Stacked Hourglass 모델 로드
num_heatmap = 16
IMAGE_SHAPE = (256, 256, 3)
model = StackedHourglassNetwork(IMAGE_SHAPE, 4, 1, num_heatmap)
model.load_weights(WEIGHTS_PATH_A)

In [17]:
# 전체 PCKh 계산
total_correct_keypoints = 0
total_keypoints = 0

for anno in tqdm(val_annos):
    filename = anno['image']
    filepath = os.path.join(IMAGE_PATH, filename)
    image = preprocess_image(filepath)

    # 원본 이미지 크기
    original_image = Image.open(filepath)
    original_width, original_height = original_image.size

    # 예측 수행
    inputs = np.expand_dims(image, axis=0)
    outputs = model.predict(inputs)
    heatmaps = outputs[-1]  # 배치 차원 제거
    heatmaps = np.squeeze(heatmaps, axis=0)  # heatmaps shape: (64, 64, 16)
    # 키포인트 추출
    pred_keypoints = extract_keypoints_from_heatmap(heatmaps)

    # 키포인트 좌표를 원본 이미지 크기로 변환
    pred_keypoints_scaled = []
    for (x, y) in pred_keypoints:
        x_orig = x / heatmaps.shape[1] * original_width
        y_orig = y / heatmaps.shape[0] * original_height
        pred_keypoints_scaled.append((x_orig, y_orig))

    # 실제 키포인트 가져오기
    gt_joints = anno['joints']  # [[x0, y0], [x1, y1], ..., [x15, y15]]
    gt_vis = anno['joints_vis']  # [v0, v1, ..., v15]
    gt_keypoints = []
    for i in range(len(gt_joints)):
        x, y = gt_joints[i]
        v = gt_vis[i]
        gt_keypoints.append((x, y, v))

    # 헤드 크기 계산 (머리 위쪽과 목의 거리)
    head_top = np.array(gt_joints[9])  # Head top
    upper_neck = np.array(gt_joints[8])  # Upper neck
    head_size = np.linalg.norm(head_top - upper_neck)

    # 만약 헤드 크기가 0이면 스킵
    if head_size == 0:
        continue

    # PCKh 계산
    correct_kps, total_kps = compute_pckh(pred_keypoints_scaled, gt_keypoints, head_size, threshold=0.5)
    total_correct_keypoints += correct_kps
    total_keypoints += total_kps

# 최종 PCKh 계산
pckh = total_correct_keypoints / total_keypoints if total_keypoints > 0 else 0
print(f'PCKh@0.5: {pckh * 100:.2f}%')

100%|██████████| 2958/2958 [06:12<00:00,  7.94it/s]

PCKh@0.5: 20.44%





In [3]:
# Simple baseline 모델 로드
num_heatmap = 16
IMAGE_SHAPE = (256, 256, 3)
model = SimpleBaseline(IMAGE_SHAPE, num_heatmap)
model.load_weights(WEIGHTS_PATH)

In [6]:
# 전체 PCKh 계산
total_correct_keypoints = 0
total_keypoints = 0

for anno in tqdm(val_annos):
    filename = anno['image']
    filepath = os.path.join(IMAGE_PATH, filename)
    image = preprocess_image(filepath)

    # 원본 이미지 크기
    original_image = Image.open(filepath)
    original_width, original_height = original_image.size

    # 예측 수행
    inputs = np.expand_dims(image, axis=0)
    outputs = model.predict(inputs)
    heatmaps = outputs[0]  # 배치 차원 제거

    # 키포인트 추출
    pred_keypoints = extract_keypoints_from_heatmap(heatmaps)

    # 키포인트 좌표를 원본 이미지 크기로 변환
    pred_keypoints_scaled = []
    for (x, y) in pred_keypoints:
        x_orig = x / heatmaps.shape[1] * original_width
        y_orig = y / heatmaps.shape[0] * original_height
        pred_keypoints_scaled.append((x_orig, y_orig))

    # 실제 키포인트 가져오기
    gt_joints = anno['joints']  # [[x0, y0], [x1, y1], ..., [x15, y15]]
    gt_vis = anno['joints_vis']  # [v0, v1, ..., v15]
    gt_keypoints = []
    for i in range(len(gt_joints)):
        x, y = gt_joints[i]
        v = gt_vis[i]
        gt_keypoints.append((x, y, v))

    # 헤드 크기 계산 (머리 위쪽과 목의 거리)
    head_top = np.array(gt_joints[9])  # Head top
    upper_neck = np.array(gt_joints[8])  # Upper neck
    head_size = np.linalg.norm(head_top - upper_neck)

    # 만약 헤드 크기가 0이면 스킵
    if head_size == 0:
        continue

    # PCKh 계산
    correct_kps, total_kps = compute_pckh(pred_keypoints_scaled, gt_keypoints, head_size, threshold=0.5)
    total_correct_keypoints += correct_kps
    total_keypoints += total_kps

# 최종 PCKh 계산
pckh = total_correct_keypoints / total_keypoints if total_keypoints > 0 else 0
print(f'PCKh@0.5: {pckh * 100:.2f}%')

100%|██████████| 2958/2958 [04:54<00:00, 10.05it/s]

PCKh@0.5: 21.01%



