In [1]:
# install mediapipe library:
!pip install ultralytics mediapipe==0.10.9 --q

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m34.5/34.5 MB[0m [31m62.2 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.0/1.0 MB[0m [31m60.5 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m162.1/162.1 kB[0m [31m15.7 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m363.4/363.4 MB[0m [31m3.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.8/13.8 MB[0m [31m118.1 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m24.6/24.6 MB[0m [31m90.8 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m883.7/883.7 kB[0m [31m52.4 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m664.8/664.8 MB[0m [31m1.3 MB/s[0m eta [36m0:00:00[0m
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━

In [2]:
import cv2
import mediapipe as mp
import numpy as np
import math
import os
from ultralytics import YOLO

class BowlingPoseDetector:
    def __init__(self):
        # Initialize YOLOv8 for person detection
        self.yolo_model = YOLO('yolov8x.pt')  # Using nano version for speed

        # Initialize MediaPipe pose detection
        self.mp_pose = mp.solutions.pose
        self.pose = self.mp_pose.Pose(
            static_image_mode=False,
            model_complexity=1,
            enable_segmentation=False,
            min_detection_confidence=0.5,
            min_tracking_confidence=0.5
        )
        self.mp_drawing = mp.solutions.drawing_utils

        # Debug mode for detailed output
        self.debug_mode = True

    def detect_objects(self, frame):
        """Detect persons and sports balls, returning only the most confident person"""
        results = self.yolo_model(frame, verbose=False)
        sports_balls = []
        best_person = None
        max_confidence = 0.3  # Minimum confidence threshold

        for result in results:
            boxes = result.boxes
            if boxes is not None:
                for box in boxes:
                    class_id = int(box.cls[0])
                    x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()
                    confidence = box.conf[0].cpu().numpy()

                    # Only keep the most confident person detection
                    if class_id == 0 and confidence > max_confidence:
                        max_confidence = confidence
                        best_person = {
                            'bbox': [int(x1), int(y1), int(x2), int(y2)],
                            'confidence': float(confidence)
                        }

                    # Still detect all sports balls
                    elif class_id == 32 and confidence > 0.1:
                        sports_balls.append({
                            'bbox': [int(x1), int(y1), int(x2), int(y2)],
                            'confidence': float(confidence)
                        })

        # Return as list with 0 or 1 person
        persons = [best_person] if best_person else []
        return persons, sports_balls

    def crop_person_roi(self, frame, bbox):
        """Crop the region of interest around the detected person"""
        x1, y1, x2, y2 = bbox

        # Add padding around the person
        padding = 50
        h, w = frame.shape[:2]

        x1 = max(0, x1 - padding)
        y1 = max(0, y1 - padding)
        x2 = min(w, x2 + padding)
        y2 = min(h, y2 + padding)

        roi = frame[y1:y2, x1:x2]
        return roi, (x1, y1, x2, y2)

    def get_keypoint(self, landmarks, keypoint_id):
        """Get keypoint coordinates with better visibility checking"""
        if landmarks and len(landmarks.landmark) > keypoint_id:
            landmark = landmarks.landmark[keypoint_id]
            # Lowered visibility threshold for better detection
            if landmark.visibility > 0.3:
                return landmark.x, landmark.y, landmark.visibility
        return None, None, 0

    def calculate_distance(self, p1, p2):
        """Calculate distance between two points"""
        if None in [p1[0], p1[1], p2[0], p2[1]]:
            return float('inf')
        return math.sqrt((p1[0] - p2[0]) ** 2 + (p1[1] - p2[1]) ** 2)

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points"""
        if None in [p1[0], p1[1], p2[0], p2[1], p3[0], p3[1]]:
            return 0
        v1 = np.array([p1[0] - p2[0], p1[1] - p2[1]])
        v2 = np.array([p3[0] - p2[0], p3[1] - p2[1]])

        norm1 = np.linalg.norm(v1)
        norm2 = np.linalg.norm(v2)
        if norm1 < 1e-6 or norm2 < 1e-6:
            return 0

        cos_angle = np.dot(v1, v2) / (norm1 * norm2)
        angle = np.arccos(np.clip(cos_angle, -1.0, 1.0))
        return np.degrees(angle)

    def detect_bowling_stance(self, landmarks):
        """Detect bowling stance - person preparing to bowl"""
        # Get key points
        left_shoulder_x, left_shoulder_y, ls_vis = self.get_keypoint(landmarks, 11)
        right_shoulder_x, right_shoulder_y, rs_vis = self.get_keypoint(landmarks, 12)
        left_hip_x, left_hip_y, lh_vis = self.get_keypoint(landmarks, 23)
        right_hip_x, right_hip_y, rh_vis = self.get_keypoint(landmarks, 24)
        left_knee_x, left_knee_y, lk_vis = self.get_keypoint(landmarks, 25)
        right_knee_x, right_knee_y, rk_vis = self.get_keypoint(landmarks, 26)

        confidence_factors = []

        # Check if person is standing upright
        if None not in [left_shoulder_y, left_hip_y, right_shoulder_y, right_hip_y]:
            # Body should be relatively upright
            left_body_angle = abs(left_shoulder_y - left_hip_y)
            right_body_angle = abs(right_shoulder_y - right_hip_y)
            if left_body_angle > 0.1 and right_body_angle > 0.1:
                confidence_factors.append(0.3)

        # Check leg positioning (should be stable, not in motion)
        if None not in [left_knee_y, left_hip_y, right_knee_y, right_hip_y]:
            # Legs should be in standing position
            left_leg_bend = abs(left_knee_y - left_hip_y)
            right_leg_bend = abs(right_knee_y - right_hip_y)
            if left_leg_bend > 0.1 and right_leg_bend > 0.1:
                confidence_factors.append(0.2)

        # Check shoulder alignment (should be fairly level)
        if None not in [left_shoulder_y, right_shoulder_y]:
            shoulder_level = abs(left_shoulder_y - right_shoulder_y)
            if shoulder_level < 0.05:  # Shoulders fairly level
                confidence_factors.append(0.2)

        confidence = sum(confidence_factors)
        detected = confidence > 0.4

        if self.debug_mode:
            print(f"  Bowling stance: confidence={confidence:.2f}, detected={detected}")

        return detected, confidence

    def detect_arm_swing(self, landmarks):
        """Detect bowling arm swing motion"""
        # Get arm keypoints for both arms
        left_shoulder_x, left_shoulder_y, ls_vis = self.get_keypoint(landmarks, 11)
        left_elbow_x, left_elbow_y, le_vis = self.get_keypoint(landmarks, 13)
        left_wrist_x, left_wrist_y, lw_vis = self.get_keypoint(landmarks, 15)

        right_shoulder_x, right_shoulder_y, rs_vis = self.get_keypoint(landmarks, 12)
        right_elbow_x, right_elbow_y, re_vis = self.get_keypoint(landmarks, 14)
        right_wrist_x, right_wrist_y, rw_vis = self.get_keypoint(landmarks, 16)

        confidence_factors = []

        # Check left arm swing
        if None not in [left_shoulder_x, left_shoulder_y, left_elbow_x, left_elbow_y, left_wrist_x, left_wrist_y]:
            # Calculate arm extension
            arm_length = self.calculate_distance((left_shoulder_x, left_shoulder_y), (left_wrist_x, left_wrist_y))
            # Check if arm is extended (bowling motion)
            if arm_length > 0.15:  # Threshold for extended arm
                confidence_factors.append(0.4)

            # Check arm angle (should be relatively straight during swing)
            arm_angle = self.calculate_angle((left_shoulder_x, left_shoulder_y), (left_elbow_x, left_elbow_y), (left_wrist_x, left_wrist_y))
            if arm_angle > 140:  # Relatively straight arm
                confidence_factors.append(0.3)

        # Check right arm swing
        if None not in [right_shoulder_x, right_shoulder_y, right_elbow_x, right_elbow_y, right_wrist_x, right_wrist_y]:
            # Calculate arm extension
            arm_length = self.calculate_distance((right_shoulder_x, right_shoulder_y), (right_wrist_x, right_wrist_y))
            # Check if arm is extended (bowling motion)
            if arm_length > 0.15:  # Threshold for extended arm
                confidence_factors.append(0.4)

            # Check arm angle (should be relatively straight during swing)
            arm_angle = self.calculate_angle((right_shoulder_x, right_shoulder_y), (right_elbow_x, right_elbow_y), (right_wrist_x, right_wrist_y))
            if arm_angle > 140:  # Relatively straight arm
                confidence_factors.append(0.3)

        confidence = min(sum(confidence_factors), 1.0)
        detected = confidence > 0.5

        if self.debug_mode:
            print(f"  Arm swing: confidence={confidence:.2f}, detected={detected}")

        return detected, confidence

    def detect_ball_pickup(self, landmarks):
        """Improved ball pickup detection with better posture analysis"""
        # Get wrist positions
        right_wrist_x, right_wrist_y, rw_vis = self.get_keypoint(landmarks, 16)
        left_wrist_x, left_wrist_y, lw_vis = self.get_keypoint(landmarks, 15)

        # Get hip and knee positions
        left_hip_x, left_hip_y, lh_vis = self.get_keypoint(landmarks, 23)
        right_hip_x, right_hip_y, rh_vis = self.get_keypoint(landmarks, 24)
        left_knee_x, left_knee_y, lk_vis = self.get_keypoint(landmarks, 25)
        right_knee_x, right_knee_y, rk_vis = self.get_keypoint(landmarks, 26)

        # Get shoulder positions
        left_shoulder_x, left_shoulder_y, ls_vis = self.get_keypoint(landmarks, 11)
        right_shoulder_x, right_shoulder_y, rs_vis = self.get_keypoint(landmarks, 12)

        confidence_factors = []

        # 1. Check if either wrist is low (near typical ball pickup position)
        if None not in [right_wrist_y, right_hip_y] and rw_vis > 0.3:
            # Right wrist should be below hip level but not too low (avoid false positives)
            if right_hip_y < right_wrist_y < right_hip_y + 0.2:
                confidence_factors.append(0.3)

        if None not in [left_wrist_y, left_hip_y] and lw_vis > 0.3:
            # Left wrist should be below hip level but not too low
            if left_hip_y < left_wrist_y < left_hip_y + 0.2:
                confidence_factors.append(0.3)

        # 2. Check for bent knees (characteristic of pickup stance)
        if None not in [left_knee_y, left_hip_y] and lk_vis > 0.3:
            knee_hip_distance = abs(left_knee_y - left_hip_y)
            if knee_hip_distance > 0.15:  # Bent knee threshold
                confidence_factors.append(0.2)

        if None not in [right_knee_y, right_hip_y] and rk_vis > 0.3:
            knee_hip_distance = abs(right_knee_y - right_hip_y)
            if knee_hip_distance > 0.15:  # Bent knee threshold
                confidence_factors.append(0.2)

        # 3. Check for forward torso lean
        if None not in [left_shoulder_y, left_hip_y, right_shoulder_y, right_hip_y]:
            avg_shoulder_y = (left_shoulder_y + right_shoulder_y) / 2
            avg_hip_y = (left_hip_y + right_hip_y) / 2
            torso_angle = abs(avg_shoulder_y - avg_hip_y)
            if torso_angle > 0.1:  # Significant forward lean
                confidence_factors.append(0.2)

        # 4. Check hand proximity (gripping the ball)
        if None not in [right_wrist_x, right_wrist_y, left_wrist_x, left_wrist_y]:
            hand_distance = self.calculate_distance(
                (right_wrist_x, right_wrist_y),
                (left_wrist_x, left_wrist_y)
            )
            if hand_distance < 0.15:  # Hands close together
                confidence_factors.append(0.2)

        confidence = min(sum(confidence_factors), 1.0)
        detected = confidence > 0.5  # Lowered threshold for better detection

        if self.debug_mode:
            print(f"  Ball pickup: confidence={confidence:.2f}, detected={detected}")
            if detected:
                print("    Detection factors:")
                if len(confidence_factors) >= 1:
                    print(f"    - Wrist position: {confidence_factors[0]:.2f}")
                if len(confidence_factors) >= 3:
                    print(f"    - Bent knees: {confidence_factors[1] + confidence_factors[2]:.2f}")
                if len(confidence_factors) >= 4:
                    print(f"    - Torso lean: {confidence_factors[3]:.2f}")
                if len(confidence_factors) >= 5:
                    print(f"    - Hand proximity: {confidence_factors[4]:.2f}")

        return detected, confidence

        if self.debug_mode:
            print(f"  Ball pickup: confidence={confidence:.2f}, detected={detected}")

        return detected, confidence

    def detect_throw_initiation(self, landmarks):
        """Detect throw initiation - improved version"""
        # Get arm keypoints
        right_shoulder_x, right_shoulder_y, rs_vis = self.get_keypoint(landmarks, 12)
        right_elbow_x, right_elbow_y, re_vis = self.get_keypoint(landmarks, 14)
        right_wrist_x, right_wrist_y, rw_vis = self.get_keypoint(landmarks, 16)

        left_shoulder_x, left_shoulder_y, ls_vis = self.get_keypoint(landmarks, 11)
        left_elbow_x, left_elbow_y, le_vis = self.get_keypoint(landmarks, 13)
        left_wrist_x, left_wrist_y, lw_vis = self.get_keypoint(landmarks, 15)

        confidence_factors = []

        # Check right arm backward swing
        if None not in [right_shoulder_x, right_wrist_x, right_shoulder_y, right_wrist_y] and rs_vis > 0.3 and rw_vis > 0.3:
            # Wrist should be behind shoulder for backswing
            if right_wrist_x < right_shoulder_x:
                confidence_factors.append(0.4)

            # Check arm extension
            arm_distance = self.calculate_distance((right_shoulder_x, right_shoulder_y), (right_wrist_x, right_wrist_y))
            if arm_distance > 0.15:
                confidence_factors.append(0.3)

        # Check left arm backward swing
        if None not in [left_shoulder_x, left_wrist_x, left_shoulder_y, left_wrist_y] and ls_vis > 0.3 and lw_vis > 0.3:
            # Wrist should be behind shoulder for backswing
            if left_wrist_x < left_shoulder_x:
                confidence_factors.append(0.4)

            # Check arm extension
            arm_distance = self.calculate_distance((left_shoulder_x, left_shoulder_y), (left_wrist_x, left_wrist_y))
            if arm_distance > 0.15:
                confidence_factors.append(0.3)

        confidence = min(sum(confidence_factors), 1.0)
        detected = confidence > 0.5

        if self.debug_mode:
            print(f"  Throw initiation: confidence={confidence:.2f}, detected={detected}")

        return detected, confidence

    def detect_ball_release(self, landmarks):
        """More accurate ball release detection with improved criteria"""
        # Get arm keypoints
        right_shoulder_x, right_shoulder_y, rs_vis = self.get_keypoint(landmarks, 12)
        right_elbow_x, right_elbow_y, re_vis = self.get_keypoint(landmarks, 14)
        right_wrist_x, right_wrist_y, rw_vis = self.get_keypoint(landmarks, 16)

        left_shoulder_x, left_shoulder_y, ls_vis = self.get_keypoint(landmarks, 11)
        left_elbow_x, left_elbow_y, le_vis = self.get_keypoint(landmarks, 13)
        left_wrist_x, left_wrist_y, lw_vis = self.get_keypoint(landmarks, 15)

        # Get hip position for reference
        left_hip_x, left_hip_y, lh_vis = self.get_keypoint(landmarks, 23)
        right_hip_x, right_hip_y, rh_vis = self.get_keypoint(landmarks, 24)

        confidence_factors = []

        # 1. Arm extension check (more strict)
        if None not in [right_shoulder_x, right_wrist_x, right_shoulder_y, right_wrist_y] and rs_vis > 0.5 and rw_vis > 0.5:
            # Calculate arm extension (should be near full extension)
            arm_distance = self.calculate_distance((right_shoulder_x, right_shoulder_y), (right_wrist_x, right_wrist_y))
            if arm_distance > 0.25:  # More strict threshold for release
                confidence_factors.append(0.3)

            # Wrist should be in front of shoulder and below shoulder level
            if right_wrist_x > right_shoulder_x and right_wrist_y > right_shoulder_y:
                confidence_factors.append(0.3)

            # Check arm angle for straight arm (near 180 degrees)
            if None not in [right_elbow_x, right_elbow_y]:
                arm_angle = self.calculate_angle((right_shoulder_x, right_shoulder_y),
                                              (right_elbow_x, right_elbow_y),
                                              (right_wrist_x, right_wrist_y))
                if arm_angle > 160:  # Nearly straight arm
                    confidence_factors.append(0.2)

        # 2. Wrist velocity check (if available)
        if hasattr(self, 'prev_wrist_pos'):
            if None not in [right_wrist_x, right_wrist_y]:
                # Calculate wrist movement from previous frame
                dx = right_wrist_x - self.prev_wrist_pos[0]
                dy = right_wrist_y - self.prev_wrist_pos[1]
                velocity = math.sqrt(dx**2 + dy**2)

                # Release typically has high downward velocity
                if velocity > 0.05 and dy > 0:  # Moving downward
                    confidence_factors.append(0.2)

        # 3. Body position check
        if None not in [right_shoulder_y, right_hip_y]:
            # Should be in forward lunge position
            if right_shoulder_y > right_hip_y + 0.05:  # Forward lean
                confidence_factors.append(0.2)

        # Update previous wrist position for next frame
        if None not in [right_wrist_x, right_wrist_y]:
            self.prev_wrist_pos = (right_wrist_x, right_wrist_y)

        confidence = min(sum(confidence_factors), 1.0)
        detected = confidence > 0.7  # Higher threshold to reduce false positives

        if self.debug_mode:
            print(f"  Ball release: confidence={confidence:.2f}, detected={detected}")
            if detected:
                print("    Detection factors:")
                if len(confidence_factors) > 0:
                    print(f"    - Arm extension: {confidence_factors[0]:.2f}")
                if len(confidence_factors) > 1:
                    print(f"    - Wrist position: {confidence_factors[1]:.2f}")
                if len(confidence_factors) > 2:
                    print(f"    - Arm angle: {confidence_factors[2]:.2f}")
                if len(confidence_factors) > 3 and hasattr(self, 'prev_wrist_pos'):
                    print(f"    - Wrist velocity: {confidence_factors[3]:.2f}")
                if len(confidence_factors) > 4:
                    print(f"    - Body position: {confidence_factors[4]:.2f}")

        return detected, confidence

    def detect_celebration(self, landmarks):
        """Detect celebration poses"""
        # Get key points
        right_wrist_x, right_wrist_y, rw_vis = self.get_keypoint(landmarks, 16)
        left_wrist_x, left_wrist_y, lw_vis = self.get_keypoint(landmarks, 15)
        nose_x, nose_y, n_vis = self.get_keypoint(landmarks, 0)
        right_shoulder_x, right_shoulder_y, rs_vis = self.get_keypoint(landmarks, 12)
        left_shoulder_x, left_shoulder_y, ls_vis = self.get_keypoint(landmarks, 11)

        confidence_factors = []

        # Check if both arms are raised
        if None not in [right_wrist_y, nose_y] and rw_vis > 0.3 and n_vis > 0.3:
            if right_wrist_y < nose_y:  # Right wrist above nose
                confidence_factors.append(0.3)

        if None not in [left_wrist_y, nose_y] and lw_vis > 0.3 and n_vis > 0.3:
            if left_wrist_y < nose_y:  # Left wrist above nose
                confidence_factors.append(0.3)

        # Check if arms are spread wide (celebration pose)
        if None not in [right_wrist_x, left_wrist_x] and rw_vis > 0.3 and lw_vis > 0.3:
            arm_spread = abs(right_wrist_x - left_wrist_x)
            if arm_spread > 0.4:  # Wide arm spread
                confidence_factors.append(0.4)

        confidence = min(sum(confidence_factors), 1.0)
        detected = confidence > 0.6

        if self.debug_mode:
            print(f"  Celebration: confidence={confidence:.2f}, detected={detected}")

        return detected, confidence

    def process_person_pose(self, person_roi):
        """Process pose estimation for a single person ROI"""
        # Ensure ROI is valid
        if person_roi is None or person_roi.size == 0:
            print("⚠️  Invalid person ROI")
            return None, None

        # Resize ROI if it's too small for MediaPipe
        h, w = person_roi.shape[:2]
        if h < 50 or w < 50:
            print(f"⚠️  ROI too small: {w}x{h}, skipping pose estimation")
            return None, None

        # Convert to RGB
        rgb = cv2.cvtColor(person_roi, cv2.COLOR_BGR2RGB)

        # Process with MediaPipe
        results = self.pose.process(rgb)

        detections = {
            'bowling_stance': (False, 0.0),
            'arm_swing': (False, 0.0),
            'ball_pickup': (False, 0.0),
            'throw_initiation': (False, 0.0),
            'ball_release': (False, 0.0),
            'celebration': (False, 0.0)
        }

        if results.pose_landmarks:
            print("✅ Pose landmarks detected!")
            if self.debug_mode:
                print("  Analyzing poses...")

            detections['bowling_stance'] = self.detect_bowling_stance(results.pose_landmarks)
            detections['arm_swing'] = self.detect_arm_swing(results.pose_landmarks)
            detections['ball_pickup'] = self.detect_ball_pickup(results.pose_landmarks)
            detections['throw_initiation'] = self.detect_throw_initiation(results.pose_landmarks)
            detections['ball_release'] = self.detect_ball_release(results.pose_landmarks)
            detections['celebration'] = self.detect_celebration(results.pose_landmarks)

            return detections, results.pose_landmarks
        else:
            print("⚠️  No pose landmarks detected in ROI")

        return detections, None

    def process_frame(self, frame):
        """Main processing function that focuses on one person only"""
        # Step 1: Detect persons and sports balls using YOLOv8
        persons, sports_balls = self.detect_objects(frame)

        # Create annotated frame
        annotated = frame.copy()

        # Results for the detected person
        all_detections = []

        print(f"🔍 Found {len(persons)} person(s) and {len(sports_balls)} sports ball(s) in the frame")

        # Step 1.5: Draw sports balls first
        for i, ball in enumerate(sports_balls):
            bbox = ball['bbox']
            ball_confidence = ball['confidence']

            # Draw sports ball bounding box
            x1, y1, x2, y2 = bbox
            cv2.rectangle(annotated, (x1, y1), (x2, y2), (0, 255, 255), 2)  # Yellow for sports ball
            cv2.putText(annotated, f'Sports Ball {i+1} ({ball_confidence:.2f})',
                       (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)

        if not persons:
            print("⚠️  No persons detected in the frame")
            return [], annotated

        # Step 2: Process only the primary person
        person = persons[0]
        bbox = person['bbox']
        person_confidence = person['confidence']

        print(f"\n👤 Processing Primary Person (confidence: {person_confidence:.2f})")

        # Draw person bounding box
        x1, y1, x2, y2 = bbox
        cv2.rectangle(annotated, (x1, y1), (x2, y2), (255, 0, 0), 2)
        cv2.putText(annotated, f'Primary Person ({person_confidence:.2f})',
                   (x1, y1-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)

        # Crop person ROI
        person_roi, bbox_adjusted = self.crop_person_roi(frame, bbox)

        if person_roi.size == 0:
            print("⚠️  Empty ROI for primary person")
            return all_detections, annotated

        print(f"📐 ROI size: {person_roi.shape}")

        # Step 3: Process pose estimation for this person
        detections, pose_landmarks = self.process_person_pose(person_roi)

        if detections is None:
            return all_detections, annotated

        # Step 4: Draw pose landmarks on the main frame
        if pose_landmarks:
            print("🎯 Drawing pose landmarks for primary person")
            # Adjust landmarks coordinates back to original frame
            adjusted_landmarks = self.adjust_landmarks_to_original(
                pose_landmarks, bbox_adjusted, person_roi.shape
            )

            # Draw pose landmarks
            self.draw_pose_landmarks(annotated, adjusted_landmarks)

        # Add detection results
        all_detections.append({
            'person_id': 1,  # Only one person tracked
            'person_confidence': person_confidence,
            'bbox': bbox,
            'detections': detections
        })

        # Step 5: Add detection text overlay
        self.add_detection_overlay(annotated, all_detections, len(sports_balls))

        return all_detections, annotated

    def adjust_landmarks_to_original(self, landmarks, bbox_adjusted, roi_shape):
        """Adjust pose landmarks coordinates from ROI back to original frame"""
        x1, y1, x2, y2 = bbox_adjusted
        roi_height, roi_width = roi_shape[:2]

        adjusted_landmarks = []
        for landmark in landmarks.landmark:
            # Convert from normalized coordinates to ROI pixel coordinates
            roi_x = landmark.x * roi_width
            roi_y = landmark.y * roi_height

            # Adjust to original frame coordinates
            orig_x = roi_x + x1
            orig_y = roi_y + y1

            adjusted_landmarks.append((orig_x, orig_y, landmark.z))

        return adjusted_landmarks

    def draw_pose_landmarks(self, frame, landmarks):
        """Draw pose landmarks on the frame"""
        if not landmarks:
            return

        # MediaPipe pose connections
        connections = [
            # Face
            (0, 1), (1, 2), (2, 3), (3, 7),
            (0, 4), (4, 5), (5, 6), (6, 8),
            (9, 10),
            # Torso
            (11, 12), (12, 24), (24, 23), (23, 11),
            # Left arm
            (11, 13), (13, 15), (15, 17), (15, 19), (15, 21),
            # Right arm
            (12, 14), (14, 16), (16, 18), (16, 20), (16, 22),
            # Left leg
            (23, 25), (25, 27), (27, 29), (27, 31),
            # Right leg
            (24, 26), (26, 28), (28, 30), (28, 32)
        ]

        # Draw landmarks
        for i, (x, y, z) in enumerate(landmarks):
            cv2.circle(frame, (int(x), int(y)), 3, (0, 255, 0), -1)

        # Draw connections
        for connection in connections:
            start_idx, end_idx = connection
            if start_idx < len(landmarks) and end_idx < len(landmarks):
                start_point = (int(landmarks[start_idx][0]), int(landmarks[start_idx][1]))
                end_point = (int(landmarks[end_idx][0]), int(landmarks[end_idx][1]))
                cv2.line(frame, start_point, end_point, (0, 255, 0), 2)

    def add_detection_overlay(self, frame, all_detections, num_balls=0):
        """Add detection results overlay to the frame"""
        y_offset = 30

        # Add sports ball count at the top
        if num_balls > 0:
            cv2.putText(frame, f"Sports Balls Detected: {num_balls}", (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2)
            y_offset += 30

        for person_data in all_detections:
            person_id = person_data['person_id']
            detections = person_data['detections']

            # Person header
            cv2.putText(frame, f"Primary Person:", (10, y_offset),
                       cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
            y_offset += 25

            # Detection results
            for pose_name, (detected, conf) in detections.items():
                status = f"  {pose_name}: {'DETECTED' if detected else 'Not detected'} ({conf:.2f})"
                color = (0, 255, 0) if detected else (0, 0, 255)
                cv2.putText(frame, status, (10, y_offset),
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
                y_offset += 20

            y_offset += 10  # Extra spacing


def is_image_file(file_path):
    """Check if the file is an image"""
    return file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff', '.tif'))

def is_video_file(file_path):
    """Check if the file is a video"""
    return file_path.lower().endswith(('.mp4', '.avi', '.mov', '.mkv', '.wmv', '.flv', '.webm'))

def save_output_image(annotated_image, input_path, output_dir="output"):
    """Save the annotated image to the output directory"""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    base_name = os.path.splitext(os.path.basename(input_path))[0]
    output_path = os.path.join(output_dir, f"{base_name}_bowling_analysis.jpg")
    cv2.imwrite(output_path, annotated_image)
    print(f"💾 Output saved to: {output_path}")
    return output_path

def save_output_video(annotated_frames, input_path, output_dir="output", fps=30):
    """Save the annotated video to the output directory"""
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    base_name = os.path.splitext(os.path.basename(input_path))[0]
    output_path = os.path.join(output_dir, f"{base_name}_bowling_analysis.mp4")

    if not annotated_frames:
        print("⚠️  No frames to save")
        return None

    # Get frame dimensions
    height, width = annotated_frames[0].shape[:2]

    # Define codec and create VideoWriter
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))

    for frame in annotated_frames:
        out.write(frame)

    out.release()
    print(f"💾 Output video saved to: {output_path}")
    return output_path

def process_image(image_path, detector):
    """Process a single image"""
    print(f"📸 Processing image: {image_path}")

    # Load image
    image = cv2.imread(image_path)
    if image is None:
        print(f"❌ Error: Could not load image {image_path}")
        return

    print(f"📐 Image dimensions: {image.shape}")

    # Process the frame
    detections, annotated = detector.process_frame(image)

    # Print summary
    print(f"\n📊 Summary for {os.path.basename(image_path)}:")
    print(f"   Primary person detected: {'Yes' if detections else 'No'}")

    if detections:
        detected_poses = [pose for pose, (detected, _) in detections[0]['detections'].items() if detected]
        print(f"   Detected poses: {', '.join(detected_poses) if detected_poses else 'None'}")

    # Save output
    save_output_image(annotated, image_path)

    # Display result
    cv2.imshow('Bowling Pose Analysis', annotated)
    cv2.waitKey(0)
    cv2.destroyAllWindows()

def process_video(video_path, detector):
    """Process a video file"""
    print(f"🎬 Processing video: {video_path}")

    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"❌ Error: Could not open video {video_path}")
        return

    # Get video properties
    fps = cap.get(cv2.CAP_PROP_FPS)
    total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    print(f"📐 Video properties: {width}x{height}, {fps} FPS, {total_frames} frames")

    annotated_frames = []
    frame_count = 0
    detection_summary = {
        'frames_with_person': 0,
        'pose_counts': {}
    }

    while True:
        ret, frame = cap.read()
        if not ret:
            break

        frame_count += 1
        print(f"\n🎞️  Processing frame {frame_count}/{total_frames}")

        # Process the frame
        detections, annotated = detector.process_frame(frame)
        annotated_frames.append(annotated)

        # Collect detection statistics
        if detections:
            detection_summary['frames_with_person'] += 1

            for pose_name, (detected, conf) in detections[0]['detections'].items():
                if detected:
                    if pose_name not in detection_summary['pose_counts']:
                        detection_summary['pose_counts'][pose_name] = 0
                    detection_summary['pose_counts'][pose_name] += 1

        # Display progress
        if frame_count % 10 == 0:
            print(f"📊 Progress: {frame_count}/{total_frames} frames processed")

    cap.release()
    cv2.destroyAllWindows()

    # Print video summary
    print(f"\n📊 Video Summary for {os.path.basename(video_path)}:")
    print(f"   Total frames processed: {frame_count}")
    print(f"   Frames with primary person: {detection_summary['frames_with_person']} ({detection_summary['frames_with_person']/frame_count*100:.1f}%)")

    if detection_summary['pose_counts']:
        print("\n   Pose detections:")
        for pose, count in detection_summary['pose_counts'].items():
            percentage = (count / detection_summary['frames_with_person']) * 100
            print(f"       {pose}: {count} frames ({percentage:.1f}% of person frames)")

    # Save output video
    save_output_video(annotated_frames, video_path, fps=fps)

def process_webcam(detector):
    """Process live webcam feed"""
    print("📷 Starting webcam processing...")
    print("Press 'q' to quit, 's' to save current frame")

    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("❌ Error: Could not open webcam")
        return

    frame_count = 0

    while True:
        ret, frame = cap.read()
        if not ret:
            print("❌ Error: Could not read frame from webcam")
            break

        frame_count += 1

        # Process the frame
        detections, annotated = detector.process_frame(frame)

        # Add frame counter
        cv2.putText(annotated, f"Frame: {frame_count}", (10, annotated.shape[0] - 10),
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)

        # Display the frame
        cv2.imshow('Bowling Pose Analysis - Live', annotated)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('q'):
            break
        elif key == ord('s'):
            # Save current frame
            save_output_image(annotated, f"webcam_frame_{frame_count}.jpg")

    cap.release()
    cv2.destroyAllWindows()

def main():
    """Main function to run the bowling pose detector"""
    print("🎳 Bowling Pose Detector v1.0 (Single Person Mode)")
    print("="*50)

    # Initialize detector
    print("🔧 Initializing detector...")
    detector = BowlingPoseDetector()

    # Get input from user
    print("\n🎯 Choose input mode:")
    print("1. Process image file")
    print("2. Process video file")
    print("3. Process webcam (live)")
    print("4. Process all images in a folder")
    print("5. Process all videos in a folder")

    choice = input("\nEnter your choice (1-5): ").strip()

    if choice == '1':
        # Process single image
        image_path = input("Enter image path: ").strip()
        if os.path.exists(image_path) and is_image_file(image_path):
            process_image(image_path, detector)
        else:
            print("❌ Error: Invalid image file path")

    elif choice == '2':
        # Process single video
        video_path = input("Enter video path: ").strip()
        if os.path.exists(video_path) and is_video_file(video_path):
            process_video(video_path, detector)
        else:
            print("❌ Error: Invalid video file path")

    elif choice == '3':
        # Process webcam
        process_webcam(detector)

    elif choice == '4':
        # Process all images in folder
        folder_path = input("Enter folder path: ").strip()
        if os.path.exists(folder_path):
            image_files = [f for f in os.listdir(folder_path) if is_image_file(f)]
            if image_files:
                print(f"📁 Found {len(image_files)} image files")
                for image_file in image_files:
                    image_path = os.path.join(folder_path, image_file)
                    process_image(image_path, detector)
            else:
                print("❌ No image files found in the folder")
        else:
            print("❌ Error: Invalid folder path")

    elif choice == '5':
        # Process all videos in folder
        folder_path = input("Enter folder path: ").strip()
        if os.path.exists(folder_path):
            video_files = [f for f in os.listdir(folder_path) if is_video_file(f)]
            if video_files:
                print(f"📁 Found {len(video_files)} video files")
                for video_file in video_files:
                    video_path = os.path.join(folder_path, video_file)
                    process_video(video_path, detector)
            else:
                print("❌ No video files found in the folder")
        else:
            print("❌ Error: Invalid folder path")

    else:
        print("❌ Invalid choice")

if __name__ == "__main__":
    try:
        main()
    except KeyboardInterrupt:
        print("\n⚠️  Process interrupted by user")
    except Exception as e:
        print(f"❌ Error: {e}")
        import traceback
        traceback.print_exc()
    finally:
        cv2.destroyAllWindows()
        print("🔚 Program ended")

Creating new Ultralytics Settings v0.0.6 file ✅ 
View Ultralytics Settings with 'yolo settings' or at '/root/.config/Ultralytics/settings.json'
Update Settings with 'yolo settings key=value', i.e. 'yolo settings runs_dir=path/to/dir'. For help see https://docs.ultralytics.com/quickstart/#ultralytics-settings.
🎳 Bowling Pose Detector v1.0 (Single Person Mode)
🔧 Initializing detector...
Downloading https://github.com/ultralytics/assets/releases/download/v8.3.0/yolov8x.pt to 'yolov8x.pt'...


100%|██████████| 131M/131M [00:00<00:00, 232MB/s]


[1;30;43mStreaming output truncated to the last 5000 lines.[0m

🎞️  Processing frame 225/514
🔍 Found 1 person(s) and 0 sports ball(s) in the frame

👤 Processing Primary Person (confidence: 0.89)
📐 ROI size: (867, 427, 3)
✅ Pose landmarks detected!
  Analyzing poses...
  Bowling stance: confidence=0.30, detected=False
  Arm swing: confidence=0.40, detected=False
  Ball pickup: confidence=0.40, detected=False
  Throw initiation: confidence=0.30, detected=False
  Ball release: confidence=0.60, detected=False
  Celebration: confidence=0.00, detected=False
🎯 Drawing pose landmarks for primary person

🎞️  Processing frame 226/514
🔍 Found 1 person(s) and 0 sports ball(s) in the frame

👤 Processing Primary Person (confidence: 0.89)
📐 ROI size: (867, 425, 3)
✅ Pose landmarks detected!
  Analyzing poses...
  Bowling stance: confidence=0.30, detected=False
  Arm swing: confidence=0.40, detected=False
  Ball pickup: confidence=0.40, detected=False
  Throw initiation: confidence=0.30, detected=Fa

In [3]:
# mount google drive:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
# using move the /content/output/output_bowling_analysis.mp4 video to drive:
import shutil
shutil.move('/content/output/output_bowling_analysis.mp4', '/content/drive/MyDrive/bowling/output_bowling_analysis.mp4')

'/content/drive/MyDrive/bowling/output_bowling_analysis.mp4'