In [None]:
import cv2
import mediapipe as mp
import numpy as np
import threading


In [None]:
mp_drawing = mp.solutions.drawing_utils
mp_drawing_styles = mp.solutions.drawing_styles
mp_pose = mp.solutions.pose


In [None]:
R_SHOULDER = mp_pose.PoseLandmark.RIGHT_SHOULDER.value
R_HIP = mp_pose.PoseLandmark.RIGHT_HIP.value
R_KNEE = mp_pose.PoseLandmark.RIGHT_KNEE.value


In [None]:
POSE_OVERLAY = True


In [None]:
def get_xy_from_landmark(lndmrk):
    return [lndmrk.x, lndmrk.y]


def calculate_angle(left, mid, right):
    a = np.array(left)
    b = np.array(mid)
    c = np.array(right)

    ba = a - b
    bc = c - b

    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(cosine_angle)

    return np.degrees(angle)


In [None]:
class RepCounter:
    state = None
    reps = 0
    started = False
    remaining_time = 60
    t = None

    def decrementSec(self):
        # recursively countdown every second
        self.t = threading.Timer(1.0, self.decrementSec).start()
        self.remaining_time -= 1

    def update(self, angle):
        if angle > 110:
            self.state = "down"
        if angle < 40 and self.state == "down":
            if not self.started:
                self.started = True
                self.decrementSec()
            self.reps += 1
            self.state = "up"


In [None]:
rep_counter = RepCounter()

vid = "demo/howcast_pushup.mp4"
# use 0 for webcam input
cap = cv2.VideoCapture(vid)

# use default options
with mp_pose.Pose() as pose:
    # main loop
    while cap.isOpened():
        ret, frame = cap.read()

        if not ret:
            print("Can't receive frame (stream end?). Exiting ...")
            # If loading a video, use 'break' instead of 'continue'.
            break

        height, width, c = frame.shape

        # To improve performance, optionally mark the image as not writeable to
        # pass by reference.
        frame.flags.writeable = False
        results = pose.process(frame)
        frame.flags.writeable = True

        # results might be None
        try:
            # get landmarks
            landmarks = results.pose_landmarks.landmark
            r_shoulder_xy = get_xy_from_landmark(landmarks[R_SHOULDER])
            r_hip_xy = get_xy_from_landmark(landmarks[R_HIP])
            r_knee_xy = get_xy_from_landmark(landmarks[R_KNEE])

            situp_angle = calculate_angle(r_shoulder_xy, r_hip_xy, r_knee_xy)

            rep_counter.update(situp_angle)

            # draw angle at hip
            if POSE_OVERLAY:
                cv2.putText(
                    frame,
                    str(int(situp_angle)),
                    tuple(np.multiply(r_hip_xy, [width, height]).astype(int)),
                    cv2.FONT_HERSHEY_PLAIN,
                    3,
                    (255, 255, 0),
                    3,
                )
        except:
            continue

        # draw pose landmarks
        if POSE_OVERLAY:
            mp_drawing.draw_landmarks(
                frame,
                results.pose_landmarks,
                mp_pose.POSE_CONNECTIONS,
                landmark_drawing_spec=mp_drawing_styles.get_default_pose_landmarks_style(),
            )

        # draw rep count
        cv2.putText(
            frame,
            str(rep_counter.reps),
            (50, 50),
            cv2.FONT_HERSHEY_PLAIN,
            3,
            (0, 255, 0),
            3,
        )

        # draw curent state
        cv2.putText(
            frame,
            rep_counter.state,
            (100, 50),
            cv2.FONT_HERSHEY_PLAIN,
            3,
            (0, 255, 0),
            3,
        )

        # draw countdown
        cv2.putText(
            frame,
            str(rep_counter.remaining_time),
            (250, 50),
            cv2.FONT_HERSHEY_PLAIN,
            3,
            (0, 255, 0),
            3,
        )

        cv2.imshow("webcam", frame)

        if cv2.waitKey(5) & 0xFF == ord("q") or rep_counter.remaining_time <= 0:
            break


cap.release()
cv2.destroyAllWindows()
for i in range(1, 5):
    cv2.waitKey(1)

rep_counter.reps
