In [6]:
pip install screeninfo

Collecting screeninfo
  Downloading screeninfo-0.8.1-py3-none-any.whl (12 kB)
Installing collected packages: screeninfo
Successfully installed screeninfo-0.8.1
Note: you may need to restart the kernel to use updated packages.


Final

In [2]:
import cv2
import mediapipe as mp
import numpy as np
import pyttsx3
import threading
import time
from screeninfo import get_monitors 

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# Initialize text-to-speech engine
engine = pyttsx3.init()
engine.setProperty('rate', 150)

# Create a queue for TTS messages
last_spoken_time = 0
last_spoken_message = ""
SPEECH_COOLDOWN = 3

def speak_feedback(message):
    global last_spoken_time, last_spoken_message
    current_time = time.time()
    if (current_time - last_spoken_time >= SPEECH_COOLDOWN and 
        message != last_spoken_message):
        def speak():
            engine.say(message)
            engine.runAndWait()
            
        thread = threading.Thread(target=speak, daemon=True)
        thread.start()
        last_spoken_time = current_time
        last_spoken_message = message

def draw_angle_marker(image, point, is_correct, error_counter=0, size=10):
    """Draw a colored dot indicating if angle is within correct range, only red if error threshold exceeded"""
    pixel = tuple(np.multiply(point, [image.shape[1], image.shape[0]]).astype(int))
    # Only show red if error count exceeds threshold, otherwise show green
    color = (0, 255, 0) if (is_correct or error_counter <= error_threshold) else (0, 0, 255)
    cv2.circle(image, pixel, size, color, -1)

def draw_knee_warning(image, knee_point, error_counter=0, size=15):
    """Draw a red circle at knee point only if error threshold exceeded"""
    if error_counter > error_threshold:
        pixel = tuple(np.multiply(knee_point, [image.shape[1], image.shape[0]]).astype(int))
        cv2.circle(image, pixel, size, (0, 0, 255), -1)

def put_text_with_background(image, text, position, font, scale, text_color, thickness, bg_color=(0,0,0)):
    (text_width, text_height), baseline = cv2.getTextSize(text, font, scale, thickness)
    padding = 10
    bg_x1 = position[0] - padding
    bg_y1 = position[1] - text_height - padding
    bg_x2 = position[0] + text_width + padding
    bg_y2 = position[1] + padding
    
    overlay = image.copy()
    cv2.rectangle(overlay, (bg_x1, bg_y1), (bg_x2, bg_y2), bg_color, -1)
    cv2.addWeighted(overlay, 0.5, image, 0.5, 0, image)
    cv2.putText(image, text, position, font, scale, text_color, thickness, cv2.LINE_AA)

# Create error frame counters for each type of error
error_frames_counter = {
    'shoulder': 0,
    'hip': 0,
    'knee': 0,
    'ankle': 0
}

# Reset function for error counters
def reset_error_counters():
    for key in error_frames_counter:
        error_frames_counter[key] = 0

# Modified feedback function that checks error threshold      
def provide_feedback(image, error_type, message, position, counter_key):
    """Provide feedback and mark that feedback was shown"""
    global feedback_shown_in_round
    if error_frames_counter[counter_key] > error_threshold:
        put_text_with_background(image, message, position, cv2.FONT_HERSHEY_SIMPLEX, 
                               1, (255, 255, 255), 2, bg_color=(0, 0, 255))
        speak_feedback(message)
        feedback_shown_in_round = True 


def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    return round(angle, 2)

def calculate_angle2(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    
    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)
    angle = 360 - angle
    return round(angle, 2)

round_statuses = {}  # To store if each round was correct/incorrect
incorrect_rounds_count = 0
        
error_tracking = {}  # Track errors for each round
error_threshold = 30  # Number of frames with errors before marking round as incorrect (around 30 frames per second)
        
def update_round_status(round_num, has_feedback):
    """Update the status of a completed round based on whether feedback was shown"""
    global round_statuses, incorrect_rounds_count
    
    if has_feedback:
        round_statuses[round_num] = "INCORRECT"
        incorrect_rounds_count += 1
    else:
        round_statuses[round_num] = "CORRECT"

# In the main loop, add a flag to track if feedback was shown in current round
feedback_shown_in_round = False
    
def draw_angle_arc(image, center_point, start_point, angle, radius=50):
    """Draw an arc to visualize an angle"""
    # Convert points to pixel coordinates
    height, width = image.shape[:2]
    center = tuple(np.multiply(center_point, [width, height]).astype(int))
    start = tuple(np.multiply(start_point, [width, height]).astype(int))
    
    # Calculate start angle
    start_angle = np.arctan2(start[1] - center[1], start[0] - center[0])
    
    # Convert angle to radians
    angle_rad = np.deg2rad(angle)
    
    # Calculate end angle
    end_angle = start_angle + angle_rad
    
    # Draw the arc
    cv2.ellipse(image, center, (radius, radius), 
                np.rad2deg(start_angle), 0, np.rad2deg(angle_rad), 
                (255, 255, 0), 2)
    
    # Add angle text
    text_pos = (center[0] - 20, center[1] - radius - 10)
    cv2.putText(image, f"{angle:.1f}", text_pos,
                cv2.FONT_HERSHEY_SIMPLEX, 0.6, (255, 255, 0), 2)

cap = cv2.VideoCapture("./Deadlift_edited2.mp4")
# cap = cv2.VideoCapture(0)


monitor = get_monitors()[0]
cap.set(cv2.CAP_PROP_FRAME_WIDTH,  monitor.width)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, monitor.height)

fps = cap.get(cv2.CAP_PROP_FPS)

if not cap.isOpened():
    print("Error: Could not open video feed.")
    exit()

round_count = 0
previous_stage = None
current_stage = None
in_progress = False

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            print("End of video or error reading frame.")
            break

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        height, width, _ = image.shape
        results = pose.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        try:
            landmarks = results.pose_landmarks.landmark
            
            # Get landmarks
            left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x,
                           landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y]
            left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].x,
                         landmarks[mp_pose.PoseLandmark.LEFT_ELBOW.value].y]
            left_hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x,
                       landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
            left_knee = [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x,
                        landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
            left_ankle = [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x,
                         landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]

            right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].x,
                            landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER.value].y]
            right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].x,
                          landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW.value].y]
            right_hip = [landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].x,
                        landmarks[mp_pose.PoseLandmark.RIGHT_HIP.value].y]
            right_knee = [landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].x,
                         landmarks[mp_pose.PoseLandmark.RIGHT_KNEE.value].y]
            right_ankle = [landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].x,
                          landmarks[mp_pose.PoseLandmark.RIGHT_ANKLE.value].y]

            # Calculate centers
            center_shoulder = [(left_shoulder[0] + right_shoulder[0]) / 2, 
                             (left_shoulder[1] + right_shoulder[1]) / 2]
            center_hip = [(left_hip[0] + right_hip[0]) / 2, 
                         (left_hip[1] + right_hip[1]) / 2]
            neck = [(center_shoulder[0] + landmarks[mp_pose.PoseLandmark.NOSE.value].x) / 2,
                   (center_shoulder[1] + landmarks[mp_pose.PoseLandmark.NOSE.value].y) / 2]
            back_point = [(neck[0] + center_hip[0]) / 2, (neck[1] + center_hip[1]) / 2]
            
            # Adjusted ankle points
            moved_right_ankle = [right_ankle[0] - 0.1, right_ankle[1]]
            moved_left_ankle = [left_ankle[0] - 0.1, left_ankle[1]]

            # Calculate angles
            left_shoulder_angle = calculate_angle(left_elbow, left_shoulder, left_hip)
            left_hip_angle = calculate_angle2(left_shoulder, left_hip, left_knee)
            left_knee_angle = calculate_angle(left_hip, left_knee, left_ankle)
            left_trunk_angle = calculate_angle(left_shoulder, left_hip, [left_hip[0], left_hip[1] - 0.2])
            left_ankle_angle = calculate_angle2(left_knee, left_ankle, moved_left_ankle)

            right_shoulder_angle = calculate_angle(right_elbow, right_shoulder, right_hip)
            right_hip_angle = calculate_angle2(right_shoulder, right_hip, right_knee)
            right_knee_angle = calculate_angle(right_hip, right_knee, right_ankle)
            right_trunk_angle = calculate_angle(right_shoulder, right_hip, [right_hip[0], right_hip[1] - 0.2])
            right_ankle_angle = calculate_angle2(right_knee, right_ankle, moved_right_ankle)

            back_angle = calculate_angle(neck, back_point, center_hip)
            
            # Draw angle markers based on conditions
            avg_trunk_angle = (left_trunk_angle + right_trunk_angle) / 2
            
            # Visualize angles with colored dots
            if avg_trunk_angle > 50:  # Set Up stage
                shoulder_correct = (left_shoulder_angle < 60 and right_shoulder_angle < 60)
                hip_correct = (left_hip_angle > 67 and right_hip_angle > 67)
                knee_correct = (left_knee_angle > 125 and right_knee_angle > 125)
                ankle_correct = (left_ankle_angle > 80 and right_ankle_angle > 80)
            
            elif 20 < avg_trunk_angle <= 50:  # Lifting stage
                shoulder_correct = (left_shoulder_angle < 25 and right_shoulder_angle < 25)
                hip_correct = True 
                knee_correct = True 
                ankle_correct = (left_ankle_angle > 80 and right_ankle_angle > 80)
            
            else:  # Lock Out stage
                shoulder_correct = (left_shoulder_angle < 17 and right_shoulder_angle < 17)
                hip_correct = (left_hip_angle < 190 and right_hip_angle < 190)
                knee_correct = True  
                ankle_correct = (left_ankle_angle > 80 and right_ankle_angle > 80)

            # Draw the angle markers
            draw_angle_marker(image, left_shoulder, shoulder_correct, error_frames_counter['shoulder'])
            draw_angle_marker(image, right_shoulder, shoulder_correct, error_frames_counter['shoulder'])
            draw_angle_marker(image, left_hip, hip_correct, error_frames_counter['hip'])
            draw_angle_marker(image, right_hip, hip_correct, error_frames_counter['hip'])
            draw_angle_marker(image, left_knee, knee_correct, error_frames_counter['knee'])
            draw_angle_marker(image, right_knee, knee_correct, error_frames_counter['knee'])
            draw_angle_marker(image, left_ankle, ankle_correct, error_frames_counter['ankle'])
            draw_angle_marker(image, right_ankle, ankle_correct, error_frames_counter['ankle'])

            # Determine stage and provide feedback
            if avg_trunk_angle > 50:  # Set Up stage
                current_stage = "Set Up"
                if not shoulder_correct:
                    error_frames_counter['shoulder'] += 1
                    provide_feedback(image, "shoulder", "Keep bar near legs", (50, 300), 'shoulder')
                else:
                    error_frames_counter['shoulder'] = 0
                    
                if not hip_correct:
                    error_frames_counter['hip'] += 1
                    provide_feedback(image, "hip", "Lift hips higher", (50, 350), 'hip')
                else:
                    error_frames_counter['hip'] = 0
                    
                if not knee_correct:
                    error_frames_counter['knee'] += 1
                    provide_feedback(image, "knee", "Keep knees behind the bar", (50, 400), 'knee')
                else:
                    error_frames_counter['knee'] = 0
                    
                if not ankle_correct:
                    error_frames_counter['ankle'] += 1
                    provide_feedback(image, "ankle", "Pull knees back", (50, 450), 'ankle')
                    draw_knee_warning(image, left_knee, error_frames_counter['ankle'])
                    draw_knee_warning(image, right_knee, error_frames_counter['ankle'])
                else:
                    error_frames_counter['ankle'] = 0

            elif 20 < avg_trunk_angle <= 50:  # Lifting stage
                current_stage = "Lifting"
                if not shoulder_correct:
                    error_frames_counter['shoulder'] += 1
                    provide_feedback(image, "shoulder", "Keep bar near legs", (50, 300), 'shoulder')
                else:
                    error_frames_counter['shoulder'] = 0
                    
                if not ankle_correct:
                    error_frames_counter['ankle'] += 1
                    provide_feedback(image, "ankle", "Pull knees back", (50, 350), 'ankle')
                    draw_knee_warning(image, left_knee, error_frames_counter['ankle'])
                    draw_knee_warning(image, right_knee, error_frames_counter['ankle'])
                else:
                    error_frames_counter['ankle'] = 0

            else:  # Lock Out stage
                current_stage = "Lock Out"
                if not shoulder_correct:
                    error_frames_counter['shoulder'] += 1
                    provide_feedback(image, "shoulder", "Keep bar near legs", (50, 300), 'shoulder')
                else:
                    error_frames_counter['shoulder'] = 0
                    
                if not hip_correct:
                    error_frames_counter['hip'] += 1
                    provide_feedback(image, "hip", "Don't push your hips forward", (50, 350), 'hip')
                else:
                    error_frames_counter['hip'] = 0
                    
                if not ankle_correct:
                    error_frames_counter['ankle'] += 1
                    provide_feedback(image, "ankle", "Pull knees back", (50, 400), 'ankle')
                    draw_knee_warning(image, left_knee, error_frames_counter['ankle'])
                    draw_knee_warning(image, right_knee, error_frames_counter['ankle'])
                else:
                    error_frames_counter['ankle'] = 0
            
            # Reset error counters when stage changes
            if current_stage != previous_stage:
                reset_error_counters() 


            # Draw back alignment visualization
            neck_pixel = tuple(np.multiply(neck, [width, height]).astype(int))
            back_pixel = tuple(np.multiply(back_point, [width, height]).astype(int))
            hip_pixel = tuple(np.multiply(center_hip, [width, height]).astype(int))
            
            cv2.circle(image, tuple(np.multiply(neck, [width, height]).astype(int)), 5, (255, 0, 0), -1)
            cv2.circle(image, tuple(np.multiply(back_point, [width, height]).astype(int)), 5, (255, 0, 0), -1)
            cv2.circle(image, tuple(np.multiply(center_hip, [width, height]).astype(int)), 5, (255, 0, 0), -1)

            cv2.line(image, neck_pixel, back_pixel, (255, 0, 0), 2)
            cv2.line(image, back_pixel, hip_pixel, (255, 0, 0), 2)

            # Track if current round has any errors                
            current_round_has_errors = False
            if not shoulder_correct or not hip_correct or not knee_correct or not ankle_correct:
                current_round_has_errors = True

            # Update round statistics when a round completes            
            if previous_stage == "Set Up" and current_stage != "Set Up":
                in_progress = True
                # Don't reset feedback flag here anymore
            elif in_progress and current_stage == "Set Up":
                round_count += 1
                # Pass whether feedback was shown to determine round status
                update_round_status(round_count, feedback_shown_in_round)
                # Reset for next round
                feedback_shown_in_round = False
                reset_error_counters()
                in_progress = False

            # Initialize counts
            correct_count = sum(1 for status in round_statuses.values() if status == "CORRECT")
            incorrect_count = sum(1 for status in round_statuses.values() if status == "INCORRECT")
            
            # Display round status
            y_offset = 200
            summary_text_correct = f"Correct: {correct_count}"
            summary_text_incorrect = f"Incorrect: {incorrect_count}"
            
            # Only display the status count information
            put_text_with_background(image, summary_text_correct, (50, y_offset), 
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2,
                                   bg_color=(0, 255, 0))
            y_offset += 50
            put_text_with_background(image, summary_text_incorrect, (50, y_offset), 
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2,
                                   bg_color=(0, 0, 255))

            # Display stage and round
            put_text_with_background(image, f"Stage: {current_stage}", (50, 100), 
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, 
                                   bg_color=(0, 0, 0))
            put_text_with_background(image, f"Round: {round_count}", (50, 150), 
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, 
                                   bg_color=(0, 0, 0))
            put_text_with_background(image, "Always keep back aligned with blue line", (50, 50), 
                                   cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, 
                                   bg_color=(255, 0, 0))
            put_text_with_background(image, f"Video FPS: {round(fps)}", (50, height - 50), 
                       cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, 
                       bg_color=(0, 0, 0))
            
            previous_stage = current_stage
            

        except:
            pass
        
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
                                mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                                mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2))               
        
        cv2.imshow('Mediapipe Feed', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

End of video or error reading frame.
