In [6]:
#######################################################################################################################################################################
# Angle Calculation using Mediapipe Pose Visualization #
#######################################################################################################################################################################
import cv2
import mediapipe as mp
import numpy as np

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Function to calculate the angle between three points
def calculate_angle(a, b, c):
    """Calculate angle between three 2D points using arctangent function."""
    ba = np.array(a) - np.array(b)
    bc = np.array(c) - np.array(b)

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))

    return np.degrees(angle)

# Open webcam
cap = cv2.VideoCapture("asitwas.webm")

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break

    # Convert frame to RGB
    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)

    if results.pose_landmarks:
        landmarks = results.pose_landmarks.landmark

        # Define key joints (Using Mediapipe landmark indices)
        joint_ids = {
            "nose": 0,
            "left_shoulder": 11, "right_shoulder": 12,
            "left_elbow": 13, "right_elbow": 14,
            "left_wrist": 15, "right_wrist": 16,
            "left_waist": 23, "right_waist": 24  # Waist is hip landmarks
        }

        # Extract 2D coordinates
        joints = {}
        for name, idx in joint_ids.items():
            if landmarks[idx].visibility > 0.5:  # Ensure the joint is visible
                joints[name] = (int(landmarks[idx].x * frame.shape[1]), 
                                int(landmarks[idx].y * frame.shape[0]))

        # Compute mid_shoulder (midpoint of left and right shoulder)
        if all(k in joints for k in ["left_shoulder", "right_shoulder"]):
            joints["mid_shoulder"] = ((joints["left_shoulder"][0] + joints["right_shoulder"][0]) // 2,
                                      (joints["left_shoulder"][1] + joints["right_shoulder"][1]) // 2)

        # Compute angles only if required points exist
        angles = {}
        if all(k in joints for k in ["left_shoulder", "left_elbow", "left_wrist"]):
            angles["Left Elbow"] = calculate_angle(joints["left_shoulder"], joints["left_elbow"], joints["left_wrist"])
        if all(k in joints for k in ["right_shoulder", "right_elbow", "right_wrist"]):
            angles["Right Elbow"] = calculate_angle(joints["right_shoulder"], joints["right_elbow"], joints["right_wrist"])
        if all(k in joints for k in ["left_elbow", "left_shoulder", "left_waist"]):
            angles["Left Arm Lift"] = calculate_angle(joints["left_elbow"], joints["left_shoulder"], joints["left_waist"])
        if all(k in joints for k in ["right_elbow", "right_shoulder", "right_waist"]):
            angles["Right Arm Lift"] = calculate_angle(joints["right_elbow"], joints["right_shoulder"], joints["right_waist"])
        if all(k in joints for k in ["nose", "mid_shoulder", "left_shoulder"]):
            angles["Neck Angle"] = calculate_angle(joints["nose"], joints["mid_shoulder"], joints["left_shoulder"])

        # Draw key points and lines
        for (p1, p2, p3, label, text_pos) in [
            ("left_shoulder", "left_elbow", "left_wrist", "Left Elbow", "left_elbow"),
            ("right_shoulder", "right_elbow", "right_wrist", "Right Elbow", "right_elbow"),
            ("left_elbow", "left_shoulder", "left_waist", "Left Arm Lift", "left_shoulder"),
            ("right_elbow", "right_shoulder", "right_waist", "Right Arm Lift", "right_shoulder")
        ]:
            if all(k in joints for k in [p1, p2, p3]):
                # Draw lines
                cv2.line(frame, joints[p1], joints[p2], (0, 255, 0), 3)
                cv2.line(frame, joints[p2], joints[p3], (0, 255, 0), 3)

                # Draw key points
                cv2.circle(frame, joints[p1], 6, (0, 0, 255), -1)
                cv2.circle(frame, joints[p2], 6, (255, 0, 0), -1)
                cv2.circle(frame, joints[p3], 6, (0, 255, 255), -1)

                # Display angle
                if label in angles:
                    angle_text = f"{label}: {int(angles[label])}"
                    cv2.putText(frame, angle_text, (joints[text_pos][0] + 20, joints[text_pos][1]), 
                                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

        # Draw neck angle separately
        if all(k in joints for k in ["nose", "mid_shoulder", "left_shoulder"]) and "Neck Angle" in angles:
            cv2.line(frame, joints["nose"], joints["mid_shoulder"], (0, 255, 0), 3)
            cv2.line(frame, joints["mid_shoulder"], joints["left_shoulder"], (0, 255, 0), 3)

            cv2.circle(frame, joints["nose"], 6, (0, 0, 255), -1)
            cv2.circle(frame, joints["mid_shoulder"], 6, (255, 0, 0), -1)
            cv2.circle(frame, joints["left_shoulder"], 6, (0, 255, 255), -1)

            # Display neck angle
            angle_text = f"Neck Angle: {int(angles['Neck Angle'])}"
            cv2.putText(frame, angle_text, (joints["nose"][0] + 20, joints["nose"][1]), 
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2)

    # Show the frame
    cv2.imshow("Pose Angle Visualization", frame)

    # Exit if 'q' is pressed
    if cv2.waitKey(10) & 0xFF == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()


In [None]:
#######################################################################################################################################################################
# Move Detection, Recording and Preprocessing #
#######################################################################################################################################################################

import cv2
import mediapipe as mp
import json
import numpy as np
import time
import os

# Load timeline.json
with open("timeline.json", "r") as f:
    timeline_data = json.load(f)

# Extract moves
moves = timeline_data["moves"]

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Directory to save processed reference data
output_dir = "reference_moves"
os.makedirs(output_dir, exist_ok=True)

def calculate_angle(a, b, c):
    """Calculate angle between three 3D points using the arctangent function."""
    ba = np.array(a) - np.array(b)
    bc = np.array(c) - np.array(b)

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    
    return np.degrees(angle)

def process_frame(image):
    """Extract pose landmarks from a frame and compute angles."""
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)
    
    if results.pose_landmarks:
        landmarks = results.pose_landmarks.landmark
        
        # Extract key joint coordinates (normalized values)
        joint_ids = {
            "nose": 0,
            "left_shoulder": 11, "right_shoulder": 12,
            "left_elbow": 13, "right_elbow": 14,
            "left_wrist": 15, "right_wrist": 16,
            "left_hip": 23, "right_hip": 24
        }
        
        joints = {name: (landmarks[idx].x, landmarks[idx].y, landmarks[idx].z) for name, idx in joint_ids.items()}

        # Compute mid-shoulder point
        if "left_shoulder" in joints and "right_shoulder" in joints:
            mid_shoulder = (
                (joints["left_shoulder"][0] + joints["right_shoulder"][0]) / 2,
                (joints["left_shoulder"][1] + joints["right_shoulder"][1]) / 2,
                (joints["left_shoulder"][2] + joints["right_shoulder"][2]) / 2
            )
        else:
            mid_shoulder = None

        # Compute angles
        angles = {}

        if all(k in joints for k in ["left_shoulder", "left_elbow", "left_wrist"]):
            angles["left_elbow"] = calculate_angle(joints["left_shoulder"], joints["left_elbow"], joints["left_wrist"])
        if all(k in joints for k in ["right_shoulder", "right_elbow", "right_wrist"]):
            angles["right_elbow"] = calculate_angle(joints["right_shoulder"], joints["right_elbow"], joints["right_wrist"])
        if all(k in joints for k in ["left_elbow", "left_shoulder", "left_hip"]):
            angles["left_arm_lift"] = calculate_angle(joints["left_elbow"], joints["left_shoulder"], joints["left_hip"])
        if all(k in joints for k in ["right_elbow", "right_shoulder", "right_hip"]):
            angles["right_arm_lift"] = calculate_angle(joints["right_elbow"], joints["right_shoulder"], joints["right_hip"])
        if mid_shoulder and "nose" in joints:
            angles["neck_angle"] = calculate_angle(joints["nose"], mid_shoulder, joints["left_shoulder"])

        return angles
    
    return None

# Open webcam
cap = cv2.VideoCapture(0)

for move in moves:
    move_name = move["name"]
    duration = move["duration"]
    
    print(f"Get ready for move: {move_name}")
    time.sleep(2)  # Give user time to prepare
    
    print(f"Recording move: {move_name} (duration: {duration}s)...")
    
    start_time = time.time()
    capture_time = start_time + (duration / 2)  # Capture a frame at the middle of the move
    
    captured_frame = None

    while time.time() - start_time < duration:
        ret, frame = cap.read()
        if not ret:
            print("Failed to capture frame!")
            continue
        
        if time.time() >= capture_time and captured_frame is None:
            captured_frame = frame.copy()
    
    if captured_frame is not None:
        angles = process_frame(captured_frame)
        if angles:
            # Save angles for this move
            with open(os.path.join(output_dir, f"{move_name}.json"), "w") as f:
                json.dump(angles, f, indent=4)
            print(f"Saved reference angles for {move_name}")
        else:
            print(f"Failed to detect pose for {move_name}")

cap.release()
cv2.destroyAllWindows()

In [5]:
import cv2
import mediapipe as mp
import json
import numpy as np
import time
import os
import joblib

# Load timeline.json
with open("timeline.json", "r") as f:
    timeline_data = json.load(f)

# Extract moves
moves = timeline_data["moves"]

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose()

# Directory to save processed reference data
output_dir = "reference_moves"
os.makedirs(output_dir, exist_ok=True)

# Load the dance video
video_path = "asitwas.webm"  # Change this to your video file
cap = cv2.VideoCapture(video_path)

# Check if video loaded successfully
if not cap.isOpened():
    print("Error: Could not open video file.")
    exit()

fps = cap.get(cv2.CAP_PROP_FPS)  # Frames per second
print(f"Video FPS: {fps}")

def calculate_angle(a, b, c):
    """Calculate angle between three 3D points using the arctangent function."""
    ba = np.array(a) - np.array(b)
    bc = np.array(c) - np.array(b)

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))

    return np.degrees(angle)

def process_frame(image):
    """Extract pose landmarks from a frame and compute angles."""
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    results = pose.process(image_rgb)

    if results.pose_landmarks:
        landmarks = results.pose_landmarks.landmark

        # Extract key joint coordinates (normalized values)
        joint_ids = {
            "nose": 0,
            "left_shoulder": 11, "right_shoulder": 12,
            "left_elbow": 13, "right_elbow": 14,
            "left_wrist": 15, "right_wrist": 16,
            "left_hip": 23, "right_hip": 24
        }

        joints = {name: (landmarks[idx].x, landmarks[idx].y, landmarks[idx].z) for name, idx in joint_ids.items()}

        # Compute mid-shoulder point
        if "left_shoulder" in joints and "right_shoulder" in joints:
            mid_shoulder = (
                (joints["left_shoulder"][0] + joints["right_shoulder"][0]) / 2,
                (joints["left_shoulder"][1] + joints["right_shoulder"][1]) / 2,
                (joints["left_shoulder"][2] + joints["right_shoulder"][2]) / 2
            )
        else:
            mid_shoulder = None

        # Compute angles
        angles = {}

        if all(k in joints for k in ["left_shoulder", "left_elbow", "left_wrist"]):
            angles["left_elbow"] = calculate_angle(joints["left_shoulder"], joints["left_elbow"], joints["left_wrist"])
        if all(k in joints for k in ["right_shoulder", "right_elbow", "right_wrist"]):
            angles["right_elbow"] = calculate_angle(joints["right_shoulder"], joints["right_elbow"], joints["right_wrist"])
        if all(k in joints for k in ["left_elbow", "left_shoulder", "left_hip"]):
            angles["left_arm_lift"] = calculate_angle(joints["left_elbow"], joints["left_shoulder"], joints["left_hip"])
        if all(k in joints for k in ["right_elbow", "right_shoulder", "right_hip"]):
            angles["right_arm_lift"] = calculate_angle(joints["right_elbow"], joints["right_shoulder"], joints["right_hip"])
        if mid_shoulder and "nose" in joints:
            angles["neck_angle"] = calculate_angle(joints["nose"], mid_shoulder, joints["left_shoulder"])

        return {"joints": joints, "angles": angles}

    return None

# Process each move in the timeline
reference_data = {}

for move in moves:
    move_name = move["name"]
    start_time = move["time"]
    duration = move["duration"]

    mid_time = start_time + (duration / 2)  # Middle of the move

    frame_idx = int(mid_time * fps)  # Convert to frame index
    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # Seek to the frame

    ret, frame = cap.read()
    if not ret:
        print(f"Failed to capture frame for {move_name} at time {mid_time}s (frame {frame_idx})!")
        continue

    print(f"Processing {move_name} at frame {frame_idx}...")

    pose_data = process_frame(frame)
    if pose_data:
        reference_data[move_name] = pose_data
        print(f"✔ Saved pose data for {move_name}")
    else:
        print(f"❌ Failed to detect pose for {move_name}")

# Save the reference data using joblib
joblib.dump(reference_data, os.path.join(output_dir, "reference_data.pkl"))
print(f"✅ Reference data saved to {output_dir}/reference_data.pkl")

cap.release()
cv2.destroyAllWindows()


Video FPS: 25.0
Processing asitwas_jacket at frame 125...
✔ Saved pose data for asitwas_jacket
Processing asitwas_snap at frame 142...
✔ Saved pose data for asitwas_snap
Processing asitwas_arm_up at frame 161...
✔ Saved pose data for asitwas_arm_up
Processing asitwas_snap_down_start at frame 219...




✔ Saved pose data for asitwas_snap_down_start
Processing asitwas_snap_down at frame 237...
✔ Saved pose data for asitwas_snap_down
Processing asitwas_snap_down at frame 254...
✔ Saved pose data for asitwas_snap_down
Processing asitwas_snap_down at frame 271...
✔ Saved pose data for asitwas_snap_down
Processing asitwas_snap_down_start_2 at frame 288...
✔ Saved pose data for asitwas_snap_down_start_2
Processing asitwas_snap_down at frame 306...
✔ Saved pose data for asitwas_snap_down
Processing asitwas_snap_down at frame 323...
✔ Saved pose data for asitwas_snap_down
Processing asitwas_snap_down_end at frame 340...
✔ Saved pose data for asitwas_snap_down_end
Processing asitwas_arm_down at frame 357...
✔ Saved pose data for asitwas_arm_down
Processing asitwas_sun at frame 375...
✔ Saved pose data for asitwas_sun
Processing asitwas_fists at frame 392...
✔ Saved pose data for asitwas_fists
Processing asitwas_arm_down at frame 426...
✔ Saved pose data for asitwas_arm_down
Processing asitwas_

In [None]:
import pandas as pd
pd.Series(list(reference_data.keys())).value_counts().max()

1

In [15]:
import cv2
import json
import os
import joblib
import numpy as np

# Load timeline.json
with open("timeline.json", "r") as f:
    timeline_data = json.load(f)

# Extract moves
moves = timeline_data["moves"]

# Load reference data
reference_data = joblib.load("reference_moves/reference_data.pkl")

# Load the dance video
video_path = "asitwas.webm"  # Change this to your video file
cap = cv2.VideoCapture(video_path)

# Check if video loaded successfully
if not cap.isOpened():
    print("Error: Could not open video file.")
    exit()

fps = cap.get(cv2.CAP_PROP_FPS)  # Frames per second
print(f"Video FPS: {fps}")

# Define colors
WHITE = (255, 255, 255)
RED = (0, 0, 255)
GREEN = (0, 255, 0)
BLUE = (255, 0, 0)

# Joint connections based on the MediaPipe pose model
connections = [
    ("left_shoulder", "right_shoulder"),
    ("left_shoulder", "left_elbow"), ("left_elbow", "left_wrist"),
    ("right_shoulder", "right_elbow"), ("right_elbow", "right_wrist"),
    ("left_shoulder", "left_hip"), ("right_shoulder", "right_hip"),
    ("left_hip", "right_hip")
]

def draw_landmarks(frame, joints):
    """Draws landmarks and connections on the frame."""
    height, width, _ = frame.shape

    # Convert normalized joint coordinates to pixel values
    joint_positions = {key: (int(x * width), int(y * height)) for key, (x, y, z) in joints.items()}

    # Draw connections
    for joint1, joint2 in connections:
        if joint1 in joint_positions and joint2 in joint_positions:
            cv2.line(frame, joint_positions[joint1], joint_positions[joint2], BLUE, 2)

    # Draw joints
    for joint, (x, y) in joint_positions.items():
        cv2.circle(frame, (x, y), 5, RED, -1)

    return frame

# Process each move
for move in moves:
    move_name = move["name"]
    start_time = move["time"]
    duration = move["duration"]

    # Get the corresponding reference data
    if move_name not in reference_data:
        print(f"❌ No reference data found for {move_name}, skipping...")
        continue

    mid_time = start_time + (duration / 2)  # Middle of the move
    frame_idx = int(mid_time * fps)  # Convert to frame index

    cap.set(cv2.CAP_PROP_POS_FRAMES, frame_idx)  # Seek to the frame
    ret, frame = cap.read()
    
    if not ret:
        print(f"❌ Failed to capture frame for {move_name} at frame {frame_idx}!")
        continue

    print(f"🎨 Drawing landmarks for {move_name} at frame {frame_idx}...")

    joints = reference_data[move_name]["joints"]

    # Draw landmarks
    frame = draw_landmarks(frame, joints)

    # Display the frame
    cv2.imshow("Pose Landmarks", frame)
    key = cv2.waitKey(500)  # Display for 500ms

    # Press 'q' to exit early
    if key == ord('q'):
        break

cap.release()
cv2.destroyAllWindows()

Video FPS: 25.0
🎨 Drawing landmarks for asitwas_jacket at frame 125...
🎨 Drawing landmarks for asitwas_snap at frame 142...
🎨 Drawing landmarks for asitwas_arm_up at frame 161...
🎨 Drawing landmarks for asitwas_snap_down_start at frame 219...
🎨 Drawing landmarks for asitwas_snap_down at frame 237...
🎨 Drawing landmarks for asitwas_snap_down at frame 254...
🎨 Drawing landmarks for asitwas_snap_down at frame 271...
🎨 Drawing landmarks for asitwas_snap_down_start_2 at frame 288...
🎨 Drawing landmarks for asitwas_snap_down at frame 306...
🎨 Drawing landmarks for asitwas_snap_down at frame 323...
🎨 Drawing landmarks for asitwas_snap_down_end at frame 340...
🎨 Drawing landmarks for asitwas_arm_down at frame 357...
🎨 Drawing landmarks for asitwas_sun at frame 375...
🎨 Drawing landmarks for asitwas_fists at frame 392...
🎨 Drawing landmarks for asitwas_arm_down at frame 426...
🎨 Drawing landmarks for asitwas_sun at frame 444...
🎨 Drawing landmarks for asitwas_fists at frame 461...
🎨 Drawing lan