# Imports

In [5]:
# from inference import inference
import numpy as np
import cv2
import os
import matplotlib.pyplot as plt
import math
from scipy.signal import medfilt
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import os


# Utilities

In [24]:
def load_video(video_path, output_folder, fps=3):
    """loads a video from and splits into different image frames based on frames per second"""
    capture = cv2.VideoCapture(video_path)
    
    #get original video's frame rate
    original_fps = capture.get(cv2.CAP_PROP_FPS)
    
    #calculate the frame interval based on the desired fps
    frame_interval = int(round(original_fps / fps))

    #create output dir if it doesn't exist 
    os.makedirs(output_folder, exist_ok=True)
    
    frame_count = 0
    frame_index = 0
    
    while True:
        success, frame = capture.read()
        
        if not success:
            break
        
        #Process the frame only if it's within the desired frame interval 
        if frame_index % frame_interval == 0:
            output_file = os.path.join(output_folder, f'frame_{frame_count}.jpg')
            cv2.imwrite(output_file, frame)
            frame_count += 1
        frame_index += 1
    capture.release()

In [23]:
class BodyPart: 
    def __init__(self, body_part):
        """
        Constructor for 1 body part consisting of x, y coordinates, confidence, and boolean exists information 
        Parameters: 
        body_part: int 
                   1 x 3 ndarray consisteing of x, y, confidence level
        """
        self.x = body_part[0]
        self.y = body_part[1]
        self.c = body_part[2]
        self.exists = self.c != 0 #check if body part exists or not within one frame
     
    def __truediv__(self, scalar):
        return BodyPart([self.x/scalar, self.y/scalar, self.c])
    
    def __floordiv__(self, scalar):
        __truediv__(self, scalar)
            
    @staticmethod
    def dist(bodypart1, bodypart2):
        "Calculates the Euclidean distance between BodyPart instances"
        return np.sqrt(np.square(bodypart1.x - bodypart2.x) + np.square(bodypart1.y - bodypart2.y))

In [22]:
class Pose:
    #based on COCO dataset
    BODY_PART_NAMES = ['nose', 'left_eye', 'right_eye', 'left_ear', 
                       'right_ear', 'left_shoulder', 'right_shoulder', 'left_elbow', 'right_elbow',
                       'left_wrist', 'right_wrist', 'left_hip', 'right_hip', 'left_knee', 'right_knee',
                       'left_ankle', 'right_ankle'] 
    def __init__(self, body_parts):
        """
        Constructs a pose for one frame, given an array of parts 
        Parameters: 
            body_parts: 17 x 3 ndarray of x, y, confidence values
        """
        for name, vals in zip(self.BODY_PART_NAMES, body_parts):
            setattr(self, name, BodyPart(vals))

    def __iter__(self):
        for attr, value in self.__dict__.items():
            yield attr, value
    
    def __str__(self):
        out = ''
        for name in self.BODY_PART_NAMES:
            _ = "{}: {},{}".format(name, getattr(self, name).x, getattr(self, name).x)
            out = out + _ + '\n'
            return out
    
    def print(self, parts):
        """
        Print x and y coordinates of body parts on the current instance of the class
        Parameters: 
            parts: list 
                   list containing sequence of body parts E.g. ['nose', 'left ear']
        Returns: 
            x and y coordinates of each bodypart as set in construction time
        """
        out = ''
        # out = dict()
        for name in parts:
            if not name in self.BODY_PART_NAMES:
                raise NameError(name)
            _ = "{}: {},{}".format(name, getattr(self, name).x, getattr(self, name).y)
            out = out + _ + ','
            # out[name] = (getattr(self, name).x, getattr(self, name).y)
        return out

In [21]:
class PoseSequence:
    """Contains a sequence of chained Pose Objects + normalization 
    based on average torso length"""
    def __init__(self, sequence):
        """
        Parameters: 
        sequence: list of nx17x3 arrays, where n equal number of frames for a pose 
        """
        self.poses = []
        for parts in sequence:
            self.poses.append(Pose(parts))
        
        # print(f'self.poses after uploading frames: {self.poses}')
        #normalize poses based on the average pixel length
        torso_lengths = np.array([BodyPart.dist(pose.nose, pose.left_hip) for pose in self.poses if pose.nose.exists and pose.left_hip.exists] +
                                 [BodyPart.dist(pose.nose, pose.right_hip) for pose in self.poses if pose.nose.exists and pose.right_hip.exists])     
        mean_torso = np.mean(torso_lengths)   
        
        for pose in self.poses: 
            for attr, part in pose: 
                setattr(pose, attr, part/mean_torso)  
                # print('progression of left and right hips:', {pose.print(['left_hip', 'right_hip'])})

In [None]:
from configs.ViTPose_base_coco_256x192 import model as model_cfg
from configs.ViTPose_base_coco_256x192 import data_cfg
import os
import torch
import re
import glob

def parse_sequence(input_folder, output_folder_keypoints, output_folder_visual):
    """
    Parse a sequence of image frames and saves each corresponding 
    pose from each image frame as a numpy file within the output directory
    
    Parameters: 
        input_folder: str 
                      path to the folder containing spliced image frames for one video
        output_folder: str 
                      path to saved numpy array files of keypoints 
    """
    
    file_names = sorted(os.listdir(input_folder), key=lambda x: int(x.split('_')[1].split('.')[0]))
    print(f"file_names: {file_names}")
    num_frames = len(file_names)
    print(f"num_frames: {num_frames}")
    all_keypoints = np.zeros((num_frames, 17, 3))
    
    #set variables for model inference
    CKPT_PATH = "/home/cgusti/ViTPose_pytorch/scripts/checkpoints/vitpose-b-multi-coco.pth"
    img_size = data_cfg['image_size']
    
    for i, file_name in enumerate(file_names):
        img_path = input_folder + '/' + file_name #e.g data/input_frames/WarriorTwo/WarriorTwo_correct_5/frame_1.jpg
        print(f"image path for inference index {i}: {img_path}")
        #model inference
        keypoints, img_skeleton= inference(img_path=img_path, output_path=output_folder_keypoints, img_size=img_size, 
                              model_cfg=model_cfg, ckpt_path=CKPT_PATH, device=torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu'), 
                              save_result=True) #this function will automatically save result in output path
        print(f'keypoints that comes out of the model from inference:  {keypoints}')
        all_keypoints[i] = keypoints 
        cv2.imwrite(output_folder_visual + f'_frame{i}.jpg', img_skeleton)
        print(f'current all keypoints: {all_keypoints}')
    np.save(output_folder_keypoints + '_sequence.npy', all_keypoints)

ModuleNotFoundError: No module named 'torch'

In [None]:
import argparse
import os.path as osp

import torch
from torch import Tensor

from pathlib import Path
import cv2
import numpy as np
from time import time
from PIL import Image
from torchvision.transforms import transforms

import sys
import os
from models.model import ViTPose
from utils.visualization import draw_points_and_skeleton, joints_dict
from utils.dist_util import get_dist_info, init_dist
from utils.top_down_eval import keypoints_from_heatmaps

__all__ = ['inference']       
            
@torch.no_grad() #disabling gradient calculation (typically used in situations where the model parameters are fixed and there is no need to compute gradients, such as during inference)
def inference(img_path: Path, output_path: Path, img_size: tuple[int, int],
              model_cfg: dict, ckpt_path: Path, device: torch.device, save_result: bool=True) -> np.ndarray:
    '''
    Outputs: 
    (1) points: is a 3 dimensional array with shape (1, num_keypoints, 2), where num_keypoints = # number of keypoints the model is trained to detect
    Each entry represents the (x,y) coordinates of the first detected keypoint (E.g [343.3852])
    (2) prob: is a 1 x num_keypoints matrix where each entry represents the probability of a particular keypoint being present at a given pixel location in the input image 
    '''
    
    # Prepare model
    vit_pose = ViTPose(model_cfg)
    ckpt = torch.load(ckpt_path)
    if 'state_dict' in ckpt:
        vit_pose.load_state_dict(ckpt['state_dict'])
    else:
        vit_pose.load_state_dict(ckpt)
    vit_pose.to(device)
    print(f">>> Model loaded: {ckpt_path}")
    
    # Prepare input data
    img = Image.open(img_path)
    org_w, org_h = img.size
    # print(f">>> Original image size: {org_h} X {org_w} (height X width)")
    # print(f">>> Resized image size: {img_size[1]} X {img_size[0]} (height X width)")
    # print(f">>> Scale change: {org_h/img_size[1]}, {org_w/img_size[0]}")
    img_tensor = transforms.Compose (
        [transforms.Resize((img_size[1], img_size[0])),
         transforms.ToTensor()]
    )(img).unsqueeze(0).to(device)
    
    # Feed to model
    tic = time()
    heatmaps = vit_pose(img_tensor).detach().cpu().numpy() # N, 17, h/4, w/4
    elapsed_time = time()-tic
    # print(f">>> Output size: {heatmaps.shape} ---> {elapsed_time:.4f} sec. elapsed [{elapsed_time**-1: .1f} fps]\n")    
    
    
    # points = heatmap2coords(heatmaps=heatmaps, original_resolution=(org_h, org_w))
    points, prob = keypoints_from_heatmaps(heatmaps=heatmaps, center=np.array([[org_w//2, org_h//2]]), scale=np.array([[org_w, org_h]]),
                                           unbiased=True, use_udp=True)
    points = np.concatenate([points[:, :, ::-1], prob], axis=2) #concatenate points with probabilities (1, num_keypoints, heatmap_size)
    # print(f'printing final points: {points}')
    
    #Visualizaiton
    if save_result:
        for pid, point in enumerate(points):
            img = np.array(img)[:, :, ::-1] # RGB to BGR for cv2 modules
            img = draw_points_and_skeleton(img.copy(), point, joints_dict()['coco']['skeleton'], person_index=pid,
                                           points_color_palette='gist_rainbow', skeleton_color_palette='jet',
                                           points_palette_samples=10, confidence_threshold=0.4)
    return points, img
    

# Warrior II Analysis
1. Hips need to be square (cannot rotate)
2. Torso needs to be facing upright and not bent to the side 
3. Arms need to be at a 180 degree angle 
4. front heel must be lined up with back arch (not sure if we can check for this though)


In [None]:
#Data - Warrior II videos of warrior II pose sequences 
file_list_correct = [file for file in os.listdir('../scripts/data/input_videos/WarriorTwo/correct') if not file.startswith('.')]
print(file_list_correct)
file_list_wrong = [file for file in os.listdir('../scripts/data/input_videos/WarriorTwo/wrong') if not file.startswith('.')]
print(file_list_wrong)

['WarriorTwo_correct_4.mp4', 'WarriorTwo_correct_5.mp4', 'WarriorTwo_correct_7.mp4', 'WarriorTwo_correct_6.mp4', 'WarriorTwo_correct_2.mp4', 'WarriorTwo_correct_3.mp4', 'WarriorTwo_correct_1.mp4']
['WarriorTwo_wrong_4.mov', 'WarriorTwo_wrong_5.mov', 'WarriorTwo_wrong_7.mov', 'WarriorTwo_wrong_6.mov', 'WarriorTwo_wrong_2.mov', 'WarriorTwo_wrong_3.mov', 'WarriorTwo_wrong_1.mov']


#### split each data into subsequent image frames for processing

In [None]:
#process correct video sequences for Warrior II
for filename in file_list_correct: 
    output_dir = '/home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/' + os.path.splitext(filename)[0]
    input_video = '/home/cgusti/ViTPose_pytorch/scripts/data/input_videos/WarriorTwo/correct/' + filename
    load_video(input_video, output_dir)

In [None]:
#TODO: process wrong video sequences for Warrior II 
for filename in file_list_wrong:
    output_dir = '/home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/wrong/' + os.path.splitext(filename)[0]
    input_video = '/home/cgusti/ViTPose_pytorch/scripts/data/input_videos/WarriorTwo/wrong/' + filename
    load_video(input_video, output_dir)

#### List all instances to be converted to Pose Sequence objects

In [None]:
directory = '/home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/correct'

filepath_list = [os.path.join(directory, file_name) for file_name in os.listdir(directory)]
sorted_filepath_list = sorted(filepath_list, key=lambda x: int(x.rsplit('_', 1)[-1]))
print(sorted_filepath_list_correct)

filename_list = os.listdir(directory)
sorted_filename_list = sorted(filename_list, key=lambda x: int(x.split('_')[2]))
print(sorted_filename_list_correct)

# Create pose sequence objects for each collection of image frame 
# (one collection representing one datapoint)
for filename, input_filepath in zip(sorted_filename_list, sorted_filepath_list):
    print(f'filename: {filename}, input_path: {input_filepath}')
    output_dir_keypoints = '/home/cgusti/ViTPose_pytorch/scripts/data/processed_frames/numerical/WarriorTwo/correct/' + filename 
    output_dir_visual = '/home/cgusti/ViTPose_pytorch/scripts/data/processed_frames/visualization/WarriorTwo/correct/' + filename
    parse_sequence(input_filepath, output_dir_keypoints, output_dir_visual)

In [None]:

directory = '/home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/wrong'
sorted_filepath_list_wrong = sorted([os.path.join(directory, file_name) for file_name in os.listdir(directory)], key=lambda x: int(x.rsplit('_', 1)[-1]))
sorted_filename_list_wrong = sorted(os.listdir(directory), key=lambda x: int(x.split('_')[2]))

for filename, input_filepath in zip(sorted_filename_list_wrong, sorted_filepath_list_wrong):
    print(f'filename: {filename}, input_path: {input_filepath}')
    output_dir_keypoints = '/home/cgusti/ViTPose_pytorch/scripts/data/processed_frames/numerical/WarriorTwo/wrong/' + filename 
    output_dir_visual = '/home/cgusti/ViTPose_pytorch/scripts/data/processed_frames/visualization/WarriorTwo/wrong/' + filename
    parse_sequence(input_filepath, output_dir_keypoints, output_dir_visual)

filename: WarriorTwo_wrong_1, input_path: /home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/wrong/WarriorTwo_wrong_1
file_names: ['frame_0.jpg', 'frame_1.jpg', 'frame_2.jpg', 'frame_3.jpg', 'frame_4.jpg', 'frame_5.jpg', 'frame_6.jpg', 'frame_7.jpg', 'frame_8.jpg', 'frame_9.jpg', 'frame_10.jpg', 'frame_11.jpg', 'frame_12.jpg', 'frame_13.jpg', 'frame_14.jpg']
num_frames: 15
image path for inference index 0: /home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/wrong/WarriorTwo_wrong_1/frame_0.jpg




>>> Model loaded: /home/cgusti/ViTPose_pytorch/scripts/checkpoints/vitpose-b-multi-coco.pth
keypoints that comes out of the model from inference:  [[[4.5687683e+02 9.7660474e+02 8.8531238e-01]
  [4.4600372e+02 9.9673828e+02 8.9460456e-01]
  [4.4246716e+02 9.5695496e+02 8.9960629e-01]
  [4.4893164e+02 1.0254385e+03 9.0724790e-01]
  [4.4228931e+02 9.2829468e+02 9.0774691e-01]
  [5.1588062e+02 1.0656023e+03 9.6982402e-01]
  [5.1441895e+02 9.0634546e+02 9.8127973e-01]
  [5.7532129e+02 1.1616973e+03 9.1787016e-01]
  [5.5993445e+02 7.8254370e+02 9.2773074e-01]
  [6.2081201e+02 1.2727913e+03 7.8276956e-01]
  [5.9133350e+02 6.5511487e+02 9.0397263e-01]
  [6.7119202e+02 1.0505625e+03 8.6586118e-01]
  [6.7167090e+02 9.3932056e+02 9.9385303e-01]
  [7.9106055e+02 1.0290166e+03 9.5996296e-01]
  [7.8696973e+02 9.3016858e+02 8.9576375e-01]
  [8.9382910e+02 1.0144607e+03 8.8965249e-01]
  [9.0004968e+02 9.3353931e+02 9.2408836e-01]]]
current all keypoints: [[[4.56876831e+02 9.76604736e+02 8.85312378e-0

# Debugging

In [None]:
from configs.ViTPose_base_coco_256x192 import model as model_cfg
from configs.ViTPose_base_coco_256x192 import data_cfg
img_path = '/home/cgusti/ViTPose_pytorch/scripts/data/input_frames/WarriorTwo/WarriorTwo_correct_4/frame_31.jpg'
CKPT_PATH = "/home/cgusti/ViTPose_pytorch/scripts/checkpoints/vitpose-b-multi-coco.pth"
img_size = data_cfg['image_size']
keypoints, img_skeleton = inference(img_path=img_path, output_path='just_test',img_size=img_size, model_cfg=model_cfg, ckpt_path=CKPT_PATH, 
                              device=torch.device("cuda") if torch.cuda.is_available() else torch.device('cpu'),
                              save_result=False)
coordinates = keypoints[:, :, :2]
print(coordinates)

# Extract x and y values (e.g left hip - index)
x = coordinates[:, :, 0]
y = coordinates[:, :, 1]

# labels for each point
keypoints = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", 
             "right_shoulder", "left_elbow","right_elbow", "left_wrist", "right_wrist","left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"]

#plot 
plt.scatter(*zip(*coordinates[0]))

# Add labels to each point
for coord, label in zip(coordinates[0], keypoints):
    plt.text(coord[0], coord[1], label, ha='center', va='bottom')
    
# Set the plot title and labels
plt.title('Scatter Plot with Point Labels')
plt.xlabel('X')
plt.ylabel('Y')

# Display the plot
plt.show()

# Plot the x and y coordinates
# plt.plot(x, y, 'o')
# plt.xlabel('X')
# plt.ylabel('Y')
# plt.title('Pairs of X and Y Coordinates')
# plt.show()

#It looks to me that the raw coordinates input from the machine are correct

In [None]:
#investing further torso angle to the horizontal (should be close to 90 degrees)
import math
import numpy as np

left_shoulder = coordinates[0, 5]
print(left_shoulder)
left_hip = coordinates[0, 11]
print(left_hip)
torso_vector = left_shoulder - left_hip
x = torso_vector[0]
y = torso_vector[1]
torso_angle = math.degrees(np.arctan2(y, x))
print(torso_angle)


#let's plot the vector in the 2D plane 
fig, ax = plt.subplots()
ax.quiver(0, 0, x, y, angles='xy', scale_units='xy', scale=1, color='r')
# Set the x and y axis limits
ax.set_xlim([-5, 5])
ax.set_ylim([-5, 5])
# Set the title of the plot
ax.set_title('Vector Visualization')
plt.show()

#Since we are pointing to the negative axis, the angle should be close to 180 degrees. The below angle looks correct!

In [None]:
all_keypoints = np.load('/home/cgusti/ViTPose_pytorch/scripts/data/processed_frames/numerical/WarriorTwo/correct/WarriorTwo_correct_5_sequence.npy')
for i in range(all_keypoints.shape[0]):
    plot_keypoints(all_keypoints[i])

# Analyzing Warrior II using DTW and KNN

In [12]:
def load_ps(filename):
    """Load a PoseSequence object from a given numpy file. 
    
    Args: 
        filename: file name of the numpy file containing keypoints
        
    Returns: 
        PoseSequence object with normalized joint keypoints
    """
    all_keypoints = np.load(filename)
    return PoseSequence(all_keypoints)

In [13]:
def plot_keypoints(coordinates):
    x = coordinates[:, 0]
    y = coordinates[:, 1]
    
    # labels for each point
    keypoints = ["nose", "left_eye", "right_eye", "left_ear", "right_ear", "left_shoulder", 
             "right_shoulder", "left_elbow","right_elbow", "left_wrist", "right_wrist","left_hip", "right_hip", "left_knee", "right_knee", "left_ankle", "right_ankle"]
    
    # plot 
    plt.scatter(x, y)

    # Add labels to each point
    for coord, label in zip(coordinates, keypoints):
        plt.text(coord[0], coord[1], label, ha='center', va='bottom')
        
    # Set the plot title and labels
    plt.title('Scatter Plot with Point Labels')
    plt.xlabel('X')
    plt.ylabel('Y')

    # Display the plot
    plt.show()

In [14]:
def calculate_angles(vector_series_1, vector_series_2):
    """Calculate the angle using dot product for each pair of rows in two arrays. """
    dot_products = np.sum(vector_series_1 * vector_series_2, axis=1)
    magnitudes_1 = np.linalg.norm(vector_series_1, axis=1)
    magnitudes_2 = np.linalg.norm(vector_series_2, axis=1)    
    #calculate angles in radians
    angles_rad = np.arccos(dot_products/(magnitudes_1 * magnitudes_2))
    #convert angles to degress
    angles_deg = np.degrees(angles_rad)
    return angles_deg

In [15]:
def calculate_angle_with_horizontal(vector_series):
    x_coords = vector_series[:,0]
    y_coords = vector_series[:,1]
    angles = []
    for i in range(len(x_coords)):
        x = x_coords[i]
        y = y_coords[i]
        angle = math.degrees(np.arctan2(y, x))
        angles.append(angle)
    return angles

In [33]:
def load_features_warriorTwo(file_paths, output_dir):
    """
    load necessary features for a Warrior Two yoga pose
    Params: 
        inout file (list): list of input file paths
    Returns: 
        list of numpy array sequences for engineered features 
    """ 
    bent_leg_angles = []
    torso_bend_angles = []
    hip_bend_angles = []
    
    for filepath in file_paths: 
        ps = load_ps(filepath) #loading pose sequence object 
        poses_sequence = ps.poses #load all poses in the sequence 
        
        pose_sequence_body_parts = [(pose.left_shoulder, pose.right_shoulder, pose.left_elbow, pose.right_elbow, pose.left_wrist, pose.right_wrist, 
                   pose.left_hip, pose.right_hip, pose.left_knee, pose.right_knee, pose.left_ankle, pose.right_ankle) for pose in poses_sequence]
        
        #filter out data points/frames where a part does not exist. We only want full, usable frames 
        filtered_sequence = [body_pose for body_pose in pose_sequence_body_parts if all(part.exists for part in body_pose)]
        
        #calculate left bend angle 
        left_upper_leg_vectors = np.array([(body_parts[6].x - body_parts[8].x, body_parts[6].y - body_parts[8].y) for body_parts in filtered_sequence])
        left_lower_leg_vectors = np.array([(body_parts[8].x - body_parts[10].x, body_parts[8].y - body_parts[10].y) for body_parts in filtered_sequence])
        left_leg_bend = calculate_angles(left_upper_leg_vectors, left_lower_leg_vectors)
        left_leg_bend_filtered = medfilt(medfilt(left_leg_bend, 5),5)
        # print(f"left leg bend: {left_leg_bend}")
        
        #calculate right bend angle 
        right_upper_leg_vectors = np.array([(body_parts[7].x - body_parts[9].x, body_parts[7].y - body_parts[9].y) for body_parts in filtered_sequence])
        right_lower_leg_vectors = np.array([(body_parts[9].x - body_parts[11].x, body_parts[9].y - body_parts[11].y) for body_parts in filtered_sequence])
        right_leg_bend = calculate_angles(right_upper_leg_vectors, right_lower_leg_vectors)
        #use a median filter to eliminate outliers 
        right_leg_bend_filtered = medfilt(medfilt(right_leg_bend, 5),5)
        
        #deduce which side person is facing
        side = 'right' if np.mean(right_leg_bend_filtered) > np.mean(left_leg_bend_filtered) else 'left'
        
        if side == 'right': 
            print('right is bent leg')
            bent_leg_angle = right_leg_bend_filtered
            bent_leg_angles.append(bent_leg_angle)
        else: 
            print('left is bent leg')
            bent_leg_angle = left_leg_bend_filtered
            bent_leg_angles.append(bent_leg_angle)
        
        #calculate arc tangent for hips 
        hip_vectors = np.array([(body_parts[6].x - body_parts[7].x, body_parts[6].y - body_parts[7].y) for body_parts in filtered_sequence])
        hip_bend_angle = calculate_angle_with_horizontal(hip_vectors)
        hip_bend_angles.append(hip_bend_angle)
        
        #calculate left/right torso angle with the horizontal 
        torso_vectors = np.array([(body_parts[0].x - body_parts[11].x, body_parts[0].y - body_parts[11].y) for body_parts in filtered_sequence])
        torso_bend_angle = calculate_angle_with_horizontal(torso_vectors)
        torso_bend_angles.append(torso_bend_angle)

        #save recorded angles in numpy filre 
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)

        file_name = os.path.splitext(filepath)[0].split('/')[-1]
        np.savez(os.path.join(output_dir, file_name), bent_leg=bent_leg_angle, torso_bend=torso_bend_angle, hip_bend=hip_bend_angle)
    
    return bent_leg_angles, torso_bend_angles, hip_bend_angles

#Note: decided not to include straight_leg_angles because of inconsistencies

In [31]:
correct_dir = 'data/processed_frames/numerical/WarriorTwo/correct'
wrong_dir = 'data/processed_frames/numerical/WarriorTwo/wrong'
pose_sequences_file_names_correct = [os.path.join(correct_dir, file_name) for file_name in os.listdir(correct_dir)]
pose_sequences_file_names_wrong = [os.path.join(wrong_dir, file_name) for file_name in os.listdir(wrong_dir)]
all_files = pose_sequences_file_names_correct + pose_sequences_file_names_wrong
# Split the files into train and test sets
train_files, test_files = train_test_split(all_files, test_size=0.4, random_state=42, shuffle=True)
# bent_leg_angles, torso_bend_angles, hip_bend_angles = load_features_warriorTwo(pose_sequences_file_names)

In [18]:
def get_labels(file_names):
    labels = [1 if "correct" in i else 0 for i in file_names]
    return labels

In [35]:
#loading features 
X_train_1, X_train_2, X_train_3 = load_features_warriorTwo(train_files, output_dir='data/angle_analysis') #bent_leg_angles, torso_bend_angles, hip_bend_angles
X_test_1, X_test_2, X_test_3 = load_features_warriorTwo(test_files, output_dir='data/angle_analysis')
y_train = get_labels(train_files)
y_test = get_labels(test_files)

right is bent leg
right is bent leg
left is bent leg
right is bent leg
left is bent leg
right is bent leg
right is bent leg
left is bent leg
left is bent leg
right is bent leg
right is bent leg
right is bent leg
left is bent leg
right is bent leg
left is bent leg
right is bent leg
right is bent leg
right is bent leg
right is bent leg




In [29]:
# debugging: 


### Using DTW (Dynamic Time Warping)

Compute Dynamic Time Warp Distance of two sequences
https://towardsdatascience.com/time-series-classification-using-dynamic-time-warping-61dcd9e143f6

In [None]:
#Example
from dtaidistance import dtw
from dtaidistance import dtw_visualisation as dtwvis

# path = dtw.warping_path(bent_leg_angles[0], bent_leg_angles[1])
# dtwvis.plot_warping(bent_leg_angles[0], bent_leg_angles[1], path)
# distance = dtw.distance(bent_leg_angles[0], bent_leg_angles[1])

In [None]:
#function that takes as input the number of neighbors of KNN and the index of the time series
# in the test set, and returns one of the labels: correct, wrong 

In [None]:
predictions = []
for example in range(len(test_files)):
    #store the average distance to good and bad training examples
    f1_correct, f2_correct, f3_correct, f1_wrong, f2_wrong, f3_wrong = [[] for i in range(6)]  
    print(f'\n\n ground truth is : {y_test[example]}')
    #Compare distance of current test example with all training examples 
    for i in range(len(X_train_1)):
        dist1 = dtw.distance(X_train_1[i], X_test_1[example], use_pruning=True)
        dist2 = dtw.distance(X_train_2[i], X_test_2[example], use_pruning=True)
        dist3 = dtw.distance(X_train_3[i], X_test_3[example], use_pruning=True)
        
        if y_train[i]: #if the current train data is a correct pose,
            # print('traning data is correct') 
            # print(f'dist1, dist2, dist2: {dist1, dist2, dist3}')
            f1_correct.append(dist1)
            f2_correct.append(dist2)
            f3_correct.append(dist3)
        else: 
            # print('traning data is wrong') 
            # print(f'dist1, dist2, dist2: {dist1, dist2, dist3}')
            f1_wrong.append(dist1)
            f2_wrong.append(dist2)
            f3_wrong.append(dist3)
    print(f'correctpose f1, f2 and f3: {f1_correct, f2_correct, f3_correct}')
    correct_pose_score = np.mean(f1_correct) + np.mean(f2_correct) + np.mean(f3_correct)
    print(f'wrongpose f1, f2 and f3: {f1_wrong, f2_wrong, f3_wrong}')
    wrong_pose_score = np.mean(f1_wrong) + np.mean(f2_wrong) + np.mean(f3_wrong)
    #Compare distance of current test example with all training examples 
    if correct_pose_score < wrong_pose_score: 
        print('prediction is correct pose')
        predictions.append(1)
    else: 
        print('prediction is prediction is wrong pose')
        predictions.append(0)
    
print(classification_report(y_test, predictions, target_names=['correct', 'incorrect']))
#The reason why this algorithm is flawed is because there is an imbalance of classes 



 ground truth is : 1
correctpose f1, f2 and f3: ([79.00790116264469, 90.34526832072653, 23.48002190798272], [28.4013839163727, 2.3044524984059818, 75.02972361076621], [12.907270926674085, 9.219873057315722, 27.483079499253744])
wrongpose f1, f2 and f3: ([142.7353650211865, 110.92923231270353, 40.57031240872859, 54.61618586271699, 28.646355090714227, 77.55712285024322, 61.74523301215828, 169.36574466108215], [22.799728167897065, 1026.6513130813055, 46.16976548801631, 904.470686530047, inf, 38.118384823904954, 437.78376575250263, 527.5048407719997], [48.42555392698344, 282.6171499069545, 53.92941426318836, 183.21821349192123, 81.96943504773218, 51.79634303381344, 259.08860502707563, 163.830180028733])
prediction is correct pose


 ground truth is : 1
correctpose f1, f2 and f3: ([167.65547719612883, 245.65406591537248, 57.941695969337744], [14.852146814163326, 12.759916466457366, 65.41254421670209], [4.67977187187827, 6.341594323021961, 26.84286237651108])
wrongpose f1, f2 and f3: ([inf

In [None]:
#Let's do some debugging here.. 
ps = load_ps('/home/cgusti/ViTPose_pytorch/scripts/data/processed_frames/numerical/WarriorTwo/correct/WarriorTwo_correct_2_sequence.npy')
poses_sequence = ps.poses #load all poses in the sequence 

right_knee_progression = [(pose.right_knee.x,pose.right_knee.y)  for pose in poses_sequence]
right_ankle_progression = [(pose.right_ankle.x,pose.right_ankle.y)  for pose in poses_sequence]


fig, (ax1, ax2) = plt.subplots(1,2)
ax1.plot(right_knee_progression)
ax1.set_title('right knee progression (x, y)')

ax2.plot(right_ankle_progression)
ax2.set_title('right ankle progression (x, y)')
plt.tight_layout()


In [None]:
#There's something fishy about the angles between the horizontal axis: 


#so let's visualize it here 
vector = [-1.78116055, 1.2703142 ]

# Create a figure and axis
fig, ax = plt.subplots()

# Plot the vector
ax.quiver(0, 0, vector[0], vector[1], angles='xy', scale_units='xy', scale=1, color='r')

# Set the x and y axis limits
ax.set_xlim([-5, 5])
ax.set_ylim([-5, 5])

# Set labels for x and y axis
ax.set_xlabel('X-axis')
ax.set_ylabel('Y-axis')

# Set the title of the plot
ax.set_title('Vector Visualization')

# Display the plot
plt.show()

### Warrior I Analaysis 

#### Some important notes
1. Hips need to be square  
    - Need to identify hips vector 
2. Back foot need to pivot facing forward (same direction the head is facing)
3. Knee needs to be bent and stacked above the ankle 
    - Need to identify front and back foot
    - Need to identify front and back leg vector 