# Live Human Pose Estimation with OpenVINO™ - Video

Pose estimation predicts the 2D position and orientation of each person in an image or a video. Skeletons consisting of 18 predefined key points (joints) and 19 connections between them (limbs) are visualized as an overlay on the images or video.

In [1]:
#%pip install -q "openvino>=2023.1.0"
#%pip install pytube

## Import libraries

In [2]:
# Import the necessary libraries to utilize OpenVINO.
import collections
import sys
import time
from pathlib import Path

import cv2
import numpy as np
from IPython import display
from numpy.lib.stride_tricks import as_strided
import openvino as ov

from decoder import OpenPoseDecoder
import utils.notebook_utils as utils

## The model

### Download the model

In [3]:
# A directory where the model will be downloaded.
base_model_dir = Path("model")

# The name of the model from Open Model Zoo.
model_name = "human-pose-estimation-0001"
# Selected precision (FP32, FP16, FP16-INT8).
precision = "FP16-INT8"

model_path = base_model_dir / f"{model_name}.xml"

if not model_path.exists():
    model_url_dir = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/2022.1/models_bin/3/{model_name}/{precision}/"
    utils.download_file(model_url_dir + model_name + '.xml', model_path.name, model_path.parent)
    utils.download_file(model_url_dir + model_name + '.bin', model_path.with_suffix('.bin').name, model_path.parent)

### Load the model

In [4]:
# Select the device to use the product or service
import ipywidgets as widgets

core = ov.Core()

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device

Dropdown(description='Device:', index=2, options=('CPU', 'GPU', 'AUTO'), value='AUTO')

In [5]:
# Initialize OpenVINO Runtime
core = ov.Core()
# Read the network from a file.
model = core.read_model(model_path)
# Let the AUTO device decide where to load the model (you can use CPU, GPU as well).
compiled_model = core.compile_model(model=model, device_name=device.value, config={"PERFORMANCE_HINT": "LATENCY"})

# Get the input and output names of nodes.
input_layer = compiled_model.input(0)
output_layers = compiled_model.outputs

# Get the input size.
height, width = list(input_layer.shape)[2:]

In [6]:
input_layer.any_name, [o.any_name for o in output_layers]

('data', ['Mconv7_stage2_L1', 'Mconv7_stage2_L2'])

## Processing

### OpenPose Decoder
To transform the raw results from the neural network into pose estimations, you need OpenPose Decoder. It is provided in the Open Model Zoo and compatible with the human-pose-estimation-0001 model.

If you choose a model other than human-pose-estimation-0001 you will need another decoder (for example, AssociativeEmbeddingDecoder), which is available in the demos section of Open Model Zoo.o.

In [7]:
decoder = OpenPoseDecoder()

### Process Results
A bunch of useful functions to transform results into poses.

First, pool the heatmap. Since pooling is not available in numpy, use a simple method to do it directly with numpy. Then, use non-maximum suppression to get the keypoints from the heatmap. After that, decode poses by using the decoder. Since the input image is bigger than the network outputs, you need to multiply all pose coordinates by a scaling factor.

In [8]:
# 2D pooling in numpy (from: https://stackoverflow.com/a/54966908/1624463)
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
    """
    2D Pooling

    Parameters:
        A: input 2D array
        kernel_size: int, the size of the window
        stride: int, the stride of the window
        padding: int, implicit zero paddings on both sides of the input
        pool_mode: string, 'max' or 'avg'
    """
    # Padding
    A = np.pad(A, padding, mode="constant")

    # Window view of A
    output_shape = (
        (A.shape[0] - kernel_size) // stride + 1,
        (A.shape[1] - kernel_size) // stride + 1,
    )
    kernel_size = (kernel_size, kernel_size)
    A_w = as_strided(
        A,
        shape=output_shape + kernel_size,
        strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides
    )
    A_w = A_w.reshape(-1, *kernel_size)

    # Return the result of pooling.
    if pool_mode == "max":
        return A_w.max(axis=(1, 2)).reshape(output_shape)
    elif pool_mode == "avg":
        return A_w.mean(axis=(1, 2)).reshape(output_shape)

In [9]:
# non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
    return heatmaps * (heatmaps == pooled_heatmaps)

In [10]:
# Get poses from results.
def process_results(img, pafs, heatmaps):
    # This processing comes from
    # https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/common/python/models/open_pose.py
    pooled_heatmaps = np.array(
        [[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]]
    )
    nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)

    # Decode poses.
    poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
    output_shape = list(compiled_model.output(index=0).partial_shape)
    output_scale = img.shape[1] / output_shape[3].get_length(), img.shape[0] / output_shape[2].get_length()
    # Multiply coordinates by a scaling factor.
    poses[:, :, :2] *= output_scale
    return poses, scores

## Draw Pose Overlays
Draw pose overlays on the image to visualize estimated poses. Joints are drawn as circles and limbs are drawn as lines. The code is based on the Human Pose Estimation Demo from Open Model Zoo.

In [11]:
colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
          (255, 170, 0), (0, 255, 0), (255, 255, 0), (0, 255, 85), (170, 255, 0), (0, 85, 255),
          (0, 255, 170), (0, 0, 255), (0, 255, 255), (85, 0, 255), (0, 170, 255))

default_skeleton = ((15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7),
                    (6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6))


def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
    if poses.size == 0:
        return img

    img_limbs = np.copy(img)
    for pose in poses:
        points = pose[:, :2].astype(np.int32)
        points_scores = pose[:, 2]
        # Draw joints.
        for i, (p, v) in enumerate(zip(points, points_scores)):
            if v > point_score_threshold:
                cv2.circle(img, tuple(p), 1, colors[i], 2)
        # Draw limbs.
        for i, j in skeleton:
            if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
                cv2.line(img_limbs, tuple(points[i]), tuple(points[j]), color=colors[j], thickness=4)
    cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
    return img

## Main Processing Function
Run pose estimation on the specified source. Either a webcam or a video file.

In [12]:
# Main processing function to run pose estimation.
outputExists = False
def run_pose_estimation(source, flip=False, use_popup=False, skip_first_frames=0, save=False):
    pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
    heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
    player = None
    try:
        # Create a video player to play with target fps.
        player = utils.VideoPlayer(source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
        # Start capturing.
        player.start()
        if use_popup:
            title = "Press ESC to Exit"
            cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

        processing_times = collections.deque()

        if save:
            file_path = os.path.join(os.path.abspath(os.getcwd()),app.config['UPLOAD_FOLDER'],"output.mp4")
            if os.path.exists(file_path):
                os.remove(file_path)
            
            global outputExists
            outputExists = False
            
        while True:
            # Grab the frame.
            frame = player.next()
            if frame is None:
                print("Source ended")
                break
            # If the frame is larger than full HD, reduce size to improve the performance.
            scale = 1280 / max(frame.shape)
            if scale < 1:
                frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)

            # Resize the image and change dims to fit neural network input.
            # (see https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/intel/human-pose-estimation-0001)
            input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
            # Create a batch of images (size = 1).
            input_img = input_img.transpose((2,0,1))[np.newaxis, ...]

            # Measure processing time.
            start_time = time.time()
            # Get results.
            results = compiled_model([input_img])
            stop_time = time.time()

            pafs = results[pafs_output_key]
            heatmaps = results[heatmaps_output_key]
            # Get poses from network results.
            poses, scores = process_results(frame, pafs, heatmaps)

            # Draw poses on a frame.
            frame = draw_poses(frame, poses, 0.1)

            processing_times.append(stop_time - start_time)
            # Use processing times from last 200 frames.
            if len(processing_times) > 200:
                processing_times.popleft()

            _, f_width = frame.shape[:2]
            # mean processing time [ms]
            processing_time = np.mean(processing_times) * 1000
            fps = 1000 / processing_time
            cv2.putText(frame, f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)", (20, 40),
                        cv2.FONT_HERSHEY_COMPLEX, f_width / 1000, (0, 0, 255), 1, cv2.LINE_AA)

            if save:
                if not os.path.exists(file_path):
                    fps = 20.0
                    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
                    size = (int(frame.shape[1]), int(frame.shape[0]))
                    out = cv2.VideoWriter(file_path, fourcc, fps, size)
                    
                out.write(frame)
            # Use this workaround if there is flickering.
            if use_popup:
                cv2.imshow(title, frame)
                key = cv2.waitKey(1)
                # escape = 27
                if key == 27:
                    break
            else:
                # Encode numpy array to jpg.
                _, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
                # Create an IPython image.
                # i = display.Image(data=encoded_img)
                # # Display the image in this notebook.
                # display.clear_output(wait=True)
                # display.display(i)
                encoded_img = encoded_img.tobytes()
                yield (b'--frame\r\n'
                       b'Content-Type: image/jpeg\r\n\r\n' + encoded_img + b'\r\n')
    # ctrl-c
    except KeyboardInterrupt:
        print("Interrupted")
    # any different error
    except RuntimeError as e:
        print(e)
    finally:
        if player is not None:
            # Stop capturing.
            player.stop()
        if use_popup:
            cv2.destroyAllWindows()
            
    if save:
        out.release()
        outputExists = True

    # return render_template('show_video_inference.html')

## Run


### Run Live Pose Estimation
Use a webcam as the video input. By default, the primary webcam is set with source=0. If you have multiple webcams, each one will be assigned a consecutive number starting at 0. Set flip=True when using a front-facing camera. Some web browsers, especially Mozilla Firefox, may cause flickering. If you experience flickering, set use_popup=True.

NOTE: To use this notebook with a webcam, you need to run the notebook on a computer with a webcam. If you run the notebook on a server (for example, Binder), the webcam will not work. Popup mode may not work if you run this notebook on a remote computer (for example, Binder).

If you do not have a webcam, you can still run this demo with a video file. Any format supported by OpenCV will work. You can skip first N frames to fast forward video.

Run the pose estimation:

<img src="https://flask.palletsprojects.com/en/3.0.x/_images/flask-horizontal.png">
## Flask

### Flask configuration
<a href="https://flask.palletsprojects.com/en/3.0.x/config/"> Configuration Basics </a>

In [13]:
# Import the necessary libraries to utilize Flask.
from flask import Flask, render_template, session, send_file, Response, send_from_directory, request
from flask_wtf import FlaskForm
from wtforms import FileField, SubmitField
from wtforms.validators import InputRequired

# Import the necessary libraries to utilize URL.
from urllib.parse import urlparse
from urllib.request import urlretrieve
from pytube import YouTube

import os
from functools import wraps

In [14]:
app = Flask(__name__)
app.config['SECRET_KEY'] = 'random_numbers_and_letters'
app.config['UPLOAD_FOLDER'] = 'static\data'

### Flask Routing
<a href="https://flask.palletsprojects.com/en/3.0.x/quickstart/#routing"> Routing </a>

In [15]:
def inputready(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        path = os.path.join(os.path.abspath(os.getcwd()), app.config['UPLOAD_FOLDER'],"input.mp4")
        if os.path.exists(path):
            os.remove(path)
        return func(*args, **kwargs)
    return wrapper

In [16]:
class UploadFileForm(FlaskForm):
    file = FileField("File", validators = [InputRequired()])
    submit = SubmitField("Submit")

In [17]:
@app.route('/', methods=['GET', 'POST'])
@inputready
def index():
    form = UploadFileForm() 
    
    if form.validate_on_submit(): 
        file = form.file.data
        file_type = str(os.path.splitext(file.filename)[1])
        session['file_type'] = file_type
         
        if file_type in {'.mp4'}: 
            file_path = os.path.join(os.path.abspath(os.getcwd()), app.config['UPLOAD_FOLDER'],"input.mp4") 
            file.save(file_path)
            return inference_video()
        
    return render_template('index.html', form = form)

In [18]:
@app.route("/inference_url", methods=['GET', 'POST'])
@inputready
def inference_url():    
    url = request.form['url']
    path = os.path.join(os.path.abspath(os.getcwd()), app.config['UPLOAD_FOLDER'])
    
    if "youtube" in urlparse(url).netloc:
        download_video_from_youtube(url, path)
    else:
        urlretrieve(url, os.path.join(path, 'input.mp4'))

    return inference_video()

In [19]:
def download_video_from_youtube(link, path):
    yt = YouTube(link)
    video = yt.streams.get_highest_resolution()

    video.download(output_path=path, filename='input.mp4')

In [20]:
@app.route("/inference_video")
def inference_video():    
    return render_template('show_video_inference.html')

In [21]:
@app.route('/run_inference_video')
def run_inference_video():
    video_file = os.path.join(os.path.abspath(os.getcwd()), app.config['UPLOAD_FOLDER'],"input.mp4")
    return Response(run_pose_estimation(source=video_file, save=True), mimetype='multipart/x-mixed-replace; boundary=frame')

In [22]:
@app.route("/download_exists")
def download_exists():
    global outputExists
    if outputExists:
        return "OK", 200
    return "not yet", 403

In [23]:
@app.route("/download_output")
def download_output():
    global outputExists
    if outputExists:
        return send_from_directory(directory=app.config['UPLOAD_FOLDER'], path="output.mp4", as_attachment=True)
    return "not yet", 403

In [None]:
if __name__=='__main__':
    app.run(host='0.0.0.0', port=8000)

 * Serving Flask app '__main__'
 * Debug mode: off


 * Running on all addresses (0.0.0.0)
 * Running on http://127.0.0.1:8000
 * Running on http://10.2.27.215:8000
Press CTRL+C to quit
