In [2]:
import cv2
import mediapipe as mp
import numpy as np

In [3]:
def calculate_angle(a, b, c):
    a = np.array(a)  # First
    b = np.array(b)  # Mid
    c = np.array(c)  # End
    
    radians = np.arctan2(c[1]-b[1], c[0]-b[0]) - np.arctan2(a[1]-b[1], a[0]-b[0])
    angle = np.abs(radians*180.0/np.pi)
    
    if angle > 180.0:
        angle = 360 - angle
        
    return angle 

In [4]:
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose

In [5]:
def rescale_frame(frame, percent=50):
    width = int(frame.shape[1] * percent/ 100)
    height = int(frame.shape[0] * percent/ 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation =cv2.INTER_AREA)

In [8]:
# Initialize the webcam
cap = cv2.VideoCapture('../videos/IMG_0292.mp4')
# cap = cv2.VideoCapture(0)

# Squat counter variables
counter = 0
stage = None  # "down" or "up"

# Initialize Pose detection
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        
        if not ret:
            print("Ignoring empty camera frame.")
            break
        
        # Recolor the frame to RGB
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
      
        # Detect the pose
        results = pose.process(image)
        
        # Recolor back to BGR
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        # Extract landmarks
        try:
            landmarks = results.pose_landmarks.landmark
            
            # Get coordinates for the left hip, knee, and ankle
            hip = [landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x, landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y]
            knee = [landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y]
            ankle = [landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x, landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y]
            
            # Calculate the angle
            angle = calculate_angle(hip, knee, ankle)
            
            # Squat logic
            if angle > 160:  # Threshold for standing
                if stage == 'down':
                    counter += 1
                    print(counter)  # Print the squat count
                stage = 'up'
            if angle < 100:  # Threshold for squat
                stage = 'down'
            
        except:
            pass
        
        # Render detections
        mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)
        
        cv2.imshow('Squat Detection', image)

        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()

1
2
3
4
5
Ignoring empty camera frame.


In [1]:
import cv2
import sys
from ultralytics import YOLO
import traceback 

In [2]:
from ultralytics.utils.plotting import Annotator
import numpy as np

class Annotator_altered(Annotator):
    def __init__(self, im0,  *args, **kwargs): #Annotator(im0, line_width=2)
        super().__init__(im0,  *args, **kwargs)  # Call the constructor of the base class (if needed)
        #print("Subclass constructor called.")
        #print(self.im0)
  


    
    
    def draw_specific_points(self, keypoints, indices=[2, 5, 7], shape=(640, 640), radius=2):
        """
        Draw specific keypoints for gym steps counting.

        Args:
            keypoints (list): list of keypoints data to be plotted (required)
            indices (list): keypoints ids list to be plotted
            shape (tuple): imgsz for model inference
            radius (int): Keypoint radius value
        """
        for i, k in enumerate(keypoints):
            if i in indices:
                x_coord, y_coord = k[0], k[1]
                if x_coord % shape[1] != 0 and y_coord % shape[0] != 0:
                    if len(k) == 3:
                        conf = k[2]
                        if conf < 0.5: ## confidence value (if smaller skipp)
                            continue
                    cv2.circle(self.im, (int(x_coord), int(y_coord)), radius, (0, 255, 0), -1, lineType=cv2.LINE_AA)
        return self.im
    
    
    def plot_angle_and_count_and_stage(self, angle_text, count_text, stage_text, center_kpt, line_thickness=2):
        """
        Plot the pose angle, count value and step stage.

        Args:
            angle_text (str): angle value for workout monitoring (required)
            count_text (str): counts value for workout monitoring (required)
            stage_text (str): stage decision for workout monitoring (required)
            center_kpt (int): centroid pose index for workout monitoring (required)
            line_thickness (int): thickness for text display
        """
        angle_text, count_text, stage_text = (f" {angle_text:.0f}", f"Count : {count_text}", f" {stage_text}")
        font_scale = 0.3 + (line_thickness / 20.0) ##befroe it was 10
        draw_angle=False

        # Draw angle
        (angle_text_width, angle_text_height), _ = cv2.getTextSize(angle_text, 0, font_scale, line_thickness)
        #angle_text_position = (int(center_kpt[0]), int(center_kpt[1])) #tuple(np.multiply(elbow_r, [640, 480]).astype(int)) (maybe use this to adjust to output frame size)
        #print(center_kpt, self.im.shape[0],self.im.shape[1])
        angle_text_position = (int(center_kpt[0]), int(center_kpt[1])) #tuple(np.multiply(elbow_r, [640, 480]).astype(int)) (maybe use this to adjust to output frame size)

        angle_background_position = (angle_text_position[0], angle_text_position[1] - angle_text_height - 5)
        angle_background_size = (angle_text_width + 2 * 5, angle_text_height + 2 * 5 + (line_thickness * 2))
        
        if draw_angle:
            cv2.rectangle(
                self.im,
                angle_background_position,
                (
                    angle_background_position[0] + angle_background_size[0],
                    angle_background_position[1] + angle_background_size[1],
                ),
                (255, 255, 255),
                -1,
            )
            cv2.putText(self.im, angle_text, angle_text_position, 0, font_scale, (0, 0, 0), line_thickness)

       
        # Draw Counts
        (count_text_width, count_text_height), _ = cv2.getTextSize(count_text, 0, font_scale, line_thickness)
        count_text_position = (angle_text_position[0], angle_text_position[1] + angle_text_height + 10)
        
        count_background_position = (
            angle_background_position[0],
            angle_background_position[1] + angle_background_size[1] + 5,
        )  ## before 10,10
        count_background_size = (count_text_width + 3, count_text_height + 2 + (line_thickness * 1))

        cv2.rectangle(
            self.im,
            count_background_position,
            (
                count_background_position[0] + count_background_size[0],
                count_background_position[1] + count_background_size[1],
            ),
            (255, 255, 255),
            -1,
        )
        cv2.putText(self.im, count_text, count_text_position, 0, font_scale, (0, 0, 0), line_thickness)

        # Draw Stage
        (stage_text_width, stage_text_height), _ = cv2.getTextSize(stage_text, 0, font_scale, line_thickness)
        stage_text_position = (int(center_kpt[0]), int(center_kpt[1]) + angle_text_height + count_text_height + 40)
        stage_background_position = (stage_text_position[0], stage_text_position[1] - stage_text_height - 5)
        
        ##befroe 10,10
        stage_background_size = (stage_text_width + 3, stage_text_height + 5)

        cv2.rectangle(
            self.im,
            stage_background_position,
            (
                stage_background_position[0] + stage_background_size[0],
                stage_background_position[1] + stage_background_size[1],
            ),
            (255, 255, 255),
            -1,
        )
        cv2.putText(self.im, stage_text, stage_text_position, 0, font_scale, (0, 0, 0), line_thickness)


In [3]:
#https://docs.ultralytics.com/guides/workouts-monitoring/#real-world-applications
#https://github.com/ultralytics/ultralytics/blob/main/ultralytics/solutions/ai_gym.py

from ultralytics.utils.checks import check_imshow
from ultralytics.utils.plotting import Annotator

class ExcerciseCounter:
    """ class to manage multi-person excercise counting."""

    def __init__(self):
        """Initializes the class with default vals."""

        # Image and line thickness
        self.im0 = None
        self.tf = None

        # Keypoints and count information
        self.keypoints = None
        self.poseup_angle = None
        self.posedown_angle = None
        self.threshold = 0.001

        # Store stage, count and angle information
        self.angle_r = None #added
        self.angle_l = None #added
        self.count = None
        self.stage = None
        self.pose_type = "pushup"
        self.kpts_to_check_r = None ##added
        self.kpts_to_check_l = None ##addded

        # Visual Information
        self.view_img = False
        self.annotator = None

        # Check if environment support imshow
        self.env_check = check_imshow(warn=True)

    def set_args(
        self,
        kpts_to_check_l,
        kpts_to_check_r,
        line_thickness=0.1,
        view_img=False,
        pose_up_angle=145.0,
        pose_down_angle=90.0,
        pose_type="pullup",
    ):
        """
        Configures the line_thickness, save image and view image parameters.

        Args:
            kpts_to_check (list): 3 keypoints for counting
            line_thickness (int): Line thickness for bounding boxes.
            view_img (bool): display the im0
            pose_up_angle (float): Angle to set pose position up
            pose_down_angle (float): Angle to set pose position down
            pose_type (str): "pushup", "pullup" or "abworkout"
        """
        self.kpts_to_check_r = kpts_to_check_r ## ensure that its always ascending
        self.kpts_to_check_l = kpts_to_check_l ## ensure that its always ascending
        self.tf = line_thickness
        self.view_img = view_img
        self.poseup_angle = pose_up_angle
        self.posedown_angle = pose_down_angle
        self.pose_type = pose_type

    
    
    def start_counting(self, im0, results, frame_count):
        """
        Function used to count excercsie.

        Args:
            im0 (ndarray): Current frame from the video stream.
            results (list): Pose estimation data
            frame_count (int): store current frame count
        """
        self.im0 = im0
        
        ## if this is the first frame then initialize the objects (counter,angles,stages) with their respective lengths
        if frame_count == 1:
            self.count = [0] * len(results[0])
            self.angle_r = [0] * len(results[0]) #added
            self.angle_l = [0] * len(results[0]) #added
            self.stage = ["-" for _ in results[0]]  ## for each element (name is irrelevant thats why _ ) 
        self.keypoints = results[0].keypoints.data
        self.annotator = Annotator_altered(im0, line_width=1)  ### to write on image (image and width of the writing)

        num_keypoints = len(results[0]) ## amount of people working out  

        # Resize self.angle, self.count, and self.stage if the number of keypoints has changed
        if len(self.angle_r) != num_keypoints:
            self.angle_r = [0] * num_keypoints
            self.angle_l = [0] * num_keypoints
            self.count = [0] * num_keypoints
            self.stage = ["-" for _ in range(num_keypoints)]

        #print("Check 3")
        ## ind=index of person, k=keypoints of that person (we iterate as many times as persons there are )
        
        
        
        for ind, k in enumerate(reversed(self.keypoints)):
            
            
            ##### If pushup or pullup
            if self.pose_type in ["pushup", "pullup"]:
                self.angle_r[ind] = self.annotator.estimate_pose_angle(
                    k[int(self.kpts_to_check_r[0])].cpu(),  ## erster Keypoint 
                    k[int(self.kpts_to_check_r[1])].cpu(),  ## zweiter keypoint bsp. elbow
                    k[int(self.kpts_to_check_r[2])].cpu(), ## 3ter keypoints bspw. shoulder 
                )
                self.angle_l[ind] = self.annotator.estimate_pose_angle(
                    k[int(self.kpts_to_check_l[0])].cpu(),  ## erster Keypoint 
                    k[int(self.kpts_to_check_l[1])].cpu(),  ## zweiter keypoint bsp. elbow
                    k[int(self.kpts_to_check_l[2])].cpu(), ## 3ter keypoints bspw. shoulder 
                )
                
                
                ## schreibe den key point auf (hier wird jeder aufgeschrieben nicht nur der winkel vllt das ändern)
                ## das darunter auskommentiert wäre meine änderung 
                #self.im0 = self.annotator.draw_specific_points(k[1], self.kpts_to_check[1], shape=(640, 640), radius=10)
                self.im0 = self.annotator.draw_specific_points(k, self.kpts_to_check_l, shape=(640*0.5, 640*0.5), radius=1)
                self.im0 = self.annotator.draw_specific_points(k, self.kpts_to_check_r, shape=(640*0.5, 640*0.5), radius=1)

            
                ### Pushup counter logic 
                if self.pose_type == "pushup":
                    if self.angle[ind] > self.poseup_angle:
                        self.stage[ind] = "up"
                    if self.angle[ind] < self.posedown_angle and self.stage[ind] == "up":
                        self.stage[ind] = "down"
                        self.count[ind] += 1
                    
                    
                    self.annotator.plot_angle_and_count_and_stage(
                        angle_text=self.angle_l[ind],
                        count_text=self.count[ind],
                        stage_text=self.stage[ind],
                        center_kpt=k[int(self.kpts_to_check_l[1])],
                        line_thickness=self.tf,
                    )

                ### PullUp counter logic 
                #keypoints order (always ascending)== shoulder,elbow,wrist
                
                
                if self.pose_type == "pullup":
                    #print("pullup")
                    shoulder_l_y=k[int(self.kpts_to_check_l[0])][1].cpu()
                    wrist_l_y= k[int(self.kpts_to_check_l[2])][1].cpu()
                    shoulder_r_y=k[int(self.kpts_to_check_r[0])][1].cpu()
                    wrist_r_y= k[int(self.kpts_to_check_r[2])][1].cpu()
                    nose_y=k[0][1].cpu()
                    
                    
                    if ind==1:
                        print(f"angle_l:{self.angle_l[ind]},wrist_l:{wrist_l_y},should_y:{shoulder_l_y},nose:{nose_y} ")
                    
                    if self.angle_l[ind] > self.posedown_angle and self.angle_r[ind] > self.posedown_angle and wrist_l_y<shoulder_l_y and wrist_r_y<shoulder_r_y :
                        self.stage[ind] = "down"
                        if ind ==1:
                            print("down")
                    if self.angle_l[ind] < self.poseup_angle and self.angle_r[ind] < self.poseup_angle and self.stage[ind] == "down" and nose_y<wrist_r_y and nose_y<wrist_l_y :
                        self.stage[ind] = "up"
                        self.count[ind] += 1
                        if ind ==1:
                            print("up")
                    
                    ## anlge left 
                                        
                    self.annotator.plot_angle_and_count_and_stage(
                        angle_text=self.angle_l[ind],
                        count_text=self.count[ind],
                        stage_text=self.stage[ind],
                        center_kpt=k[int(self.kpts_to_check_l[1])],
                        line_thickness=self.tf,
                    )
                    #if ind==1:
                    #    print(f"self.angle_l[ind]:{self.angle_l[ind]},self.count[ind]:{self.count[ind]},self.stage[ind]:{self.stage[ind]}")
            
            
            #### If absworkout 
            if self.pose_type == "crunch":
                self.angle[ind] = self.annotator.estimate_pose_angle(
                    k[int(self.kpts_to_check[0])].cpu(),
                    k[int(self.kpts_to_check[1])].cpu(),
                    k[int(self.kpts_to_check[2])].cpu(),
                )
                self.im0 = self.annotator.draw_specific_points(k, self.kpts_to_check, shape=(640*0.5, 640*0.5), radius=1)
                
                if self.angle[ind] > self.poseup_angle:
                    self.stage[ind] = "down"
                if self.angle[ind] < self.posedown_angle and self.stage[ind] == "down":
                    self.stage[ind] = "up"
                    self.count[ind] += 1
                self.annotator.plot_angle_and_count_and_stage(
                    angle_text=self.angle[ind],
                    count_text=self.count[ind],
                    stage_text=self.stage[ind],
                    center_kpt=k[int(self.kpts_to_check[1])],
                    line_thickness=self.tf,
                )

            

            self.annotator.kpts(k, shape=(640*0.5, 640*0.5), radius=1, kpt_line=False)

       # if self.env_check and self.view_img:
        #    cv2.imshow("Ultralytics YOLOv8 AI GYM", self.im0)
        #    if cv2.waitKey(1) & 0xFF == ord("q"):
         #       return

        return self.im0


# if __name__ == "__main__":
#     ExcerciseCounter()

In [7]:
## https://www.youtube.com/watch?v=Y28xXQmju64

import cv2
import sys
from ultralytics import YOLO
import traceback 

#### Inputs

model = YOLO("yolov8n-pose.pt")  # path to model file
video_source_path='../videos/pull_up_girl.mp4'  #'Copy of C1_PullUp_short.mp4'

video_source_is_video=True
excercise_name="pullup"  
scale_factor_video=0.5
scale_factor_webcam=1
end_video_loop='q'


############################################


frame_count=0
counter=0

## set scaling factor according to input source 
if video_source_is_video:
    cap = cv2.VideoCapture(video_source_path)# path to video file or webcam (webcam==0)
    scale_factor = scale_factor_video ## to increase speed reduce size of outut frames 

else:
    cap = cv2.VideoCapture(0)# path to video file or webcam (webcam==0)
    scale_factor = scale_factor_webcam ## to increase speed reduce size of outut frames


    
    
if excercise_name=="pullup":
    kpts_to_check_values_l=[6, 8, 10] ##shoulder,elbow,wrist
    kpts_to_check_values_r=[5, 7, 9] ##shoulder,elbow,wrist
    print("test")
elif excercise_name=="pushup":
    kpts_to_check_values=[6, 6, 6]  ## tbd
elif excercise_name=="crunch":
    kpts_to_check_values=[6, 6, 6]  ## tbd


##############
gym_object = ExcerciseCounter()  # init AI GYM module
gym_object.set_args(line_thickness=1,
                    view_img=True,
                    pose_type=excercise_name, # TODO
                    kpts_to_check_l=kpts_to_check_values_l,
                    kpts_to_check_r=kpts_to_check_values_r,
                    pose_up_angle=70,
                    pose_down_angle=140,
                   )

########



assert cap.isOpened(), "Error reading video file"
w, h, fps = (int(cap.get(x)) for x in (cv2.CAP_PROP_FRAME_WIDTH, cv2.CAP_PROP_FRAME_HEIGHT, cv2.CAP_PROP_FPS))

if not cap.isOpened():
    print("Error reading video file")
    sys.exit()
try:
    while cap.isOpened():
        success, frame = cap.read()
        #frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        frame.flags.writeable = False
        if not success: #or (frame_count % process_every_n_frames != 0 and frame_count>1):
            print("Video frame is empty or video processing has been successfully completed.")
            break

        #frame_count+=1
        frame = cv2.resize(frame, (0, 0), fx=scale_factor, fy=scale_factor)
        results = model.predict(frame, verbose=False) ##predicts people including all boxes,etc
        
        #print(f"keypoints len: {results[0].keypoints.data[0]}")
        if results[0].keypoints.data.numel()==0:
            print("No perosn detected")
            continue
        
        #print(results[0].keypoints.data)
        frame.flags.writeable = True
        #print("Start Counting")
        frame_count+=1
        frame = gym_object.start_counting(frame, results, frame_count)

        cv2.imshow("Excercise counter active", frame)
        if cv2.waitKey(1) & 0xFF == ord(end_video_loop):
            break
except Exception as e:
    print(f"An error occurred: {traceback.format_exc()}")
finally:
    cap.release()
    cv2.destroyAllWindows()

test
angle_l:166.25630200352234,wrist_l:257.1437683105469,should_y:242.4915313720703,nose:206.8870849609375 
angle_l:168.6265552304352,wrist_l:263.4891052246094,should_y:243.43447875976562,nose:206.8480987548828 
angle_l:166.03258598744776,wrist_l:263.2064514160156,should_y:243.58804321289062,nose:206.91580200195312 
angle_l:168.53343243129123,wrist_l:264.06158447265625,should_y:243.62844848632812,nose:206.3764190673828 
angle_l:165.86797842845044,wrist_l:263.52215576171875,should_y:243.4121551513672,nose:206.2606658935547 
angle_l:163.70067744253535,wrist_l:261.6050720214844,should_y:242.97451782226562,nose:206.31687927246094 
angle_l:162.45547931532866,wrist_l:261.03125,should_y:243.0130615234375,nose:206.7503662109375 
angle_l:168.62127868193028,wrist_l:308.77056884765625,should_y:267.4705505371094,nose:250.6532745361328 
angle_l:170.50758568539104,wrist_l:306.4664611816406,should_y:267.310791015625,nose:250.583984375 
angle_l:163.28538828062025,wrist_l:260.7790222167969,should_y:24