In [7]:
# test realtime pose CLASSIFICATION (and hence landmark detection...)
# on our curated dataset of pose videos

import sys
sys.path.insert(0, '/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/training_pipeline')
import os
import cv2
import mediapipe as mp
import pickle
from extract_features import FeaturesMP
import numpy as np
import time
from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
from collections import Counter

# Retrieve pre-trained model
mp_model_path = "/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/pretrained_models/pose_landmarker_full.task"
# Initialize FeaturesMP object
features_mp = FeaturesMP(mp_model_path, image_size=(1080, 1920))
# load the label encoder
with open('/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/training_pipeline/label_encoder.pkl', 'rb') as f:    
    label_encoder = pickle.load(f)

# NOT SURE IF 7 OR 8 BUT ONE OF THOSE
# load the trained classifier
with open('/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/trained_classifiers/padded_nn_7.pkl', 'rb') as f:
    classifier = pickle.load(f)
    
# load the sanskrit to english dictionary
with open('/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/sanskrit_english_dict.pkl', 'rb') as f:
    sanskrit_english_dict = pickle.load(f)


# function to write demonstration videos - rest of testing function is sent in test_video.py

def test_video(video_path, features_mp, label_encoder, classifier, sanskrit_english_dict):

    cap = cv2.VideoCapture(video_path)

    frame_rate = cap.get(cv2.CAP_PROP_FPS)
    n = -1

    # introduce delay in position predictions
    buffer = [-1,-1,-1,-1]

    label_across_frames = []
    label_with_delays = []

    # Initialize detector
    features_mp.init_detector(video=True, min_pose_detection_confidence=0.7)

    # Create a loop to read the latest frame from the camera
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            print("Error: Unable to fetch the frame or finished the video.")
            break
        
        # new frame
        n+=1
        # get timestamp from frame rate
        timestamp = int(n * 1000 / frame_rate)

        # Run inference on the image 
        landmarks = features_mp.detect(frame, video=True, frame_timestamp_ms=timestamp)
        
        # Draw landmarks if detected
        if landmarks is not None:
            if len(landmarks.pose_landmarks) != 0:
                
                pose_landmarks_list = landmarks.pose_landmarks 

                # get only normalized coordinates - improves latency
                pose_landmarks = pose_landmarks_list[0]

                to_classify = np.zeros((features_mp.n_landmarks, 4))
                to_extend = []
                
                # store normalized landmarks to appends and classify
                for k, landmark in enumerate(pose_landmarks):
                    to_classify[k] = [landmark.x, landmark.y, landmark.z, landmark.visibility]
                    
                # normalize and rotate to_classify
                to_classify = features_mp.make_rot_invariant_partial(to_classify, init_norm=True)
                to_classify = to_classify.reshape(1, features_mp.n_landmarks * 4)
                    
                # Run inference
                predicted_class = classifier.predict(to_classify)
                # Get the string label
                predicted_name = label_encoder.inverse_transform([int(predicted_class-1)])
                english = sanskrit_english_dict[predicted_name[0]]
                label_across_frames.append(english)
                # Append to buffer
                buffer.pop(0)
                buffer.append(predicted_name[0])
                # # if all elements now in the buffer are the same, then we can display the pose
                if buffer[0] == buffer[1] == buffer[2] == buffer[3]:
                    text = english
                    label_with_delays.append(english)

    # Release resources
    cap.release()
    # out.release()
    cv2.destroyAllWindows()
    
    return label_across_frames, label_with_delays


In [9]:

# MISSING ARDHA MATS, ardha pincha, ashta chandra

testing_now = ['Adho Mukha Svanasana', 'Adho Mukha Vrksasana', 'Anjaneyasana', 'Ardha Navasana','Baddha Konasana']
    
for pose in testing_now:
    label_accross_frames_list = []
    label_with_delays_list = []
    directory_path = f"/Users/alejandraduran/Documents/Pton_courses/COS429/video_data/{pose}"
    # iterate over the videos of the directory
    for dirname, _, filenames in os.walk(directory_path):
        for filename in filenames:
            if not filename.endswith(('.mp4', '.avi')):  # Only process video files
                continue  # Skip non-video files
            video_path = os.path.join(dirname, filename)
            label_across_frames, label_with_delays = test_video(video_path, features_mp, label_encoder, classifier, sanskrit_english_dict)
            label_accross_frames_list.append(label_across_frames)
            label_with_delays_list.append(label_with_delays)
            
    # save the multi-video results for each pose
    with open(f'/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/testing/metrics/{pose}_label_across_frames.pkl', 'wb') as f:
        pickle.dump(label_accross_frames_list, f)
    with open(f'/Users/alejandraduran/Documents/Pton_courses/COS429/COS429_final_project/testing/metrics/{pose}_label_with_delays.pkl', 'wb') as f:
        pickle.dump(label_with_delays_list, f) 
        
    # run in cluster
            

I0000 00:00:1733956777.162145 94905900 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 86), renderer: Apple M1


Error: Unable to fetch the frame or finished the video.


I0000 00:00:1733956793.779411 94905900 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 86), renderer: Apple M1


Error: Unable to fetch the frame or finished the video.


I0000 00:00:1733956801.200516 94905900 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 86), renderer: Apple M1


KeyboardInterrupt: 

In [3]:
# Flatten the list of lists
flattened_list_accross_frames = [item for sublist in label_accross_frames_list for item in sublist]
flattened_list_with_delays = [item for sublist in label_with_delays_list for item in sublist]

# Count unique terms
counter_accross_frames = Counter(flattened_list_accross_frames)
counter_with_delays = Counter(flattened_list_with_delays)

# Print the counts
print("Counts for label_accross_frames_list:")
print(counter_accross_frames)

print("Counts for label_with_delays_list:")
print(counter_with_delays)

Counts for label_accross_frames_list:
Counter({'Downward-facing dog': 4234, 'Crow': 2156, 'Extended Side Angle': 297, 'Pyramid': 177, 'Corpse': 119, 'Pigeon': 52, 'Triangle': 46, 'Warrior III': 40, 'Half-Moon': 20, 'Wild Thing': 18, 'Plank': 13, 'High Lunge': 13, 'Cat': 12, 'Squat': 12, 'Reverse Warrior': 9, 'Sphinx': 6, 'Tree': 5, 'Butterfly': 3, 'Chair': 2})
Counts for label_with_delays_list:
Counter({'Downward-facing dog': 4163, 'Crow': 2066, 'Extended Side Angle': 185, 'Pyramid': 168, 'Corpse': 85, 'Pigeon': 35, 'Triangle': 20, 'Warrior III': 10, 'Cat': 9, 'Squat': 9, 'Plank': 4, 'Tree': 2, 'Wild Thing': 2, 'Half-Moon': 1})
