**Development and Verification of Postural Control Assessment Using Deep-Learning-Based Pose Estimators: Towards Clinical Applications**

In [None]:
# Import libraries
import glob
import cv2
import pathlib
import pickle
import csv
import numpy as np
import pandas as pd
from google.colab import drive

In [None]:
# Mount drive to access your drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


### Set Paths

In [None]:
# Path to the directory including the videos you want to analyze
INPUT_VIDEO_PATHS = glob.glob('/content/drive/MyDrive/Colab Notebooks/OTI2/videos/*.m[top][sv4]')
INPUT_VIDEO_PATHS = INPUT_VIDEO_PATHS[4:12]
# INPUT_VIDEO_PATHS = glob.glob('/content/drive/MyDrive/.../*.mp4')

# Participant's body orientation. 'R': Facing right side. 'L': Facing left side
BODY_ORIENTATIONS = ['R', 'R', 'R', 'R', 'L', 'L', 'L', 'L']

# Outstretched limbs. 'R': Right arm and left leg are extended. 'L': Left arm and right leg are extended
STRETCHED_LIMBS = ['L', 'R', 'L', 'R', 'R', 'L', 'L', 'R']

# Directory to output results
OUTPUT_DIR = '/content/drive/MyDrive/Colab Notebooks/OTI2/tmp'
# OUTPUT_DIR = '/content/drive/MyDrive/...'

In [None]:
# All lists must have the same number of elements
assert len(INPUT_VIDEO_PATHS) == len(BODY_ORIENTATIONS) == len(STRETCHED_LIMBS)

# Check if "BODY_ORIENTATIONS" and "STRETCHED_LIMBS" are what you intended
for PATH, BO, SL in zip(INPUT_VIDEO_PATHS, BODY_ORIENTATIONS, STRETCHED_LIMBS):
    print(PATH, BO, SL)

/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a03_Lt.mts R L
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a03_Rt.mts R R
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a04_Lt.mts R L
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a04_Rt.mts R R
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a05_Rt.mov L R
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a05_Lt.mov L L
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a06_Lt.mts L L
/content/drive/MyDrive/Colab Notebooks/OTI2/videos/a06_Rt.mts L R


### Install MediaPipe
Official Colab code: https://colab.research.google.com/drive/1uCuA6We9T5r0WljspEHWPHXCT_2bMKUy \\
Keypoint IDs: https://google.github.io/mediapipe/solutions/pose.html

In [None]:
! pip install mediapipe

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mediapipe
  Downloading mediapipe-0.8.10.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (32.9 MB)
[K     |████████████████████████████████| 32.9 MB 1.2 MB/s 
Installing collected packages: mediapipe
Successfully installed mediapipe-0.8.10.1


In [None]:
# You may change the parameters, such as `static_image_mode` and `min_detection_confidence`, during the initialization
# Run `help(mp_pose.Pose)` to get more informations about the parameters
import mediapipe as mp
mp_pose = mp.solutions.pose
mp_drawing = mp.solutions.drawing_utils 
mp_drawing_styles = mp.solutions.drawing_styles
# help(mp_pose.Pose)

### Calculate SPB and AG

In [None]:
# Calculate the angle in the range of 0-180 degrees
def tangent_angle(u: np.ndarray, v: np.ndarray):
    i = np.inner(u, v)
    n = np.linalg.norm(u) * np.linalg.norm(v)
    c = i / n
    return np.rad2deg(np.arccos(np.clip(c, -1.0, 1.0)))

In [None]:
# Initialize output csv file
csv_file = open(f'{OUTPUT_DIR}/PCA.csv', 'w')
csv_writer = csv.writer(csv_file)
csv_writer.writerow(['ID', 'Body direction', 'Outstretched limbs', 'Time', 'SPB1(whole)', 'SPB1(upper)', 'SPB1(lower)', 'SPB3(whole)', 'SPB3(upper)', 'SPB3(lower)', 'AG1(whole)', 'AG1(upper)', 'AG1(lower)', 'AG2(whole)', 'AG2(upper)', 'AG2(lower)'])

181

In [None]:
# Main loop ####################################################################
# It will take about 3 minutes for one minute video

# Threshold for the noise reduction
noise_threshold = 0.5

# Unit vector for computing AG
right_vec = np.array([1, 0]) - np.array([0, 0])
left_vec  = np.array([-1, 0]) - np.array([0, 0])

for PATH, BO, SL in zip(INPUT_VIDEO_PATHS, BODY_ORIENTATIONS, STRETCHED_LIMBS):
    
    # Input video
    video = cv2.VideoCapture(PATH)
    video_stem = pathlib.Path(PATH).stem
    rotate_direction = cv2.ROTATE_90_CLOCKWISE if BO == 'L' else cv2.ROTATE_90_COUNTERCLOCKWISE

    # Video writer settings
    fps = video.get(cv2.CAP_PROP_FPS)
    width = int(video.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(video.get(cv2.CAP_PROP_FRAME_HEIGHT))
    writer = cv2.VideoWriter(f'{OUTPUT_DIR}/{video_stem}_mp.mp4', cv2.VideoWriter_fourcc('m', 'p', '4', 'v'), int(fps), (width, height))

    # Run MediaPipe Pose and draw pose landmarks ###############################
    outputs = [video_stem, BO, SL, video.get(cv2.CAP_PROP_FRAME_COUNT) / fps]
    mp_results = []
    with mp_pose.Pose() as pose:
        while True:
            # Read a video frame
            ret, frm = video.read()
            if not ret: break

            # Rotate the frame so that the head comes upward
            frm = cv2.rotate(frm, rotate_direction)

            # Convert the BGR image to RGB and process it with MediaPipe Pose
            mp_result = pose.process(cv2.cvtColor(frm, cv2.COLOR_BGR2RGB))
            mp_results.append(mp_result.pose_landmarks)
            
            # Draw pose landmarks
            mp_drawing.draw_landmarks(frm, mp_result.pose_landmarks, mp_pose.POSE_CONNECTIONS, landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style())

            # Rotate in the opposite direction
            frm = cv2.rotate(frm, 2-rotate_direction)
            writer.write(frm)
    video.release()
    writer.release()

    # Noise reduction ##########################################################
    # Prep to read keypoint
    df_kpts = pd.DataFrame(index=range(len(mp_results)), columns=['shoulder-x', 'shoulder-y', 'elbow-x', 'elbow-y', 'wrist-x', 'wrist-y', 'hip-x', 'hip-y', 'knee-x', 'knee-y', 'ankle-x', 'ankle-y'], dtype=float)
    required_kpts = [12, 14, 16, 23, 25, 27] if SL == 'R' in video_stem else [11, 13, 15, 24, 26, 28]

    # Tidying keypoints
    for frm_idx, raw_kpt in enumerate(mp_results):
        for kpt_idx, kpt_id in enumerate(required_kpts):
            if raw_kpt.landmark[kpt_id].visibility >= noise_threshold:
                if BO == 'R':
                    df_kpts.iat[frm_idx, kpt_idx*2+0] = width - raw_kpt.landmark[kpt_id].y * width
                    df_kpts.iat[frm_idx, kpt_idx*2+1] = raw_kpt.landmark[kpt_id].x * height
                else:
                    df_kpts.iat[frm_idx, kpt_idx*2+0] = raw_kpt.landmark[kpt_id].y * width
                    df_kpts.iat[frm_idx, kpt_idx*2+1] = height - raw_kpt.landmark[kpt_id].x * height
    
    # Remove outliers
    for _ in range(5):
        outlier_idx = (df_kpts - df_kpts.ewm(span=fps).mean()).abs() > df_kpts.ewm(span=fps).std()
        df_kpts[outlier_idx] = np.nan
    
    # Linear interpolation
    df_kpts.interpolate(limit_direction="both", inplace=True, axis=0)

    # Calculate SPB ############################################################
    # Calculate body length
    body_lengths = np.full(len(df_kpts), np.nan, dtype=float)
    for idx in range(len(df_kpts)):
        p1 = np.array((df_kpts['shoulder-x'][idx], df_kpts['shoulder-y'][idx]))
        p2 = np.array((df_kpts['hip-x'][idx], df_kpts['hip-y'][idx]))
        body_lengths[idx] = np.linalg.norm(p2 - p1)
    body_length = body_lengths.mean()

    # Calculate distances
    dists = np.full((4, len(df_kpts)-1), np.nan, dtype=float)
    for kpt_type_idx, kpt_type in enumerate(['elbow', 'wrist', 'knee', 'ankle']):
        xs = df_kpts[kpt_type+'-x']
        ys = df_kpts[kpt_type+'-y']
        for idx in range(len(xs)-1):
            p1 = np.array((xs[idx], ys[idx]))
            p2 = np.array((xs[idx+1], ys[idx+1]))
            dists[kpt_type_idx, idx] = np.linalg.norm(p2 - p1) / body_length

    # SPB 1
    means = np.mean(dists, axis=1)
    spb1_u = (means[0] + means[1]) / 2.
    spb1_l = (means[2] + means[3]) / 2.
    spb1_w = (spb1_u + spb1_l) / 2.
    
    # SPB 3
    spb3_w = np.sum(dists, axis=0).max()
    spb3_u = np.sum(dists[:2], axis=0).max()
    spb3_l = np.sum(dists[2:], axis=0).max()

    outputs += [spb1_w, spb1_u, spb1_l, spb3_w, spb3_u, spb3_l]

    # Calculate AG #############################################################
    angles1 = np.full((2, len(df_kpts)), np.nan, dtype=float)
    angles2 = np.full((2, len(df_kpts)), np.nan, dtype=float)
    for frm_idx in range(len(df_kpts)):
        # AG1
        armvec = np.array([(df_kpts['wrist-x'][frm_idx]+df_kpts['elbow-x'][frm_idx]) / 2., 
                           (df_kpts['wrist-y'][frm_idx]+df_kpts['elbow-y'][frm_idx]) / 2.]) - \
                           np.array([df_kpts['shoulder-x'][frm_idx], df_kpts['shoulder-y'][frm_idx]])
        legvec = np.array([(df_kpts['ankle-x'][frm_idx]+df_kpts['knee-x'][frm_idx]) / 2., 
                           (df_kpts['ankle-y'][frm_idx]+df_kpts['knee-y'][frm_idx]) / 2.]) - \
                           np.array([df_kpts['hip-x'][frm_idx], df_kpts['hip-y'][frm_idx]])
        if BO == 'L':
            angles1[0, frm_idx] = tangent_angle(armvec, left_vec)
            angles1[1, frm_idx] = tangent_angle(legvec, right_vec)
        else:
            angles1[0, frm_idx] = tangent_angle(armvec, right_vec)
            angles1[1, frm_idx] = tangent_angle(legvec, left_vec)

        # AG2
        sho_hip_vec = np.array([df_kpts['shoulder-x'][frm_idx], df_kpts['shoulder-y'][frm_idx]])-np.array([df_kpts['hip-x'][frm_idx],   df_kpts['hip-y'][frm_idx]])
        hip_sho_vec = np.array([df_kpts['hip-x'][frm_idx],      df_kpts['hip-y'][frm_idx]])-np.array([df_kpts['shoulder-x'][frm_idx],   df_kpts['shoulder-y'][frm_idx]])
        elb_sho_vec = np.array([df_kpts['elbow-x'][frm_idx],    df_kpts['elbow-y'][frm_idx]])-np.array([df_kpts['shoulder-x'][frm_idx], df_kpts['shoulder-y'][frm_idx]])
        wst_elb_vec = np.array([df_kpts['wrist-x'][frm_idx],    df_kpts['wrist-y'][frm_idx]])-np.array([df_kpts['elbow-x'][frm_idx],    df_kpts['elbow-y'][frm_idx]])
        kne_hip_vec = np.array([df_kpts['knee-x'][frm_idx],     df_kpts['knee-y'][frm_idx]])-np.array([df_kpts['hip-x'][frm_idx],       df_kpts['hip-y'][frm_idx]])
        ank_kne_vec = np.array([df_kpts['ankle-x'][frm_idx],    df_kpts['ankle-y'][frm_idx]])-np.array([df_kpts['knee-x'][frm_idx],     df_kpts['knee-y'][frm_idx]])

        # Clockwise:+, counterclockwise:-
        if BO == 'L':
            # Clockwise
            if np.cross(sho_hip_vec, elb_sho_vec) >= 0:
                angles2[0, frm_idx] = 0.
            else:
                angles2[0, frm_idx] = tangent_angle(elb_sho_vec, sho_hip_vec)
            # Counterclockwise
            if np.cross(hip_sho_vec, kne_hip_vec) <= 0:
                angles2[1, frm_idx] = 0.
            else:         
                angles2[1, frm_idx] = tangent_angle(kne_hip_vec, hip_sho_vec)
        else:
            # Counterclockwise
            if np.cross(sho_hip_vec, elb_sho_vec) <= 0:
                angles2[0, frm_idx] = 0.
            else:
                angles2[0, frm_idx] = tangent_angle(elb_sho_vec, sho_hip_vec)
            # Clockwise
            if np.cross(hip_sho_vec, kne_hip_vec) >= 0:
                angles2[1, frm_idx] = 0.
            else:         
                angles2[1, frm_idx] = tangent_angle(kne_hip_vec, hip_sho_vec)
        angles2[0, frm_idx] += tangent_angle(wst_elb_vec, elb_sho_vec)
        angles2[1, frm_idx] += tangent_angle(ank_kne_vec, kne_hip_vec)

    # AG1
    ag1_u = angles1[0].mean()
    ag1_l = angles1[1].mean()
    ag1_w = np.sum(angles1, axis=0).mean()

    # AG2
    ag2_u = angles2[0].mean()
    ag2_l = angles2[1].mean()
    ag2_w = np.sum(angles2, axis=0).mean()
    
    # Write
    csv_writer.writerow(outputs + [ag1_w, ag1_u, ag1_l, ag2_w, ag2_u, ag2_l])
csv_file.close()