In [None]:
%load_ext autoreload
%autoreload 2
import os
import ujson
import multiprocessing as mp
import time
import cv2
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import collections as mc

from config import *
from openpose_helpers import *
from run_openpose import run_openpose

print(OUTPUT) 
# edit OUTPUT in config.py
# NOTE: when running on large datasets, make sure this is 
# a place you have space for millions of tiny files!

In [None]:
DEMO_OUTPUT = os.path.join(OUTPUT, 'openpose_demo')
os.makedirs(DEMO_OUTPUT, exist_ok=True)

In [None]:
# Download openpose demo video into demo directory
pos_vid_path = os.path.join(DEMO_OUTPUT, 'positive_test_video.avi')
neg_vid_path = os.path.join(DEMO_OUTPUT, 'negative_test_video.avi')

# Download positive (people in every frame) example video
if not os.path.exists(pos_vid_path):
    print('Downloading positive video...')
    !wget https://github.com/CMU-Perceptual-Computing-Lab/openpose/raw/master/examples/media/video.avi \
        -O $DEMO_OUTPUT/positive_test_video.avi
else:
    print('Already downloaded positive video')
    
# Generate negative (no people) video with only black frames
if not os.path.exists(neg_vid_path):
    print('Generating negative video...')
    out = cv2.VideoWriter(neg_vid_path,cv2.VideoWriter_fourcc(*'DIVX'), 
                          15, (200,200), False)        
    black_frame = np.zeros((200,200)).astype('uint8')
    img_array = [black_frame] * 10
    for i in range(len(img_array)):
        out.write(img_array[i])
    out.release()
else:
    print('Already generated negative video')

In [None]:
# Submit job to run openpose on this video + condense the outputs into one dataframe
op_output_dir = os.path.join(DEMO_OUTPUT, 'openpose_raw_json')
condensed_output_dir = os.path.join(DEMO_OUTPUT, 'openpose_condensed')

run_openpose(pos_vid_path, op_output_dir, 
             condensed_output_dir=condensed_output_dir, 
             keypoint_scale=3, condense=True, overwrite=False)

run_openpose(neg_vid_path, op_output_dir, 
             condensed_output_dir=condensed_output_dir, 
             keypoint_scale=3, condense=True, overwrite=False)

In [None]:
# Sanity check Openpose by pulling a random frame and seeing if keypoints align with
# people's bodies

def get_frame(vid_path, frame):
    cap = cv2.VideoCapture(vid_path)
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame)
    return cap.read()[1]

def sanity_check_op_keypts(vid_path, vid_df_path, keypts_normalized=True):
    """
    Given a vid-level JSON, plot a random frame in the video containing openpose 
    keypoints to sanity-check that the openpose keypoints appear sensible.
    
    Arguments:
    vid_path          -- path to the video to sanity check
    vid_df_path       -- path to the video's Openpose condensed 
                        JSON dataframe to sanity check
    keypts_normalized -- True if keypoints in Openpose outputs are normalized
    (i.e. x and y values in range [0, 1]), else False
    """
    df = pd.read_json(vid_df_path)
    openpose_npy = recover_npy(df)
    rand_frame = np.random.randint(0, len(df))
    img = get_frame(vid_path, rand_frame)
    plt.figure(figsize=(10, img.shape[0]/img.shape[1]*10))
    x_factor = img.shape[1] if keypts_normalized else 1
    y_factor = img.shape[0] if keypts_normalized else 1
    for person in openpose_npy[rand_frame]:
        plt.scatter(person[0, NPY_FACE_START:NPY_FACE_END]*x_factor, 
                    person[1, NPY_FACE_START:NPY_FACE_END]*y_factor, 
                    c='C0', s=8, label='Face keypoints')
        plt.scatter(person[0, NPY_POSE_START:NPY_POSE_END]*x_factor,  
                    person[1, NPY_POSE_START:NPY_POSE_END]*y_factor,  
                    c='C1', s=8, label='Pose keypoints')
        plt.scatter(person[0, NPY_HAND_LEFT_START:NPY_HAND_LEFT_END]*x_factor,   
                    person[1, NPY_HAND_LEFT_START:NPY_HAND_LEFT_END]*y_factor,   
                    c='C2', s=8, label='Left hand keypoints')
        plt.scatter(person[0, NPY_HAND_RIGHT_START:NPY_HAND_RIGHT_END]*x_factor,    
                    person[1, NPY_HAND_RIGHT_START:NPY_HAND_RIGHT_END]*y_factor,    
                    c='C3', s=8, label='Right hand keypoints')
    handles, labels = plt.gca().get_legend_handles_labels()
    by_label = dict(zip(labels, handles))
    plt.legend(by_label.values(), by_label.keys())    
    plt.imshow(img)

In [None]:
# Keypoints appear in sensible places
pos_df_path = os.path.join(condensed_output_dir, 'positive_test_video.json')
sanity_check_op_keypts(pos_vid_path, pos_df_path, keypts_normalized=True)

In [None]:
# No keypoints appear (since the frame is black and no people detected)
neg_df_path = os.path.join(condensed_output_dir, 'negative_test_video.json')
sanity_check_op_keypts(neg_vid_path, neg_df_path, keypts_normalized=True)