Marjaryasana Pose Detection System

In [None]:
!pip install mediapipe

Collecting mediapipe
  Downloading mediapipe-0.10.20-cp310-cp310-manylinux_2_28_x86_64.whl.metadata (9.7 kB)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.1-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.20-cp310-cp310-manylinux_2_28_x86_64.whl (35.6 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.6/35.6 MB[0m [31m46.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.1-py3-none-any.whl (32 kB)
Installing collected packages: sounddevice, mediapipe
Successfully installed mediapipe-0.10.20 sounddevice-0.5.1


In [None]:
import mediapipe as mp
import numpy as np
import cv2
from typing import Tuple, Union, List
import logging

class MarjaryasanaDetector:
    def __init__(self):
        # Initialize MediaPipe Pose
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,
            model_complexity=1,
            enable_segmentation=False,
            min_detection_confidence=0.5
        )
        # Set up logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)

    def calculate_angle(self, point1, point2, point3) -> float:
        """
        Calculate angle between three points.

        Args:
            point1, point2, point3: MediaPipe landmark points

        Returns:
            float: Angle in degrees
        """
        try:
            a = np.array([point1.x, point1.y])
            b = np.array([point2.x, point2.y])
            c = np.array([point3.x, point3.y])

            ba = a - b
            bc = c - b

            cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
            angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))

            return np.degrees(angle)
        except Exception as e:
            self.logger.error(f"Error calculating angle: {str(e)}")
            return 0.0

    def get_landmarks(self, results) -> Union[List, None]:
        """
        Extract relevant landmarks from pose detection results.

        Args:
            results: MediaPipe pose detection results

        Returns:
            dict: Dictionary containing relevant landmark points or None if not found
        """
        if not results.pose_landmarks:
            return None

        try:
            landmarks = results.pose_landmarks.landmark
            return {
                'shoulders': [
                    landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
                    landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
                ],
                'hips': [
                    landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
                    landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value]
                ],
                'knees': [
                    landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value],
                    landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value]
                ],
                'elbows': [
                    landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
                    landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value]
                ],
                'wrists': [
                    landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value],
                    landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value]
                ]
            }
        except Exception as e:
            self.logger.error(f"Error extracting landmarks: {str(e)}")
            return None

    def detect_pose(self, image) -> Tuple[bool, str]:
        """
        Detect if the person is in Marjaryasana pose.

        Args:
            image: Input image in BGR format

        Returns:
            tuple: (is_pose_detected: bool, message: str)
        """
        try:
            # Convert the BGR image to RGB
            image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

            # Process the image and get pose landmarks
            results = self.pose.process(image_rgb)

            # Get landmarks
            landmark_dict = self.get_landmarks(results)
            if landmark_dict is None:
                return False, "No pose detected"

            # Calculate key angles
            spine_angle = self.calculate_angle(
                landmark_dict['shoulders'][0],
                landmark_dict['hips'][0],
                landmark_dict['knees'][0]
            )

            arm_angle = self.calculate_angle(
                landmark_dict['shoulders'][0],
                landmark_dict['elbows'][0],
                landmark_dict['wrists'][0]
            )

            # Define pose criteria
            is_marjaryasana = (
                # Spine should be roughly horizontal (parallel to ground)
                80 <= spine_angle <= 100 and
                # Arms should be straight and perpendicular to ground
                160 <= arm_angle <= 180 and
                # Check if hands and knees are on ground level (similar y-coordinates)
                abs(landmark_dict['wrists'][0].y - landmark_dict['knees'][0].y) < 0.1 and
                # Check if hands are shoulder-width apart
                abs(landmark_dict['wrists'][0].x - landmark_dict['wrists'][1].x) <=
                abs(landmark_dict['shoulders'][0].x - landmark_dict['shoulders'][1].x) * 1.5
            )

            if is_marjaryasana:
                return True, "Marjaryasana detected"
            else:
                return False, "Not in Marjaryasana pose"

        except Exception as e:
            self.logger.error(f"Error in pose detection: {str(e)}")
            return False, f"Error detecting pose: {str(e)}"

    def process_video(self, video_path: str, output_path: str = None) -> None:
        """
        Process video file and detect Marjaryasana pose in each frame.

        Args:
            video_path: Path to input video file
            output_path: Optional path for output video file
        """
        try:
            cap = cv2.VideoCapture(video_path)
            if not cap.isOpened():
                raise ValueError("Could not open video file")

            if output_path:
                frame_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
                frame_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
                fps = int(cap.get(cv2.CAP_PROP_FPS))

                out = cv2.VideoWriter(output_path,
                                    cv2.VideoWriter_fourcc(*'mp4v'),
                                    fps, (frame_width, frame_height))

            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break

                # Detect pose
                is_pose, message = self.detect_pose(frame)

                # Draw status on frame
                color = (0, 255, 0) if is_pose else (0, 0, 255)
                cv2.putText(frame, message, (50, 50),
                           cv2.FONT_HERSHEY_SIMPLEX, 1, color, 2)

                if output_path:
                    out.write(frame)

                cv2.imshow('Marjaryasana Detection', frame)
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

            cap.release()
            if output_path:
                out.release()
            cv2.destroyAllWindows()

        except Exception as e:
            self.logger.error(f"Error processing video: {str(e)}")
            raise

In [None]:
# Initialize the detector
detector = MarjaryasanaDetector()

try:
    # For single image
    image = cv2.imread('pose.jpg')
    if image is None:
        raise ValueError("Could not read image file")

    is_pose, message = detector.detect_pose(image)
    print(message)

    # For video
    # yoga_video.mp4 is used to for imaging purpose you can add your own file
    detector.process_video('yoga_video.mp4', 'output.mp4')

except Exception as e:
    print(f"An error occurred: {str(e)}")

An error occurred: Could not read image file
