In [1]:
import cv2
import os
import numpy as np
import mediapipe as mp
from scipy.linalg import orthogonal_procrustes
import matplotlib.pyplot as plt
from collections import defaultdict
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import warnings


warnings.filterwarnings("ignore", category=UserWarning, message="SymbolDatabase.GetPrototype() is deprecated")


logging.basicConfig(level=logging.INFO)


weight_procrustes = 0.5
weight_cosine = 0.5


threshold = 0.3


def load_and_preprocess_image(image_path):
    try:
        image = cv2.imread(image_path)
        if image is None:
            logging.warning(f"Failed to load image at {image_path}")
            return None
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        return image_rgb
    except Exception as e:
        logging.error(f"Error loading image {image_path}: {e}")
        return None

# Function to extract pose features using MediaPipe BlazePose
def extract_pose_features(image):
    if image is None:
        return None
    try:
        with mp.solutions.pose.Pose(static_image_mode=True, min_detection_confidence=0.5) as pose:
            results = pose.process(image)
            if not results.pose_landmarks:
                return None
            landmarks = results.pose_landmarks.landmark
            keypoints = np.array([[lm.x, lm.y, lm.z] for lm in landmarks])
        return keypoints
    except Exception as e:
        logging.error(f"Error extracting pose features: {e}")
        return None

# Function to compute Procrustes distance between pose feature vectors
def procrustes_analysis(X, Y):
    if X is None or Y is None:
        return float('inf')

    # Center the matrices
    X -= np.mean(X, axis=0)
    Y -= np.mean(Y, axis=0)

    # Normalize the matrices
    X /= np.linalg.norm(X)
    Y /= np.linalg.norm(Y)

    # Compute the orthogonal Procrustes problem
    R, scale = orthogonal_procrustes(X, Y)

    # Apply the transformation
    Z = np.dot(X, R) * scale

    # Compute the Procrustes distance
    d = np.sum((Z - Y) ** 2)

    return d

# Function to compute Cosine Similarity distance between pose feature vectors
def cosine_similarity(X, Y):
    if X is None or Y is None:
        return float('inf')

 
    X_norm = X / np.linalg.norm(X, axis=1, keepdims=True)
    Y_norm = Y / np.linalg.norm(Y, axis=1, keepdims=True)

   
    cosine_sim = np.sum(X_norm * Y_norm, axis=1).mean()

   
    cosine_dist = 1 - cosine_sim

    return cosine_dist

# Function to compute weighted distance for each pair of poses
def compute_weighted_distance(procrustes_distance, cosine_distance):
    weighted_distance = weight_procrustes * procrustes_distance + weight_cosine * cosine_distance
    return weighted_distance

# Function to extract frames from a video and compare with specific correct poses
def extract_frames_and_compare(video_path, output_dir, frame_rate, target_size=(512, 384), num_matches=3):
   
    os.makedirs(output_dir, exist_ok=True)

    
    video_capture = cv2.VideoCapture(video_path)

    
    frame_count = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT))
    fps = video_capture.get(cv2.CAP_PROP_FPS)

   
    interval = int(fps / frame_rate)

    
    frame_number = 0
    success = True

    
    def process_frame(frame_filename):
        try:
            frame_path = os.path.join(output_dir, frame_filename)
            frame_image = load_and_preprocess_image(frame_path)
            frame_keypoints = extract_pose_features(frame_image)

            frame_results = []
            for pose_name, ref_keypoints in correct_pose_features.items():
                procrustes_dist = procrustes_analysis(frame_keypoints, ref_keypoints)
                cosine_dist = cosine_similarity(frame_keypoints, ref_keypoints)
                weighted_dist = compute_weighted_distance(procrustes_dist, cosine_dist)
                frame_results.append((pose_name, frame_filename, weighted_dist))

            return frame_results
        except Exception as e:
            logging.error(f"Error processing frame {frame_filename}: {e}")
            return []


   
    def compare_frames_with_specific_poses(frame_filename):
        try:
            frame_results = process_frame(frame_filename)
            if frame_results:
                for pose_name, frame_filename, distance in frame_results:
                    if distance <= 0.2:  
                        best_matches[pose_name].append({'frame': frame_filename, 'distance': distance})
        except Exception as e:
            logging.error(f"Error comparing frame {frame_filename}: {e}")


   
    correct_pose_paths = { 
        "zenkutsuDachi_awaseTsuki(leftLeg)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/zenkutsuDachi_awaseTsuki(leftLeg).jpg",
        "shikoDachi_gedanBarai(front)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/shikoDachi_gedanBarai(front).jpg",
        "zenkutsuDachi_empiUke(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/zenkutsuDachi_empiUke(right).jpg",
        "zenkitsuDahi_empiUke(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/zenkitsuDahi_empiUke(left).jpg",
        "zenkutsuDachi_chudanTsuki": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/zenkutsuDachi_chudanTsuki.jpg",
        "shikoDach_gedaiBarai(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/shikoDach_gedaiBarai(left).jpg",
        "shikoDachi_GedanBarai(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/shikoDachi_GedanBarai(right).jpg",
        "zenkutsuDachi_awaseTsuki(rightLeg)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/zenkutsuDachi_awaseTsuki(rightLeg).jpg",
        "sotoUke_maeGeri(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/sotoUke_maeGeri(right).jpg",
        "sotoUke_maeGeri(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/sotoUke_maeGeri(left).jpg",
        "motoDachi_sotoUke(left2)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_sotoUke(left2).jpg",
        "motoDachi_sotoUke(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_sotoUke(right).jpg",
        "motoDachi_sotoUke(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_sotoUke(left).jpg",
        "motoDachi_jodanTsuki(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_jodanTsuki(left).jpg",
        "motoDachi_jodanTsuki(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_jodanTsuki(right).jpg",
        "motoDachi_ageUke(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_ageUke(right).jpg",
        "motoDachi_ageUke(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/motoDachi_ageUke(left).jpg",
        "hachijiDachi_jodanYoko(left)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/hachijiDachi_jodanYoko(left).jpg",
        "hachijiDachi_jidanYoko(right)": "C:/Users/USER/Desktop/Frame extraction/full/Frame_ex_n_comp/CorrectPose/hachijiDachi_jidanYoko(right).jpg"
    }

    
    correct_pose_features = {
        name: extract_pose_features(load_and_preprocess_image(path))
        for name, path in correct_pose_paths.items()
    }

    
    best_matches = defaultdict(list)

    
    frame_filenames = []

    
    while success:
        success, frame = video_capture.read()
        if frame_number % interval == 0 and success:
            
            frame_resized = cv2.resize(frame, target_size)
            
            frame_filename = f"frame_{frame_number}.jpg"
            frame_path = os.path.join(output_dir, frame_filename)
            cv2.imwrite(frame_path, frame_resized)
            
            frame_filenames.append(frame_filename)
        frame_number += 1

    
    video_capture.release()

    # Use ThreadPoolExecutor to process frames in parallel
    with ThreadPoolExecutor() as executor:
        futures = [executor.submit(compare_frames_with_specific_poses, filename) for filename in frame_filenames]
        for future in as_completed(futures):
            future.result() 

   
    for pose_name, matches in best_matches.items():
        matches.sort(key=lambda x: x['distance'])
        best_matches[pose_name] = matches[:num_matches]

  
    for pose_name, matches_info in best_matches.items():
        for match_info in matches_info:
            frame_name = match_info['frame']
            frame_path = os.path.join(output_dir, frame_name)
            print(f"Best match for {pose_name} is frame {frame_name}")

          
            image = cv2.imread(frame_path)
            plt.imshow(cv2.cvtColor(image, cv2.COLOR_BGR2RGB))
            plt.title(f"{pose_name} - {frame_name}")
            plt.show()

            if pose_name in correct_pose_paths:
                ref_image_path = correct_pose_paths[pose_name]
                ref_image = cv2.imread(ref_image_path)
                plt.imshow(cv2.cvtColor(ref_image, cv2.COLOR_BGR2RGB))
                plt.title(f"Reference - {pose_name}")
                plt.show()



video_path = r"C:\Users\USER\Desktop\Frame extraction\full\Frame_ex_n_comp\video\vid2.mp4"
output_dir = r"C:\Users\USER\Desktop\Frame extraction\full\Frame_ex_n_comp\extracted_frames_resized"
frame_rate = 10  
extract_frames_and_compare(video_path, output_dir, frame_rate)


