<a href="https://colab.research.google.com/github/LukeHaberkamp/2D-Pose-Estimation/blob/main/2D_Pose_Estimation_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Run the cell below to install and build OpenPose

In [None]:
import os,sys
from os.path import exists, join, basename, splitext

git_repo_url = 'https://github.com/CMU-Perceptual-Computing-Lab/openpose.git'
project_name = splitext(basename(git_repo_url))[0]
if not exists(project_name):
  # see: https://github.com/CMU-Perceptual-Computing-Lab/openpose/issues/949
  # install new CMake becaue of CUDA10
  !wget -q https://cmake.org/files/v3.13/cmake-3.13.0-Linux-x86_64.tar.gz
  !tar xfz cmake-3.13.0-Linux-x86_64.tar.gz --strip-components=1 -C /usr/local
  # clone openpose
  !git clone -q --depth 1 $git_repo_url
  !sed -i 's/execute_process(COMMAND git checkout master WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}\/3rdparty\/caffe)/execute_process(COMMAND git checkout f019d0dfe86f49d1140961f8c7dec22130c83154 WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}\/3rdparty\/caffe)/g' openpose/CMakeLists.txt
  # install system dependencies
  !apt-get -qq install -y libatlas-base-dev libprotobuf-dev libleveldb-dev libsnappy-dev libhdf5-serial-dev protobuf-compiler libgflags-dev libgoogle-glog-dev liblmdb-dev opencl-headers ocl-icd-opencl-dev libviennacl-dev 
  !pip install xlsxwriter
  # build openpose
  !cd openpose && rm -rf build || true && mkdir build && cd build && cmake .. && make -j`nproc`

  print("Openpose is ready!")

To analyze a video, run the cell below to upload a video into the virtual workspace for analysis.

---


Or if videos are saved in you Google Drive, uncomment the code which has been commented out to load and utilize them in the analysis.

In [None]:
from google.colab import files
uploaded = files.upload()
myVideo = next(iter(uploaded))

## Uncomment the lines below to access videos from your Google Drive
# from google.colab import drive
# drive.mount('/content/drive')
# myVideo = "/content/drive/MyDrive/myVideo.mp4"

Run the code below to apply the pose estimation model and perform kinematic analysis on your video

In [None]:
import pandas as pd
import numpy as np
import os
import cv2
import json
from scipy import signal, interpolate, stats
import matplotlib.pyplot as plt
from string import ascii_letters
import more_itertools as mit
from functools import reduce

if os.path.exists("/content/openpose/output/"):
  !rm -rf /content/openpose/output/

!ffmpeg -y -loglevel info -i $myVideo video.mp4 
!cd /content/openpose && ./build/examples/openpose/openpose.bin --video ../video.mp4 --number_people_max 1 --write_json ./output/ --display 0 --write_video ../openpose.avi
# convert the result into MP4
!ffmpeg -y -loglevel info -i openpose.avi -vf "drawtext=fontfile=Arial.ttf:text='Frame = %{frame_num}':start_number=1:x=w-tw-10:y=10:fontcolor=white:fontsize=30:" -c:a copy output.mp4

path_to_json = "/content/openpose/output/"

# video file
cap = cv2.VideoCapture("output.mp4")

# get frames/sec of the video file (1 FPS = 1 Hz)
fps = cap.get(cv2.CAP_PROP_FPS)

# obtain the time delta
dt = 1 / fps

# Import Json files, pos_json = position JSON
json_files = [pos_json for pos_json in os.listdir(path_to_json) if pos_json.endswith('.json')]

# obtain the time vector
time = []
for i in range(len(json_files)):
    t = dt * i
    time.append(t)

time = pd.Series(time)


def filter_data(og_df):
    # removes keypoints with a confidence score below 0.05
    df = og_df[og_df['acc'] > 0.05]
    # fill the orginal dataframe with nan as a placeholder
    og_df.loc[:] = np.nan
    final_df = og_df
    # remove outliers using Median Absolute Deviation
    MAD = stats.median_absolute_deviation(df)
    median = df.median()
    threshold = 3
    stat = np.abs((df - median) / MAD)
    df = df[(stat < threshold).any(1)]
    # set the cutoff frequency of the butterworth filter to 6 Hz
    fc = 6
    # create a fourth order filter
    filter_order = 4
    # set the scipy butterworth filter. Divide the filter order by 2 to account for a dual pass
    # set the critical frequency as the cutoff frequency divided by the Nyquist frequency
    b, a = signal.butter(filter_order/2, fc/(fps/2))
    # apply a zero-lag filter to the x and y coordinate data
    df['x'] = signal.filtfilt(b, a, df['x'])
    df['y'] = signal.filtfilt(b, a, df['y'])
    # copy filtered data over the nan placeholders
    og_df.iloc[df.index] = df
    # use interpolation to fill in the missing data
    og_df['x'].interpolate(method='linear', limit=2, inplace=True)
    og_df['y'].interpolate(method='linear', limit=2, inplace=True)
    return og_df

# function to calculate the distance between two points
def distance_eq(df2, df1):
    delta_y = np.square(df2['y'] - df1['y'])
    delta_x = np.square(df2['x'] - df1['x'])
    distance = np.sqrt(delta_y + delta_x)
    return distance

# function to determine the angle between 3 points using the cosine law
def cos_law(A, B, C):
    a = distance_eq(A, C)
    b = distance_eq(B, C)
    c = distance_eq(A, B)
    gamma = np.degrees(
        np.arccos((np.square(a)+np.square(b)-np.square(c))/(2*a*b)))
    gamma = 180 - gamma
    return gamma

# calculates the angle between a segment and the vertical of the global coordinate system
def calc_vertical_angle(prox, dist):
    delta_x = np.abs(prox['x'] - dist['x'])
    delta_y = -(prox['y'] - dist['y'])
    angle = np.degrees(np.arctan(delta_x/delta_y))
    return angle

# calculates the angle between the hip keypoints and the horizontal of the global coordinate system
# contralateral pelvic drop reveals a (+) angle
def calc_pd_angle(stance, swing):
    delta_x = np.abs(stance['x'] - swing['x'])
    delta_y = -(stance['y'] - swing['y'])
    angle = np.degrees(np.arctan(delta_y/delta_x))
    return angle


# calculates the angle between a segment and the horizontal of the global coordinate system
def calc_horizontal_angle(prox, dist):
    delta_x = prox['x'] - dist['x']
    delta_y = -(prox['y'] - dist['y'])
    angle = np.degrees(np.arctan(delta_y/delta_x))
    angle = angle.where(angle > 0, angle+180)
    return angle

# equation to determine the knee abduction angle (180 - knee frontal projection angle)
def calc_kneeABD(hip, knee, ank):
    thigh = 180 - calc_horizontal_angle(hip, knee)
    shank = calc_horizontal_angle(knee, ank)
    kfppa = thigh + shank
    knee_abd = 180 - kfppa
    return knee_abd


def __get_df(json_files, path_to_json):
    # initialize empty dataframes
    column_names = ['x', 'y', 'acc']
    neck_df = pd.DataFrame(columns=column_names)
    right_shoulder_df = pd.DataFrame(columns=column_names)
    left_shoulder_df = pd.DataFrame(columns=column_names)
    mid_hip_df = pd.DataFrame(columns=column_names)
    right_hip_df = pd.DataFrame(columns=column_names)
    left_hip_df = pd.DataFrame(columns=column_names)
    right_knee_df = pd.DataFrame(columns=column_names)
    left_knee_df = pd.DataFrame(columns=column_names)
    right_ankle_df = pd.DataFrame(columns=column_names)
    left_ankle_df = pd.DataFrame(columns=column_names)
    right_toe_df = pd.DataFrame(columns=column_names)
    left_toe_df = pd.DataFrame(columns=column_names)
    right_heel_df = pd.DataFrame(columns=column_names)
    left_heel_df = pd.DataFrame(columns=column_names)


    # Loop through all json files in output directory
    # Each file is a frame in the video
    for file in sorted(json_files):
        # extract the keypoint data from the json file
        ld = json.load(open(path_to_json + file))
        for dictionary in ld['people']:
            try:
                keypoints = dictionary['pose_keypoints_2d']
            except KeyError:
                pass
        # rearrange array to have 25 rows and 3 columns (x,y,acc)
        np_array = np.asarray(keypoints)
        temp = np.reshape(np_array, (25, 3))

        try:
            neck_df = neck_df.append({'x': temp[1, 0], 'y': temp[1, 1], 'acc': temp[1, 2]}, ignore_index=True)

            right_shoulder_df = right_shoulder_df.append({'x': temp[2, 0], 'y': temp[2, 1], 'acc': temp[2, 2]},
                                                         ignore_index=True)
            left_shoulder_df = left_shoulder_df.append({'x': temp[5, 0], 'y': temp[5, 1], 'acc': temp[5, 2]},
                                                       ignore_index=True)
            mid_hip_df = mid_hip_df.append({'x': temp[8, 0], 'y': temp[8, 1], 'acc': temp[8, 2]}, ignore_index=True)

            right_hip_df = right_hip_df.append({'x': temp[9, 0], 'y': temp[9, 1], 'acc': temp[9, 2]}, ignore_index=True)
            left_hip_df = left_hip_df.append({'x': temp[12, 0], 'y': temp[12, 1], 'acc': temp[12, 2]}, ignore_index=True)
            right_knee_df = right_knee_df.append({'x': temp[10, 0], 'y': temp[10, 1], 'acc': temp[10, 2]},
                                                 ignore_index=True)
            left_knee_df = left_knee_df.append({'x': temp[13, 0], 'y': temp[13, 1], 'acc': temp[13, 2]}, ignore_index=True)
            right_ankle_df = right_ankle_df.append({'x': temp[11, 0], 'y': temp[11, 1], 'acc': temp[11, 2]},
                                                   ignore_index=True)
            left_ankle_df = left_ankle_df.append({'x': temp[14, 0], 'y': temp[14, 1], 'acc': temp[14, 2]},
                                                 ignore_index=True)
            right_toe_df = right_toe_df.append({'x': temp[23, 0], 'y': temp[23, 1], 'acc': temp[23, 2]}, ignore_index=True)

            left_toe_df = left_toe_df.append({'x': temp[20, 0], 'y': temp[20, 1], 'acc': temp[20, 2]}, ignore_index=True)

            right_heel_df = right_heel_df.append({'x': temp[24, 0], 'y': temp[24, 1], 'acc': temp[24, 2]},
                                                 ignore_index=True)

            left_heel_df = left_heel_df.append({'x': temp[21, 0], 'y': temp[21, 1], 'acc': temp[21, 2]}, ignore_index=True)

        except:
            print('bad point set at: ', file)

    # remove outliers, filter, and interpolate the raw kinematic data
    right_shoulder_df = filter_data(right_shoulder_df)
    left_shoulder_df = filter_data(left_shoulder_df)
    mid_hip_df = filter_data(mid_hip_df)
    right_hip_df = filter_data(right_hip_df)
    left_hip_df = filter_data(left_hip_df)
    right_knee_df = filter_data(right_knee_df)
    left_knee_df = filter_data(left_knee_df)
    right_ankle_df = filter_data(right_ankle_df)
    left_ankle_df = filter_data(left_ankle_df)
    right_toe_df = filter_data(right_toe_df)
    left_toe_df = filter_data(left_toe_df)
    right_heel_df = filter_data(right_heel_df)
    left_heel_df = filter_data(left_heel_df)

    # use the proportion between the hip key points to determine the plane
    # in the sagittal plane the proportion is close to 1
    proportion_side = right_hip_df['x'].median() / left_hip_df['x'].median()
    # normalize the proportion based off the direction
    if proportion_side > 1:
        proportion_side = 1/proportion_side
    # use 0.8 as the cutoff between determining direction in the sagittal or frontal plane
    if proportion_side > 0.8:
        plane = "Sagittal"
    else:
        plane = "Frontal"

    if plane == "Sagittal":
          # determine the angle between the trunk and the vertical of the global coordinate system
          trunk_lean = calc_vertical_angle(neck_df, mid_hip_df)

          # use the cosine law to determine knee flexion
          right_knee_flexion = cos_law(right_hip_df, right_ankle_df, right_knee_df)
          
          # use the cosine law to determine hip flexion
          right_hip_flexion = cos_law(neck_df, right_knee_df, right_hip_df)
          
          # determine the ankle dorsiflexion angle, the angle between the foot and shank segment
          right_ankle_dorsiflexion = calc_horizontal_angle(right_heel_df, right_toe_df) - calc_horizontal_angle(right_knee_df, right_ankle_df) - 90

          # use the cosine law to determine knee flexion
          left_knee_flexion = cos_law(left_hip_df, left_ankle_df, left_knee_df)
          
          # use the cosine law to determine hip flexion
          left_hip_flexion = cos_law(neck_df, left_knee_df, left_hip_df)
                    
          # determine the ankle dorsiflexion angle, the angle between the foot and shank segment
          left_ankle_dorsiflexion = calc_horizontal_angle(left_heel_df, left_toe_df) - calc_horizontal_angle(left_knee_df, left_ankle_df) + 90
        
          # place the sagittal plane joint angle measurements into a singular dataframe
          data = {'Right Knee Flexion': right_knee_flexion,
                    'Right Hip Flexion': right_hip_flexion,
                    'Trunk Lean': trunk_lean,
                    'Right Ankle Dorsiflexion': right_ankle_dorsiflexion,
                    'Left Knee Flexion': left_knee_flexion,
                    'Left Hip Flexion': left_hip_flexion,
                    'Left Ankle Dorsiflexion': left_ankle_dorsiflexion}
            
          df = pd.DataFrame(data)
          return df

    else:
          # determine the hip keypoints and the horizontal of the global coordinate system
          # contralateral pelvic drop where the left hip < right hip reveals a (+) angle
          right_pelvic_drop = calc_pd_angle(right_hip_df, left_hip_df)
          
          # use the cosine law to determine hip adduction 
          right_hip_adduction = cos_law(left_hip_df, right_knee_df, right_hip_df) - 90

          # determine the knee abduction angle (180 - knee frontal projection angle)
          right_knee_abduction = calc_kneeABD(right_hip_df, right_knee_df, right_ankle_df)
          
          # determine the hip keypoints and the horizontal of the global coordinate system
          # contralateral pelvic drop where the right hip < left hip reveals a (+) angle
          left_pelvic_drop = calc_pd_angle(left_hip_df, right_hip_df)
         
          # use the cosine law to determine hip adduction 
          left_hip_adduction = cos_law(left_knee_df, right_hip_df, left_hip_df) - 90

          # determine the knee abduction angle (180 - knee frontal projection angle)
          left_knee_abduction = calc_kneeABD(left_hip_df, left_knee_df, left_ankle_df)

          # place the frontal plane joint angle measurements into a singular dataframe
          data = {'Right Contralateral Pelvic Drop': right_pelvic_drop,
                  'Right Hip Adduction': right_hip_adduction,
                  'Right Knee Abduction': right_knee_abduction,
                  'Left Contralateral Pelvic Drop': left_pelvic_drop,
                  'Left Hip Adduction': left_hip_adduction,
                  'Left Knee Abduction': left_knee_abduction}

          df = pd.DataFrame(data)
          return df

# obtain a dataframe with the relevant joint angle calculations
df = __get_df(json_files, path_to_json)

# function to export the results to excel 
def excel_export():
    # create a folder to place images of each joint angle vs. time graph
    if not os.path.exists('Vs. Time'):
        os.makedirs('Vs. Time')
    else:
        for f in os.listdir('Vs. Time/'):
            os.remove('Vs. Time/' + f)
    # graph the joint angle vs. time for each joint angle in the dataframe
    def makeGraph(xVar, folder, df, start_idx):
        col = list(df.columns)
        i = start_idx
        sizeofList = len(col)
        allowed = set(ascii_letters + ' ')
        while i < sizeofList:
            fig, ax = plt.subplots()
            plt.plot(time, df[col[i]])
            ax.set(xlabel=xVar, ylabel=col[i])
            img_title = ''.join(l for l in col[i] if l in allowed)
            ax.set_title(img_title + ' vs. ' + xVar)
            save_title = (str(i - start_idx) + '. ' + img_title)
            fig.savefig(folder + save_title + '.png')
            i += 1

    makeGraph('Time', 'Vs. Time/', df, 0)
    # initialize the excel and write the results to the file
    excel_filename = 'excelResults'
    writer = pd.ExcelWriter(excel_filename + '.xlsx', engine='xlsxwriter')
    workbook = writer.book
    worksheet1 = workbook.add_worksheet(name='Graphs')

    # insert images of the matplotlib graphs into excel
    image_row = 0
    image_col = 1
    filelist = os.listdir('Vs. Time/')
    for image in sorted(filelist):
        worksheet1.insert_image(image_row, image_col, 'Vs. Time/' + image)
        image_col += 11

    # export the joint angle dataframe to excel
    df.to_excel(writer, sheet_name='Raw Data', index=False)
    workbook.close()

# execute the export of the results to excel
excel_export()

print('calculations complete')

Run the cell below to download the video with the skeleton overlayed as well as an excel file with each joint angle graph and raw joint angle results. 

In [None]:
from google.colab import files
files.download('output.mp4') 
files.download('excelResults.xlsx')

To visualize the kinematics, select the joint angle you wish to analyze below. Ensure that the joint angle from the proper plane is selected.

In [None]:
sagittalJointAngle = "Trunk Lean" #@param ['Right Knee Flexion', 'Right Hip Flexion', 'Trunk Lean', 'Right Ankle Dorsiflexion', 'Left Knee Flexion', 'Left Hip Flexion','Left Ankle Dorsiflexion'] {type:"string"}

frontalJointAngle = "Right Knee Abduction" #@param ['Right Contralateral Pelvic Drop', 'Right Hip Adduction', 'Right Knee Abduction', 'Left Contralateral Pelvic Drop', 'Left Hip Adduction', 'Left Knee Abduction'] {type:"string"}

import plotly.graph_objects as go
if len(df.columns) == 7:
  fig = go.Figure([go.Scatter(x=time, y=df[sagittalJointAngle])])
  fig.update_layout(
    title=sagittalJointAngle + ' vs. Time',
    xaxis_title="Time (sec)",
    yaxis_title=sagittalJointAngle+ ' (degrees)',
  )
else:
  fig = go.Figure([go.Scatter(x=time, y=df[frontalJointAngle])])
  fig.update_layout(
    title=frontalJointAngle + ' vs. Time',
    xaxis_title="Time (sec)",
    yaxis_title=frontalJointAngle+ ' (degrees)',
  )

fig.show()