## A notebook for exploring video posedata

**Intended use:** the user selects a video that is accompanied by already extracted posedata in a .json file. The notebook provides visualizations that summarize the quality and content of the poses extracted across all frames of the video, as well as armature plots of the detected poses in a selected frame. These can be viewed separately from the source video and even animated.

Note that at present, this only works with .json output files generated via the Open PifPaf command-line tools.

In [None]:
from datetime import datetime, timedelta
from pathlib import Path
from time import sleep

from bokeh.io import output_notebook
from bokeh.layouts import column, row
from bokeh.models import (
    DatetimeTickFormatter,
    Range1d,
    LinearAxis,
    Slider,
    Div,
    TapTool,
    CrosshairTool,
    Button,
    Toggle,
)
from bokeh.models.widgets.inputs import Select
from bokeh.plotting import figure, show
from bokeh.themes import Theme
import cv2
from IPython.display import HTML, display
from ipywidgets import Dropdown, Layout
import jsonlines
import numpy as np
from PIL import Image, ImageDraw


### Build and display the video/posedata selector widget

**Important:** for a video to appear in the dropdown menu, the video and its posedata output file must be present at the path specified in `source_data_folder`, which is by default the folder containing this notebook. The names of the matched video and posedata files should be identical, other than that the posedata file will have .openpifpaf.json appended to the name of the video file.

In [None]:
source_data_folder = Path.cwd()


def get_available_videos(data_folder):
    """Available videos will be limited to those with a .json and matching video (.mp4, .avi, etc)
    file in a predefined directory (the notebook's running directory, for now)"""
    available_json_files = list(data_folder.glob("*.json"))
    available_video_files = (
        p.resolve()
        for p in Path(data_folder).glob("*")
        if p.suffix in {".avi", ".mp4", ".mov", ".mkv", ".webm"}
        and "openpifpaf" not in p.name
    )
    available_json = [
        json_file.stem.split(".")[0] for json_file in available_json_files
    ]

    available_videos = []

    for video_name in available_video_files:
        if video_name.stem.split(".")[0] in available_json:
            available_videos.append(video_name.name)

    return available_videos


select_msg = (
    "<style>.widget-label { min-width: 20ex !important; }</style>"
    "<body><p>Please select the video to explore from the dropdown list. To be available in the list, "
    "the pose estimation output .json file and the original video file must share the same name, except "
    f"for the .openpifaf.json extension, and be stored in {source_data_folder}/.</p></body>"
)

display(HTML(select_msg))

video_selector = Dropdown(
    options=get_available_videos(source_data_folder),
    description="Video to explore:",
    disabled=False,
    layout=Layout(width="60%", height="40px"),
)

video_selector


### Collect video and per-frame pose metadata for the selected video
The video should be selected from the drop-down menu above after running the cell

In [None]:
video_file = f"{source_data_folder}/{video_selector.value}"

pose_file = f"{source_data_folder}/{video_selector.value}.openpifpaf.json"

print("Video file:", video_file)
print("Posedata file:", pose_file)

video_name = ".".join(video_file.split(".")[:-1])

cap = cv2.VideoCapture(video_file)
video_frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
video_fps = cap.get(cv2.CAP_PROP_FPS)
video_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
video_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

print("Video FPS:", video_fps)

print("Processing video and JSON files, please wait...")

pose_json = jsonlines.open(pose_file)
pose_data = []

# Per-frame pose data: frame, seconds, num_poses, avg_pose_conf, avg_coords_per_pose
pose_series = {
    "frame": [],
    "seconds": [],
    "timestamp": [],
    "num_poses": [],
    "avg_score": [],
    "avg_coords_per_pose": [],
}

for frame in pose_json:

    pose_data.append(frame)

    # Frame output is numbered from 1 in the JSON
    seconds = float(frame["frame"] - 1) / video_fps

    num_poses = len(frame["predictions"])
    pose_series["num_poses"].append(num_poses)

    pose_series["frame"].append(frame["frame"] - 1)
    pose_series["seconds"].append(seconds)

    # Construct a timestamp that can be used with Bokeh's DatetimeTickFormatter
    td = timedelta(seconds=seconds)
    datestring = str(td)
    if td.microseconds == 0:
        datestring += ".000000"
    dt = datetime.strptime(datestring, "%H:%M:%S.%f")

    pose_series["timestamp"].append(dt)

    pose_scores = []
    pose_coords_counts = []
    avg_score = 0  # NaN for empty frames?
    avg_coords_per_pose = 0

    for pose in frame["predictions"]:

        # ??? Do something with the bbox? The avg ratio of bbox area to full screen can indicate closeup
        # vs. long shot (could also run monoloco to get this kind of info + more)

        pose_scores.append(pose["score"])
        pose_coords = 0
        for i in range(0, len(pose["keypoints"]), 3):
            # Mostly ignore the coord confidence value, unless it's 0 (coord not found)
            # These seem to be averaged into the full pose "score" already
            if pose["keypoints"][i + 2] != 0:
                pose_coords += 1

        # To find the typically small proportion of poses that are complete
        # if pose_coords == 17:
        #     print(frame['frame'])

        pose_coords_counts.append(pose_coords)

    if num_poses > 0:
        avg_score = sum(pose_scores) / num_poses
        avg_coords_per_pose = sum(pose_coords_counts) / num_poses

    pose_series["avg_score"].append(avg_score)
    pose_series["avg_coords_per_pose"].append(avg_coords_per_pose)

print("Total frames:", len(pose_series["frame"]))

print("Duration:", pose_series["timestamp"][len(pose_series["timestamp"]) - 1].time())

print("Please run the next code cell to launch the explorer app.")


### Build and launch the explorer app

This displays an interactive chart visualization of the attributes of the posedata in the .json output file across the runtime of the video.

Clicking anywhere in the chart, moving the slider, or clicking the prev/next buttons will select a frame and draw the poses detected in that frame, with the option of displaying the actual image from the source video as the "background."

Please see the cell below the next if you are running this notebook in VS Code. Note also that the Jupyter server must be running on port 8888 for the explorer app to work in Jupyter/JupterLab.

In [None]:
# The body part numberings and armature connectors for the 17-keypoint COCO pose format are defined in
# https://github.com/openpifpaf/openpifpaf/blob/main/src/openpifpaf/plugins/coco/constants.py
# Note that the body part numbers in the connector (skeleton) definitions begin with 1, for some reason, not 0
OPP_COCO_SKELETON = [
    (16, 14),
    (14, 12),
    (17, 15),
    (15, 13),
    (12, 13),
    (6, 12),
    (7, 13),
    (6, 7),
    (6, 8),
    (7, 9),
    (8, 10),
    (9, 11),
    (2, 3),
    (1, 2),
    (1, 3),
    (2, 4),
    (3, 5),
    (4, 6),
    (5, 7),
]
OPP_COCO_COLORS = [
    "orangered",
    "orange",
    "blue",
    "lightblue",
    "darkgreen",
    "red",
    "lightgreen",
    "pink",
    "plum",
    "purple",
    "brown",
    "saddlebrown",
    "mediumorchid",
    "gray",
    "salmon",
    "chartreuse",
    "lightgray",
    "darkturquoise",
    "goldenrod",
]

UPSCALE = 3  # See draw_frame()

# Default dimensions of the output visualizations
FIGURE_WIDTH = 950
FIGURE_HEIGHT = 500


def image_from_video_frame(video_file, frameno):
    """Grabs the specified frame from the video and converts it into an RGBA array"""
    cap = cv2.VideoCapture(video_file)
    cap.set(1, frameno)
    ret, img = cap.read()
    rgba_img = cv2.cvtColor(img, cv2.COLOR_RGB2RGBA)
    image = np.asarray(rgba_img)
    return image


def draw_frame(frame, bg_img=None):
    """Draws the poses in the specified frame, superimposing them on the frame image, if provided."""

    # The only way to get smooth(er) lines in the pose armatures via PIL ImageDraw is to upscale the entire
    # image by some factor, draw the lines, then downscale back to the original resolution while applying
    # Lanczos resampling, because ImageDraw doesn't do any native anti-aliasing.
    if bg_img is None:
        bg_img = Image.new("RGBA", (video_width * UPSCALE, video_height * UPSCALE))
    else:
        bg_img = bg_img.resize((video_width * UPSCALE, video_height * UPSCALE))

    drawing = ImageDraw.Draw(bg_img)

    for pose_prediction in frame["predictions"]:
        pose_coords = np.array_split(
            pose_prediction["keypoints"], len(pose_prediction["keypoints"]) / 3
        )

        for i, seg in enumerate(OPP_COCO_SKELETON):

            if pose_coords[seg[0] - 1][2] == 0 or pose_coords[seg[1] - 1][2] == 0:
                continue

            line_color = OPP_COCO_COLORS[i]
            shape = [
                (
                    int(pose_coords[seg[0] - 1][0] * UPSCALE),
                    int(pose_coords[seg[0] - 1][1] * UPSCALE),
                ),
                (
                    int(pose_coords[seg[1] - 1][0] * UPSCALE),
                    int(pose_coords[seg[1] - 1][1]) * UPSCALE,
                ),
            ]
            drawing.line(shape, fill=line_color, width=2 * UPSCALE)

    bg_img = bg_img.resize(
        (video_width, video_height), resample=Image.Resampling.LANCZOS
    )

    return bg_img


def pil_to_bokeh_image(pil_img):
    """The Bokeh interactive notebook tools will only display image data if it's formatted in a particular way"""
    img_array = np.array(pil_img.transpose(Image.Transpose.FLIP_TOP_BOTTOM))

    img = np.empty(img_array.shape[:2], dtype=np.uint32)
    view = img.view(dtype=np.uint8).reshape(img_array.shape)

    for i in range(video_height):
        for j in range(video_width):
            view[i, j, 0] = img_array[i, j, 0]
            view[i, j, 1] = img_array[i, j, 1]
            view[i, j, 2] = img_array[i, j, 2]
            view[i, j, 3] = img_array[i, j, 3]

    return img


def bkapp(doc):
    """Define and run the Bokeh interactive notebook (Python + Javascript) application"""

    max_y = max(pose_series["avg_coords_per_pose"] + pose_series["num_poses"])

    # This is the main interactive chart figure
    p = figure(
        width=FIGURE_WIDTH,
        height=FIGURE_HEIGHT,
        title=video_name,
        min_border=10,
        y_range=(0, max_y + 1),
        tools="save,box_zoom,pan,reset",
    )
    # Format the X axis as hour-minute-second timecodes
    p.x_range = Range1d(min(pose_series["timestamp"]), max(pose_series["timestamp"]))
    p.xaxis.axis_label = "Time"
    time_formatter = DatetimeTickFormatter(
        hourmin="%H:%M:%S",
        minutes="%H:%M:%S",
        minsec="%H:%M:%S",
        seconds="%Ss",
        milliseconds="%3Nms",
    )
    p.line(
        pose_series["timestamp"],
        pose_series["num_poses"],
        legend_label="Poses per frame",
        color="blue",
        alpha=0.6,
        line_width=2,
    )
    p.line(
        pose_series["timestamp"],
        pose_series["avg_coords_per_pose"],
        legend_label="Coords per pose",
        color="red",
        alpha=0.6,
        line_width=2,
    )
    # The left Y axis corresponds to counts of poses and coordinates
    p.yaxis.axis_label = "Poses or Coords"
    p.extra_y_ranges = {"avg_score": Range1d(0, 1)}
    p.line(
        pose_series["timestamp"],
        pose_series["avg_score"],
        y_range_name="avg_score",
        legend_label="Avg pose score",
        color="green",
        alpha=0.4,
        line_width=2,
    )
    # The right Y axis corresponds to the average pose score (from 0 to 1)
    p.add_layout(
        LinearAxis(y_range_name="avg_score", axis_label="Avg Pose Score"), "right"
    )
    p.xaxis.formatter = time_formatter
    p.xaxis.ticker.desired_num_ticks = 10
    p.legend.click_policy = "hide"

    def tap_callback(event):
        """When the chart is clicked, move the slider to the appropriate frame"""
        # event.x is a timestamp, so it needs to be converted to a frameno
        start_dt = datetime(1900, 1, 1)
        dt = datetime.utcfromtimestamp(event.x / 1000)
        t_delta = dt - start_dt
        clicked_frame = round(t_delta.total_seconds() * video_fps)
        slider.value = clicked_frame

    tap = TapTool()
    crosshair = CrosshairTool()

    def get_frame_info(fn):
        return f"Frame info: {pose_series['num_poses'][fn]} detected poses, {pose_series['avg_coords_per_pose'][fn]} avg coords/pose, {pose_series['avg_score'][fn]} avg pose score"

    info_div = Div(text=get_frame_info(0))

    p.add_tools(tap, crosshair)
    p.on_event("tap", tap_callback)

    # This is the second figure, where the poses in the selected frame are drawn
    frame_p = figure(
        x_range=(0, video_width),
        y_range=(0, video_height),
        width=FIGURE_WIDTH,
        height=int(FIGURE_WIDTH / video_width * video_height),
        title="Poses in selected frame",
        tools="save",
    )
    # Add an invisible glyph to suppress the "figure has no renderers" warning
    frame_p.circle(0, 0, size=0, alpha=0.0)

    def background_toggle_handler(event):
        """When the image underlay is toggled on or off, prompt the slider to redraw the frame"""
        slider_callback(None, slider.value, slider.value)

    background_switch = Toggle(label="show background", active=False)
    background_switch.on_click(background_toggle_handler)

    def slider_callback(attr, old, new):
        """When the slider moves, draw the poses in the new frame and show the background if desired"""
        frame_p.renderers = []
        if background_switch.active:
            cap = cv2.VideoCapture(video_file)
            cap.set(1, new)
            ret, img = cap.read()
            rgb_bg = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            rgba_bg = cv2.cvtColor(rgb_bg, cv2.COLOR_RGB2RGBA)
            pil_bg = Image.fromarray(rgba_bg)
            img = draw_frame(pose_data[new], pil_bg)
        else:
            img = draw_frame(pose_data[new])
        img = pil_to_bokeh_image(img)
        frame_p.image_rgba(image=[img], x=0, y=0, dw=img.shape[1], dh=img.shape[0])
        info_div.text = get_frame_info(new)

    slider = Slider(
        start=0, end=len(pose_data) - 1, value=0, step=1, title="Selected frame"
    )
    slider.on_change("value", slider_callback)

    def prev_handler(event):
        slider.value = max(0, slider.value - 1)

    def next_handler(event):
        slider.value = min(slider.value + 1, len(pose_data) - 1)

    prev_button = Button(label="prev")
    prev_button.on_click(prev_handler)
    next_button = Button(label="next")
    next_button.on_click(next_handler)

    control_row = row(children=[prev_button, next_button, background_switch])

    layout_column = column(p, slider, info_div, control_row, frame_p)
    
    doc.add_root(layout_column)


output_notebook()

show(bkapp, notebook_url="localhost:8888")


**Running the notebook in VS Code:** As of late 2022, if you are running this notebook in VS Code instead of Jupyter or JupyterLab, the above cell will not work (BokehJS will load, but no figures will appear) without the following workaround:

Take note of the error message that appears when you try to run the cell above, particularly the long alphanumeric string suggested as a value for `BOKEH_ALLOW_WS_ORIGIN`. Copy this string, then uncomment the last two lines in the cell below, paste the alphanumeric string in place of the `INSERT_BOKEH_ALLOW_WS_ORIGIN_VALUE_HERE` text, run the cell, then try running the cell above to launch the explorer app again. It should work now.

In [None]:
# If you are following the steps above to run the explorer app in VS Code,
# uncomment the following lines (remove the '#'s) before running this cell:
#import os
#os.environ["BOKEH_ALLOW_WS_ORIGIN"] = "INSERT_BOKEH_ALLOW_WS_ORIGIN_VALUE_HERE"

### Demo of frame-by-frame pose drawing

The cell below uses a different viz library to draw the poses in each successive frame on an HTML canvas, at the same frame rate as the source video.

Note that this drawing library (`ipycanvas`) doesn't play well with the Bokeh interactive application above, which is why the somewhat clunkier PIL ImageDraw library is used to draw the poses there instead.

In [None]:
from ipycanvas import Canvas, hold_canvas

canvas = Canvas(width=video_width, height=video_height, sync_image_data=True)

display(canvas)


def draw_frame_on_canvas(frame, canvas):

    for pose_prediction in frame["predictions"]:
        pose_coords = np.array_split(
            pose_prediction["keypoints"], len(pose_prediction["keypoints"]) / 3
        )

        for i, seg in enumerate(OPP_COCO_SKELETON):

            if pose_coords[seg[0] - 1][2] == 0 or pose_coords[seg[1] - 1][2] == 0:
                continue

            canvas.stroke_style = OPP_COCO_COLORS[i]
            canvas.line_width = 2

            canvas.stroke_line(
                pose_coords[seg[0] - 1][0],
                pose_coords[seg[0] - 1][1],
                pose_coords[seg[1] - 1][0],
                pose_coords[seg[1] - 1][1],
            )


# This will "animate" all of the detected poses starting from the beginning of the video
for frame in pose_data:

    with hold_canvas():

        canvas.clear()

        draw_frame_on_canvas(frame, canvas)

        sleep(1 / video_fps)
