In [1]:
!pip install pycocotools



In [9]:
import os
import json
import numpy as np
import tensorflow as tf
from PIL import Image
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval

import tensorflow as tf
from tensorflow.keras.layers import Add, Concatenate, Lambda
from tensorflow.keras.layers import Input, Conv2D, ReLU, MaxPool2D
from tensorflow.keras.layers import Conv2DTranspose, BatchNormalization, ReLU
from tensorflow.keras.layers import UpSampling2D, ZeroPadding2D
from tensorflow.keras.layers import BatchNormalization

from tensorflow.keras.applications import ResNet50 

import ray

import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

# Paths
PROJECT_PATH = os.getenv('HOME') + '/aiffel/mpii'
IMAGE_PATH = os.path.join(PROJECT_PATH, 'images')
MODEL_PATH = os.path.join(PROJECT_PATH, 'model')
TFRECORD_PATH = os.path.join(PROJECT_PATH, 'tfrecords_mpii')
VALID_JSON = os.path.join(PROJECT_PATH, 'mpii_human_pose_v1_u12_2', 'validation.json')
WEIGHTS_PATH_HOURGLASS = os.path.join(MODEL_PATH, 'model-epoch-9-loss-1.1228.h5')
WEIGHTS_PATH_SIMPLE = os.path.join(MODEL_PATH, 'model_simplebaseline-epoch-9-loss-0.2786.h5')

In [10]:
def BottleneckBlock(inputs, filters, strides=1, downsample=False, name=None):
    identity = inputs
    if downsample:
        identity = Conv2D(
            filters=filters,
            kernel_size=1,
            strides=strides,
            padding='same',
            kernel_initializer='he_normal')(inputs)

    x = BatchNormalization(momentum=0.9)(inputs)
    x = ReLU()(x)
    x = Conv2D(
        filters=filters // 2,
        kernel_size=1,
        strides=1,
        padding='same',
        kernel_initializer='he_normal')(x)

    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    x = Conv2D(
        filters=filters // 2,
        kernel_size=3,
        strides=strides,
        padding='same',
        kernel_initializer='he_normal')(x)

    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    x = Conv2D(
        filters=filters,
        kernel_size=1,
        strides=1,
        padding='same',
        kernel_initializer='he_normal')(x)

    x = Add()([identity, x])
    return x

In [11]:
def HourglassModule(inputs, order, filters, num_residual):
    
    up1 = BottleneckBlock(inputs, filters, downsample=False)
    for i in range(num_residual):
        up1 = BottleneckBlock(up1, filters, downsample=False)

    low1 = MaxPool2D(pool_size=2, strides=2)(inputs)
    for i in range(num_residual):
        low1 = BottleneckBlock(low1, filters, downsample=False)

    low2 = low1
    if order > 1:
        low2 = HourglassModule(low1, order - 1, filters, num_residual)
    else:
        for i in range(num_residual):
            low2 = BottleneckBlock(low2, filters, downsample=False)

    low3 = low2
    for i in range(num_residual):
        low3 = BottleneckBlock(low3, filters, downsample=False)

    up2 = UpSampling2D(size=2)(low3)

    return up2 + up1

In [12]:
def LinearLayer(inputs, filters):
    x = Conv2D(
        filters=filters,
        kernel_size=1,
        strides=1,
        padding='same',
        kernel_initializer='he_normal')(inputs)
    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    return x

In [13]:
def StackedHourglassNetwork(
        input_shape=(256, 256, 3), 
        num_stack=4, 
        num_residual=1,
        num_heatmap=16):
    
    inputs = Input(shape=input_shape)

    x = Conv2D(
        filters=64,
        kernel_size=7,
        strides=2,
        padding='same',
        kernel_initializer='he_normal')(inputs)
    x = BatchNormalization(momentum=0.9)(x)
    x = ReLU()(x)
    x = BottleneckBlock(x, 128, downsample=True)
    x = MaxPool2D(pool_size=2, strides=2)(x)
    x = BottleneckBlock(x, 128, downsample=False)
    x = BottleneckBlock(x, 256, downsample=True)

    ys = []
    for i in range(num_stack):
        x = HourglassModule(x, order=4, filters=256, num_residual=num_residual)
        for i in range(num_residual):
            x = BottleneckBlock(x, 256, downsample=False)

        x = LinearLayer(x, 256)

        y = Conv2D(
            filters=num_heatmap,
            kernel_size=1,
            strides=1,
            padding='same',
            kernel_initializer='he_normal')(x)
        ys.append(y)

        if i < num_stack - 1:
            y_intermediate_1 = Conv2D(filters=256, kernel_size=1, strides=1)(x)
            y_intermediate_2 = Conv2D(filters=256, kernel_size=1, strides=1)(y)
            x = Add()([y_intermediate_1, y_intermediate_2])

    return tf.keras.Model(inputs, ys, name='stacked_hourglass')

In [14]:
def SimpleBaseline(input_shape=(256, 256, 3), num_heatmap=16):
    inputs = Input(shape=input_shape)

    # Backbone: ResNet50 without top layers
    backbone = ResNet50(weights='imagenet', include_top=False, input_tensor=inputs)

    x = backbone.output

    # Deconvolution layers to upsample
    x = Conv2DTranspose(filters=256, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    x = Conv2DTranspose(filters=256, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    x = Conv2DTranspose(filters=256, kernel_size=4, strides=2, padding='same', use_bias=False)(x)
    x = BatchNormalization()(x)
    x = ReLU()(x)

    # Final convolution to get the heatmap
    outputs = Conv2D(filters=num_heatmap, kernel_size=1, strides=1, padding='same', activation=None)(x)

    model = tf.keras.Model(inputs=inputs, outputs=outputs, name='simple_baseline')

    return model

In [15]:
# Load the model and weights
num_heatmap = 16
IMAGE_SHAPE = (256, 256, 3)
model_hourglass = StackedHourglassNetwork(IMAGE_SHAPE, 4, 1, num_heatmap)
model_hourglass.load_weights(WEIGHTS_PATH_HOURGLASS)
model_simple = SimpleBaseline()
model_simple.load_weights(WEIGHTS_PATH_SIMPLE)

# Prepare the validation dataset
with open(VALID_JSON) as val_json:
    val_annos = json.load(val_json)

In [16]:
# Function to preprocess images
def preprocess_image(image_path):
    image = Image.open(image_path).convert('RGB')
    image = image.resize((256, 256))
    image = np.array(image)
    image = image.astype(np.float32) / 255.0  # Normalize to [0,1]
    return image

In [33]:
# Function to extract keypoints from heatmap
def extract_keypoints_from_heatmap(heatmaps):
    num_heatmap = heatmaps.shape[-1]
    keypoints = []
    for i in range(num_heatmap):
        heatmap = heatmaps[:, :, i]  # heatmap shape: (64, 64)
        y, x = np.unravel_index(np.argmax(heatmap), heatmap.shape)
        keypoints.append((x, y))
    return keypoints

In [34]:
# Function to convert keypoints to COCO format
def convert_keypoints_to_coco_format(keypoints, image_id):
    keypoints_coco = []
    scores = []
    for x, y in keypoints:
        keypoints_coco.extend([x, y, 2])  # x, y, visibility
        scores.append(1.0)  # Placeholder score
    annotation = {
        'image_id': image_id,
        'category_id': 1,  # Person category
        'keypoints': keypoints_coco,
        'score': np.mean(scores)
    }
    return annotation

In [56]:
# Mapping from MPII to COCO keypoints (Note: Adjust as necessary)
# Since MPII has 16 keypoints and COCO has 17, we'll map accordingly
MPII_to_COCO = {
    0: 16,  # Right ankle -> Right ankle
    1: 14,  # Right knee -> Right knee
    2: 12,  # Right hip -> Right hip
    3: 11,  # Left hip -> Left hip
    4: 13,  # Left knee -> Left knee
    5: 15,  # Left ankle -> Left ankle
    6: None, # Pelvis (No COCO equivalent)
    7: 0,   # Thorax -> Nose (Approximation)
    8: 5,   # Upper neck -> Left shoulder (Approximation)
    9: 6,   # Head top -> Right shoulder (Approximation)
    10: 10, # Right wrist -> Right wrist
    11: 8,  # Right elbow -> Right elbow
    12: 6,  # Right shoulder -> Right shoulder
    13: 5,  # Left shoulder -> Left shoulder
    14: 7,  # Left elbow -> Left elbow
    15: 9   # Left wrist -> Left wrist
}

In [57]:
# Prepare ground truth annotations in COCO format
def prepare_ground_truth_coco(val_annos):
    gt_annotations = []
    images = []
    ann_id = 1
    img_id = 1
    for anno in val_annos:
        filename = anno['image']
        filepath = os.path.join(IMAGE_PATH, filename)
        image = Image.open(filepath)
        width, height = image.size

        keypoints = [0] * 51  # 17 keypoints * 3
        num_keypoints = 0
        for mpii_idx in range(16):
            x, y = anno['joints'][mpii_idx]
            v = anno['joints_vis'][mpii_idx]
            coco_idx = MPII_to_COCO.get(mpii_idx)
            if coco_idx is not None:
                keypoints[coco_idx * 3] = x
                keypoints[coco_idx * 3 + 1] = y
                keypoints[coco_idx * 3 + 2] = v
                if v > 0:
                    num_keypoints += 1
        # Missing keypoints remain as zeros with visibility 0

        gt_annotation = {
            'id': ann_id,
            'image_id': img_id,
            'category_id': 1,
            'keypoints': keypoints,
            'num_keypoints': num_keypoints,
            'bbox': [0, 0, width, height],
            'area': width * height,
            'iscrowd': 0
        }
        gt_annotations.append(gt_annotation)

        image_info = {
            'id': img_id,
            'file_name': filename,
            'width': width,
            'height': height
        }
        images.append(image_info)

        ann_id += 1
        img_id += 1

    categories = [{
        'id': 1,
        'name': 'person',
        'supercategory': 'person',
        'keypoints': [
            'nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear',
            'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
            'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
            'left_knee', 'right_knee', 'left_ankle', 'right_ankle'
        ],
        'skeleton': [
            [16, 14], [14, 12], [12, 6], [6, 5], [5, 13], [13, 15],
            [6, 8], [8, 10], [6, 7], [7, 9], [2, 3], [1, 2],
            [1, 3], [2, 4], [3, 5]
        ]
    }]
    gt_coco = {
        'images': images,
        'annotations': gt_annotations,
        'categories': categories
    }
    return gt_coco

In [58]:
gt_coco = prepare_ground_truth_coco(val_annos)

In [59]:
# Save ground truth annotations to a JSON file
gt_coco_path = os.path.join(PROJECT_PATH, 'gt_coco.json')
with open(gt_coco_path, 'w') as f:
    json.dump(gt_coco, f)

In [41]:
# 예측 및 mAP 계산 코드
# Prepare predictions
predictions = []
img_id = 1
for anno in val_annos:
    filename = anno['image']
    filepath = os.path.join(IMAGE_PATH, filename)
    image = preprocess_image(filepath)

    # Run inference
    inputs = np.expand_dims(image, axis=0)  # inputs shape: (1, 256, 256, 3)
    outputs = model_hourglass.predict(inputs)  # outputs is a list of tensors
    heatmaps = outputs[-1]  # 마지막 스택의 출력 사용
    heatmaps = np.squeeze(heatmaps, axis=0)  # heatmaps shape: (64, 64, 16)
    print("Heatmaps shape after squeezing:", heatmaps.shape)

    # Extract keypoints
    keypoints = extract_keypoints_from_heatmap(heatmaps)

    # Map keypoints to COCO format
    keypoints_coco = [0] * 51  # 17 keypoints * 3
    for mpii_idx, (x, y) in enumerate(keypoints):
        coco_idx = MPII_to_COCO.get(mpii_idx)
        if coco_idx is not None:
            x_orig = x / heatmaps.shape[1] * image.shape[1]
            y_orig = y / heatmaps.shape[0] * image.shape[0]
            keypoints_coco[coco_idx * 3] = x_orig
            keypoints_coco[coco_idx * 3 + 1] = y_orig
            keypoints_coco[coco_idx * 3 + 2] = 2  # Visibility flag

    prediction = {
        'image_id': img_id,
        'category_id': 1,
        'keypoints': keypoints_coco,
        'score': 1.0  # Placeholder score
    }
    predictions.append(prediction)
    img_id += 1
    
# Save predictions to a JSON file
predictions_path = os.path.join(PROJECT_PATH, 'predictions.json')
with open(predictions_path, 'w') as f:
    json.dump(predictions, f)

Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps s

In [62]:
# 예측 및 mAP 계산 코드
# Prepare predictions
predictions = []
img_id = 1
for anno in val_annos:
    filename = anno['image']
    filepath = os.path.join(IMAGE_PATH, filename)
    image = preprocess_image(filepath)

    # Run inference
    inputs = np.expand_dims(image, axis=0)  # inputs shape: (1, 256, 256, 3)
    outputs = model_simple.predict(inputs)  # outputs is a list of tensors
    heatmaps = outputs[0]  # 마지막 스택의 출력 사용
    print("Heatmaps shape after squeezing:", heatmaps.shape)

    # Extract keypoints
    keypoints = extract_keypoints_from_heatmap(heatmaps)

    # Map keypoints to COCO format
    keypoints_coco = [0] * 51  # 17 keypoints * 3
    for mpii_idx, (x, y) in enumerate(keypoints):
        coco_idx = MPII_to_COCO.get(mpii_idx)
        if coco_idx is not None:
            x_orig = x / heatmaps.shape[1] * image.shape[1]
            y_orig = y / heatmaps.shape[0] * image.shape[0]
            keypoints_coco[coco_idx * 3] = x_orig
            keypoints_coco[coco_idx * 3 + 1] = y_orig
            keypoints_coco[coco_idx * 3 + 2] = 2  # Visibility flag

    prediction = {
        'image_id': img_id,
        'category_id': 1,
        'keypoints': keypoints_coco,
        'score': 1.0  # Placeholder score
    }
    predictions.append(prediction)
    img_id += 1
    
# Save predictions to a JSON file
predictions_path_simple = os.path.join(PROJECT_PATH, 'predictions_simple.json')
with open(predictions_path_simple, 'w') as f:
    json.dump(predictions, f)

Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps shape after squeezing: (64, 64, 16)
Heatmaps s

In [60]:
# Load ground truth and predictions using COCO API
coco_gt = COCO(gt_coco_path)
coco_dt = coco_gt.loadRes(predictions_path)

# Evaluate using COCO API
coco_eval = COCOeval(coco_gt, coco_dt, 'keypoints')
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()

loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
Loading and preparing results...
DONE (t=0.10s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *keypoints*
DONE (t=0.85s).
Accumulating evaluation results...
DONE (t=0.06s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets= 20 ] = 0.000
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets= 20 ] = 0.000
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets= 20 ] = 0.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets= 20 ] = -1.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets= 20 ] = 0.000
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 20 ] = 0.003
 Average Recall     (AR) @[ IoU=0.50      | area=   all | maxDets= 20 ] = 0.011
 Average Recall     (AR) @[ IoU=0.75      | area=   all | maxDets= 20 ] = 0.001
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=medium | m

In [63]:
# Load ground truth and predictions using COCO API
coco_gt = COCO(gt_coco_path)
coco_dt = coco_gt.loadRes(predictions_path_simple)

# Evaluate using COCO API
coco_eval = COCOeval(coco_gt, coco_dt, 'keypoints')
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()

loading annotations into memory...
Done (t=0.03s)
creating index...
index created!
Loading and preparing results...
DONE (t=0.10s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *keypoints*
DONE (t=0.58s).
Accumulating evaluation results...
DONE (t=0.06s).
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets= 20 ] = 0.000
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets= 20 ] = 0.000
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets= 20 ] = 0.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets= 20 ] = -1.000
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets= 20 ] = 0.000
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 20 ] = 0.004
 Average Recall     (AR) @[ IoU=0.50      | area=   all | maxDets= 20 ] = 0.013
 Average Recall     (AR) @[ IoU=0.75      | area=   all | maxDets= 20 ] = 0.001
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=medium | m