### *Live Feed Hard Coded SpliceVid

In [None]:
import cv2
import numpy as np
import mediapipe as mp
import math
import os

# Create RepVids directory in "Deep Dive AI Summer 2025" folder
save_dir = os.path.join(os.path.expanduser("~"), "Desktop", "Deep Dive AI Summer 2025", "RepVids")
os.makedirs(save_dir, exist_ok=True)

# Initialize variables
frame_buffer = []
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
buffer_size = 90  # Number of frames to keep in buffer

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

def calc_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return 360 - angle if angle > 180 else angle

cap = cv2.VideoCapture(0)
counter = 0
stage = None

with mp_pose.Pose(min_detection_confidence=0.5,
                  min_tracking_confidence=0.5) as pose:

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)

        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Always add current frame to buffer
        frame_buffer.append(image.copy())
        """
        if len(frame_buffer) > buffer_size:
            frame_buffer.pop(0)  # Keep only last 90 frames
        """

        label = "None"

        try:
            landmarks = results.pose_landmarks.landmark
            h, w, _ = image.shape

            # Get right side landmarks
            shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                        landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            hip = [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                   landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y]
            knee = [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                    landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y]
            ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]

            # Visibility check
            visible = all(landmarks[i].visibility > 0.7 for i in [
                mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
                mp_pose.PoseLandmark.RIGHT_ELBOW.value,
                mp_pose.PoseLandmark.RIGHT_WRIST.value,
                mp_pose.PoseLandmark.RIGHT_HIP.value,
                mp_pose.PoseLandmark.RIGHT_KNEE.value,
                mp_pose.PoseLandmark.RIGHT_ANKLE.value
            ])
            
            # Horizontal filter
            locs = np.array([
                shoulder[1] * h, elbow[1] * h, wrist[1] * h,
                hip[1] * h, knee[1] * h, ankle[1] * h
            ])
            data_range = np.max(locs) - np.min(locs)
            if data_range > 0.5 * h:
                visible = False
            downUp = False
            

            if visible:
                elbow_angle = calc_angle(shoulder, elbow, wrist)
                back_angle = calc_angle(shoulder, hip, knee)
                knee_angle = calc_angle(hip, knee, ankle)

                # Pushup logic
                percent = np.interp(elbow_angle, [90, 160], [0, 100])
                
                # Store previous counter to detect changes
                prev_counter = counter

                # Track full cycle: up -> down -> up
                if elbow_angle > 165:
                    if stage == "down":
                        stage = "up"
                        counter += 1
                    else:
                        stage = "up"
                elif elbow_angle < 95 and stage == "up":
                    stage = "down"

                # SAVE VIDEO EVERY TIME COUNTER INCREASES
                if counter > prev_counter:
                    filename = os.path.join(save_dir, f'rep_{counter:03d}.mp4')
                    
                    if len(frame_buffer) > 0:
                        out = cv2.VideoWriter(filename, fourcc, 20.0, (image.shape[1], image.shape[0]))
                        
                        for f in frame_buffer:
                            out.write(f)
                        out.release()
                        
                        print(f"Saved rep {counter} to: {filename} ({len(frame_buffer)} frames)")

                        frame_buffer = []  # Reset buffer for next rep
                    else:
                        print(f"No frames to save for rep {counter}")

                label = "Pushup"

                # Draw angle + percent
                cv2.putText(image, f'Elbow: {int(elbow_angle)}', (10, 100),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(image, f'{int(percent)}%', (10, 140),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            else:
                label = "None"

        except:
            pass

        # Draw buffer status
        cv2.putText(image, f'Buffer: {len(frame_buffer)}/{buffer_size}', (10, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)

        # Draw pose
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2))

        # Label
        cv2.putText(image, f'Form: {label}', (10, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 255), 2)

        # Rep counter
        cv2.rectangle(image, (image.shape[1] - 160, 0), (image.shape[1], 80), (0, 0, 0), -1)
        cv2.putText(image, 'REPS', (image.shape[1] - 150, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        cv2.putText(image, str(counter), (image.shape[1] - 140, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 4)

        cv2.imshow('Push-Up Detector', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

In [None]:
# Enhanced first rep handling with stage-based frame collection

import cv2
import numpy as np
import mediapipe as mp
import os

# Create RepVids directory in "Deep Dive AI Summer 2025" folder
save_dir = os.path.join(os.path.expanduser("~"), "Desktop", "Deep Dive AI Summer 2025", "RepVids")
os.makedirs(save_dir, exist_ok=True)

# Initialize variables
frame_buffer = []
first_rep_buffer = []  # Special buffer for first rep
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
buffer_size = 90
first_stage_up_detected = False  # Track when stage first becomes "up"
is_first_rep = True  # Track if we're on the first rep

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

def calc_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return 360 - angle if angle > 180 else angle

cap = cv2.VideoCapture(0)
counter = 0
stage = None

with mp_pose.Pose(min_detection_confidence=0.5,
                  min_tracking_confidence=0.5) as pose:

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)

        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Always add current frame to main buffer for non-first reps
        if not is_first_rep:
            frame_buffer.append(image.copy())

        label = "None"

        try:
            landmarks = results.pose_landmarks.landmark
            h, w, _ = image.shape

            # Get right side landmarks
            shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                        landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            hip = [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                   landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y]
            knee = [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                    landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y]
            ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]

            # Visibility check
            visible = all(landmarks[i].visibility > 0.7 for i in [
                mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
                mp_pose.PoseLandmark.RIGHT_ELBOW.value,
                mp_pose.PoseLandmark.RIGHT_WRIST.value,
                mp_pose.PoseLandmark.RIGHT_HIP.value,
                mp_pose.PoseLandmark.RIGHT_KNEE.value,
                mp_pose.PoseLandmark.RIGHT_ANKLE.value
            ])
            
            # Horizontal filter
            locs = np.array([
                shoulder[1] * h, elbow[1] * h, wrist[1] * h,
                hip[1] * h, knee[1] * h, ankle[1] * h
            ])
            data_range = np.max(locs) - np.min(locs)
            if data_range > 0.5 * h:
                visible = False

            if visible:
                elbow_angle = calc_angle(shoulder, elbow, wrist)
                back_angle = calc_angle(shoulder, hip, knee)
                knee_angle = calc_angle(hip, knee, ankle)

                # Pushup logic
                percent = np.interp(elbow_angle, [90, 160], [0, 100])
                
                # Store previous counter and stage to detect changes
                prev_counter = counter
                prev_stage = stage

                # Track full cycle: up -> down -> up
                if elbow_angle > 165:
                    if stage == "down":
                        stage = "up"
                        counter += 1
                    else:
                        stage = "up"
                elif elbow_angle < 95 and stage == "up":
                    stage = "down"

                # FIRST REP SPECIAL HANDLING
                if is_first_rep:
                    # Start collecting frames when stage first becomes "up"
                    if stage == "up" and not first_stage_up_detected:
                        first_stage_up_detected = True
                        first_rep_buffer = [image.copy()]  # Start with current frame
                        print("🔴 First rep: Started collecting frames (stage = 'up')")
                    
                    # Continue collecting frames if we've started
                    elif first_stage_up_detected:
                        first_rep_buffer.append(image.copy())
                    
                    # Save first rep when counter increases from 0 to 1
                    if counter > prev_counter and counter == 1:
                        filename = os.path.join(save_dir, f'rep_{counter:03d}.mp4')
                        
                        if len(first_rep_buffer) > 0:
                            out = cv2.VideoWriter(filename, fourcc, 20.0, (image.shape[1], image.shape[0]))
                            
                            for f in first_rep_buffer:
                                out.write(f)
                            out.release()
                            
                            print(f"✅ Saved FIRST rep {counter} to: {filename} ({len(first_rep_buffer)} frames)")
                            print(f"   Frames collected from first 'up' stage to rep completion")
                            
                            # Reset first rep variables
                            first_rep_buffer = []
                            is_first_rep = False
                            print("🔄 Switching to normal rep recording mode")
                        else:
                            print(f"❌ No frames to save for first rep {counter}")

                # NORMAL REP HANDLING (for reps 2+)
                elif counter > prev_counter:
                    filename = os.path.join(save_dir, f'rep_{counter:03d}.mp4')
                    
                    if len(frame_buffer) > 0:
                        out = cv2.VideoWriter(filename, fourcc, 20.0, (image.shape[1], image.shape[0]))
                        
                        for f in frame_buffer:
                            out.write(f)
                        out.release()
                        
                        print(f"✅ Saved rep {counter} to: {filename} ({len(frame_buffer)} frames)")

                        frame_buffer = []  # Reset buffer for next rep
                    else:
                        print(f"❌ No frames to save for rep {counter}")

                label = "Pushup"

                # Draw angle + percent
                cv2.putText(image, f'Elbow: {int(elbow_angle)}', (10, 100),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(image, f'{int(percent)}%', (10, 140),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            else:
                label = "None"

        except:
            pass

        # Draw buffer status with first rep indicator
        if is_first_rep:
            if first_stage_up_detected:
                cv2.putText(image, f'First Rep Buffer: {len(first_rep_buffer)}', (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
            else:
                cv2.putText(image, 'Waiting for first "up" stage...', (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
        else:
            cv2.putText(image, f'Buffer: {len(frame_buffer)}/{buffer_size}', (10, 70),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)

        # Draw pose
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2))

        # Label with first rep indicator
        rep_status = " (FIRST REP)" if is_first_rep else ""
        cv2.putText(image, f'Form: {label}{rep_status}', (10, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 255), 2)

        # Rep counter
        cv2.rectangle(image, (image.shape[1] - 160, 0), (image.shape[1], 80), (0, 0, 0), -1)
        cv2.putText(image, 'REPS', (image.shape[1] - 150, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        cv2.putText(image, str(counter), (image.shape[1] - 140, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 4)

        cv2.imshow('Push-Up Detector', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

In [1]:
"""
Enhanced Push-up Detection System with MediaPipe

This module provides a comprehensive push-up detection and recording system
using MediaPipe pose estimation and OpenCV for video processing.
"""

import cv2
import numpy as np
import mediapipe as mp
import os
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
from contextlib import contextmanager
import logging
from pathlib import Path


@dataclass
class DetectionConfig:
    """Configuration parameters for push-up detection."""
    
    # MediaPipe confidence thresholds
    min_detection_confidence: float = 0.5
    min_tracking_confidence: float = 0.5
    
    # Angle thresholds for push-up detection
    elbow_angle_up_threshold: float = 165.0
    elbow_angle_down_threshold: float = 95.0
    
    # Visibility and filtering thresholds
    min_visibility_threshold: float = 0.7
    horizontal_filter_ratio: float = 0.5
    
    # Video recording parameters
    fps: float = 20.0
    fourcc: str = 'mp4v'
    buffer_size: int = 90
    
    # File paths
    save_directory: str = "RepVids"
    
    # UI parameters
    font_scale: float = 0.7
    font_thickness: int = 2


class LandmarkExtractor:
    """Utility class for extracting and processing pose landmarks."""
    
    @staticmethod
    def extract_landmarks(results, mp_pose) -> Optional[Dict[str, List[float]]]:
        """Extract relevant landmarks from MediaPipe results."""
        if not results.pose_landmarks:
            return None
            
        landmarks = results.pose_landmarks.landmark
        
        return {
            'shoulder': [
                landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y
            ],
            'elbow': [
                landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y
            ],
            'wrist': [
                landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y
            ],
            'hip': [
                landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y
            ],
            'knee': [
                landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y
            ],
            'ankle': [
                landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y
            ]
        }
    
    @staticmethod
    def check_visibility(results, mp_pose, threshold: float = 0.7) -> bool:
        """Check if all required landmarks are visible enough."""
        if not results.pose_landmarks:
            return False
            
        landmarks = results.pose_landmarks.landmark
        required_landmarks = [
            mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            mp_pose.PoseLandmark.RIGHT_WRIST.value,
            mp_pose.PoseLandmark.RIGHT_HIP.value,
            mp_pose.PoseLandmark.RIGHT_KNEE.value,
            mp_pose.PoseLandmark.RIGHT_ANKLE.value
        ]
        
        return all(landmarks[i].visibility > threshold for i in required_landmarks)
    
    @staticmethod
    def apply_horizontal_filter(landmark_dict: Dict[str, List[float]], 
                              image_height: int, 
                              filter_ratio: float = 0.5) -> bool:
        """Apply horizontal filter to check if pose is suitable for detection."""
        y_coords = [
            landmark_dict['shoulder'][1] * image_height,
            landmark_dict['elbow'][1] * image_height,
            landmark_dict['wrist'][1] * image_height,
            landmark_dict['hip'][1] * image_height,
            landmark_dict['knee'][1] * image_height,
            landmark_dict['ankle'][1] * image_height
        ]
        
        data_range = max(y_coords) - min(y_coords)
        return data_range <= filter_ratio * image_height


class AngleCalculator:
    """Utility class for calculating angles between body landmarks."""
    
    @staticmethod
    def calculate_angle(point_a: List[float], 
                       point_b: List[float], 
                       point_c: List[float]) -> float:
        """
        Calculate angle between three points.
        
        Args:
            point_a: First point coordinates [x, y]
            point_b: Vertex point coordinates [x, y]
            point_c: Third point coordinates [x, y]
            
        Returns:
            Angle in degrees
        """
        a = np.array(point_a)
        b = np.array(point_b)
        c = np.array(point_c)
        
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        
        return 360 - angle if angle > 180 else angle


class VideoRecorder:
    """Handles video recording functionality for push-up reps."""
    
    def __init__(self, config: DetectionConfig):
        self.config = config
        self.save_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025" / config.save_directory
        self.save_path.mkdir(parents=True, exist_ok=True)
        
    @contextmanager
    def create_video_writer(self, filename: str, frame_size: Tuple[int, int]):
        """Context manager for video writer."""
        fourcc = cv2.VideoWriter_fourcc(*self.config.fourcc)
        filepath = self.save_path / filename
        
        writer = cv2.VideoWriter(
            str(filepath), 
            fourcc, 
            self.config.fps, 
            frame_size
        )
        
        try:
            yield writer, filepath
        finally:
            writer.release()
    
    def save_rep_video(self, frames: List[np.ndarray], rep_number: int) -> bool:
        """Save a list of frames as a video file."""
        if not frames:
            logging.warning(f"No frames to save for rep {rep_number}")
            return False
            
        filename = f'rep_{rep_number:03d}.mp4'
        frame_size = (frames[0].shape[1], frames[0].shape[0])
        
        try:
            with self.create_video_writer(filename, frame_size) as (writer, filepath):
                for frame in frames:
                    writer.write(frame)
                    
            logging.info(f"Saved rep {rep_number} to: {filepath} ({len(frames)} frames)")
            return True
            
        except Exception as e:
            logging.error(f"Failed to save rep {rep_number}: {e}")
            return False


class PushUpDetector:
    """Main push-up detection and tracking class."""
    
    def __init__(self, config: DetectionConfig = None):
        self.config = config or DetectionConfig()
        self.video_recorder = VideoRecorder(self.config)
        
        # Detection state
        self.counter = 0
        self.stage = None
        
        # Frame buffers
        self.frame_buffer = []
        self.first_rep_buffer = []
        
        # First rep tracking
        self.first_stage_up_detected = False
        self.is_first_rep = True
        
        # MediaPipe setup
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose
        
        # Setup logging
        logging.basicConfig(level=logging.INFO)
        
    def process_frame(self, frame: np.ndarray, pose_results) -> Tuple[np.ndarray, str]:
        """
        Process a single frame for push-up detection.
        
        Args:
            frame: Input frame from camera
            pose_results: MediaPipe pose detection results
            
        Returns:
            Tuple of (processed_frame, detection_label)
        """
        processed_frame = frame.copy()
        label = "None"
        
        # Extract landmarks
        landmark_dict = LandmarkExtractor.extract_landmarks(pose_results, self.mp_pose)
        if not landmark_dict:
            return self._add_ui_elements(processed_frame, label)
            
        # Check visibility and apply filters
        if not self._is_pose_valid(pose_results, landmark_dict, frame.shape[0]):
            return self._add_ui_elements(processed_frame, label)
            
        # Calculate angles
        elbow_angle = AngleCalculator.calculate_angle(
            landmark_dict['shoulder'], 
            landmark_dict['elbow'], 
            landmark_dict['wrist']
        )
        
        # Update detection state
        prev_counter = self.counter
        self._update_detection_state(elbow_angle)
        
        # Handle frame recording
        self._handle_frame_recording(processed_frame, prev_counter)
        
        # Add angle and percentage display
        percent = np.interp(elbow_angle, [90, 160], [0, 100])
        self._add_angle_display(processed_frame, elbow_angle, percent)
        
        label = "Pushup"
        
        # Draw pose landmarks
        self.mp_drawing.draw_landmarks(
            processed_frame, 
            pose_results.pose_landmarks, 
            self.mp_pose.POSE_CONNECTIONS,
            self.mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=4),
            self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
        )
        
        return self._add_ui_elements(processed_frame, label)
    
    def _is_pose_valid(self, pose_results, landmark_dict: Dict, image_height: int) -> bool:
        """Check if the detected pose is valid for push-up detection."""
        # Check visibility
        if not LandmarkExtractor.check_visibility(
            pose_results, 
            self.mp_pose, 
            self.config.min_visibility_threshold
        ):
            return False
            
        # Apply horizontal filter
        return LandmarkExtractor.apply_horizontal_filter(
            landmark_dict, 
            image_height, 
            self.config.horizontal_filter_ratio
        )
    
    def _update_detection_state(self, elbow_angle: float):
        """Update the push-up detection state based on elbow angle."""
        if elbow_angle > self.config.elbow_angle_up_threshold:
            if self.stage == "down":
                self.stage = "up"
                self.counter += 1
            else:
                self.stage = "up"
        elif elbow_angle < self.config.elbow_angle_down_threshold and self.stage == "up":
            self.stage = "down"
    
    def _handle_frame_recording(self, frame: np.ndarray, prev_counter: int):
        """Handle frame recording for both first rep and subsequent reps."""
        if self.is_first_rep:
            self._handle_first_rep_recording(frame, prev_counter)
        else:
            self._handle_normal_rep_recording(frame, prev_counter)
    
    def _handle_first_rep_recording(self, frame: np.ndarray, prev_counter: int):
        """Handle frame recording for the first repetition."""
        # Start collecting frames when stage first becomes "up"
        if self.stage == "up" and not self.first_stage_up_detected:
            self.first_stage_up_detected = True
            self.first_rep_buffer = [frame.copy()]
            logging.info("First rep: Started collecting frames (stage = 'up')")
        
        # Continue collecting frames if we've started
        elif self.first_stage_up_detected:
            self.first_rep_buffer.append(frame.copy())
        
        # Save first rep when counter increases from 0 to 1
        if self.counter > prev_counter and self.counter == 1:
            if self.video_recorder.save_rep_video(self.first_rep_buffer, self.counter):
                logging.info("Frames collected from first 'up' stage to rep completion")
            
            # Reset first rep variables
            self.first_rep_buffer = []
            self.is_first_rep = False
            logging.info("Switching to normal rep recording mode")
    
    def _handle_normal_rep_recording(self, frame: np.ndarray, prev_counter: int):
        """Handle frame recording for normal repetitions (2+)."""
        # Always add current frame to buffer
        self.frame_buffer.append(frame.copy())
        
        # Keep buffer size manageable
        if len(self.frame_buffer) > self.config.buffer_size:
            self.frame_buffer.pop(0)
        
        # Save rep when counter increases
        if self.counter > prev_counter:
            self.video_recorder.save_rep_video(self.frame_buffer, self.counter)
            self.frame_buffer = []  # Reset buffer for next rep
    
    def _add_angle_display(self, frame: np.ndarray, elbow_angle: float, percent: float):
        """Add angle and percentage display to frame."""
        cv2.putText(
            frame, 
            f'Elbow: {int(elbow_angle)}', 
            (10, 100),
            cv2.FONT_HERSHEY_SIMPLEX, 
            self.config.font_scale, 
            (255, 255, 255), 
            self.config.font_thickness
        )
        cv2.putText(
            frame, 
            f'{int(percent)}%', 
            (10, 140),
            cv2.FONT_HERSHEY_SIMPLEX, 
            1, 
            (0, 255, 0), 
            self.config.font_thickness
        )
    
    def _add_ui_elements(self, frame: np.ndarray, label: str) -> np.ndarray:
        """Add UI elements to the frame."""
        # Buffer status
        if self.is_first_rep:
            if self.first_stage_up_detected:
                cv2.putText(
                    frame, 
                    f'First Rep Buffer: {len(self.first_rep_buffer)}', 
                    (10, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 
                    0.5, 
                    (0, 255, 255), 
                    2
                )
            else:
                cv2.putText(
                    frame, 
                    'Waiting for first "up" stage...', 
                    (10, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 
                    0.5, 
                    (255, 255, 0), 
                    2
                )
        else:
            cv2.putText(
                frame, 
                f'Buffer: {len(self.frame_buffer)}/{self.config.buffer_size}', 
                (10, 70),
                cv2.FONT_HERSHEY_SIMPLEX, 
                0.5, 
                (255, 255, 0), 
                2
            )
        
        # Form label with first rep indicator
        rep_status = " (FIRST REP)" if self.is_first_rep else ""
        cv2.putText(
            frame, 
            f'Form: {label}{rep_status}', 
            (10, 40),
            cv2.FONT_HERSHEY_SIMPLEX, 
            1.2, 
            (0, 255, 255), 
            2
        )
        
        # Rep counter
        cv2.rectangle(frame, (frame.shape[1] - 160, 0), (frame.shape[1], 80), (0, 0, 0), -1)
        cv2.putText(
            frame, 
            'REPS', 
            (frame.shape[1] - 150, 30),
            cv2.FONT_HERSHEY_SIMPLEX, 
            0.8, 
            (255, 255, 255), 
            2
        )
        cv2.putText(
            frame, 
            str(self.counter), 
            (frame.shape[1] - 140, 70),
            cv2.FONT_HERSHEY_SIMPLEX, 
            2, 
            (255, 255, 255), 
            4
        )
        
        return frame


class PushUpApp:
    """Main application class for push-up detection."""
    
    def __init__(self, config: DetectionConfig = None):
        self.config = config or DetectionConfig()
        self.detector = PushUpDetector(self.config)
        
    def run(self, camera_index: int = 0):
        """Run the push-up detection application."""
        cap = cv2.VideoCapture(camera_index)
        
        if not cap.isOpened():
            logging.error(f"Failed to open camera {camera_index}")
            return
            
        try:
            with self.detector.mp_pose.Pose(
                min_detection_confidence=self.config.min_detection_confidence,
                min_tracking_confidence=self.config.min_tracking_confidence
            ) as pose:
                
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        logging.warning("Failed to read frame from camera")
                        break
                    
                    # Process frame with MediaPipe
                    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    rgb_frame.flags.writeable = False
                    results = pose.process(rgb_frame)
                    
                    # Process with detector
                    rgb_frame.flags.writeable = True
                    bgr_frame = cv2.cvtColor(rgb_frame, cv2.COLOR_RGB2BGR)
                    processed_frame, _ = self.detector.process_frame(bgr_frame, results)
                    
                    # Display frame
                    cv2.imshow('Push-Up Detector', processed_frame)
                    
                    # Check for quit
                    if cv2.waitKey(10) & 0xFF == ord('q'):
                        break
                        
        except Exception as e:
            logging.error(f"Error during execution: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()


def main():
    """Main entry point."""
    # Create custom configuration if needed
    config = DetectionConfig(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        elbow_angle_up_threshold=165.0,
        elbow_angle_down_threshold=95.0,
        fps=20.0
    )
    
    # Run the application
    app = PushUpApp(config)
    app.run()


if __name__ == "__main__":
    main()


ERROR:root:Error during execution: too many values to unpack (expected 2)


### *Downsampling to 30

In [None]:
import cv2
import numpy as np
import os
import mediapipe as mp
import pandas as pd
from tqdm import tqdm

def calculate_angle(a, b, c):
    a, b, c = np.array(a), np.array(b), np.array(c)
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return angle if angle <= 180 else 360 - angle

def extract_key_frames_and_downsample_to_30(input_path, output_path):
    """
    Extract frames between key points and downsample to 30 frames with guaranteed minimum elbow angle
    """
    mp_pose = mp.solutions.pose
    pose = mp_pose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7)

    cap = cv2.VideoCapture(input_path)
    elbow_angles = []
    frame_count = 0

    print(f"Analyzing frames for key points...")
    
    # STEP 1: First pass - extract frames and calculate angles
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        results = pose.process(image_rgb)

        if results.pose_landmarks:
            lm = results.pose_landmarks.landmark
            
            # Try both sides - use whichever is more visible
            try:
                # Try LEFT side first
                left_shoulder = [lm[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x, lm[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
                left_elbow = [lm[mp_pose.PoseLandmark.LEFT_ELBOW.value].x, lm[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
                left_wrist = [lm[mp_pose.PoseLandmark.LEFT_WRIST.value].x, lm[mp_pose.PoseLandmark.LEFT_WRIST.value].y]
                
                left_visibility = (lm[mp_pose.PoseLandmark.LEFT_SHOULDER.value].visibility + 
                                 lm[mp_pose.PoseLandmark.LEFT_ELBOW.value].visibility + 
                                 lm[mp_pose.PoseLandmark.LEFT_WRIST.value].visibility) / 3
                
                # Try RIGHT side
                right_shoulder = [lm[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x, lm[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
                right_elbow = [lm[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x, lm[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
                right_wrist = [lm[mp_pose.PoseLandmark.RIGHT_WRIST.value].x, lm[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
                
                right_visibility = (lm[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].visibility + 
                                  lm[mp_pose.PoseLandmark.RIGHT_ELBOW.value].visibility + 
                                  lm[mp_pose.PoseLandmark.RIGHT_WRIST.value].visibility) / 3
                
                # Use the side with better visibility
                if left_visibility >= right_visibility and left_visibility > 0.5:
                    shoulder, elbow, wrist = left_shoulder, left_elbow, left_wrist
                elif right_visibility > 0.5:
                    shoulder, elbow, wrist = right_shoulder, right_elbow, right_wrist
                else:
                    frame_count += 1
                    continue
                
                angle = calculate_angle(shoulder, elbow, wrist)
                elbow_angles.append((frame_count, angle))
                
            except:
                pass

        frame_count += 1

    cap.release()
    
    if len(elbow_angles) < 3:
        print(f"Not enough frames with pose landmarks ({len(elbow_angles)}). Skipping...")
        return False

    # STEP 2: Identify key frame indices based on elbow angles
    print(f"Found {len(elbow_angles)} frames with pose data")
    
    # Find lowest point (most bent elbow) - MUST be included
    lowest_point = min(elbow_angles, key=lambda x: x[1])
    lowest_frame_index = lowest_point[0]
    
    print(f"Minimum elbow angle: {lowest_point[1]:.1f}° at frame {lowest_frame_index}")

    # Find highest points before and after lowest point
    key_indices = []
    
    if lowest_frame_index > 0:
        # Find frames with pose data before the lowest point
        before_frames = [(idx, angle) for idx, angle in elbow_angles if idx < lowest_frame_index]
        if before_frames:
            highest_before = max(before_frames, key=lambda x: x[1])
            start_frame = highest_before[0]
            print(f"Start frame: {start_frame} (angle: {highest_before[1]:.1f}°)")
        else:
            start_frame = 0
    else:
        start_frame = 0

    if lowest_frame_index < frame_count - 1:
        # Find frames with pose data after the lowest point
        after_frames = [(idx, angle) for idx, angle in elbow_angles if idx > lowest_frame_index]
        if after_frames:
            highest_after = max(after_frames, key=lambda x: x[1])
            end_frame = highest_after[0]
            print(f"End frame: {end_frame} (angle: {highest_after[1]:.1f}°)")
        else:
            end_frame = frame_count - 1
    else:
        end_frame = frame_count - 1

    print(f"Extracting frames from {start_frame} to {end_frame} (total: {end_frame - start_frame + 1} frames)")
    
    # STEP 3: Extract all frames between key points
    cap = cv2.VideoCapture(input_path)
    all_frames_in_range = []
    frame_indices_in_range = []
    
    current_frame = 0
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break
            
        if start_frame <= current_frame <= end_frame:
            all_frames_in_range.append(frame)
            frame_indices_in_range.append(current_frame)
            
        current_frame += 1
        
        # Stop if we've passed the end frame
        if current_frame > end_frame:
            break

    cap.release()
    
    if len(all_frames_in_range) == 0:
        print(f"No frames extracted in range. Skipping...")
        return False
    
    print(f"Extracted {len(all_frames_in_range)} frames between key points")
    
    # STEP 4: Downsample to exactly 30 frames while ensuring minimum angle frame is included
    target_frames = 30
    
    if len(all_frames_in_range) <= target_frames:
        # If we have 30 or fewer frames, use all of them
        final_frames = all_frames_in_range
        final_indices = frame_indices_in_range
        print(f"Using all {len(all_frames_in_range)} frames (≤30)")
    else:
        # Downsample but GUARANTEE the minimum elbow angle frame is included
        
        # Find the position of the minimum elbow angle frame in our extracted range
        min_frame_position_in_range = lowest_frame_index - start_frame
        
        # Create evenly spaced indices
        selected_positions = np.linspace(0, len(all_frames_in_range) - 1, target_frames, dtype=int)
        
        # Ensure the minimum elbow angle frame is included
        if min_frame_position_in_range not in selected_positions:
            # Replace the closest selected position with the minimum angle frame position
            closest_idx = np.argmin(np.abs(selected_positions - min_frame_position_in_range))
            selected_positions[closest_idx] = min_frame_position_in_range
            selected_positions = np.sort(selected_positions)
        
        final_frames = [all_frames_in_range[i] for i in selected_positions]
        final_indices = [frame_indices_in_range[i] for i in selected_positions]
        
        print(f"⬇Downsampled from {len(all_frames_in_range)} to {len(final_frames)} frames")
        print(f"Minimum elbow angle frame {lowest_frame_index} is guaranteed to be included")
    
    # STEP 5: Save output video
    if len(final_frames) > 0:
        height, width, _ = final_frames[0].shape
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        out = cv2.VideoWriter(output_path, fourcc, 10.0, (width, height))
        
        for frame in final_frames:
            out.write(frame)
        
        out.release()
        print(f"Saved {len(final_frames)} frames to: {output_path}")
        return True
    else:
        print(f"No frames to save")
        return False

# Process all videos in RepVids directory
def process_repvids_to_repkeys():
    """
    Process all videos in RepVids: Extract frames between key points → Downsample to 30 frames → Save to RepKeys
    """
    # Input and output directories
    input_dir = os.path.join(os.path.expanduser("~"), "Desktop", "Deep Dive AI Summer 2025", "RepVids")
    output_dir = os.path.join(os.path.expanduser("~"), "Desktop", "Deep Dive AI Summer 2025", "RepKeys")
    
    os.makedirs(output_dir, exist_ok=True)
    
    print(f"Processing videos from: {input_dir}")
    print(f"Saving processed videos to: {output_dir}")
    print(f"Target: Extract frames between key points → 30 frames max with guaranteed minimum elbow angle\n")
    
    # Get all video files
    video_files = [f for f in os.listdir(input_dir) if f.lower().endswith(('.mp4', '.avi', '.mov', '.mkv'))]
    
    if not video_files:
        print("No video files found in RepVids directory!")
        return 0, 0
    
    successful = 0
    failed = 0
    
    # Process each video file
    for i, video_file in enumerate(video_files, 1):
        input_path = os.path.join(input_dir, video_file)
        output_filename = f"keyframes_30_{video_file}"
        output_path = os.path.join(output_dir, output_filename)
        
        print(f"[{i}/{len(video_files)}] Processing: {video_file}")
        
        try:
            success = extract_key_frames_and_downsample_to_30(input_path, output_path)
            if success:
                successful += 1
                print(f"Success!\n")
            else:
                failed += 1
                print(f"Failed!\n")
        except Exception as e:
            failed += 1
            print(f"Error: {e}\n")
    
    print("="*60)
    print(f"PROCESSING COMPLETE!")
    print(f"Successful: {successful}")
    print(f"Failed: {failed}")
    print(f"Output directory: {output_dir}")
    print("="*60)

    # Return the counts so they can be accessed outside the function
    return successful, failed

# Run the processing
if __name__ == "__main__":
    successful, failed = process_repvids_to_repkeys()

In [None]:
# Enhanced first rep handling with stage-based frame collection

import cv2
import numpy as np
import mediapipe as mp
import math
import os

# Create RepVids directory in "Deep Dive AI Summer 2025" folder
save_dir = os.path.join(os.path.expanduser("~"), "Desktop", "Deep Dive AI Summer 2025", "RepVids")
os.makedirs(save_dir, exist_ok=True)

# Initialize variables
frame_buffer = []
first_rep_buffer = []  # Special buffer for first rep
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
buffer_size = 90
first_stage_up_detected = False  # Track when stage first becomes "up"
is_first_rep = True  # Track if we're on the first rep

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

def calc_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return 360 - angle if angle > 180 else angle

cap = cv2.VideoCapture(0)
counter = 0
stage = None

with mp_pose.Pose(min_detection_confidence=0.5,
                  min_tracking_confidence=0.5) as pose:

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)

        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Always add current frame to main buffer for non-first reps
        if not is_first_rep:
            frame_buffer.append(image.copy())

        label = "None"

        try:
            landmarks = results.pose_landmarks.landmark
            h, w, _ = image.shape

            # Get right side landmarks
            shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                        landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
            hip = [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                   landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y]
            knee = [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                    landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y]
            ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                     landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]

            # Visibility check
            visible = all(landmarks[i].visibility > 0.7 for i in [
                mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
                mp_pose.PoseLandmark.RIGHT_ELBOW.value,
                mp_pose.PoseLandmark.RIGHT_WRIST.value,
                mp_pose.PoseLandmark.RIGHT_HIP.value,
                mp_pose.PoseLandmark.RIGHT_KNEE.value,
                mp_pose.PoseLandmark.RIGHT_ANKLE.value
            ])
            
            # Horizontal filter
            locs = np.array([
                shoulder[1] * h, elbow[1] * h, wrist[1] * h,
                hip[1] * h, knee[1] * h, ankle[1] * h
            ])
            data_range = np.max(locs) - np.min(locs)
            if data_range > 0.5 * h:
                visible = False

            if visible:
                elbow_angle = calc_angle(shoulder, elbow, wrist)
                back_angle = calc_angle(shoulder, hip, knee)
                knee_angle = calc_angle(hip, knee, ankle)

                # Pushup logic
                percent = np.interp(elbow_angle, [90, 160], [0, 100])
                
                # Store previous counter and stage to detect changes
                prev_counter = counter
                prev_stage = stage

                # Track full cycle: up -> down -> up
                if elbow_angle > 165:
                    if stage == "down":
                        stage = "up"
                        counter += 1
                    else:
                        stage = "up"
                elif elbow_angle < 95 and stage == "up":
                    stage = "down"

                # FIRST REP SPECIAL HANDLING
                if is_first_rep:
                    # Start collecting frames when stage first becomes "up"
                    if stage == "up" and not first_stage_up_detected:
                        first_stage_up_detected = True
                        first_rep_buffer = [image.copy()]  # Start with current frame
                        print("🔴 First rep: Started collecting frames (stage = 'up')")
                    
                    # Continue collecting frames if we've started
                    elif first_stage_up_detected:
                        first_rep_buffer.append(image.copy())
                    
                    # Save first rep when counter increases from 0 to 1
                    if counter > prev_counter and counter == 1:
                        filename = os.path.join(save_dir, f'rep_{counter:03d}.mp4')
                        
                        if len(first_rep_buffer) > 0:
                            out = cv2.VideoWriter(filename, fourcc, 20.0, (image.shape[1], image.shape[0]))
                            
                            for f in first_rep_buffer:
                                out.write(f)
                            out.release()
                            
                            print(f"✅ Saved FIRST rep {counter} to: {filename} ({len(first_rep_buffer)} frames)")
                            print(f"   Frames collected from first 'up' stage to rep completion")
                            
                            # Reset first rep variables
                            first_rep_buffer = []
                            is_first_rep = False
                            print("🔄 Switching to normal rep recording mode")
                        else:
                            print(f"❌ No frames to save for first rep {counter}")

                # NORMAL REP HANDLING (for reps 2+)
                elif counter > prev_counter:
                    filename = os.path.join(save_dir, f'rep_{counter:03d}.mp4')
                    
                    if len(frame_buffer) > 0:
                        out = cv2.VideoWriter(filename, fourcc, 20.0, (image.shape[1], image.shape[0]))
                        
                        for f in frame_buffer:
                            out.write(f)
                        out.release()
                        
                        print(f"✅ Saved rep {counter} to: {filename} ({len(frame_buffer)} frames)")

                        frame_buffer = []  # Reset buffer for next rep
                    else:
                        print(f"❌ No frames to save for rep {counter}")

                label = "Pushup"

                # Draw angle + percent
                cv2.putText(image, f'Elbow: {int(elbow_angle)}', (10, 100),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                cv2.putText(image, f'{int(percent)}%', (10, 140),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
            else:
                label = "None"

        except:
            pass

        # Draw buffer status with first rep indicator
        if is_first_rep:
            if first_stage_up_detected:
                cv2.putText(image, f'First Rep Buffer: {len(first_rep_buffer)}', (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
            else:
                cv2.putText(image, 'Waiting for first "up" stage...', (10, 70),
                            cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
        else:
            cv2.putText(image, f'Buffer: {len(frame_buffer)}/{buffer_size}', (10, 70),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)

        # Draw pose
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2))

        # Label with first rep indicator
        rep_status = " (FIRST REP)" if is_first_rep else ""
        cv2.putText(image, f'Form: {label}{rep_status}', (10, 40),
                    cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 255), 2)

        # Rep counter
        cv2.rectangle(image, (image.shape[1] - 160, 0), (image.shape[1], 80), (0, 0, 0), -1)
        cv2.putText(image, 'REPS', (image.shape[1] - 150, 30),
                    cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
        cv2.putText(image, str(counter), (image.shape[1] - 140, 70),
                    cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 4)

        cv2.imshow('Push-Up Detector', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

### Live Feed Latest Attempt

In [None]:
# Define output_dir if not already defined
output_dir = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")
os.makedirs(output_dir, exist_ok=True)  # Ensure the directory exists

# Find all model files and get the most recent
model_pattern = os.path.join(output_dir, 'pushup_classification*.h5')
model_files = glob.glob(model_pattern)

if model_files:
    # Sort by modification time, get most recent
    latest_model = max(model_files, key=os.path.getmtime)
    model = load_model(latest_model)
    model_name = os.path.basename(latest_model)
    print(f"Loaded most recent model: {model_name}")
else:
    print("No existing model found. Training the initial model first with the code above.")
    model = None

In [None]:
# Enhanced Version with Sliding Window:

# No prerequisites required

import cv2
import numpy as np
from tensorflow.keras.models import load_model
from collections import deque
import threading
import time
import glob
import datetime

class LivePushupClassifier:
    def __init__(self, model_path, frame_limit=30): # model was trained on 30 frames
        self.model = load_model(model_path)
        self.frame_buffer = deque(maxlen=frame_limit) # sliding window buffer of 30 frames
        self.frame_limit = frame_limit
        self.prediction_result = "Initializing..." # current prediction result
        self.confidence = 0.0
        self.is_predicting = False # thread-safe flag to prevent multiple predictions at once
        
    def preprocess_frame(self, frame):
        # Converts raw camera frame to AI model input format
        processed = cv2.resize(frame, (112, 112))
        processed = processed / 255.0
        return processed.astype('float32')
    
    def predict_async(self): # Asynchronous Prediction
        # Run prediction in separate thread
        if len(self.frame_buffer) == self.frame_limit and not self.is_predicting: # 30 frames taken + Prevent multiple predictions
            self.is_predicting = True
            
            # Prepare 30 frame sequence
            video_sequence = np.array(list(self.frame_buffer), dtype='float32') # preserves data order and ensures correct data type
            video_sequence = np.expand_dims(video_sequence, axis=0)
            # Add batch dimension for model input
            # video_sequence shape: (1, 30, 112, 112, 3) bc TensorFlow/Keras expects 5D input for Conv3D

            # Predict (in background thread)
            prediction = self.model.predict(video_sequence, verbose=0) # Outputs probability array [[0.15, 0.85]] (Incorrect: 15%, Correct: 85%)
            predicted_class = np.argmax(prediction, axis=1)[0]
            self.confidence = np.max(prediction) * 100
            
            # Update result
            labels = ['Incorrect Form', 'Correct Form']
            self.prediction_result = labels[predicted_class]
            
            self.is_predicting = False
    
    def add_frame(self, frame):
        # Add frame to buffer and trigger prediction
        processed_frame = self.preprocess_frame(frame)
        self.frame_buffer.append(processed_frame) # Add processed frame to sliding window buffer
        
        # Start prediction in background thread when buffer is full(30 frames)
        if len(self.frame_buffer) == self.frame_limit:
            threading.Thread(target=self.predict_async, daemon=True).start()
            # creates a new thread for prediction to avoid blocking the main thread
            # target = the function to run in the new thread
            # daemon=True allows the thread to close when the main program closes
    
    def get_result(self):
        # Get current prediction result
        return self.prediction_result, self.confidence

# Usage
def run_live_classifier():
    # Define output_dir if not already defined
    output_dir = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")
    os.makedirs(output_dir, exist_ok=True)  # Ensure the directory exists

    # Find all model files and get the most recent
    model_pattern = os.path.join(output_dir, 'pushup_classification*.h5')
    model_files = glob.glob(model_pattern)

    if model_files:
        # Sort by modification time, get most recent
        latest_model = max(model_files, key=os.path.getmtime)
        model = load_model(latest_model)
        model_name = os.path.basename(latest_model)
        print(f"Loaded most recent model: {model_name}")

        classifier = LivePushupClassifier(latest_model) # Initialize classifier with the loaded model

    else:
        print("No existing model found.")
        model = None
    
    # Initialize webcam
    cap = cv2.VideoCapture(0)
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
    
    print("Live Pushup Form Checker - Press 'q' to quit")
    
    while True:
        ret, frame = cap.read()
        if not ret:
            break
        
        # Add frame to classifier
        classifier.add_frame(frame) # add to sliding window buffer
        
        # Get prediction result
        result, confidence = classifier.get_result() # gets latest prediction result
        
        # Display result
        if "Correct" in result:
            color = (0, 255, 0)  # Green
        elif "Incorrect" in result:
            color = (0, 0, 255)  # Red
        else:
            color = (255, 255, 0)  # Yellow
        
        # displays overlay text on frame
        cv2.putText(frame, f"{result}", (10, 30), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, color, 2)
        cv2.putText(frame, f"Confidence: {confidence:.1f}%", (10, 60), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, color, 2)
        cv2.putText(frame, f"Buffer: {len(classifier.frame_buffer)}/30", (10, 90), 
                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 1)
        
        # Creates Display frame window
        cv2.imshow('Live Pushup Form Checker', frame)
        
        if cv2.waitKey(1) & 0xFF == ord('q'):
            break
    
    cap.release()
    cv2.destroyAllWindows()

# Run the live classifier
run_live_classifier()

### all, hopfully

In [2]:
"""
Complete Pushup Detection, Processing, and Classification System
Combines live detection, keyframe extraction, and real-time CNN classification
"""

import cv2
import numpy as np
import mediapipe as mp
import os
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import glob
from tensorflow.keras.models import load_model
from pathlib import Path
import logging


@dataclass
class IntegratedConfig:
    """Configuration for the integrated system."""
    
    # Detection parameters
    min_detection_confidence: float = 0.5
    min_tracking_confidence: float = 0.5
    elbow_angle_up_threshold: float = 165.0
    elbow_angle_down_threshold: float = 95.0
    min_visibility_threshold: float = 0.7
    horizontal_filter_ratio: float = 0.5
    
    # Video parameters
    fps: float = 20.0
    fourcc: str = 'mp4v'
    buffer_size: int = 90
    
    # Directories
    repvids_dir: str = "RepVids"
    repkeys_dir: str = "RepKeys"
    models_dir: str = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")
    
    # CNN parameters
    target_frames: int = 30
    frame_size: Tuple[int, int] = (112, 112)


class AngleCalculator:
    """Calculate angles between body landmarks."""
    
    @staticmethod
    def calculate_angle(a: List[float], b: List[float], c: List[float]) -> float:
        """Calculate angle between three points."""
        a, b, c = np.array(a), np.array(b), np.array(c)
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        return 360 - angle if angle > 180 else angle


class KeyFrameProcessor:
    """Process recorded pushup videos to extract key frames."""
    
    def __init__(self, config: IntegratedConfig):
        self.config = config
        self.mp_pose = mp.solutions.pose
        
    def extract_key_frames_and_downsample(self, input_path: str, output_path: str) -> bool:
        """Extract frames between key points and downsample to 30 frames."""
        pose = self.mp_pose.Pose(
            min_detection_confidence=0.7, 
            min_tracking_confidence=0.7
        )
        
        cap = cv2.VideoCapture(input_path)
        elbow_angles = []
        frame_count = 0
        
        print(f"Processing {os.path.basename(input_path)}...")
        
        # STEP 1: Analyze all frames for elbow angles
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
                
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(image_rgb)
            
            if results.pose_landmarks:
                try:
                    landmarks = results.pose_landmarks.landmark
                    
                    # Try right side first, fallback to left
                    shoulder = [landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                               landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
                    elbow = [landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                            landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
                    wrist = [landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                            landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
                    
                    # Check visibility
                    visibility = (landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].visibility +
                                 landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].visibility +
                                 landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].visibility) / 3
                    
                    if visibility > 0.5:
                        angle = AngleCalculator.calculate_angle(shoulder, elbow, wrist)
                        elbow_angles.append((frame_count, angle))
                except:
                    pass
                    
            frame_count += 1
            
        cap.release()
        pose.close()
        
        if len(elbow_angles) < 3:
            print(f"Insufficient pose data ({len(elbow_angles)} frames)")
            return False
            
        # STEP 2: Find key points
        min_angle_frame = min(elbow_angles, key=lambda x: x[1])
        min_frame_idx, min_angle = min_angle_frame
        
        # Find start and end frames
        before_frames = [(idx, angle) for idx, angle in elbow_angles if idx < min_frame_idx]
        after_frames = [(idx, angle) for idx, angle in elbow_angles if idx > min_frame_idx]
        
        start_frame = max(before_frames, key=lambda x: x[1])[0] if before_frames else 0
        end_frame = max(after_frames, key=lambda x: x[1])[0] if after_frames else frame_count - 1
        
        # STEP 3: Extract and downsample frames
        cap = cv2.VideoCapture(input_path)
        extracted_frames = []
        current_frame = 0
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
                
            if start_frame <= current_frame <= end_frame:
                extracted_frames.append(frame)
                
            current_frame += 1
            if current_frame > end_frame:
                break
                
        cap.release()
        
        if not extracted_frames:
            return False
            
        # Downsample to target frames (30)
        if len(extracted_frames) <= self.config.target_frames:
            final_frames = extracted_frames
        else:
            # Ensure minimum angle frame is included
            min_frame_pos = min_frame_idx - start_frame
            indices = np.linspace(0, len(extracted_frames) - 1, self.config.target_frames, dtype=int)
            
            if min_frame_pos not in indices:
                closest_idx = np.argmin(np.abs(indices - min_frame_pos))
                indices[closest_idx] = min_frame_pos
                indices = np.sort(indices)
                
            final_frames = [extracted_frames[i] for i in indices]
            
        # STEP 4: Save output
        if final_frames:
            height, width = final_frames[0].shape[:2]
            fourcc = cv2.VideoWriter_fourcc(*self.config.fourcc)
            out = cv2.VideoWriter(output_path, fourcc, 10.0, (width, height))
            
            for frame in final_frames:
                out.write(frame)
            out.release()
            
            print(f"Saved {len(final_frames)} frames to RepKeys")
            return True
            
        return False


class CNNClassifier:
    """Real-time CNN classification with threading."""
    
    def __init__(self, config: IntegratedConfig):
        self.config = config
        self.model = None
        self.frame_buffer = deque(maxlen=config.target_frames)
        self.prediction_result = "Initializing..."
        self.confidence = 0.0
        self.is_predicting = False
        self.load_latest_model()
        
    def load_latest_model(self):
        """Load the most recent CNN model."""
        os.makedirs(self.config.models_dir, exist_ok=True)
        model_pattern = os.path.join(self.config.models_dir, 'pushup_classification*.h5')
        model_files = glob.glob(model_pattern)
        
        if model_files:
            latest_model = max(model_files, key=os.path.getmtime)
            self.model = load_model(latest_model)
            model_name = os.path.basename(latest_model)
            print(f"Loaded CNN model: {model_name}")
        else:
            print("No CNN model found - classification disabled")
            
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess frame for CNN input."""
        processed = cv2.resize(frame, self.config.frame_size)
        processed = processed / 255.0
        return processed.astype('float32')
        
    def predict_async(self):
        """Run prediction in background thread."""
        if (len(self.frame_buffer) == self.config.target_frames and 
            not self.is_predicting and self.model is not None):
            
            self.is_predicting = True
            
            try:
                # Prepare sequence for CNN
                video_sequence = np.array(list(self.frame_buffer), dtype='float32')
                video_sequence = np.expand_dims(video_sequence, axis=0)
                
                # Predict
                prediction = self.model.predict(video_sequence, verbose=0)
                predicted_class = np.argmax(prediction, axis=1)[0]
                self.confidence = np.max(prediction) * 100
                
                # Update result
                labels = ['Incorrect Form', 'Correct Form']
                self.prediction_result = labels[predicted_class]
                
            except Exception as e:
                print(f"Prediction error: {e}")
                self.prediction_result = "Error"
                self.confidence = 0.0
                
            finally:
                self.is_predicting = False
                
    def add_frame(self, frame: np.ndarray):
        """Add frame to buffer and trigger prediction."""
        processed_frame = self.preprocess_frame(frame)
        self.frame_buffer.append(processed_frame)
        
        if len(self.frame_buffer) == self.config.target_frames:
            threading.Thread(target=self.predict_async, daemon=True).start()
            
    def get_result(self) -> Tuple[str, float]:
        """Get current prediction result."""
        return self.prediction_result, self.confidence


class IntegratedPushupDetector:
    """Main integrated pushup detection system."""
    
    def __init__(self, config: IntegratedConfig = None):
        self.config = config or IntegratedConfig()
        
        # Setup directories
        self.repvids_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025" / self.config.repvids_dir
        self.repkeys_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025" / self.config.repkeys_dir
        self.repvids_path.mkdir(parents=True, exist_ok=True)
        self.repkeys_path.mkdir(parents=True, exist_ok=True)
        
        # Initialize components
        self.keyframe_processor = KeyFrameProcessor(self.config)
        self.cnn_classifier = CNNClassifier(self.config)
        
        # MediaPipe setup
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose
        
        # Detection state
        self.counter = 0
        self.stage = None
        self.frame_buffer = []
        self.first_rep_buffer = []
        self.first_stage_up_detected = False
        self.is_first_rep = True
        
        # Video recording
        self.fourcc = cv2.VideoWriter_fourcc(*self.config.fourcc)
        
    def extract_landmarks(self, results) -> Optional[Dict[str, List[float]]]:
        """Extract pose landmarks."""
        if not results.pose_landmarks:
            return None
            
        landmarks = results.pose_landmarks.landmark
        
        return {
            'shoulder': [landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                        landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y],
            'elbow': [landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                     landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].y],
            'wrist': [landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                     landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].y],
            'hip': [landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                   landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value].y],
            'knee': [landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                    landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value].y],
            'ankle': [landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                     landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]
        }
        
    def is_pose_valid(self, results, landmark_dict: Dict, image_height: int) -> bool:
        """Check if pose is valid for detection."""
        if not results.pose_landmarks:
            return False
            
        # Visibility check
        landmarks = results.pose_landmarks.landmark
        required_landmarks = [
            self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            self.mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            self.mp_pose.PoseLandmark.RIGHT_WRIST.value,
            self.mp_pose.PoseLandmark.RIGHT_HIP.value,
            self.mp_pose.PoseLandmark.RIGHT_KNEE.value,
            self.mp_pose.PoseLandmark.RIGHT_ANKLE.value
        ]
        
        if not all(landmarks[i].visibility > self.config.min_visibility_threshold 
                  for i in required_landmarks):
            return False
            
        # Horizontal filter
        y_coords = [
            landmark_dict['shoulder'][1] * image_height,
            landmark_dict['elbow'][1] * image_height,
            landmark_dict['wrist'][1] * image_height,
            landmark_dict['hip'][1] * image_height,
            landmark_dict['knee'][1] * image_height,
            landmark_dict['ankle'][1] * image_height
        ]
        
        data_range = max(y_coords) - min(y_coords)
        return data_range <= self.config.horizontal_filter_ratio * image_height
        
    def process_rep_completion(self, frame: np.ndarray, rep_number: int, frames: List[np.ndarray]):
        """Process completed repetition."""
        if not frames:
            print(f"No frames to save for rep {rep_number}")
            return
            
        # Save to RepVids
        repvids_filename = self.repvids_path / f'rep_{rep_number:03d}.mp4'
        height, width = frame.shape[:2]
        out = cv2.VideoWriter(str(repvids_filename), self.fourcc, self.config.fps, (width, height))
        
        for f in frames:
            out.write(f)
        out.release()
        
        print(f"✅ Saved rep {rep_number} to RepVids ({len(frames)} frames)")
        
        # Process to RepKeys in background thread
        def process_to_repkeys():
            repkeys_filename = self.repkeys_path / f'keyframes_30_rep_{rep_number:03d}.mp4'
            success = self.keyframe_processor.extract_key_frames_and_downsample(
                str(repvids_filename), str(repkeys_filename)
            )
            if success:
                print(f"✅ Processed rep {rep_number} to RepKeys")
            else:
                print(f"❌ Failed to process rep {rep_number} to RepKeys")
                
        threading.Thread(target=process_to_repkeys, daemon=True).start()
        
    def run(self, camera_index: int = 0):
        """Run the integrated detection system."""
        cap = cv2.VideoCapture(camera_index)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        
        if not cap.isOpened():
            print(f"Failed to open camera {camera_index}")
            return
            
        print("🚀 Integrated Pushup Detection System Started")
        print("📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN classification")
        print("Press 'q' to quit")
        
        try:
            with self.mp_pose.Pose(
                min_detection_confidence=self.config.min_detection_confidence,
                min_tracking_confidence=self.config.min_tracking_confidence
            ) as pose:
                
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                        
                    # Process with MediaPipe
                    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image.flags.writeable = False
                    results = pose.process(image)
                    
                    image.flags.writeable = True
                    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
                    
                    # Add frame to CNN classifier for real-time feedback
                    self.cnn_classifier.add_frame(image)
                    
                    # Handle frame buffering for rep recording
                    if not self.is_first_rep:
                        self.frame_buffer.append(image.copy())
                        
                    # Extract landmarks and process pushup detection
                    landmark_dict = self.extract_landmarks(results)
                    label = "None"
                    
                    if landmark_dict and self.is_pose_valid(results, landmark_dict, image.shape[0]):
                        elbow_angle = AngleCalculator.calculate_angle(
                            landmark_dict['shoulder'], 
                            landmark_dict['elbow'], 
                            landmark_dict['wrist']
                        )
                        
                        # Update detection state
                        prev_counter = self.counter
                        
                        if elbow_angle > self.config.elbow_angle_up_threshold:
                            if self.stage == "down":
                                self.stage = "up"
                                self.counter += 1
                            else:
                                self.stage = "up"
                        elif elbow_angle < self.config.elbow_angle_down_threshold and self.stage == "up":
                            self.stage = "down"
                            
                        # Handle rep recording
                        if self.is_first_rep:
                            # First rep handling
                            if self.stage == "up" and not self.first_stage_up_detected:
                                self.first_stage_up_detected = True
                                self.first_rep_buffer = [image.copy()]
                                print("🔴 First rep: Started collecting frames")
                            elif self.first_stage_up_detected:
                                self.first_rep_buffer.append(image.copy())
                                
                            if self.counter > prev_counter and self.counter == 1:
                                self.process_rep_completion(image, self.counter, self.first_rep_buffer)
                                self.first_rep_buffer = []
                                self.is_first_rep = False
                                print("🔄 Switching to normal rep recording")
                                
                        elif self.counter > prev_counter:
                            # Normal rep handling
                            self.process_rep_completion(image, self.counter, self.frame_buffer)
                            self.frame_buffer = []
                            
                        label = "Pushup"
                        
                        # Display angle info
                        percent = np.interp(elbow_angle, [90, 160], [0, 100])
                        cv2.putText(image, f'Elbow: {int(elbow_angle)}°', (10, 100),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                        cv2.putText(image, f'{int(percent)}%', (10, 140),
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                                   
                    # Get CNN classification result
                    cnn_result, cnn_confidence = self.cnn_classifier.get_result()
                    
                    # Display CNN feedback
                    if "Correct" in cnn_result:
                        cnn_color = (0, 255, 0)  # Green
                    elif "Incorrect" in cnn_result:
                        cnn_color = (0, 0, 255)  # Red
                    else:
                        cnn_color = (255, 255, 0)  # Yellow
                        
                    cv2.putText(image, f"Form: {cnn_result}", (10, 170),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, cnn_color, 2)
                    cv2.putText(image, f"Confidence: {cnn_confidence:.1f}%", (10, 200),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, cnn_color, 2)
                               
                    # Buffer status
                    if self.is_first_rep:
                        if self.first_stage_up_detected:
                            cv2.putText(image, f'First Rep Buffer: {len(self.first_rep_buffer)}', 
                                       (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                        else:
                            cv2.putText(image, 'Waiting for first "up" stage...', 
                                       (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                    else:
                        cv2.putText(image, f'Buffer: {len(self.frame_buffer)}/{self.config.buffer_size}', 
                                   (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                                   
                    # Draw pose landmarks
                    self.mp_drawing.draw_landmarks(
                        image, results.pose_landmarks, self.mp_pose.POSE_CONNECTIONS,
                        self.mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=4),
                        self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
                    )
                    
                    # Status and rep counter
                    rep_status = " (FIRST REP)" if self.is_first_rep else ""
                    cv2.putText(image, f'Detection: {label}{rep_status}', (10, 40),
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
                               
                    # Rep counter
                    cv2.rectangle(image, (image.shape[1] - 160, 0), (image.shape[1], 80), (0, 0, 0), -1)
                    cv2.putText(image, 'REPS', (image.shape[1] - 150, 30),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
                    cv2.putText(image, str(self.counter), (image.shape[1] - 140, 70),
                               cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 4)
                               
                    # CNN buffer status
                    cv2.putText(image, f'CNN Buffer: {len(self.cnn_classifier.frame_buffer)}/30', 
                               (10, 230), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
                               
                    cv2.imshow('Integrated Pushup Detection System', image)
                    
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                        
        except Exception as e:
            print(f"Error during execution: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()


def main():
    """Run the integrated pushup detection system."""
    config = IntegratedConfig(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        elbow_angle_up_threshold=165.0,
        elbow_angle_down_threshold=95.0,
        target_frames=30
    )
    
    detector = IntegratedPushupDetector(config)
    detector.run()


# Run the integrated system
if __name__ == "__main__":
    main()



Loaded CNN model: pushup_classification_latest_improved.h5
🚀 Integrated Pushup Detection System Started
📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN classification
Press 'q' to quit
🔴 First rep: Started collecting frames
✅ Saved rep 1 to RepVids (150 frames)
🔄 Switching to normal rep recording
Processing rep_001.mp4...
✅ Saved rep 2 to RepVids (34 frames)
Processing rep_002.mp4...
Saved 30 frames to RepKeys
✅ Processed rep 2 to RepKeys
✅ Saved rep 3 to RepVids (43 frames)
Processing rep_003.mp4...
Saved 30 frames to RepKeys
✅ Processed rep 3 to RepKeys
Saved 30 frames to RepKeys
✅ Processed rep 1 to RepKeys
✅ Saved rep 4 to RepVids (45 frames)
Processing rep_004.mp4...
Saved 30 frames to RepKeys
✅ Processed rep 4 to RepKeys
✅ Saved rep 5 to RepVids (46 frames)
Processing rep_005.mp4...
Saved 30 frames to RepKeys
✅ Processed rep 5 to RepKeys
✅ Saved rep 6 to RepVids (97 frames)
Processing rep_006.mp4...
✅ Saved rep 7 to RepVids (68 frames)
Processing rep_007.mp4...
Saved 30 fr

Saved 30 frames to RepKeys
✅ Processed rep 8 to RepKeys


# dfbfd

In [1]:
"""
Complete Pushup Detection, Processing, and Classification System
Combines live detection, keyframe extraction, and real-time CNN classification
"""

import cv2
import numpy as np
import mediapipe as mp
import os
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import glob
from tensorflow.keras.models import load_model
from pathlib import Path
import logging


@dataclass
class IntegratedConfig:
    """Configuration for the integrated system."""
    
    # Detection parameters
    min_detection_confidence: float = 0.5
    min_tracking_confidence: float = 0.5
    elbow_angle_up_threshold: float = 165.0
    elbow_angle_down_threshold: float = 95.0
    min_visibility_threshold: float = 0.7
    horizontal_filter_ratio: float = 0.5
    
    # Video parameters
    fps: float = 20.0
    fourcc: str = 'mp4v'
    buffer_size: int = 90
    
    # Directories
    repvids_dir: str = "RepVids"
    repkeys_dir: str = "RepKeys"
    models_dir: str = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")
    
    # CNN parameters
    target_frames: int = 30
    frame_size: Tuple[int, int] = (112, 112)


class AngleCalculator:
    """Calculate angles between body landmarks."""
    
    @staticmethod
    def calculate_angle(a: List[float], b: List[float], c: List[float]) -> float:
        """Calculate angle between three points."""
        a, b, c = np.array(a), np.array(b), np.array(c)
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        return 360 - angle if angle > 180 else angle


class KeyFrameProcessor:
    """Process recorded pushup videos to extract key frames."""
    
    def __init__(self, config: IntegratedConfig):
        self.config = config
        self.mp_pose = mp.solutions.pose
        
    def extract_key_frames_and_downsample(self, input_path: str, output_path: str) -> bool:
        """Extract frames between key points and downsample to 30 frames."""
        pose = self.mp_pose.Pose(
            min_detection_confidence=0.7, 
            min_tracking_confidence=0.7
        )
        
        cap = cv2.VideoCapture(input_path)
        elbow_angles = []
        frame_count = 0
        
        print(f"Processing {os.path.basename(input_path)}...")
        
        # STEP 1: Analyze all frames for elbow angles
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
                
            image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
            results = pose.process(image_rgb)
            
            if results.pose_landmarks:
                try:
                    landmarks = results.pose_landmarks.landmark
                    
                    # Try both sides - use whichever is more visible
                    # LEFT side
                    left_shoulder = [landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                                   landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
                    left_elbow = [landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
                                landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
                    left_wrist = [landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].x,
                                landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].y]
                    
                    left_visibility = (landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].visibility +
                                     landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].visibility +
                                     landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].visibility) / 3
                    
                    # RIGHT side
                    right_shoulder = [landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                                    landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
                    right_elbow = [landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                                 landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
                    right_wrist = [landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                                 landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].y]
                    
                    right_visibility = (landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].visibility +
                                      landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].visibility +
                                      landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].visibility) / 3
                    
                    # Use the side with better visibility
                    if left_visibility >= right_visibility and left_visibility > 0.5:
                        shoulder, elbow, wrist = left_shoulder, left_elbow, left_wrist
                    elif right_visibility > 0.5:
                        shoulder, elbow, wrist = right_shoulder, right_elbow, right_wrist
                    else:
                        frame_count += 1
                        continue
                    
                    angle = AngleCalculator.calculate_angle(shoulder, elbow, wrist)
                    elbow_angles.append((frame_count, angle))
                    
                except:
                    pass
                    
            frame_count += 1
            
        cap.release()
        pose.close()
        
        if len(elbow_angles) < 3:
            print(f"Insufficient pose data ({len(elbow_angles)} frames)")
            return False
            
        # STEP 2: Find key points
        min_angle_frame = min(elbow_angles, key=lambda x: x[1])
        min_frame_idx, min_angle = min_angle_frame
        
        # Find start and end frames
        before_frames = [(idx, angle) for idx, angle in elbow_angles if idx < min_frame_idx]
        after_frames = [(idx, angle) for idx, angle in elbow_angles if idx > min_frame_idx]
        
        start_frame = max(before_frames, key=lambda x: x[1])[0] if before_frames else 0
        end_frame = max(after_frames, key=lambda x: x[1])[0] if after_frames else frame_count - 1
        
        # STEP 3: Extract and downsample frames
        cap = cv2.VideoCapture(input_path)
        extracted_frames = []
        current_frame = 0
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
                
            if start_frame <= current_frame <= end_frame:
                extracted_frames.append(frame)
                
            current_frame += 1
            if current_frame > end_frame:
                break
                
        cap.release()
        
        if not extracted_frames:
            return False
            
        # Downsample to target frames (30)
        if len(extracted_frames) <= self.config.target_frames:
            final_frames = extracted_frames
        else:
            # Ensure minimum angle frame is included
            min_frame_pos = min_frame_idx - start_frame
            indices = np.linspace(0, len(extracted_frames) - 1, self.config.target_frames, dtype=int)
            
            if min_frame_pos not in indices:
                closest_idx = np.argmin(np.abs(indices - min_frame_pos))
                indices[closest_idx] = min_frame_pos
                indices = np.sort(indices)
                
            final_frames = [extracted_frames[i] for i in indices]
            
        # STEP 4: Save output
        if final_frames:
            height, width = final_frames[0].shape[:2]
            fourcc = cv2.VideoWriter_fourcc(*self.config.fourcc)
            out = cv2.VideoWriter(output_path, fourcc, 10.0, (width, height))
            
            for frame in final_frames:
                out.write(frame)
            out.release()
            
            print(f"Saved {len(final_frames)} frames to RepKeys")
            return True
            
        return False


class CNNClassifier:
    """Real-time CNN classification with threading."""
    
    def __init__(self, config: IntegratedConfig):
        self.config = config
        self.model = None
        self.frame_buffer = deque(maxlen=config.target_frames)
        self.prediction_result = "Initializing..."
        self.confidence = 0.0
        self.is_predicting = False
        self.load_latest_model()
        
    def load_latest_model(self):
        """Load the most recent CNN model."""
        os.makedirs(self.config.models_dir, exist_ok=True)
        model_pattern = os.path.join(self.config.models_dir, 'pushup_classification*.h5')
        model_files = glob.glob(model_pattern)
        
        if model_files:
            latest_model = max(model_files, key=os.path.getmtime)
            self.model = load_model(latest_model)
            model_name = os.path.basename(latest_model)
            print(f"Loaded CNN model: {model_name}")
        else:
            print("No CNN model found - classification disabled")
            
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess frame for CNN input."""
        processed = cv2.resize(frame, self.config.frame_size)
        processed = processed / 255.0
        return processed.astype('float32')
        
    def predict_async(self):
        """Run prediction in background thread."""
        if (len(self.frame_buffer) == self.config.target_frames and 
            not self.is_predicting and self.model is not None):
            
            self.is_predicting = True
            
            try:
                # Prepare sequence for CNN
                video_sequence = np.array(list(self.frame_buffer), dtype='float32')
                video_sequence = np.expand_dims(video_sequence, axis=0)
                
                # Predict
                prediction = self.model.predict(video_sequence, verbose=0)
                predicted_class = np.argmax(prediction, axis=1)[0]
                self.confidence = np.max(prediction) * 100
                
                # Update result
                labels = ['Incorrect Form', 'Correct Form']
                self.prediction_result = labels[predicted_class]
                
            except Exception as e:
                print(f"Prediction error: {e}")
                self.prediction_result = "Error"
                self.confidence = 0.0
                
            finally:
                self.is_predicting = False
                
    def add_frame(self, frame: np.ndarray):
        """Add frame to buffer and trigger prediction."""
        processed_frame = self.preprocess_frame(frame)
        self.frame_buffer.append(processed_frame)
        
        if len(self.frame_buffer) == self.config.target_frames:
            threading.Thread(target=self.predict_async, daemon=True).start()
            
    def get_result(self) -> Tuple[str, float]:
        """Get current prediction result."""
        return self.prediction_result, self.confidence


class IntegratedPushupDetector:
    """Main integrated pushup detection system."""
    
    def __init__(self, config: IntegratedConfig = None):
        self.config = config or IntegratedConfig()
        
        # Setup directories
        self.repvids_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025" / self.config.repvids_dir
        self.repkeys_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025" / self.config.repkeys_dir
        self.repvids_path.mkdir(parents=True, exist_ok=True)
        self.repkeys_path.mkdir(parents=True, exist_ok=True)
        
        # Initialize components
        self.keyframe_processor = KeyFrameProcessor(self.config)
        self.cnn_classifier = CNNClassifier(self.config)
        
        # MediaPipe setup
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose
        
        # Detection state
        self.counter = 0
        self.stage = None
        self.frame_buffer = []
        self.first_rep_buffer = []
        self.first_stage_up_detected = False
        self.is_first_rep = True
        
        # Video recording
        self.fourcc = cv2.VideoWriter_fourcc(*self.config.fourcc)
        
    def extract_landmarks(self, results) -> Optional[Dict[str, List[float]]]:
        """Extract pose landmarks - uses better visible side."""
        if not results.pose_landmarks:
            return None
            
        landmarks = results.pose_landmarks.landmark
        
        try:
            # Get LEFT side landmarks
            left_landmarks = {
                'shoulder': [landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                           landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].y],
                'elbow': [landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
                         landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].y],
                'wrist': [landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].x,
                         landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].y],
                'hip': [landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value].x,
                       landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value].y],
                'knee': [landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value].x,
                        landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value].y],
                'ankle': [landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value].x,
                         landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value].y]
            }
            
            # Calculate LEFT side visibility
            left_visibility = (
                landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value].visibility
            ) / 6
            
            # Get RIGHT side landmarks
            right_landmarks = {
                'shoulder': [landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                           landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y],
                'elbow': [landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                         landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].y],
                'wrist': [landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].x,
                         landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].y],
                'hip': [landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                       landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value].y],
                'knee': [landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                        landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value].y],
                'ankle': [landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                         landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]
            }
            
            # Calculate RIGHT side visibility
            right_visibility = (
                landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value].visibility +
                landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value].visibility
            ) / 6
            
            # Return the side with better visibility
            if left_visibility >= right_visibility:
                return left_landmarks
            else:
                return right_landmarks
                
        except Exception as e:
            print(f"Error extracting landmarks: {e}")
            return None
        
    def is_pose_valid(self, results, landmark_dict: Dict, image_height: int) -> bool:
        """Check if pose is valid for detection - works with both sides."""
        if not results.pose_landmarks or not landmark_dict:
            return False
            
        landmarks = results.pose_landmarks.landmark
        
        # Determine which side we're using based on the landmark_dict
        # Check if we have left or right side landmarks
        shoulder_x = landmark_dict['shoulder'][0]
        
        # Find which side has better visibility by checking both sides
        left_landmarks = [
            self.mp_pose.PoseLandmark.LEFT_SHOULDER.value,
            self.mp_pose.PoseLandmark.LEFT_ELBOW.value,
            self.mp_pose.PoseLandmark.LEFT_WRIST.value,
            self.mp_pose.PoseLandmark.LEFT_HIP.value,
            self.mp_pose.PoseLandmark.LEFT_KNEE.value,
            self.mp_pose.PoseLandmark.LEFT_ANKLE.value
        ]
        
        right_landmarks = [
            self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            self.mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            self.mp_pose.PoseLandmark.RIGHT_WRIST.value,
            self.mp_pose.PoseLandmark.RIGHT_HIP.value,
            self.mp_pose.PoseLandmark.RIGHT_KNEE.value,
            self.mp_pose.PoseLandmark.RIGHT_ANKLE.value
        ]
        
        # Calculate visibility for both sides
        left_visibility = sum(landmarks[i].visibility for i in left_landmarks) / len(left_landmarks)
        right_visibility = sum(landmarks[i].visibility for i in right_landmarks) / len(right_landmarks)
        
        # Use the side with better visibility for validation
        if left_visibility >= right_visibility:
            required_landmarks = left_landmarks
            current_visibility = left_visibility
        else:
            required_landmarks = right_landmarks
            current_visibility = right_visibility
        
        # Visibility check
        if current_visibility <= self.config.min_visibility_threshold:
            return False
            
        # Horizontal filter
        y_coords = [
            landmark_dict['shoulder'][1] * image_height,
            landmark_dict['elbow'][1] * image_height,
            landmark_dict['wrist'][1] * image_height,
            landmark_dict['hip'][1] * image_height,
            landmark_dict['knee'][1] * image_height,
            landmark_dict['ankle'][1] * image_height
        ]
        
        data_range = max(y_coords) - min(y_coords)
        return data_range <= self.config.horizontal_filter_ratio * image_height
        
    def process_rep_completion(self, frame: np.ndarray, rep_number: int, frames: List[np.ndarray]):
        """Process completed repetition."""
        if not frames:
            print(f"No frames to save for rep {rep_number}")
            return
            
        # Save to RepVids
        repvids_filename = self.repvids_path / f'rep_{rep_number:03d}.mp4'
        height, width = frame.shape[:2]
        out = cv2.VideoWriter(str(repvids_filename), self.fourcc, self.config.fps, (width, height))
        
        for f in frames:
            out.write(f)
        out.release()
        
        print(f"✅ Saved rep {rep_number} to RepVids ({len(frames)} frames)")
        
        # Process to RepKeys in background thread
        def process_to_repkeys():
            repkeys_filename = self.repkeys_path / f'keyframes_30_rep_{rep_number:03d}.mp4'
            success = self.keyframe_processor.extract_key_frames_and_downsample(
                str(repvids_filename), str(repkeys_filename)
            )
            if success:
                print(f"✅ Processed rep {rep_number} to RepKeys")
            else:
                print(f"❌ Failed to process rep {rep_number} to RepKeys")
                
        threading.Thread(target=process_to_repkeys, daemon=True).start()
        
    def run(self, camera_index: int = 0):
        """Run the integrated detection system."""
        cap = cv2.VideoCapture(camera_index)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        
        if not cap.isOpened():
            print(f"Failed to open camera {camera_index}")
            return
            
        print("🚀 Integrated Pushup Detection System Started")
        print("📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN classification")
        print("🔍 Automatic bilateral landmark detection (left/right)")
        print("Press 'q' to quit")
        
        try:
            with self.mp_pose.Pose(
                min_detection_confidence=self.config.min_detection_confidence,
                min_tracking_confidence=self.config.min_tracking_confidence
            ) as pose:
                
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                        
                    # Process with MediaPipe
                    image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    image.flags.writeable = False
                    results = pose.process(image)
                    
                    image.flags.writeable = True
                    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
                    
                    # Add frame to CNN classifier for real-time feedback
                    self.cnn_classifier.add_frame(image)
                    
                    # Handle frame buffering for rep recording
                    if not self.is_first_rep:
                        self.frame_buffer.append(image.copy())
                        
                    # Extract landmarks and process pushup detection
                    landmark_dict = self.extract_landmarks(results)
                    label = "None"
                    side_used = "None"
                    
                    if landmark_dict and self.is_pose_valid(results, landmark_dict, image.shape[0]):
                        elbow_angle = AngleCalculator.calculate_angle(
                            landmark_dict['shoulder'], 
                            landmark_dict['elbow'], 
                            landmark_dict['wrist']
                        )
                        
                        # Determine which side is being used for display
                        if results.pose_landmarks:
                            landmarks = results.pose_landmarks.landmark
                            left_vis = (landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value].visibility +
                                       landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value].visibility +
                                       landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value].visibility) / 3
                            right_vis = (landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value].visibility +
                                        landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value].visibility +
                                        landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value].visibility) / 3
                            side_used = "Left" if left_vis >= right_vis else "Right"
                        
                        # Update detection state
                        prev_counter = self.counter
                        
                        if elbow_angle > self.config.elbow_angle_up_threshold:
                            if self.stage == "down":
                                self.stage = "up"
                                self.counter += 1
                            else:
                                self.stage = "up"
                        elif elbow_angle < self.config.elbow_angle_down_threshold and self.stage == "up":
                            self.stage = "down"
                            
                        # Handle rep recording
                        if self.is_first_rep:
                            # First rep handling
                            if self.stage == "up" and not self.first_stage_up_detected:
                                self.first_stage_up_detected = True
                                self.first_rep_buffer = [image.copy()]
                                print("🔴 First rep: Started collecting frames")
                            elif self.first_stage_up_detected:
                                self.first_rep_buffer.append(image.copy())
                                
                            if self.counter > prev_counter and self.counter == 1:
                                self.process_rep_completion(image, self.counter, self.first_rep_buffer)
                                self.first_rep_buffer = []
                                self.is_first_rep = False
                                print("🔄 Switching to normal rep recording")
                                
                        elif self.counter > prev_counter:
                            # Normal rep handling
                            self.process_rep_completion(image, self.counter, self.frame_buffer)
                            self.frame_buffer = []
                            
                        label = "Pushup"
                        
                        # Display angle info with side indicator
                        percent = np.interp(elbow_angle, [90, 160], [0, 100])
                        cv2.putText(image, f'{side_used} Elbow: {int(elbow_angle)}°', (10, 100),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2)
                        cv2.putText(image, f'{int(percent)}%', (10, 140),
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                                   
                    # Get CNN classification result
                    cnn_result, cnn_confidence = self.cnn_classifier.get_result()
                    
                    # Display CNN feedback
                    if "Correct" in cnn_result:
                        cnn_color = (0, 255, 0)  # Green
                    elif "Incorrect" in cnn_result:
                        cnn_color = (0, 0, 255)  # Red
                    else:
                        cnn_color = (255, 255, 0)  # Yellow
                        
                    cv2.putText(image, f"Form: {cnn_result}", (10, 170),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, cnn_color, 2)
                    cv2.putText(image, f"Confidence: {cnn_confidence:.1f}%", (10, 200),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, cnn_color, 2)
                               
                    # Buffer status
                    if self.is_first_rep:
                        if self.first_stage_up_detected:
                            cv2.putText(image, f'First Rep Buffer: {len(self.first_rep_buffer)}', 
                                       (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                        else:
                            cv2.putText(image, 'Waiting for first "up" stage...', 
                                       (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                    else:
                        cv2.putText(image, f'Buffer: {len(self.frame_buffer)}/{self.config.buffer_size}', 
                                   (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                                   
                    # Draw pose landmarks
                    self.mp_drawing.draw_landmarks(
                        image, results.pose_landmarks, self.mp_pose.POSE_CONNECTIONS,
                        self.mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=4),
                        self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
                    )
                    
                    # Status and rep counter
                    rep_status = " (FIRST REP)" if self.is_first_rep else ""
                    cv2.putText(image, f'Detection: {label}{rep_status}', (10, 40),
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 255), 2)
                               
                    # Rep counter
                    cv2.rectangle(image, (image.shape[1] - 160, 0), (image.shape[1], 80), (0, 0, 0), -1)
                    cv2.putText(image, 'REPS', (image.shape[1] - 150, 30),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.8, (255, 255, 255), 2)
                    cv2.putText(image, str(self.counter), (image.shape[1] - 140, 70),
                               cv2.FONT_HERSHEY_SIMPLEX, 2, (255, 255, 255), 4)
                               
                    # CNN buffer status
                    cv2.putText(image, f'CNN Buffer: {len(self.cnn_classifier.frame_buffer)}/30', 
                               (10, 230), cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
                               
                    # Side indicator
                    if side_used != "None":
                        cv2.putText(image, f'Using: {side_used} Side', (10, 260),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.4, (128, 255, 128), 1)
                               
                    cv2.imshow('Integrated Pushup Detection System', image)
                    
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                        
        except Exception as e:
            print(f"Error during execution: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()


def main():
    """Run the integrated pushup detection system."""
    config = IntegratedConfig(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        elbow_angle_up_threshold=165.0,
        elbow_angle_down_threshold=95.0,
        target_frames=30
    )
    
    detector = IntegratedPushupDetector(config)
    detector.run()


# Run the integrated system
if __name__ == "__main__":
    main()



Loaded CNN model: pushup_classification_latest_improved.h5
🚀 Integrated Pushup Detection System Started
📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN classification
🔍 Automatic bilateral landmark detection (left/right)
Press 'q' to quit
🔴 First rep: Started collecting frames
✅ Saved rep 1 to RepVids (129 frames)
🔄 Switching to normal rep recording
Processing rep_001.mp4...
✅ Saved rep 2 to RepVids (35 frames)
Processing rep_002.mp4...
Saved 21 frames to RepKeys
✅ Processed rep 2 to RepKeys
✅ Saved rep 3 to RepVids (39 frames)
Processing rep_003.mp4...
Saved 30 frames to RepKeys
✅ Processed rep 1 to RepKeys
✅ Saved rep 4 to RepVids (37 frames)
Processing rep_004.mp4...
Saved 30 frames to RepKeys
✅ Processed rep 3 to RepKeys
Saved 30 frames to RepKeys
✅ Processed rep 4 to RepKeys
✅ Saved rep 5 to RepVids (42 frames)
Processing rep_005.mp4...
Saved 20 frames to RepKeys
✅ Processed rep 5 to RepKeys


# ihfivbfi

In [2]:
"""
Efficient Pushup Detection and Classification System
Streamlined version with threading, async prediction, and reduced redundancy
"""


import cv2
import numpy as np
import mediapipe as mp
import os
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import glob
from tensorflow.keras.models import load_model
from pathlib import Path
import queue
import concurrent.futures




@dataclass
class Config:
    """Streamlined configuration."""
    # Detection thresholds
    min_detection_confidence: float = 0.5
    min_tracking_confidence: float = 0.5
    elbow_angle_up: float = 165.0
    elbow_angle_down: float = 95.0
    min_visibility: float = 0.7
   
    # Video settings
    fps: float = 20.0
    target_frames: int = 30
    frame_size: Tuple[int, int] = (112, 112)
   
    # Directories
    repvids_dir: str = "RepVids"
    repkeys_dir: str = "RepKeys"
    models_dir: str = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")




class PoseAnalyzer:
    """Handles pose detection and angle calculations."""
   
    def __init__(self, config: Config):
        self.config = config
        self.mp_pose = mp.solutions.pose
       
    @staticmethod
    def calculate_angle(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:
        """Calculate angle between three points."""
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        return 360 - angle if angle > 180 else angle
   
    def extract_best_side_landmarks(self, results) -> Optional[Tuple[Dict, str, float]]:
        """Extract landmarks from the side with better visibility."""
        if not results.pose_landmarks:
            return None
           
        landmarks = results.pose_landmarks.landmark
       
        # Define landmark indices for both sides
        left_indices = {
            'shoulder': self.mp_pose.PoseLandmark.LEFT_SHOULDER.value,
            'elbow': self.mp_pose.PoseLandmark.LEFT_ELBOW.value,
            'wrist': self.mp_pose.PoseLandmark.LEFT_WRIST.value,
            'hip': self.mp_pose.PoseLandmark.LEFT_HIP.value,
            'knee': self.mp_pose.PoseLandmark.LEFT_KNEE.value,
            'ankle': self.mp_pose.PoseLandmark.LEFT_ANKLE.value
        }
       
        right_indices = {
            'shoulder': self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            'elbow': self.mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            'wrist': self.mp_pose.PoseLandmark.RIGHT_WRIST.value,
            'hip': self.mp_pose.PoseLandmark.RIGHT_HIP.value,
            'knee': self.mp_pose.PoseLandmark.RIGHT_KNEE.value,
            'ankle': self.mp_pose.PoseLandmark.RIGHT_ANKLE.value
        }
       
        # Calculate visibility for both sides
        left_vis = np.mean([landmarks[idx].visibility for idx in left_indices.values()])
        right_vis = np.mean([landmarks[idx].visibility for idx in right_indices.values()])
       
        # Choose better side
        if left_vis >= right_vis and left_vis > self.config.min_visibility:
            indices = left_indices
            side = "Left"
            visibility = left_vis
        elif right_vis > self.config.min_visibility:
            indices = right_indices
            side = "Right"
            visibility = right_vis
        else:
            return None
           
        # Extract coordinates
        coords = {}
        for joint, idx in indices.items():
            coords[joint] = np.array([landmarks[idx].x, landmarks[idx].y])
           
        return coords, side, visibility
   
    def get_elbow_angle(self, coords: Dict) -> float:
        """Calculate elbow angle from coordinates."""
        return self.calculate_angle(coords['shoulder'], coords['elbow'], coords['wrist'])




class VideoProcessor:
    """Handles video recording and keyframe extraction."""
   
    def __init__(self, config: Config):
        self.config = config
        self.pose_analyzer = PoseAnalyzer(config)
        self.fourcc = cv2.VideoWriter_fourcc(*'mp4v')
       
        # Setup directories
        base_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025"
        self.repvids_path = base_path / config.repvids_dir
        self.repkeys_path = base_path / config.repkeys_dir
        self.repvids_path.mkdir(parents=True, exist_ok=True)
        self.repkeys_path.mkdir(parents=True, exist_ok=True)
       
    def save_rep_video(self, frames: List[np.ndarray], rep_number: int) -> str:
        """Save repetition video to RepVids."""
        if not frames:
            return None
           
        filename = self.repvids_path / f'rep_{rep_number:03d}.mp4'
        height, width = frames[0].shape[:2]
       
        out = cv2.VideoWriter(str(filename), self.fourcc, self.config.fps, (width, height))
        for frame in frames:
            out.write(frame)
        out.release()
       
        print(f"✅ Saved rep {rep_number} ({len(frames)} frames)")
        return str(filename)
   
    def extract_keyframes(self, video_path: str, rep_number: int) -> bool:
        """Extract and downsample to 30 keyframes based on elbow angles."""
        cap = cv2.VideoCapture(video_path)
       
        with self.pose_analyzer.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7
        ) as pose:
           
            frames = []
            angles = []
           
            # Extract all frames and angles
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                   
                frames.append(frame)
               
                # Get elbow angle
                image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = pose.process(image_rgb)
               
                pose_data = self.pose_analyzer.extract_best_side_landmarks(results)
                if pose_data:
                    coords, _, _ = pose_data
                    angle = self.pose_analyzer.get_elbow_angle(coords)
                    angles.append(angle)
                else:
                    angles.append(180)  # Default angle if no pose detected
                   
        cap.release()
       
        if len(frames) < 3:
            return False
           
        # Find key points
        min_angle_idx = np.argmin(angles)
       
        # Find start and end based on angle peaks
        start_idx = 0
        end_idx = len(frames) - 1
       
        # Look for angle peaks before and after minimum
        for i in range(min_angle_idx - 1, -1, -1):
            if angles[i] > 160:  # High angle indicating "up" position
                start_idx = i
                break
               
        for i in range(min_angle_idx + 1, len(angles)):
            if angles[i] > 160:  # High angle indicating "up" position
                end_idx = i
                break
               
        # Extract relevant frames
        key_frames = frames[start_idx:end_idx + 1]
       
        # Downsample to target frames
        if len(key_frames) > self.config.target_frames:
            indices = np.linspace(0, len(key_frames) - 1, self.config.target_frames, dtype=int)
            # Ensure minimum angle frame is included
            min_frame_relative = min_angle_idx - start_idx
            if min_frame_relative not in indices:
                closest_idx = np.argmin(np.abs(indices - min_frame_relative))
                indices[closest_idx] = min_frame_relative
                indices = np.sort(indices)
            key_frames = [key_frames[i] for i in indices]
           
        # Save keyframes video
        if key_frames:
            output_path = self.repkeys_path / f'keyframes_30_rep_{rep_number:03d}.mp4'
            height, width = key_frames[0].shape[:2]
           
            out = cv2.VideoWriter(str(output_path), self.fourcc, 10.0, (width, height))
            for frame in key_frames:
                out.write(frame)
            out.release()
           
            print(f"✅ Extracted {len(key_frames)} keyframes for rep {rep_number}")
            return True
           
        return False




class CNNClassifier:
    """Efficient CNN classifier with async prediction."""
   
    def __init__(self, config: Config):
        self.config = config
        self.model = None
        self.frame_buffer = deque(maxlen=config.target_frames)
        self.prediction_queue = queue.Queue(maxsize=5)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
        self.current_prediction = "Initializing..."
        self.current_confidence = 0.0
        self.load_model()
       
    def load_model(self):
        """Load the latest CNN model."""
        os.makedirs(self.config.models_dir, exist_ok=True)
        model_files = glob.glob(os.path.join(self.config.models_dir, 'pushup_classification*.h5'))
       
        if model_files:
            latest_model = max(model_files, key=os.path.getmtime)
            self.model = load_model(latest_model)
            print(f"Loaded CNN model: {os.path.basename(latest_model)}")
        else:
            print("No CNN model found - classification disabled")
           
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess frame for CNN."""
        processed = cv2.resize(frame, self.config.frame_size)
        return (processed / 255.0).astype('float32')
       
    def predict_async(self, frames: List[np.ndarray]):
        """Async prediction function."""
        try:
            if self.model is None:
                return "No Model", 0.0
               
            # Prepare sequence
            sequence = np.array([self.preprocess_frame(f) for f in frames])
            sequence = np.expand_dims(sequence, axis=0)
           
            # Predict
            prediction = self.model.predict(sequence, verbose=0)
            predicted_class = np.argmax(prediction)
            confidence = np.max(prediction) * 100
           
            labels = ['Incorrect Form', 'Correct Form']
            return labels[predicted_class], confidence
           
        except Exception as e:
            print(f"Prediction error: {e}")
            return "Error", 0.0
           
    def add_frame(self, frame: np.ndarray):
        """Add frame and trigger prediction if buffer is full."""
        self.frame_buffer.append(frame.copy())
       
        if len(self.frame_buffer) == self.config.target_frames:
            # Submit async prediction
            future = self.executor.submit(self.predict_async, list(self.frame_buffer))
           
            # Non-blocking result retrieval
            def update_result():
                try:
                    result, confidence = future.result(timeout=0.1)
                    self.current_prediction = result
                    self.current_confidence = confidence
                except concurrent.futures.TimeoutError:
                    pass  # Keep previous result
                except Exception as e:
                    print(f"Prediction update error: {e}")
                   
            threading.Thread(target=update_result, daemon=True).start()
           
    def get_result(self) -> Tuple[str, float]:
        """Get current prediction result."""
        return self.current_prediction, self.current_confidence




class EfficientPushupDetector:
    """Main efficient pushup detection system."""
   
    def __init__(self, config: Config = None):
        self.config = config or Config()
       
        # Initialize components
        self.pose_analyzer = PoseAnalyzer(self.config)
        self.video_processor = VideoProcessor(self.config)
        self.cnn_classifier = CNNClassifier(self.config)
       
        # MediaPipe setup
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose
       
        # Detection state
        self.counter = 0
        self.stage = None
        self.frame_buffer = []
        self.processing_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
       
    def process_completed_rep(self, frames: List[np.ndarray], rep_number: int):
        """Process completed rep in background thread."""
        def process():
            # Save to RepVids
            video_path = self.video_processor.save_rep_video(frames, rep_number)
            if video_path:
                # Extract keyframes to RepKeys
                self.video_processor.extract_keyframes(video_path, rep_number)
               
        self.processing_executor.submit(process)
       
    def run(self, camera_index: int = 0):
        """Run the efficient detection system."""
        cap = cv2.VideoCapture(camera_index)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
       
        if not cap.isOpened():
            print(f"Failed to open camera {camera_index}")
            return
           
        print("🚀 Efficient Pushup Detection System")
        print("📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN")
        print("Press 'q' to quit")
       
        try:
            with self.mp_pose.Pose(
                min_detection_confidence=self.config.min_detection_confidence,
                min_tracking_confidence=self.config.min_tracking_confidence
            ) as pose:
               
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                       
                    # Process pose
                    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    results = pose.process(image_rgb)
                   
                    # Add frame to CNN classifier
                    self.cnn_classifier.add_frame(frame)
                   
                    # Extract pose data
                    pose_data = self.pose_analyzer.extract_best_side_landmarks(results)
                   
                    if pose_data:
                        coords, side, visibility = pose_data
                        elbow_angle = self.pose_analyzer.get_elbow_angle(coords)
                       
                        # Update detection state
                        prev_counter = self.counter
                       
                        if elbow_angle > self.config.elbow_angle_up:
                            if self.stage == "down":
                                self.stage = "up"
                                self.counter += 1
                            else:
                                self.stage = "up"
                        elif elbow_angle < self.config.elbow_angle_down and self.stage == "up":
                            self.stage = "down"
                           
                        # Handle frame buffering and rep completion
                        self.frame_buffer.append(frame.copy())
                       
                        if self.counter > prev_counter:
                            # Rep completed - process in background
                            self.process_completed_rep(self.frame_buffer.copy(), self.counter)
                            self.frame_buffer = []
                           
                        # Display info
                        percent = np.interp(elbow_angle, [90, 160], [0, 100])
                        cv2.putText(frame, f'{side} Elbow: {int(elbow_angle)}°', (10, 60),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                        cv2.putText(frame, f'{int(percent)}%', (10, 90),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
                                   
                    # Get and display CNN result
                    cnn_result, cnn_confidence = self.cnn_classifier.get_result()
                    cnn_color = (0, 255, 0) if "Correct" in cnn_result else (0, 0, 255)
                   
                    cv2.putText(frame, f"Form: {cnn_result}", (10, 120),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, cnn_color, 2)
                    cv2.putText(frame, f"Confidence: {cnn_confidence:.1f}%", (10, 150),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, cnn_color, 2)
                               
                    # Draw pose landmarks
                    self.mp_drawing.draw_landmarks(
                        frame, results.pose_landmarks, self.mp_pose.POSE_CONNECTIONS,
                        self.mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=3),
                        self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
                    )
                   
                    # Rep counter
                    cv2.rectangle(frame, (frame.shape[1] - 120, 0), (frame.shape[1], 60), (0, 0, 0), -1)
                    cv2.putText(frame, 'REPS', (frame.shape[1] - 110, 25),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(frame, str(self.counter), (frame.shape[1] - 100, 50),
                               cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 3)
                               
                    # Buffer status
                    cv2.putText(frame, f'Buffer: {len(self.frame_buffer)}', (10, 30),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                    cv2.putText(frame, f'CNN: {len(self.cnn_classifier.frame_buffer)}/30', (10, 180),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
                               
                    cv2.imshow('Efficient Pushup Detection', frame)
                   
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                       
        except Exception as e:
            print(f"Error: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()
            self.processing_executor.shutdown(wait=False)
            self.cnn_classifier.executor.shutdown(wait=False)




def main():
    """Run the efficient pushup detection system."""
    config = Config(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        elbow_angle_up=165.0,
        elbow_angle_down=95.0,
        target_frames=30
    )
   
    detector = EfficientPushupDetector(config)
    detector.run()




if __name__ == "__main__":
    main()



Loaded CNN model: pushup_classification_latest_improved.h5
🚀 Efficient Pushup Detection System
📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN
Press 'q' to quit
✅ Saved rep 1 (173 frames)
✅ Saved rep 2 (33 frames)
✅ Extracted 24 keyframes for rep 2
✅ Saved rep 3 (34 frames)
✅ Extracted 26 keyframes for rep 3
✅ Saved rep 4 (38 frames)
✅ Extracted 23 keyframes for rep 4
✅ Extracted 23 keyframes for rep 1
✅ Saved rep 5 (85 frames)
✅ Saved rep 6 (52 frames)
✅ Extracted 30 keyframes for rep 5
✅ Extracted 28 keyframes for rep 6
✅ Saved rep 7 (56 frames)
✅ Saved rep 8 (37 frames)
✅ Extracted 26 keyframes for rep 7
✅ Extracted 3 keyframes for rep 8


In [None]:
# downsizing goes to any number under 30 which we have code to fix

# model doesnt have text to show its running

In [None]:
"""
Efficient Pushup Detection and Classification System
Streamlined version with threading, async prediction, and reduced redundancy
"""


import cv2
import numpy as np
import mediapipe as mp
import os
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import glob
from tensorflow.keras.models import load_model
from pathlib import Path
import queue
import concurrent.futures




@dataclass
class Config:
    """Streamlined configuration."""
    # Detection thresholds
    min_detection_confidence: float = 0.5
    min_tracking_confidence: float = 0.5
    elbow_angle_up: float = 165.0
    elbow_angle_down: float = 95.0
    min_visibility: float = 0.7
   
    # Video settings
    fps: float = 20.0
    target_frames: int = 30
    frame_size: Tuple[int, int] = (112, 112)
   
    # Directories
    repvids_dir: str = "RepVids"
    repkeys_dir: str = "RepKeys"
    models_dir: str = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")




class PoseAnalyzer:
    """Handles pose detection and angle calculations."""
   
    def __init__(self, config: Config):
        self.config = config
        self.mp_pose = mp.solutions.pose
       
    @staticmethod
    def calculate_angle(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:
        """Calculate angle between three points."""
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        return 360 - angle if angle > 180 else angle
   
    def extract_best_side_landmarks(self, results) -> Optional[Tuple[Dict, str, float]]:
        """Extract landmarks from the side with better visibility."""
        if not results.pose_landmarks:
            return None
           
        landmarks = results.pose_landmarks.landmark
       
        # Define landmark indices for both sides
        left_indices = {
            'shoulder': self.mp_pose.PoseLandmark.LEFT_SHOULDER.value,
            'elbow': self.mp_pose.PoseLandmark.LEFT_ELBOW.value,
            'wrist': self.mp_pose.PoseLandmark.LEFT_WRIST.value,
            'hip': self.mp_pose.PoseLandmark.LEFT_HIP.value,
            'knee': self.mp_pose.PoseLandmark.LEFT_KNEE.value,
            'ankle': self.mp_pose.PoseLandmark.LEFT_ANKLE.value
        }
       
        right_indices = {
            'shoulder': self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            'elbow': self.mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            'wrist': self.mp_pose.PoseLandmark.RIGHT_WRIST.value,
            'hip': self.mp_pose.PoseLandmark.RIGHT_HIP.value,
            'knee': self.mp_pose.PoseLandmark.RIGHT_KNEE.value,
            'ankle': self.mp_pose.PoseLandmark.RIGHT_ANKLE.value
        }
       
        # Calculate visibility for both sides
        left_vis = np.mean([landmarks[idx].visibility for idx in left_indices.values()])
        right_vis = np.mean([landmarks[idx].visibility for idx in right_indices.values()])
       
        # Choose better side
        if left_vis >= right_vis and left_vis > self.config.min_visibility:
            indices = left_indices
            side = "Left"
            visibility = left_vis
        elif right_vis > self.config.min_visibility:
            indices = right_indices
            side = "Right"
            visibility = right_vis
        else:
            return None
           
        # Extract coordinates
        coords = {}
        for joint, idx in indices.items():
            coords[joint] = np.array([landmarks[idx].x, landmarks[idx].y])
           
        return coords, side, visibility
   
    def get_elbow_angle(self, coords: Dict) -> float:
        """Calculate elbow angle from coordinates."""
        return self.calculate_angle(coords['shoulder'], coords['elbow'], coords['wrist'])

class VideoProcessor:
    """Handles video recording and keyframe extraction."""
   
    def __init__(self, config: Config):
        self.config = config
        self.pose_analyzer = PoseAnalyzer(config)
        self.fourcc = cv2.VideoWriter_fourcc(*'mp4v')
       
        # Setup directories
        base_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025"
        self.repvids_path = base_path / config.repvids_dir
        self.repkeys_path = base_path / config.repkeys_dir
        self.repvids_path.mkdir(parents=True, exist_ok=True)
        self.repkeys_path.mkdir(parents=True, exist_ok=True)
       
    def save_rep_video(self, frames: List[np.ndarray], rep_number: int) -> str:
        """Save repetition video to RepVids."""
        if not frames:
            return None
           
        filename = self.repvids_path / f'rep_{rep_number:03d}.mp4'
        height, width = frames[0].shape[:2]
       
        out = cv2.VideoWriter(str(filename), self.fourcc, self.config.fps, (width, height))
        for frame in frames:
            out.write(frame)
        out.release()
       
        print(f"✅ Saved rep {rep_number} ({len(frames)} frames)")
        return str(filename)
   
    def extract_keyframes(self, video_path: str, rep_number: int) -> bool:
        """Extract and downsample to exactly 30 keyframes based on elbow angles."""
        cap = cv2.VideoCapture(video_path)
    
        with self.pose_analyzer.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7
        ) as pose:
        
            frames = []
            angles = []
        
            # Extract all frames and angles
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                frames.append(frame)
            
                # Get elbow angle
                image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = pose.process(image_rgb)
            
                pose_data = self.pose_analyzer.extract_best_side_landmarks(results)
                if pose_data:
                    coords, _, _ = pose_data
                    angle = self.pose_analyzer.get_elbow_angle(coords)
                    angles.append(angle)
                else:
                    angles.append(180)  # Default angle if no pose detected
                
        cap.release()
    
        if len(frames) < 3:
            return False
        
        # Find key points
        min_angle_idx = np.argmin(angles)
    
        # Find start and end based on angle peaks
        start_idx = 0
        end_idx = len(frames) - 1
    
        # Look for angle peaks before and after minimum
        for i in range(min_angle_idx - 1, -1, -1):
            if angles[i] > 160:  # High angle indicating "up" position
                start_idx = i
                break
            
        for i in range(min_angle_idx + 1, len(angles)):
            if angles[i] > 160:  # High angle indicating "up" position
                end_idx = i
                break
            
        # Extract relevant frames
        key_frames = frames[start_idx:end_idx + 1]
    
        # ALWAYS ensure exactly 30 frames
        if len(key_frames) == self.config.target_frames:
            # Perfect - use as is
            final_frames = key_frames
        elif len(key_frames) > self.config.target_frames:
            # Downsample to exactly 30
            indices = np.linspace(0, len(key_frames) - 1, self.config.target_frames, dtype=int)
            # Ensure minimum angle frame is included
            min_frame_relative = min_angle_idx - start_idx
            if min_frame_relative not in indices:
                closest_idx = np.argmin(np.abs(indices - min_frame_relative))
                indices[closest_idx] = min_frame_relative
                indices = np.sort(indices)
            final_frames = [key_frames[i] for i in indices]
        else:
            # Upsample to exactly 30 frames by repeating/interpolating
            if len(key_frames) < self.config.target_frames:
                # Create indices that will repeat frames to reach target
                indices = np.linspace(0, len(key_frames) - 1, self.config.target_frames)
                indices = np.round(indices).astype(int)
                
                # Ensure minimum angle frame is still included
                min_frame_relative = min_angle_idx - start_idx
                if min_frame_relative < len(key_frames):
                    # Find where min frame should be in the upsampled sequence
                    target_pos = int((min_frame_relative / (len(key_frames) - 1)) * (self.config.target_frames - 1))
                    indices[target_pos] = min_frame_relative
                
                final_frames = [key_frames[i] for i in indices]
    
        # Save keyframes video - now guaranteed to be exactly 30 frames
        if final_frames:
            output_path = self.repkeys_path / f'keyframes_30_rep_{rep_number:03d}.mp4'
            height, width = final_frames[0].shape[:2]
        
            out = cv2.VideoWriter(str(output_path), self.fourcc, 10.0, (width, height))
            for frame in final_frames:
                out.write(frame)
            out.release()
        
            print(f"✅ Extracted {len(final_frames)} keyframes for rep {rep_number}")
            return True
        
        return False

class CNNClassifier:
    """Efficient CNN classifier with async prediction."""
   
    def __init__(self, config: Config):
        self.config = config
        self.model = None
        self.frame_buffer = deque(maxlen=config.target_frames)
        self.prediction_queue = queue.Queue(maxsize=5)
        self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=2)
        self.current_prediction = "Initializing..."
        self.current_confidence = 0.0
        self.load_model()
       
    def load_model(self):
        """Load the latest CNN model."""
        os.makedirs(self.config.models_dir, exist_ok=True)
        model_files = glob.glob(os.path.join(self.config.models_dir, 'pushup_classification*.h5'))
       
        if model_files:
            latest_model = max(model_files, key=os.path.getmtime)
            self.model = load_model(latest_model)
            print(f"Loaded CNN model: {os.path.basename(latest_model)}")
        else:
            print("No CNN model found - classification disabled")
           
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess frame for CNN."""
        processed = cv2.resize(frame, self.config.frame_size)
        return (processed / 255.0).astype('float32')
       
    def predict_async(self, frames: List[np.ndarray]):
        """Async prediction function."""
        try:
            if self.model is None:
                return "No Model", 0.0
               
            # Prepare sequence
            sequence = np.array([self.preprocess_frame(f) for f in frames])
            sequence = np.expand_dims(sequence, axis=0)
           
            # Predict
            prediction = self.model.predict(sequence, verbose=0)
            predicted_class = np.argmax(prediction)
            confidence = np.max(prediction) * 100
           
            labels = ['Incorrect Form', 'Correct Form']
            return labels[predicted_class], confidence
           
        except Exception as e:
            print(f"Prediction error: {e}")
            return "Error", 0.0
           
    def add_frame(self, frame: np.ndarray):
        """Add frame and trigger prediction if buffer is full."""
        self.frame_buffer.append(frame.copy())
       
        if len(self.frame_buffer) == self.config.target_frames:
            # Submit async prediction
            future = self.executor.submit(self.predict_async, list(self.frame_buffer))
           
            # Non-blocking result retrieval
            def update_result():
                try:
                    result, confidence = future.result(timeout=0.1)
                    self.current_prediction = result
                    self.current_confidence = confidence
                except concurrent.futures.TimeoutError:
                    pass  # Keep previous result
                except Exception as e:
                    print(f"Prediction update error: {e}")
                   
            threading.Thread(target=update_result, daemon=True).start()
           
    def get_result(self) -> Tuple[str, float]:
        """Get current prediction result."""
        return self.current_prediction, self.current_confidence

class EfficientPushupDetector:
    """Main efficient pushup detection system. 
    WITH RUN FUNCTION."""
   
    def __init__(self, config: Config = None):
        self.config = config or Config()
       
        # Initialize components
        self.pose_analyzer = PoseAnalyzer(self.config)
        self.video_processor = VideoProcessor(self.config)
        self.cnn_classifier = CNNClassifier(self.config)
       
        # MediaPipe setup
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose
       
        # Detection state
        self.counter = 0
        self.stage = None
        self.frame_buffer = []
        self.first_rep_buffer = []  # Add this
        self.first_stage_up_detected = False  # Add this
        self.is_first_rep = True  # Add this
        self.processing_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
       
    def process_completed_rep(self, frames: List[np.ndarray], rep_number: int):
        """Process completed rep in background thread."""
        def process():
            # Save to RepVids
            video_path = self.video_processor.save_rep_video(frames, rep_number)
            if video_path:
                # Extract keyframes to RepKeys
                self.video_processor.extract_keyframes(video_path, rep_number)
               
        self.processing_executor.submit(process)
       
    def run(self, camera_index: int = 0):
        """Run the efficient detection system."""
        cap = cv2.VideoCapture(camera_index)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
       
        if not cap.isOpened():
            print(f"Failed to open camera {camera_index}")
            return
           
        print("🚀 Efficient Pushup Detection System")
        print("📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN")
        print("Press 'q' to quit")
       
        try:
            with self.mp_pose.Pose(
                min_detection_confidence=self.config.min_detection_confidence,
                min_tracking_confidence=self.config.min_tracking_confidence
            ) as pose:
               
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                       
                    # Process pose
                    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    results = pose.process(image_rgb)
                   
                    # Add frame to CNN classifier
                    self.cnn_classifier.add_frame(frame)
                   
                    # Extract pose data
                    pose_data = self.pose_analyzer.extract_best_side_landmarks(results)
                   
                    if pose_data:
                        coords, side, visibility = pose_data
                        elbow_angle = self.pose_analyzer.get_elbow_angle(coords)
                       
                        # Update detection state
                        prev_counter = self.counter
                       
                        if elbow_angle > self.config.elbow_angle_up:
                            if self.stage == "down":
                                self.stage = "up"
                                self.counter += 1
                            else:
                                self.stage = "up"
                        elif elbow_angle < self.config.elbow_angle_down and self.stage == "up":
                            self.stage = "down"
                           
                        # Handle frame buffering and rep completion
                        self.frame_buffer.append(frame.copy())

                        if self.is_first_rep:
                            # Start collecting frames when stage first becomes "up"
                            if self.stage == "up" and not self.first_stage_up_detected:
                                self.first_stage_up_detected = True
                                self.first_rep_buffer = [frame.copy()]  # Start with current frame
                                print("🔴 First rep: Started collecting frames (stage = 'up')")
                            
                            # Continue collecting frames if we've started
                            elif self.first_stage_up_detected:
                                self.first_rep_buffer.append(frame.copy())
                            
                            # Save first rep when counter increases from 0 to 1
                            if self.counter > prev_counter and self.counter == 1:
                                # Rep completed - process in background (same pattern as normal reps)
                                self.process_completed_rep(self.first_rep_buffer.copy(), self.counter)
                                
                                # Reset first rep variables
                                self.first_rep_buffer = []
                                self.is_first_rep = False
                                print("🔄 Switching to normal rep recording mode")

                        elif self.counter > prev_counter:
                            # Rep completed - process in background
                            self.process_completed_rep(self.frame_buffer.copy(), self.counter)
                            self.frame_buffer = []
        
                    # Get and display CNN result
                    cnn_result, cnn_confidence = self.cnn_classifier.get_result()
                    cnn_color = (0, 255, 0) if "Correct" in cnn_result else (0, 0, 255)
                   
                    cv2.putText(frame, f"Form: {cnn_result}", (10, 120),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, cnn_color, 2)
                    cv2.putText(frame, f"Confidence: {cnn_confidence:.1f}%", (10, 150),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, cnn_color, 2)
                               
                    # Draw pose landmarks
                    self.mp_drawing.draw_landmarks(
                        frame, results.pose_landmarks, self.mp_pose.POSE_CONNECTIONS,
                        self.mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=3),
                        self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
                    )
                   
                    # Rep counter
                    cv2.rectangle(frame, (frame.shape[1] - 120, 0), (frame.shape[1], 60), (0, 0, 0), -1)
                    cv2.putText(frame, 'REPS', (frame.shape[1] - 110, 25),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(frame, str(self.counter), (frame.shape[1] - 100, 50),
                               cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 3)
                               
                    # Buffer status
                    cv2.putText(frame, f'Buffer: {len(self.frame_buffer)}', (10, 30),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                    cv2.putText(frame, f'CNN: {len(self.cnn_classifier.frame_buffer)}/30', (10, 180),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.4, (255, 255, 255), 1)
                               
                    cv2.imshow('Efficient Pushup Detection', frame)
                   
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                       
        except Exception as e:
            print(f"Error: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()
            self.processing_executor.shutdown(wait=False)
            self.cnn_classifier.executor.shutdown(wait=False)




def main():
    """Run the efficient pushup detection system."""
    config = Config(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        elbow_angle_up=165.0,
        elbow_angle_down=95.0,
        target_frames=30
    )
   
    detector = EfficientPushupDetector(config)
    detector.run()




if __name__ == "__main__":
    main()

# can run without clearing files



Loaded CNN model: pushup_classification_latest_improved.h5
🚀 Efficient Pushup Detection System
📹 Live detection + 🔄 Auto-processing + 🤖 Real-time CNN
Press 'q' to quit
🔴 First rep: Started collecting frames (stage = 'up')
🔄 Switching to normal rep recording mode
✅ Saved rep 1 (71 frames)


✅ Extracted 30 keyframes for rep 1


# *IMP

In [None]:
"""
Efficient Pushup Detection and Classification System
Streamlined version with threading, async prediction, and reduced redundancy
"""

# CLEAR REPVIDS AND REPKEYS FILES BEFORE RUNNING

import cv2
import numpy as np
import mediapipe as mp
import os
import threading
import time
from collections import deque
from dataclasses import dataclass
from typing import List, Tuple, Optional, Dict
import glob
from tensorflow.keras.models import load_model
from pathlib import Path
import queue
import concurrent.futures



@dataclass
class Config:
    """Streamlined configuration."""
    # Detection thresholds
    min_detection_confidence: float = 0.5
    min_tracking_confidence: float = 0.5
    elbow_angle_up: float = 165.0
    elbow_angle_down: float = 95.0
    min_visibility: float = 0.7
   
    # Video settings
    fps: float = 20.0
    target_frames: int = 30
    frame_size: Tuple[int, int] = (112, 112)
   
    # Directories
    repvids_dir: str = "RepVids"
    repkeys_dir: str = "RepKeys"
    models_dir: str = os.path.join(os.path.expanduser("~"), "Downloads", "Pushup Models")




class PoseAnalyzer:
    """Handles pose detection and angle calculations."""
   
    def __init__(self, config: Config):
        self.config = config
        self.mp_pose = mp.solutions.pose
       
    @staticmethod
    def calculate_angle(a: np.ndarray, b: np.ndarray, c: np.ndarray) -> float:
        """Calculate angle between three points."""
        radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
        angle = np.abs(radians * 180.0 / np.pi)
        return 360 - angle if angle > 180 else angle
   
    def extract_best_side_landmarks(self, results) -> Optional[Tuple[Dict, str, float]]:
        """Extract landmarks from the side with better visibility."""
        if not results.pose_landmarks:
            return None
           
        landmarks = results.pose_landmarks.landmark
       
        # Define landmark indices for both sides
        left_indices = {
            'shoulder': self.mp_pose.PoseLandmark.LEFT_SHOULDER.value,
            'elbow': self.mp_pose.PoseLandmark.LEFT_ELBOW.value,
            'wrist': self.mp_pose.PoseLandmark.LEFT_WRIST.value,
            'hip': self.mp_pose.PoseLandmark.LEFT_HIP.value,
            'knee': self.mp_pose.PoseLandmark.LEFT_KNEE.value,
            'ankle': self.mp_pose.PoseLandmark.LEFT_ANKLE.value
        }
       
        right_indices = {
            'shoulder': self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value,
            'elbow': self.mp_pose.PoseLandmark.RIGHT_ELBOW.value,
            'wrist': self.mp_pose.PoseLandmark.RIGHT_WRIST.value,
            'hip': self.mp_pose.PoseLandmark.RIGHT_HIP.value,
            'knee': self.mp_pose.PoseLandmark.RIGHT_KNEE.value,
            'ankle': self.mp_pose.PoseLandmark.RIGHT_ANKLE.value
        }
       
        # Calculate visibility for both sides
        left_vis = np.mean([landmarks[idx].visibility for idx in left_indices.values()])
        right_vis = np.mean([landmarks[idx].visibility for idx in right_indices.values()])
       
        # Choose better side
        if left_vis >= right_vis and left_vis > self.config.min_visibility:
            indices = left_indices
            side = "Left"
            visibility = left_vis
        elif right_vis > self.config.min_visibility:
            indices = right_indices
            side = "Right"
            visibility = right_vis
        else:
            return None
           
        # Extract coordinates
        coords = {}
        for joint, idx in indices.items():
            coords[joint] = np.array([landmarks[idx].x, landmarks[idx].y])
           
        return coords, side, visibility
   
    def get_elbow_angle(self, coords: Dict) -> float:
        """Calculate elbow angle from coordinates."""
        return self.calculate_angle(coords['shoulder'], coords['elbow'], coords['wrist'])

class VideoProcessor:
    """Handles video recording and keyframe extraction."""
   
    def __init__(self, config: Config):
        self.config = config
        self.pose_analyzer = PoseAnalyzer(config)
        self.fourcc = cv2.VideoWriter_fourcc(*'mp4v')
       
        # Setup directories
        base_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025"
        self.repvids_path = base_path / config.repvids_dir
        self.repkeys_path = base_path / config.repkeys_dir
        self.repvids_path.mkdir(parents=True, exist_ok=True)
        self.repkeys_path.mkdir(parents=True, exist_ok=True)
       
    def save_rep_video(self, frames: List[np.ndarray], rep_number: int) -> str:
        """Save repetition video to RepVids."""
        if not frames:
            return None
           
        filename = self.repvids_path / f'rep_{rep_number:03d}.mp4'
        height, width = frames[0].shape[:2]
       
        out = cv2.VideoWriter(str(filename), self.fourcc, self.config.fps, (width, height))
        for frame in frames:
            out.write(frame)
        out.release()
       
        print(f"✅ Saved rep {rep_number} ({len(frames)} frames)")
        return str(filename)
   
    def extract_keyframes(self, video_path: str, rep_number: int) -> bool:
        """Extract and downsample to exactly 30 keyframes based on elbow angles."""
        cap = cv2.VideoCapture(video_path)
    
        with self.pose_analyzer.mp_pose.Pose(
            min_detection_confidence=0.7,
            min_tracking_confidence=0.7
        ) as pose:
        
            frames = []
            angles = []
        
            # Extract all frames and angles
            while cap.isOpened():
                ret, frame = cap.read()
                if not ret:
                    break
                
                frames.append(frame)
            
                # Get elbow angle
                image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                results = pose.process(image_rgb)
            
                pose_data = self.pose_analyzer.extract_best_side_landmarks(results)
                if pose_data:
                    coords, _, _ = pose_data
                    angle = self.pose_analyzer.get_elbow_angle(coords)
                    angles.append(angle)
                else:
                    angles.append(180)  # Default angle if no pose detected
                
        cap.release()
    
        if len(frames) < 3:
            return False
        
        # Find key points
        min_angle_idx = np.argmin(angles)
    
        # Find start and end based on angle peaks
        start_idx = 0
        end_idx = len(frames) - 1
    
        # Look for angle peaks before and after minimum
        for i in range(min_angle_idx - 1, -1, -1):
            if angles[i] > 160:  # High angle indicating "up" position
                start_idx = i
                break
            
        for i in range(min_angle_idx + 1, len(angles)):
            if angles[i] > 160:  # High angle indicating "up" position
                end_idx = i
                break
            
        # Extract relevant frames
        key_frames = frames[start_idx:end_idx + 1]
    
        # ALWAYS ensure exactly 30 frames
        if len(key_frames) == self.config.target_frames:
            # Perfect - use as is
            final_frames = key_frames
        elif len(key_frames) > self.config.target_frames:
            # Downsample to exactly 30
            indices = np.linspace(0, len(key_frames) - 1, self.config.target_frames, dtype=int)
            # Ensure minimum angle frame is included
            min_frame_relative = min_angle_idx - start_idx
            if min_frame_relative not in indices:
                closest_idx = np.argmin(np.abs(indices - min_frame_relative))
                indices[closest_idx] = min_frame_relative
                indices = np.sort(indices)
            final_frames = [key_frames[i] for i in indices]
        else:
            # Upsample to exactly 30 frames by repeating/interpolating
            if len(key_frames) < self.config.target_frames:
                # Create indices that will repeat frames to reach target
                indices = np.linspace(0, len(key_frames) - 1, self.config.target_frames)
                indices = np.round(indices).astype(int)
                
                # Ensure minimum angle frame is still included
                min_frame_relative = min_angle_idx - start_idx
                if min_frame_relative < len(key_frames):
                    # Find where min frame should be in the upsampled sequence
                    target_pos = int((min_frame_relative / (len(key_frames) - 1)) * (self.config.target_frames - 1))
                    indices[target_pos] = min_frame_relative
                
                final_frames = [key_frames[i] for i in indices]
    
        # Save keyframes video - now guaranteed to be exactly 30 frames
        if final_frames:
            output_path = self.repkeys_path / f'keyframes_30_rep_{rep_number:03d}.mp4'
            height, width = final_frames[0].shape[:2]
        
            out = cv2.VideoWriter(str(output_path), self.fourcc, 10.0, (width, height))
            for frame in final_frames:
                out.write(frame)
            out.release()
        
            print(f"✅ Extracted {len(final_frames)} keyframes for rep {rep_number}")
            return True
        
        return False

class CNNClassifier:
    """CNN classifier for processing saved RepKeys videos."""
   
    def __init__(self, config: Config):
        self.config = config
        self.model = None
        self.load_model()
        
        # Setup RepKeys directory path
        base_path = Path.home() / "Desktop" / "Deep Dive AI Summer 2025"
        self.repkeys_path = base_path / config.repkeys_dir
       
    def load_model(self):
        """Load the latest CNN model."""
        os.makedirs(self.config.models_dir, exist_ok=True)
        model_files = glob.glob(os.path.join(self.config.models_dir, 'pushup_classification*.h5'))
       
        if model_files:
            latest_model = max(model_files, key=os.path.getmtime)
            self.model = load_model(latest_model)
            print(f"✅ Loaded CNN model: {os.path.basename(latest_model)}")
        else:
            print("❌ No CNN model found - classification disabled")
            self.model = None
           
    def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
        """Preprocess frame for CNN."""
        processed = cv2.resize(frame, self.config.frame_size)
        return (processed / 255.0).astype('float32')
    
    def load_video_frames(self, video_path: str) -> Optional[List[np.ndarray]]:
        """Load all frames from a video file."""
        cap = cv2.VideoCapture(video_path)
        frames = []
        
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break
            frames.append(frame)
            
        cap.release()
        
        if len(frames) != self.config.target_frames:
            print(f"⚠️  Video {os.path.basename(video_path)} has {len(frames)} frames, expected {self.config.target_frames}")
            
        return frames if frames else None
    
    def predict_rep_classification(self, video_path: str, rep_number: int) -> Tuple[str, float]:
        """Predict classification for a single rep video."""
        if self.model is None:
            return "No Model Available", 0.0
            
        try:
            # Load video frames
            frames = self.load_video_frames(video_path)
            if not frames:
                return "Error: No frames loaded", 0.0
            
            # Preprocess frames
            processed_frames = [self.preprocess_frame(frame) for frame in frames]
            
            # Pad or truncate to target_frames if necessary
            if len(processed_frames) < self.config.target_frames:
                # Repeat last frame to reach target
                while len(processed_frames) < self.config.target_frames:
                    processed_frames.append(processed_frames[-1])
            elif len(processed_frames) > self.config.target_frames:
                # Truncate to target frames
                processed_frames = processed_frames[:self.config.target_frames]
            
            # Prepare sequence for model
            sequence = np.array(processed_frames, dtype='float32')
            sequence = np.expand_dims(sequence, axis=0)  # Add batch dimension
            
            # Make prediction
            prediction = self.model.predict(sequence, verbose=0)
            predicted_class = np.argmax(prediction)
            confidence = np.max(prediction) * 100
            
            labels = ['Incorrect Form', 'Correct Form']
            classification = labels[predicted_class]
            
            # Print result
            print(f"🔍 Rep {rep_number}: {classification} (Confidence: {confidence:.1f}%)")
            
            return classification, confidence
            
        except Exception as e:
            error_msg = f"Error processing rep {rep_number}: {e}"
            print(f"❌ {error_msg}")
            return "Error", 0.0
    
    def process_all_repkeys(self):
        """Process all videos in RepKeys directory and print classifications."""
        if not self.repkeys_path.exists():
            print(f"❌ RepKeys directory not found: {self.repkeys_path}")
            return
            
        # Find all RepKeys videos
        video_files = list(self.repkeys_path.glob("keyframes_30_rep_*.mp4"))
        
        if not video_files:
            print("❌ No RepKeys videos found for classification")
            return
            
        # Sort by rep number
        video_files.sort(key=lambda x: int(x.stem.split('_')[-1]))
        
        print(f"\n🤖 CNN Classification Results:")
        print("=" * 50)
        
        for video_file in video_files:
            # Extract rep number from filename
            try:
                rep_number = int(video_file.stem.split('_')[-1])
                self.predict_rep_classification(str(video_file), rep_number)
            except ValueError:
                print(f"⚠️  Could not extract rep number from {video_file.name}")
                
        print("=" * 50)
    
    def process_single_rep(self, rep_number: int) -> Tuple[str, float]:
        """Process a single rep by number."""
        video_path = self.repkeys_path / f"keyframes_30_rep_{rep_number:03d}.mp4"
        
        if not video_path.exists():
            print(f"❌ Rep {rep_number} video not found: {video_path}")
            return "File Not Found", 0.0
            
        return self.predict_rep_classification(str(video_path), rep_number)

class EfficientPushupDetector:
    """Main efficient pushup detection system."""
   
    def __init__(self, config: Config = None):
        self.config = config or Config()
       
        # Initialize components
        self.pose_analyzer = PoseAnalyzer(self.config)
        self.video_processor = VideoProcessor(self.config)
        self.cnn_classifier = CNNClassifier(self.config)
       
        # MediaPipe setup
        self.mp_drawing = mp.solutions.drawing_utils
        self.mp_pose = mp.solutions.pose
       
        # Detection state
        self.counter = 0
        self.stage = None
        self.frame_buffer = []
        self.first_rep_buffer = []
        self.first_stage_up_detected = False
        self.is_first_rep = True
        self.processing_executor = concurrent.futures.ThreadPoolExecutor(max_workers=3)
       
    def process_completed_rep(self, frames: List[np.ndarray], rep_number: int):
        """Process completed rep in background thread."""
        def process():
            # Save to RepVids
            video_path = self.video_processor.save_rep_video(frames, rep_number)
            if video_path:
                # Extract keyframes to RepKeys
                success = self.video_processor.extract_keyframes(video_path, rep_number)
                
                # If keyframes were successfully extracted, classify the rep
                if success:
                    # Add small delay to ensure file is fully written
                    time.sleep(0.5)
                    self.cnn_classifier.process_single_rep(rep_number)
               
        self.processing_executor.submit(process)
       
    def run(self, camera_index: int = 0):
        """Run the efficient detection system."""
        cap = cv2.VideoCapture(camera_index)
        cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
       
        if not cap.isOpened():
            print(f"Failed to open camera {camera_index}")
            return
           
        print("🚀 Efficient Pushup Detection System")
        print("📹 Live detection + 🔄 Auto-processing + 🤖 CNN Classification")
        print("Press 'q' to quit")
       
        try:
            with self.mp_pose.Pose(
                min_detection_confidence=self.config.min_detection_confidence,
                min_tracking_confidence=self.config.min_tracking_confidence
            ) as pose:
               
                while cap.isOpened():
                    ret, frame = cap.read()
                    if not ret:
                        break
                       
                    # Process pose
                    image_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    results = pose.process(image_rgb)
                   
                    # Extract pose data
                    pose_data = self.pose_analyzer.extract_best_side_landmarks(results)
                   
                    if pose_data:
                        coords, side, visibility = pose_data
                        elbow_angle = self.pose_analyzer.get_elbow_angle(coords)
                       
                        # Update detection state
                        prev_counter = self.counter
                       
                        if elbow_angle > self.config.elbow_angle_up:
                            if self.stage == "down":
                                self.stage = "up"
                                self.counter += 1
                            else:
                                self.stage = "up"
                        elif elbow_angle < self.config.elbow_angle_down and self.stage == "up":
                            self.stage = "down"
                           
                        # Handle frame buffering - always add to normal buffer now
                        if not self.is_first_rep:
                            self.frame_buffer.append(frame.copy())

                        if self.is_first_rep:
                            # Start collecting frames when stage first becomes "up"
                            if self.stage == "up" and not self.first_stage_up_detected:
                                self.first_stage_up_detected = True
                                self.first_rep_buffer = [frame.copy()]
                                print("🔴 First rep: Started collecting frames (stage = 'up')")
                            
                            # Continue collecting frames if we've started
                            elif self.first_stage_up_detected:
                                self.first_rep_buffer.append(frame.copy())
                            
                            # Save first rep when counter increases from 0 to 1
                            if self.counter > prev_counter and self.counter == 1:
                                # Rep completed - process in background
                                self.process_completed_rep(self.first_rep_buffer.copy(), self.counter)
                                
                                # Reset first rep variables
                                self.first_rep_buffer = []
                                self.is_first_rep = False
                                print("🔄 Switching to normal rep recording mode")

                        elif self.counter > prev_counter:
                            # Rep completed - process in background
                            self.process_completed_rep(self.frame_buffer.copy(), self.counter)
                            self.frame_buffer = []
                            
                        # Display angle info
                        percent = np.interp(elbow_angle, [90, 160], [0, 100])
                        cv2.putText(frame, f'{side} Elbow: {int(elbow_angle)}*', (10, 60),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                        cv2.putText(frame, f'{int(percent)}%', (10, 90),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
                               
                    # Draw pose landmarks
                    self.mp_drawing.draw_landmarks(
                        frame, results.pose_landmarks, self.mp_pose.POSE_CONNECTIONS,
                        self.mp_drawing.DrawingSpec(color=(0, 0, 255), thickness=2, circle_radius=3),
                        self.mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=2)
                    )
                   
                    # Rep counter
                    cv2.rectangle(frame, (frame.shape[1] - 120, 0), (frame.shape[1], 60), (0, 0, 0), -1)
                    cv2.putText(frame, 'REPS', (frame.shape[1] - 110, 25),
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 255), 2)
                    cv2.putText(frame, str(self.counter), (frame.shape[1] - 100, 50),
                               cv2.FONT_HERSHEY_SIMPLEX, 1.5, (255, 255, 255), 3)
                               
                    # Buffer status
                    if self.is_first_rep:
                        if self.first_stage_up_detected:
                            cv2.putText(frame, f'First Rep Buffer: {len(self.first_rep_buffer)}', 
                                       (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 255), 2)
                        else:
                            cv2.putText(frame, 'Waiting for first "up" stage...', 
                                       (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                    else:
                        cv2.putText(frame, f'Buffer: {len(self.frame_buffer)}', (10, 30),
                                   cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 0), 2)
                               
                    cv2.imshow('Efficient Pushup Detection', frame)
                   
                    if cv2.waitKey(1) & 0xFF == ord('q'):
                        break
                       
        except Exception as e:
            print(f"Error: {e}")
        finally:
            cap.release()
            cv2.destroyAllWindows()
            self.processing_executor.shutdown(wait=False)
            
        # Process any remaining RepKeys videos when exiting
        print("\n🔍 Final classification of all reps:")
        self.cnn_classifier.process_all_repkeys()


def main():
    """Run the efficient pushup detection system."""
    config = Config(
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5,
        elbow_angle_up=165.0,
        elbow_angle_down=95.0,
        target_frames=30
    )
   
    detector = EfficientPushupDetector(config)
    detector.run()



if __name__ == "__main__":
    main()

# CLEAR REPVIDS AND REPKEYS FILES BEFORE RUNNING



✅ Loaded CNN model: pushup_classification_latest_improved.h5
🚀 Efficient Pushup Detection System
📹 Live detection + 🔄 Auto-processing + 🤖 CNN Classification
Press 'q' to quit
🔴 First rep: Started collecting frames (stage = 'up')
🔄 Switching to normal rep recording mode
✅ Saved rep 1 (56 frames)
✅ Saved rep 2 (48 frames)
✅ Extracted 30 keyframes for rep 1
🔍 Rep 1: Incorrect Form (Confidence: 100.0%)
✅ Extracted 30 keyframes for rep 2
🔍 Rep 2: Incorrect Form (Confidence: 100.0%)
✅ Saved rep 3 (64 frames)
✅ Saved rep 4 (60 frames)
✅ Extracted 30 keyframes for rep 3
🔍 Rep 3: Incorrect Form (Confidence: 100.0%)
✅ Saved rep 5 (46 frames)
✅ Extracted 30 keyframes for rep 4
🔍 Rep 4: Incorrect Form (Confidence: 100.0%)
✅ Extracted 30 keyframes for rep 5
🔍 Rep 5: Incorrect Form (Confidence: 100.0%)

🔍 Final classification of all reps:

🤖 CNN Classification Results:
🔍 Rep 1: Incorrect Form (Confidence: 100.0%)
🔍 Rep 2: Incorrect Form (Confidence: 100.0%)
🔍 Rep 3: Incorrect Form (Confidence: 100.0

# why is it labelling incorrect with 100% confidence:

Looking at your code, there are several potential reasons why your CNN model is predicting everything as "Incorrect" with 100% confidence:

### **Most Likely Issues:**

#### 1. *Model-Data Mismatch*
Your model was likely trained on data with a different preprocessing pipeline or frame ordering than what you're using in the live system.

#### 2. *Frame Preprocessing Differences*


In [None]:
def preprocess_frame(self, frame: np.ndarray) -> np.ndarray:
    """Preprocess frame for CNN."""
    processed = cv2.resize(frame, self.config.frame_size)  # (112, 112)
    return (processed / 255.0).astype('float32')



**Check if your training data was preprocessed differently:**
- Different normalization (e.g., mean subtraction, different scaling)
- Different color space (RGB vs BGR)
- Different frame ordering or timing

#### 3. *Sequence Length Mismatch*


In [None]:
# Pad or truncate to target_frames if necessary
if len(processed_frames) < self.config.target_frames:
    # Repeat last frame to reach target
    while len(processed_frames) < self.config.target_frames:
        processed_frames.append(processed_frames[-1])



**This padding with repeated frames is problematic** - it creates artificial sequences that likely don't match your training data distribution.

#### 4. *Frame Quality/Content Issues*
Your keyframe extraction might be producing sequences that are fundamentally different from your training data in terms of:
- Frame transitions
- Pose angles captured
- Movement patterns

### **Quick Diagnostic Steps:**

#### 1. *Test with Known Good Data*


In [None]:
# Add this method to your CNNClassifier class
def test_with_training_data(self):
    """Test model with a sample from your original training dataset"""
    # Load a video you KNOW should be classified correctly
    # from your original training data and test it
    pass



### 2. *Check Model Labels*


In [None]:
# In your predict_rep_classification method, add debugging:
def predict_rep_classification(self, video_path: str, rep_number: int) -> Tuple[str, float]:
    # ... existing code ...
    
    # Add debugging
    print(f"Prediction raw output: {prediction}")
    print(f"Prediction shape: {prediction.shape}")
    print(f"Predicted class index: {predicted_class}")
    print(f"Class probabilities: {prediction[0]}")
    
    # ... rest of method



### 3. *Verify Frame Sequence*


In [None]:
# Add this debugging in extract_keyframes:
def extract_keyframes(self, video_path: str, rep_number: int) -> bool:
    # ... existing code ...
    
    # Debug the final frames
    print(f"Final frames count: {len(final_frames)}")
    print(f"Frame dimensions: {final_frames[0].shape if final_frames else 'None'}")
    
    # Save a sample frame to visually inspect
    if final_frames:
        cv2.imwrite(f"debug_frame_rep_{rep_number}.jpg", final_frames[len(final_frames)//2])
    
    # ... rest of method



## **Most Likely Fix:**

The issue is probably in the **frame padding/truncation logic**. Instead of repeating the last frame, try:



In [None]:
def predict_rep_classification(self, video_path: str, rep_number: int) -> Tuple[str, float]:
    if self.model is None:
        return "No Model Available", 0.0
        
    try:
        frames = self.load_video_frames(video_path)
        if not frames:
            return "Error: No frames loaded", 0.0
        
        # BETTER HANDLING: Only process if we have reasonable frame count
        if len(frames) < 20:  # Too few frames
            return "Insufficient Frames", 0.0
            
        # If too many frames, use intelligent subsampling instead of truncation
        if len(frames) > self.config.target_frames:
            indices = np.linspace(0, len(frames)-1, self.config.target_frames, dtype=int)
            frames = [frames[i] for i in indices]
        elif len(frames) < self.config.target_frames:
            # Instead of padding, return error or use what we have
            return "Frame Count Mismatch", 0.0
            
        # ... rest of preprocessing



**Check your training data preprocessing and ensure it matches exactly what you're doing in the live system!**

Notes to fix:

make Rep Counter pretty

add rolling window

add output folder for labelled stuff?

train model more?

no degree sign for cv.putText(not even \u00B0) - replaced with *

check accuracy of model
 - WTF WHY IS IT NOT WORKING
 - says everything is 100% incorrect for sm reason