In [1]:
import mediapipe as mp
import torch
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torch_geometric.data import Data
from scipy.spatial import Delaunay
import os 
import cv2
import pandas as pd
import ast

In [2]:
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

In [3]:
def create_graph(image_path, label):
    image = cv2.imread(image_path)
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)
    if not results.pose_landmarks:
        return None
    landmarks = results.pose_landmarks.landmark
    nodes = []
    for landmark in landmarks:
        nodes.append([landmark.x, landmark.y, landmark.z])
    x = torch.tensor(nodes, dtype=torch.float)
    edges = [
        (mp_pose.PoseLandmark.LEFT_SHOULDER.value, mp_pose.PoseLandmark.LEFT_ELBOW.value),
        (mp_pose.PoseLandmark.LEFT_ELBOW.value, mp_pose.PoseLandmark.LEFT_WRIST.value),
        (mp_pose.PoseLandmark.RIGHT_SHOULDER.value, mp_pose.PoseLandmark.RIGHT_ELBOW.value),
        (mp_pose.PoseLandmark.RIGHT_ELBOW.value, mp_pose.PoseLandmark.RIGHT_WRIST.value),
        (mp_pose.PoseLandmark.LEFT_HIP.value, mp_pose.PoseLandmark.LEFT_KNEE.value),
        (mp_pose.PoseLandmark.LEFT_KNEE.value, mp_pose.PoseLandmark.LEFT_ANKLE.value),
        (mp_pose.PoseLandmark.RIGHT_HIP.value, mp_pose.PoseLandmark.RIGHT_KNEE.value),
        (mp_pose.PoseLandmark.RIGHT_KNEE.value, mp_pose.PoseLandmark.RIGHT_ANKLE.value),
    ]
    edge_index = torch.tensor(edges, dtype=torch.long).t().contiguous()
    data = Data(x=x, edge_index=edge_index, y=torch.tensor([label], dtype=torch.long))
    return data

In [6]:
dataset_path = "yoga-posture-dataset"  
output_csv = "yoga_keypoints2.csv"

In [None]:
def normalize_keypoints(keypoints):
    kps = np.array([(kp.x, kp.y, kp.z, kp.visibility) for kp in keypoints])
    neck = kps[mp_pose.PoseLandmark.NOSE.value][:3]  
    left_hip = kps[mp_pose.PoseLandmark.LEFT_HIP.value][:3]
    right_hip = kps[mp_pose.PoseLandmark.RIGHT_HIP.value][:3]
    mid_hip = (left_hip + right_hip) / 2
    torso_length = np.linalg.norm(neck - mid_hip)
    if torso_length < 1e-6:  
        return None
    normalized_kps = kps.copy()
    normalized_kps[:, :3] = normalized_kps[:, :3] / torso_length
    
    return normalized_kps.flatten().tolist()
columns = ["image_name", "class_label"]
for i in range(33): 
    columns += [f"kp{i}_x", f"kp{i}_y", f"kp{i}_z", f"kp{i}_vis"]  
data = []
class_folders = sorted(os.listdir(dataset_path))
total_classes = len(class_folders)
for class_idx, class_name in enumerate(class_folders):
    class_path = os.path.join(dataset_path, class_name)
    if not os.path.isdir(class_path):
        continue
    print(f"Processing class {class_idx + 1}/{total_classes}: {class_name}...")
    image_files = sorted(os.listdir(class_path))
    total_images = len(image_files)
    for img_idx, img_name in enumerate(image_files):
        img_path = os.path.join(class_path, img_name)
        image = cv2.imread(img_path)
        if image is None:
            print(f"  Skipping unreadable image: {img_name}")
            continue
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)
        if results.pose_landmarks:
            normalized_kps = normalize_keypoints(results.pose_landmarks.landmark)
            if normalized_kps:
                data.append([img_name, class_name] + normalized_kps)
            else:
                print(f"  Could not normalize keypoints for {img_name}")
        if (img_idx + 1) % 10 == 0:
            print(f"  Processed {img_idx + 1}/{total_images} images in {class_name}")


Processing class 1/48: Adho Mukha Svanasana...
  Processed 10/74 images in Adho Mukha Svanasana
  Processed 20/74 images in Adho Mukha Svanasana
  Processed 30/74 images in Adho Mukha Svanasana
  Processed 40/74 images in Adho Mukha Svanasana
  Processed 50/74 images in Adho Mukha Svanasana
  Processed 60/74 images in Adho Mukha Svanasana
  Processed 70/74 images in Adho Mukha Svanasana
Processing class 2/48: Adho Mukha Vrksasana...
  Processed 10/65 images in Adho Mukha Vrksasana
  Processed 20/65 images in Adho Mukha Vrksasana
  Processed 30/65 images in Adho Mukha Vrksasana
  Processed 40/65 images in Adho Mukha Vrksasana
  Processed 50/65 images in Adho Mukha Vrksasana
  Processed 60/65 images in Adho Mukha Vrksasana
Processing class 3/48: Alanasana...
  Processed 10/18 images in Alanasana
Processing class 4/48: Anjaneyasana...
  Processed 10/71 images in Anjaneyasana
  Processed 20/71 images in Anjaneyasana
  Processed 30/71 images in Anjaneyasana
  Processed 40/71 images in Anjan

In [8]:
df = pd.DataFrame(data, columns=columns)
df.to_csv(output_csv, index=False)
print(f"\nNormalized keypoints saved to {output_csv}")


Normalized keypoints saved to yoga_keypoints2.csv


In [10]:
def calculate_angle(a, b, c):
    a, b, c = np.array(a[:2]), np.array(b[:2]), np.array(c[:2])  # Ignore z and visibility
    ba = a - b
    bc = c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))  # Clip for numerical stability
    return np.degrees(angle)

In [None]:
# Load the normalized keypoints CSV
df = pd.read_csv(output_csv)

# Angle definitions (using MediaPipe landmark indices)
angle_definitions = {
    "left_elbow": (mp_pose.PoseLandmark.LEFT_SHOULDER.value, 
                   mp_pose.PoseLandmark.LEFT_ELBOW.value, 
                   mp_pose.PoseLandmark.LEFT_WRIST.value),
    "right_elbow": (mp_pose.PoseLandmark.RIGHT_SHOULDER.value, 
                    mp_pose.PoseLandmark.RIGHT_ELBOW.value, 
                    mp_pose.PoseLandmark.RIGHT_WRIST.value),
    "left_knee": (mp_pose.PoseLandmark.LEFT_HIP.value, 
                  mp_pose.PoseLandmark.LEFT_KNEE.value, 
                  mp_pose.PoseLandmark.LEFT_ANKLE.value),
    "right_knee": (mp_pose.PoseLandmark.RIGHT_HIP.value, 
                   mp_pose.PoseLandmark.RIGHT_KNEE.value, 
                   mp_pose.PoseLandmark.RIGHT_ANKLE.value),
    "left_hip": (mp_pose.PoseLandmark.LEFT_SHOULDER.value, 
                 mp_pose.PoseLandmark.LEFT_HIP.value, 
                 mp_pose.PoseLandmark.LEFT_KNEE.value),
    "right_hip": (mp_pose.PoseLandmark.RIGHT_SHOULDER.value, 
                  mp_pose.PoseLandmark.RIGHT_HIP.value, 
                  mp_pose.PoseLandmark.RIGHT_KNEE.value),
    "left_shoulder": (mp_pose.PoseLandmark.LEFT_ELBOW.value, 
                      mp_pose.PoseLandmark.LEFT_SHOULDER.value, 
                      mp_pose.PoseLandmark.LEFT_HIP.value),
    "right_shoulder": (mp_pose.PoseLandmark.RIGHT_ELBOW.value, 
                       mp_pose.PoseLandmark.RIGHT_SHOULDER.value, 
                       mp_pose.PoseLandmark.RIGHT_HIP.value)
}

angle_features = []

for _, row in df.iterrows():
    keypoints = []
    for i in range(33):  
        x = row[f'kp{i}_x']
        y = row[f'kp{i}_y']
        z = row[f'kp{i}_z']
        vis = row[f'kp{i}_vis']
        keypoints.append((x, y, z, vis))
    
    angles = {}
    for name, (a_idx, b_idx, c_idx) in angle_definitions.items():
        try:
            a = keypoints[a_idx]
            b = keypoints[b_idx]
            c = keypoints[c_idx]
            
            if a[3] > 0.5 and b[3] > 0.5 and c[3] > 0.5:
                angles[name] = calculate_angle(a, b, c)
            else:
                angles[name] = None  
        except (IndexError, KeyError) as e:
            print(f"Error calculating {name} angle: {e}")
            angles[name] = None
    
    angle_features.append(angles)

angles_df = pd.DataFrame(angle_features)
df = pd.concat([df, angles_df], axis=1)

updated_csv_path = output_csv.replace(".csv", "_with_angles.csv")
df.to_csv(updated_csv_path, index=False)
print(f"Updated CSV with angles saved to: {updated_csv_path}")

Updated CSV with angles saved to: yoga_keypoints2_with_angles.csv
