In [1]:
import tensorflow as tf
import tensorflow_hub as hub
import numpy as np
import cv2
import matplotlib.pyplot as plt
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import json




In [2]:
def show_images(images, titles):
    n = len(images)
    plt.figure(figsize=(10,10))
    for i in range(n):
        plt.subplot(1,n,i+1)
        plt.imshow(images[i], cmap='gray')
        plt.title(titles[i])
        plt.axis('off')
    plt.show()

In [3]:
coco = COCO('../coco2014/annotations/person_keypoints_train2014.json')


loading annotations into memory...
Done (t=5.00s)
creating index...
index created!


In [11]:
movenet = hub.load("https://www.kaggle.com/models/google/movenet/TensorFlow2/singlepose-lightning/4")













In [5]:
def is_full_body(ann, min_keypoints=17):
    keypoints = ann['keypoints']
    num_visible = sum(1 for i in range(0, len(keypoints)) if keypoints[i] > 0)  # Count visible keypoints
    if num_visible < min_keypoints:
        return False  # Not all keypoints are visible
    
    bbox = ann['bbox']
    bbox_aspect_ratio = bbox[3] / bbox[2]  
    if bbox_aspect_ratio < 1.5:  # Threshold for a standing full-body shot
        return False
    
    return True

In [13]:
image_dir = "../coco2014/images/train2014"

human_images_anns = []

human_image_ids = set()
for ann in coco.anns.values():
    if ann['category_id'] == 1 and ann['iscrowd'] == 0 and is_full_body(ann,17) == True: #human, not a crowd, displays full body
        human_image_ids.add(ann['image_id'])
        human_images_anns.append(ann)
count = 0
human_image_paths = []
for img_id in human_image_ids:
    if count < 1000:
        img_info = coco.loadImgs(img_id)[0]
        img_path = f"{image_dir}/{img_info['file_name']}"
        human_image_paths.append(img_path)
        count+=1

print(len(human_image_paths))

1000


In [None]:
keypoint_names = ['nose', 'left_eye', 'right_eye', 'left_ear', 'right_ear', 'left_shoulder', 'right_shoulder',
                  'left_elbow', 'right_elbow', 'left_wrist', 'right_wrist', 'left_hip', 'right_hip',
                  'left_knee', 'right_knee', 'left_ankle', 'right_ankle']

connections = [(0, 1), (0, 2), (1, 3), (2, 4), (0, 5), (0, 6), (5, 7), (7, 9), (6, 8), (8, 10),
               (5, 6), (5, 11), (6, 12), (11, 12), (11, 13), (13, 15), (12, 14), (14, 16)]

def detect_pose_static(image_path, ann):
    image = cv2.imread(image_path)

    bbox = [a["bbox"] for a in ann]
    bbox = (bbox[0])
    if (len(bbox) >= 4):
        x_min, y_min, width, height = bbox[:4]
        x_min = int(x_min)
        y_min = int(y_min)
        height = int(height)
        width = int(width)
        
        # raises value error if bounding box dimensions exceed image dimensions 
        if x_min < 0 or y_min < 0 or (x_min+width) > image.shape[1] or (y_min+height) > image.shape[0]:
            print(image_path)
            raise ValueError(f"Invalid bounding box: {x_min}, {y_min}, {x_min+width}, {y_min+height}")
        
    # defaults to full image size in cases where bounding box does not meet requisite length 
    else:
        x_min, y_min = 0,0
        x_max, y_max = image.shape[1], image.shape[0]

    # convert BGR image to HSI and split h, s, and i channels 
    hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
    h, s, v = cv2.split(hsv)

    # Equalize intensity channel's histogram and merge channels back into singular image 
    v = cv2.equalizeHist(v)
    hsv_image = cv2.merge([h, s, v])

    # Converting from HSI to RGB, which MoveNet requires 
    final_image = cv2.cvtColor(hsv_image, cv2.COLOR_HSV2RGB)

    # Cropping Image to first bounding box dimensions found in annotation for ease of prediction 
    cropped_image = final_image[y_min:y_min+height, x_min:x_min+width,:]   

    #resizing image to match dimensions expected by MoveNet, followed by predicting keypoints 
    image_resized = tf.image.resize_with_pad(tf.expand_dims(cropped_image, axis=0), 192, 192) #192 for lightning
    image_np = image_resized.numpy().astype(np.int32)
    outputs = movenet.signatures["serving_default"](tf.constant(image_np))
    keypoints = outputs['output_0'].numpy()

    for i in range (0,17):
        keypoints[0][0][i][1] = keypoints[0][0][i][1] * width + x_min
        keypoints[0][0][i][0] = keypoints[0][0][i][0] * height + y_min

    # Repeating with the un-equalized image for later comparison 
    cropped_image = image[y_min:y_min+height, x_min:x_min+width,:]    
    original_image_resized = tf.image.resize_with_pad(tf.expand_dims(image, axis=0), 192, 192)
    original_image_np = original_image_resized.numpy().astype(np.int32)
    outputs = movenet.signatures["serving_default"](tf.constant(original_image_np))
    original_keypoints = outputs['output_0'].numpy()
    for i in range (0,17):
        original_keypoints[0][0][i][1] = original_keypoints[0][0][i][1] * width + x_min
        original_keypoints[0][0][i][0] = original_keypoints[0][0][i][0] * height + y_min

    return keypoints,original_keypoints

def visualize_pose_static(image_path, keypoints, original_keypoints):
    image = cv2.imread(image_path)
    image_original = cv2.imread(image_path)
    keypoints = np.array(keypoints)
    original_keypoints = np.array(original_keypoints)
    if keypoints.shape == (1, 1, 17, 3):
        keypoints = keypoints[0, 0]
        # Visualizing each keypoint 
        for kp in keypoints:
            x = int(kp[1] * image.shape[1])
            y = int(kp[0] * image.shape[0])
            cv2.circle(image, (x, y), 12, (255, 0, 0), -1)  # Increase thickness and change color to blue
        
        # Visualizing lines between keypoints 
        for connection in connections:
            start_point = (int(keypoints[connection[0], 1] * image.shape[1]),
                           int(keypoints[connection[0], 0] * image.shape[0]))
            end_point = (int(keypoints[connection[1], 1] * image.shape[1]),
                         int(keypoints[connection[1], 0] * image.shape[0]))
            cv2.line(image, start_point, end_point, (0, 0, 255), 8)  # Increase thickness and change color to red
        
        # repeat with Unequalized image's predicted keypoints 
        original_keypoints = original_keypoints[0, 0]
        for kp in original_keypoints:
            x = int(kp[1] * image_original.shape[1])
            y = int(kp[0] * image_original.shape[0])
            cv2.circle(image_original, (x, y), 12, (255, 0, 0), -1)  # Increase thickness and change color to blue
        for connection in connections:
            start_point = (int(original_keypoints[connection[0], 1] * image_original.shape[1]),
                           int(original_keypoints[connection[0], 0] * image_original.shape[0]))
            end_point = (int(original_keypoints[connection[1], 1] * image_original.shape[1]),
                         int(original_keypoints[connection[1], 0] * image_original.shape[0]))
            cv2.line(image_original, start_point, end_point, (0, 0, 255), 8)  # Increase thickness and change color to red
        show_images([image_original,image],["Predicted Annotations without Processing","Predicted Annotations after Processing"])
    else:
        print("Unexpected shape of keypoints array:", keypoints.shape)

In [None]:
def format_coco_results(human_image_paths, predictions, coco):
    results = []
    
    for image_path, pred in zip(human_image_paths, predictions):
        img_id = int(image_path.split("_")[-1].split(".")[0])  # Extract COCO image ID
        keypoints = pred[0, 0, :, :]  
        
        # Convert normalized keypoints to absolute image coordinates
        image = cv2.imread(image_path)
        formatted_keypoints = []
        for kp in keypoints:
            x, y, confidence = float(kp[1]), float(kp[0]), float(kp[2])
            formatted_keypoints.extend([x, y, 2 if confidence > 0.5 else 0])  # Use confidence threshold
        
        results.append({
            "image_id": img_id,
            "category_id": 1,  # Category for 'person'
            "keypoints": formatted_keypoints,
            "num_keypoints": len(formatted_keypoints)/3, # undivided length is 51 since each keypoint entry has x, y, and confidence 
            "score": float(keypoints[:, 2].mean()) 
        })
    return results

# Adding default values to any annotations that erroneously don't have any keypoints listed in their annotations 
for ann in coco.anns.values():
    if 'num_keypoints' not in ann:
        ann['num_keypoints'] = 17
    if 'keypoints' not in ann: 
        ann['keypoints'] = []

# Detect poses and format results
predictions_equalized = []
predictions_unequalized = []
for static_image_path in human_image_paths:
    annotation = [ann for ann in coco.anns.values() if ann["image_id"] == int(static_image_path.split("_")[-1].split(".")[0])]
    static_keypoints_equalized, static_keypoints_unequalized = detect_pose_static(static_image_path,annotation)
    predictions_equalized.append(static_keypoints_equalized)
    predictions_unequalized.append(static_keypoints_unequalized)

# Read predicted keypoints and add to json files 
coco_results = format_coco_results(human_image_paths, predictions_equalized, coco)
results_path_equalized = "pose_results_equalized.json"
with open(results_path_equalized, 'w') as f:
    json.dump(coco_results, f, indent=2)

# Load results and evaluate (unreliable since predictions are only based on the first bounding box, of which there can be multiple)
coco_dt = coco.loadRes(results_path_equalized)
coco_eval = COCOeval(coco, coco_dt, "keypoints")
coco_eval.evaluate()
coco_eval.accumulate()
print("Evaluation for Equalized Images:")
coco_eval.summarize()

# Repeat for unequalized images 
coco_results = format_coco_results(human_image_paths, predictions_unequalized, coco)
results_path_unequalized = "pose_results_unequalized.json"
with open(results_path_unequalized, 'w') as f:
    json.dump(coco_results, f, indent=2)

coco_dt = coco.loadRes(results_path_unequalized)
coco_eval = COCOeval(coco, coco_dt, "keypoints")
coco_eval.evaluate()
coco_eval.accumulate()
print("Evaluation for Unequalized Images:")
coco_eval.summarize()

Loading and preparing results...
DONE (t=0.07s)
creating index...
index created!
Running per image evaluation...
Evaluate annotation type *keypoints*
DONE (t=7.39s).
Accumulating evaluation results...
DONE (t=0.41s).
Evaluation for Equalized Images:
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=   all | maxDets= 20 ] = 0.005
 Average Precision  (AP) @[ IoU=0.50      | area=   all | maxDets= 20 ] = 0.010
 Average Precision  (AP) @[ IoU=0.75      | area=   all | maxDets= 20 ] = 0.002
 Average Precision  (AP) @[ IoU=0.50:0.95 | area=medium | maxDets= 20 ] = 0.005
 Average Precision  (AP) @[ IoU=0.50:0.95 | area= large | maxDets= 20 ] = 0.007
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=   all | maxDets= 20 ] = 0.003
 Average Recall     (AR) @[ IoU=0.50      | area=   all | maxDets= 20 ] = 0.008
 Average Recall     (AR) @[ IoU=0.75      | area=   all | maxDets= 20 ] = 0.001
 Average Recall     (AR) @[ IoU=0.50:0.95 | area=medium | maxDets= 20 ] = 0.002
 Average Recall     (AR) @[ Io

In [15]:
def min_max_normalize(lst):
    arr = np.array(lst)
    return (arr - arr.min()) / (arr.max() - arr.min())

def calculateMSE(MSE_list, human_images_anns,results_path):
    with open(results_path, 'r') as f:
        results = json.load(f)
    for ann in human_images_anns: 
        count = 0
        kp = ann['keypoints']
        for i in range(0,len(kp)-1,2):
            x_err = kp[i] - results[count]["keypoints"][i]
            y_err = kp[i+1] - results[count]["keypoints"][i+1]
            MSE_list.append(x_err**2 + y_err**2)
            count+=1  
    MSE_list_normalized = min_max_normalize(MSE_list)
    return np.mean(MSE_list), np.mean(MSE_list_normalized)

MSE_list_equalized = []
MSE_list_unequalized = []

print("Average Mean Squared Error for Unequalized Images:")
avg_mse_unequalized, normalized_avg_mse_unequalized = calculateMSE(MSE_list_unequalized,human_images_anns,results_path_unequalized)
print("Raw Values:", avg_mse_unequalized, "\nNormalized Values:",normalized_avg_mse_unequalized)
print("Average Mean Squared Error for Equalized Images:")
avg_mse_equalized, normalized_avg_mse_equalized = calculateMSE(MSE_list_equalized,human_images_anns,results_path_equalized)
print("Raw Values:", avg_mse_equalized, "\nNormalized Values:",normalized_avg_mse_equalized)


Average Mean Squared Error for Unequalized Images:
Raw Values: 71858.72607175894 
Normalized Values: 0.12378443110708555
Average Mean Squared Error for Equalized Images:
Raw Values: 69068.60728587248 
Normalized Values: 0.1201227833378025


In [16]:
def calculateMAE(MAE_list, human_images_anns,results_path):
    with open(results_path, 'r') as f:
        results = json.load(f)
    for ann in human_images_anns: 
        count = 0
        kp = ann['keypoints']
        for i in range(0,len(kp)-1,2):
            x_err = kp[i] - results[count]["keypoints"][i]
            y_err = kp[i+1] - results[count]["keypoints"][i+1]
            MAE_list.append(abs(x_err) + abs(y_err))
            count+=1  
    MAE_list_normalized = min_max_normalize(MAE_list)
    return np.mean(MAE_list), np.mean(MAE_list_normalized)

MAE_list_equalized = []
MAE_list_unequalized = []

print("Average Mean Absolute Error for Unequalized Images:")
avg_mae_unequalized, normalized_avg_mae_unequalized = calculateMAE(MAE_list_unequalized,human_images_anns,results_path_unequalized)
print("Raw Values:", avg_mae_unequalized, "\nNormalized Values:",normalized_avg_mae_unequalized)
print("Average Mean Absolute Error for Equalized Images:")
avg_mae_equalized, normalized_avg_mae_equalized = calculateMAE(MAE_list_equalized,human_images_anns,results_path_equalized)
print("Raw Values:", avg_mae_equalized, "\nNormalized Values:",normalized_avg_mae_equalized)

Average Mean Absolute Error for Unequalized Images:
Raw Values: 255.5487069720829 
Normalized Values: 0.23715765727483196
Average Mean Absolute Error for Equalized Images:
Raw Values: 246.68827476774555 
Normalized Values: 0.2300411523552192


In [None]:
def get_equalized_histogram(human_image_paths):
    example_image = cv2.imread(human_image_paths[0])
    hsv = cv2.cvtColor(example_image, cv2.COLOR_BGR2HSV)
    h, s, v = cv2.split(hsv)
    equalized_v = cv2.equalizeHist(v)
    fig, ax = plt.subplots(nrows=1, ncols=2)    
    ax[0].hist(v.flatten(), 255)
    ax[1].hist(equalized_v.flatten(),255)
    ax[0].set_title("Unequalized Example Histogram")
    ax[1].set_title("Equalized Example Histogram")

    for a in ax:
        a.set_xlim([0,255])
        a.set_ylim([0,5000])
    plt.show()    

get_equalized_histogram(human_image_paths)


In [None]:
for static_image_path in human_image_paths:
    print(static_image_path)
    static_keypoints,original_static_keypoints = detect_pose_static(static_image_path)
    visualize_pose_static(static_image_path, static_keypoints, original_static_keypoints)