In [1]:
import cv2
import mediapipe as mp
import numpy as np
import glob
from scipy.spatial import ConvexHull
import math
import csv

import moviepy.editor as m_editor

import pandas as pd
import os 
# init mediapipe parameters 

# Function to extract subclip from original video based on start_time and end_time in seconds
def extract_subclip(input, output, start_time, end_time):
  original_video = m_editor.VideoFileClip(input)
  cut_video = original_video.subclip(start_time, end_time)
  cut_video.write_videofile(output, codec="libx264")

# Function to calculate the Euclidean distance between two points
def euclidean_distance(p1, p2):
    return np.sqrt((p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2)

# Function to calculate the openness of a pose
def pose_openness(holistic_landmarks, image, mp_holistic):
    image_h, image_w, _ = image.shape
    main_keypoints = [
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_SHOULDER],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_SHOULDER],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_HIP],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_HIP],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_WRIST],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_WRIST],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_ELBOW],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_ELBOW],
    ]
    core_keypoints = [
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_SHOULDER],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_SHOULDER],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_HIP],
        holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_HIP],
    ]
    
    # coords = np.array([(kp.x, kp.y) for kp in keypoints])
    coords = np.array([(int(kp.x * image_w), int(kp.y * image_h)) for kp in main_keypoints])
    hull = ConvexHull(coords)

    core_coords = np.array([(int(kp.x * image_w), int(kp.y * image_h)) for kp in core_keypoints])
    core_hull = ConvexHull(core_coords)
    
    return hull.volume / core_hull.volume

# Function to calculate leaning direction
def leaning_direction(holistic_landmarks, mp_holistic):
    nose = holistic_landmarks.landmark[mp_holistic.PoseLandmark.NOSE]
    left_hip = holistic_landmarks.landmark[mp_holistic.PoseLandmark.LEFT_HIP]
    right_hip = holistic_landmarks.landmark[mp_holistic.PoseLandmark.RIGHT_HIP]
    
    avg_hip_z = (left_hip.z + right_hip.z) / 2

    if nose.z < avg_hip_z:
        return "Backward"
    else:
        return "Forward"

# Function to calculate head direction
def head_direction(prev, curr, image, mp_holistic):
    image_h, image_w, _ = image.shape
    curr_nose = curr.landmark[mp_holistic.PoseLandmark.NOSE] 
    prev_nose = prev.landmark[mp_holistic.PoseLandmark.NOSE]

    curr_nose_cood = np.array([int(curr_nose.x * image_w), int(curr_nose.y * image_h)])
    prev_nose_cood = np.array([int(prev_nose.x * image_w), int(prev_nose.y * image_h)])
    nose_diff = curr_nose_cood - prev_nose_cood
    horizontal = 'STILL'
    vertical = 'STILL'
    if nose_diff[0] > 0:
        horizontal = "RIGHT"
    elif nose_diff[0] < 0:
        horizontal = 'LEFT'
    
    if nose_diff[1] > 0:
        vertical = 'UP'
    elif nose_diff[1] < 0:
        vertical = 'DOWN'
    return horizontal, vertical

# Function to calculate angle between three landmarks
def calculate_angle(l1, l2, l3):

    # Calculate the angle between the three points
    angle = math.degrees(math.atan2(l3.y - l2.y, l3.x - l2.x) - math.atan2(l1.y - l2.y, l1.x - l2.x))
    
    # Check if the angle is less than zero.
    if angle < 0:
        # Add 360 to the found angle.
        angle += 360
  
    return angle

# Function to calculate angle between three landmarks using numpy module
def numpy_angle(l1,l2,l3, width, height):
    a = np.array([l1.x * width, l1.y * height])
    b = np.array([l2.x * width, l2.y * height])
    c = np.array([l3.x * width, l3.y * height])

    ba = a - b
    bc = c - b

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(cosine_angle)

    return np.degrees(angle)

# Function to calculate hand orientation
def orientation(l0, l9): 
    x0 = l0.x
    y0 = l0.y
    
    x9 = l9.x
    y9 = l9.y
    
    if abs(x9 - x0) < 0.05:      # since tan(0) --> ∞
        m = 1000000000
    else:
        m = abs((y9 - y0)/(x9 - x0))       
        
    if m>=0 and m<=1:
        if x9 > x0:
            return "Right"
        else:
            return "Left"
    if m>1:
        if y9 < y0:       # since, y decreases upwards
            return "Up"
        else:
            return "Down"

def process_video(video_cut, video_out, output_csv, mp_drawing, mp_holistic, mp_face_mesh, holistic, face_mesh):
    cap = cv2.VideoCapture(video_cut)

    # Get the video dimensions and FPS(input)
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    fps = int(cap.get(cv2.CAP_PROP_FPS))

    # Initialize the VideoWriter(output)
    fourcc = cv2.VideoWriter_fourcc(*"mp4v")  # You can also use "XVID" or "MJPG" for AVI files
    out = cv2.VideoWriter(video_out, fourcc, fps, (width, height))

    PL = mp_holistic.PoseLandmark
    HL = mp_holistic.HandLandmark

    ## Holistic
    prev_landmarks = None
    total_movement = 0
    frame_movement = 0
    openness_value = 0
    head_horizontal = ""
    head_vertical = ""
    holistic_threshold = 0.001  # Adjust the threshold to fine-tune movement detection sensitivity
    holistic_keypoints = [
        mp_holistic.PoseLandmark.LEFT_WRIST,
        mp_holistic.PoseLandmark.RIGHT_WRIST,
        mp_holistic.PoseLandmark.LEFT_ANKLE,
        mp_holistic.PoseLandmark.RIGHT_ANKLE,
    ]

    ## Arms
    left_arm_movement = 0
    right_arm_movement = 0

    la_counter = 0 
    la_vert = None
    la_leaning = None

    ra_counter = 0 
    ra_vert = None
    ra_leaning = None

    ## Hand
    lh_stage = None
    lh_tip_distance = 0
    lh_orientation = None

    rh_stage = None
    rh_tip_distance = 0
    rh_orientation = None

    # open output CSV file and add header row
    with open(output_csv, mode='w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerow(['time_in_seconds', 
                        'frame', 
                        'total_movement_per_second', 
                        'pose_openness', 
                        'leaning', 
                        'head_horizontal',
                        'head_vertical',
                        'left_arm_angle', 
                        'left_arm_v_movement', 
                        'left_arm_h_movement',
                        'right_arm_angle', 
                        'right_arm_v_movement', 
                        'right_arm_h_movement', 
                        'left_hand_orientation', 
                        'left_hand_state', 
                        'right_hand_orientation', 
                        'right_hand_state'])

        # Process the video frames
        frame_number = 0

        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
        
            # Convert the frame to RGB
            frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

            # Process the frame with MediaPipe's Holistic module
            results = holistic.process(frame_rgb)

            # Draw holistic landmarks on the frame
            if results.pose_landmarks:
                current = results.pose_landmarks
                current_lh = results.left_hand_landmarks
                current_rh = results.right_hand_landmarks

                mp_drawing.draw_landmarks(frame, current, mp_holistic.POSE_CONNECTIONS)
                mp_drawing.draw_landmarks(frame, current_lh, mp_holistic.HAND_CONNECTIONS)
                mp_drawing.draw_landmarks(frame, current_rh, mp_holistic.HAND_CONNECTIONS)

                ## Holistic movements
                # Calculate the total movement
                if prev_landmarks:
                    frame_movement = 0
                    for kp in holistic_keypoints:
                        distance = euclidean_distance(results.pose_landmarks.landmark[kp], prev_landmarks.landmark[kp])
                        frame_movement += distance
                    if frame_movement > holistic_threshold:
                        total_movement += frame_movement
                    
                    head_horizontal, head_vertical = head_direction(prev_landmarks, current, frame_rgb, mp_holistic)
            
                # Calculate and display the total movement and pose openness on the frame
                openness_value = pose_openness(results.pose_landmarks, frame_rgb, mp_holistic)
                # Calculate and display the leaning direction
                leaning_dir = leaning_direction(results.pose_landmarks, mp_holistic)

                ## Hand movements
                # distance b/w INDEX_FINGER_TIP and THUMB_TIP
                if current_lh:
                    lh_tip_distance = euclidean_distance(current_lh.landmark[HL.INDEX_FINGER_TIP],current_lh.landmark[HL.THUMB_TIP])
                    if current_lh.landmark[HL.MIDDLE_FINGER_TIP].y < current_lh.landmark[HL.MIDDLE_FINGER_MCP].y and current_lh.landmark[HL.RING_FINGER_TIP].y < current_lh.landmark[HL.RING_FINGER_MCP].y \
                        and current_lh.landmark[HL.PINKY_TIP].y < current_lh.landmark[HL.PINKY_TIP].y and lh_tip_distance < 0.015:
                        lh_stage = 'CLOSED'
                    else:
                        lh_stage = 'OPEN'
                        lh_orientation = orientation(current_lh.landmark[HL.WRIST], current_lh.landmark[HL.MIDDLE_FINGER_MCP])

                if current_rh:
                    rh_tip_distance = euclidean_distance(current_rh.landmark[HL.INDEX_FINGER_TIP],current_rh.landmark[HL.THUMB_TIP])
                    if current_rh.landmark[HL.MIDDLE_FINGER_TIP].y < current_rh.landmark[HL.MIDDLE_FINGER_MCP].y and current_rh.landmark[HL.RING_FINGER_TIP].y < current_rh.landmark[HL.RING_FINGER_MCP].y \
                        and current_rh.landmark[HL.PINKY_TIP].y < current_rh.landmark[HL.PINKY_TIP].y and rh_tip_distance < 0.015:
                        rh_stage = 'CLOSED'
                    else:
                        rh_stage = 'OPEN'
                        rh_orientation = orientation(current_rh.landmark[HL.WRIST], current_rh.landmark[HL.MIDDLE_FINGER_MCP])

                ## Arm movements
                # Calculate weather arm is up or down
                la_angle = numpy_angle(current.landmark[PL.LEFT_WRIST], current.landmark[PL.LEFT_ELBOW], current.landmark[PL.LEFT_SHOULDER], width, height)
                ra_angle = numpy_angle(current.landmark[PL.RIGHT_WRIST], current.landmark[PL.RIGHT_ELBOW], current.landmark[PL.RIGHT_SHOULDER], width, height)

                if la_angle > 160:
                    la_vert = "DOWN"
                if la_angle < 30 and la_vert =='DOWN':
                    la_vert="UP"
                    la_counter +=1

                if ra_angle > 160:
                    ra_vert = "DOWN"
                if ra_angle < 30 and ra_vert =='DOWN':
                    ra_vert="UP"
                    ra_counter +=1

                # Calculate wheather arm is leaning forward
                if abs(current.landmark[PL.RIGHT_WRIST].z) > abs(current.landmark[PL.RIGHT_ELBOW].z):
                    ra_leaning = 'FORWARD'
                else:
                    ra_leaning = 'NOT FORWARD'

                if abs(current.landmark[PL.LEFT_WRIST].z) > abs(current.landmark[PL.LEFT_ELBOW].z) :
                    la_leaning = 'FORWARD'
                else:
                    la_leaning = 'NOT FORWARD'

                # write to output video
                l1 = current.landmark[PL.LEFT_SHOULDER]
                l2 = current.landmark[PL.LEFT_ELBOW]
                l3 = current.landmark[PL.LEFT_WRIST]
                r1 = current.landmark[PL.RIGHT_SHOULDER]
                r2 = current.landmark[PL.RIGHT_ELBOW]
                r3 = current.landmark[PL.RIGHT_WRIST]
                
                cv2.putText(frame, f"Left shoulder:({round(l1.x,2)}, {round(l1.y,2)}, {round(l1.z,2)}) elbow:({round(l2.x,2)}, {round(l2.y,2)}, {round(l2.z,2)}) wrist: ({round(l3.x,2)}, {round(l3.y,2)}, {round(l3.z,2)})", (10,30), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                cv2.putText(frame, f"Left arm: {la_vert}  {la_leaning}", (10,60), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
                cv2.putText(frame, f"Left arm angle: {la_angle:.2f}", (10, 90), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)

                cv2.putText(frame, f"Right shoulder:({round(r1.x,2)}, {round(r1.y,2)}, {round(r1.z,2)}) elbow:({round(r2.x,2)}, {round(r2.y,2)}, {round(r2.z,2)}) wrist: ({round(r3.x,2)}, {round(r3.y,2)}, {round(r3.z,2)})", (10,120), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
                cv2.putText(frame, f"Right arm: {ra_vert}  {ra_leaning}", (10,150), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
                cv2.putText(frame, f"Right arm angle: {ra_angle:.4f}", (10, 180), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)

                cv2.putText(frame, f"Left Hand: {lh_tip_distance} {lh_stage} {lh_orientation}", (10, 210), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)
                cv2.putText(frame, f"Right Hand: {rh_tip_distance} {rh_stage} {rh_orientation}", (10, 240), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 0), 2)

                cv2.putText(frame, f"Total Movement Per Second: {total_movement:.2f}", (10, 270), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(frame, f"Openness: {openness_value:.2f}", (10, 300), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(frame, f"Leaning: {leaning_dir}", (10, 330), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)

                cv2.putText(frame, f"Head Horizontal: {head_horizontal}", (10, 360), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
                cv2.putText(frame, f"Head Vertical: {head_vertical}", (10, 390), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 0, 0), 2)
                
                # update variables
                prev_landmarks = results.pose_landmarks
                frame_number += 1

                time_in_seconds = frame_number / fps  
                # Write to CSV file
                if frame_number % fps == 0:
                    writer.writerow([time_in_seconds, 
                                    frame_number, 
                                    total_movement, 
                                    openness_value, 
                                    leaning_dir, 
                                    head_horizontal,
                                    head_vertical,
                                    la_angle, 
                                    la_vert, 
                                    la_leaning, 
                                    ra_angle, 
                                    ra_vert, 
                                    ra_leaning, 
                                    lh_orientation, 
                                    lh_stage, 
                                    rh_orientation, 
                                    rh_stage])
                    total_movement = 0 
                    frame_movement = 0

            # Save the frame
            out.write(frame)

        out.release()

In [3]:
chr_video_path = '../data/chr'
hc_video_path = '../data/hc'
video_out_path = '../output/annotated_video'
csv_out_path = '../output/gesture'
supported_extensions = ['.mp4']

# Initialize MediaPipe's Holistic module
mp_drawing = mp.solutions.drawing_utils
mp_holistic = mp.solutions.holistic
mp_face_mesh = mp.solutions.face_mesh
holistic = mp_holistic.Holistic(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5)

excluded_files = [
    '11002post-cut.mp4',
    '11007pt1-cut-5min.mp4',
    '11007pt2-cut-5min.mp4',
    '11007pt3-cut-5min.mp4',
    '11008.1-cut-5min.mp4',
    '11008.2-cut-5min.mp4',
    '3005 12m clin-cut-5min.mp4',
    '3005 12m clin2-cut-5min.mp4',
    '3005 12m clin3-cut-5min.mp4',
    '3013 12m pt2-cut-5min.mp4',
    '3019_12m-cut-5min.mp4',
    '3022post-cut.mp4',
    '3023 12m pt1-cut-5min.mp4',
    '3023 12m pt3-cut-5min.mp4',
    '3024 p2post-cut.mp4',
    '3032 12m pt2-cut-5min.mp4',
    '3034redo1post-cut.mp4',
    '3035post-cut.mp4',
    '7107_1post-cut.mp4'
]

for filename in os.listdir(chr_video_path):
    if any(filename.endswith(ext) for ext in supported_extensions):
        print(filename)
        if filename in excluded_files:
            continue
        video_cut = os.path.join(chr_video_path, filename)
        video_out = os.path.join(video_out_path, f'{filename[:-4]}-output.mp4')
        output_csv = os.path.join(csv_out_path, f'{filename[:-4]}-gesture.csv')
        
        process_video(video_cut, video_out, output_csv, mp_drawing, mp_holistic, mp_face_mesh, holistic, face_mesh)


for filename in os.listdir(hc_video_path):
    if any(filename.endswith(ext) for ext in supported_extensions):
        print(filename)
        video_cut = os.path.join(hc_video_path, filename)
        video_out = os.path.join(video_out_path, f'{filename[:-4]}-output.mp4')
        output_csv = os.path.join(csv_out_path, f'{filename[:-4]}-gesture.csv')
        process_video(video_cut, video_out, output_csv, mp_drawing, mp_holistic, mp_face_mesh, holistic, face_mesh)


11002post-cut.mp4
11007pt1-cut-5min.mp4
11007pt2-cut-5min.mp4
11007pt3-cut-5min.mp4
11008.1-cut-5min.mp4
11008.2-cut-5min.mp4
3005 12m clin-cut-5min.mp4
3005 12m clin2-cut-5min.mp4
3005 12m clin3-cut-5min.mp4
3013 12m pt1-cut-5min.mp4
3013 12m pt2-cut-5min.mp4
3019_12m-cut-5min.mp4
3022post-cut.mp4
3023 12m pt1-cut-5min.mp4
3023 12m pt3-cut-5min.mp4
3024 p2post-cut.mp4
3032 12m pt2-cut-5min.mp4
3034redo1post-cut.mp4
3035post-cut.mp4
7107_1post-cut.mp4
4003 12 mo p 2-cut-5min.mp4
4003 12 mo p1-cut-5min.mp4
4003 12 mo p3-cut-5min.mp4
4003 12 mo p4-cut-5min.mp4
