In [1]:
# Live Human Pose Estimation with OpenVINO™

# This notebook demonstrates live pose estimation with OpenVINO, using the OpenPose human-pose-estimation-0001 model from Open Model Zoo.

# Imports
%pip install -q "openvino>=2023.1.0" opencv-python websockets

import collections
import sys
import time
from pathlib import Path
import urllib.request
import cv2
import numpy as np
from IPython.display import display, Image, clear_output
from numpy.lib.stride_tricks import as_strided
import openvino as ov
import asyncio
import websockets
import json

from decoder import OpenPoseDecoder

# Download utility script
urllib.request.urlretrieve(
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/main/notebooks/utils/notebook_utils.py",
    filename="notebook_utils.py",
)

sys.path.append("../utils")
import notebook_utils as utils

# Download the model
base_model_dir = Path("model")
model_name = "human-pose-estimation-0001"
precision = "FP16-INT8"
model_path = base_model_dir / "intel" / model_name / precision / f"{model_name}.xml"

if not model_path.exists():
    model_url_dir = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/2022.1/models_bin/3/{model_name}/{precision}/"
    utils.download_file(model_url_dir + model_name + '.xml', model_path.name, model_path.parent)
    utils.download_file(model_url_dir + model_name + '.bin', model_path.with_suffix('.bin').name, model_path.parent)

# Initialize OpenVINO Runtime
core = ov.Core()
model = core.read_model(model_path)
compiled_model = core.compile_model(model=model, device_name="AUTO", config={"PERFORMANCE_HINT": "LATENCY"})

# Get the input and output names of nodes
input_layer = compiled_model.input(0)
output_layers = compiled_model.outputs
height, width = list(input_layer.shape)[2:]

# Initialize decoder
decoder = OpenPoseDecoder()



[notice] A new release of pip is available: 24.1.1 -> 24.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


Note: you may need to restart the kernel to use updated packages.


In [None]:
# 2D pooling in numpy
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
    A = np.pad(A, padding, mode="constant")
    output_shape = (
        (A.shape[0] - kernel_size) // stride + 1,
        (A.shape[1] - kernel_size) // stride + 1,
    )
    kernel_size = (kernel_size, kernel_size)
    A_w = as_strided(
        A,
        shape=output_shape + kernel_size,
        strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides
    )
    A_w = A_w.reshape(-1, *kernel_size)
    if pool_mode == "max":
        return A_w.max(axis=(1, 2)).reshape(output_shape)
    elif pool_mode == "avg":
        return A_w.mean(axis=(1, 2)).reshape(output_shape)

# Non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
    return heatmaps * (heatmaps == pooled_heatmaps)

# Get poses from results
def process_results(img, pafs, heatmaps):
    pooled_heatmaps = np.array(
        [[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]]
    )
    nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)
    poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
    output_shape = list(compiled_model.output(index=0).partial_shape)
    output_scale = img.shape[1] / output_shape[3].get_length(), img.shape[0] / output_shape[2].get_length()
    poses[:, :, :2] *= output_scale
    return poses, scores

# Draw pose overlays on the image
colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
          (255, 170, 0), (0, 255, 0), (255, 255, 0), (0, 255, 85), (170, 255, 0), (0, 85, 255),
          (0, 255, 170), (0, 0, 255), (0, 255, 255), (85, 0, 255), (0, 170, 255))

default_skeleton = ((15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7),
                    (6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6))

keypoint_names = [
    "코", "왼쪽 눈", "오른쪽 눈", "왼쪽 귀", "오른쪽 귀",
    "왼쪽 어깨", "오른쪽 어깨", "왼쪽 팔꿈치", "오른쪽 팔꿈치",
    "왼쪽 손목", "오른쪽 손목", "왼쪽 엉덩이", "오른쪽 엉덩이",
    "왼쪽 무릎", "오른쪽 무릎", "왼쪽 발목", "오른쪽 발목"
]

def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
    if poses.size == 0:
        return img
    img_limbs = np.copy(img)
    for pose in poses:
        points = pose[:, :2].astype(np.int32)
        points_scores = pose[:, 2]
        for i, (p, v) in enumerate(zip(points, points_scores)):
            if v > point_score_threshold:
                cv2.circle(img, tuple(p), 1, colors[i], 2)
        for i, j in skeleton:
            if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
                cv2.line(img_limbs, tuple(points[i]), tuple(points[j]), color=colors[j], thickness=4)
    cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
    return img


In [None]:
import collections
import numpy as np
import cv2
import time
import json
import asyncio
import websockets
from IPython.display import display, Image, clear_output

async def send_keypoints(uri, keypoints_data):
    while True:
        try:
            async with websockets.connect(uri) as websocket:
                await websocket.send(json.dumps(keypoints_data))
                break
        except (websockets.ConnectionClosedError, ConnectionRefusedError):
            print("Connection failed, retrying in 1 second...")
            await asyncio.sleep(1)

def run_pose_estimation(source=0, flip=False, use_popup=False, skip_first_frames=0, show_keypoint_names=True, websocket_uri="ws://10.10.10.1:12345"):
    pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
    heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
    player = None
    try:
        player = utils.VideoPlayer(source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
        player.start()
        if use_popup:
            title = "Press ESC to Exit"
            cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

        processing_times = collections.deque()
        last_print_time = time.time()

        while True:
            frame = player.next()
            if frame is None:
                print("Source ended")
                break

            black_background = np.zeros(frame.shape, dtype=np.uint8)

            scale = 1280 / max(frame.shape)
            if scale < 1:
                frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)

            input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
            input_img = input_img.transpose((2, 0, 1))[np.newaxis, ...]

            start_time = time.time()
            results = compiled_model([input_img])
            stop_time = time.time()

            pafs = results[pafs_output_key]
            heatmaps = results[heatmaps_output_key]
            poses, scores = process_results(frame, pafs, heatmaps)

            black_background = draw_poses(black_background, poses, 0.1)
            
            processing_times.append(stop_time - start_time)
            if len(processing_times) > 200:
                processing_times.popleft()

            processing_time = np.mean(processing_times) * 1000
            fps = 1000 / processing_time
            cv2.putText(black_background, f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)", (20, 40),
                        cv2.FONT_HERSHEY_COMPLEX, 1, (0, 0, 255), 1, cv2.LINE_AA)

            current_time = time.time()
            if current_time - last_print_time >= 1:
                keypoints_data = []
                if poses.size > 0:
                    for pose in poses:
                        keypoints = pose[:, :2].astype(int)
                        center_x, center_y = keypoints[11]  # assuming 11 is the hip keypoint
                        keypoints_dict = {}
                        if show_keypoint_names:
                            print(f"중심점 (엉덩이) 가정 위치: ({center_x}, {center_y})")
                            print("상대 키포인트 (이름, x, y):")
                        for idx, keypoint in enumerate(keypoints):
                            relative_x = keypoint[0] - center_x
                            relative_y = keypoint[1] - center_y
                            keypoints_dict[keypoint_names[idx]] = (relative_x, relative_y)
                            if show_keypoint_names:
                                print(f"{keypoint_names[idx]}: ({relative_x}, {relative_y})")
                            else:
                                print(f"({relative_x}, {relative_y})")
                        keypoints_data.append(keypoints_dict)
                else:
                    print("No poses detected.")
                
                asyncio.run(send_keypoints(websocket_uri, keypoints_data))
                last_print_time = current_time

            if use_popup:
                cv2.imshow(title, black_background)
                key = cv2.waitKey(1)
                if key == 27:
                    break
            else:
                _, encoded_img = cv2.imencode(".jpg", black_background, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
                img_display = Image(data=encoded_img)
                clear_output(wait=True)
                display(img_display)
            
                cv2.imwrite("../static/ai.png", black_background)

    except KeyboardInterrupt:
        print("Interrupted")
    except RuntimeError as e:
        print(e)
    finally:
        if player is not None:
            player.stop()
        if use_popup:
            cv2.destroyAllWindows()

# Run Live Pose Estimation
USE_WEBCAM = False
cam_id = 0
video_file = "./y2mate.com - Just Dance 2017 PC Unlimited Rasputin 4K_480p.mp4"
source = cam_id if USE_WEBCAM else video_file

additional_options = {"skip_first_frames": 500} if not USE_WEBCAM else {}
run_pose_estimation(source=source, flip=isinstance(source, int), use_popup=False, **additional_options, show_keypoint_names=True, websocket_uri="ws://10.10.10.1:12345")
