In [1]:
import cv2
import numpy as np
import mediapipe as mp
import logging
from math import sqrt
import time  # Import time module to manage timing

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose


In [2]:
from datetime import datetime


def calculate_angle(a, b, c):
    """
    b is the midpoint of a and c (e.g. left hip, left elbow and left shoulder)
    our case will be left-hip, left-knee and left-ankle
    """

    a = np.array(a)  # First
    b = np.array(b)  # Mid
    c = np.array(c)  # End

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(
        a[1] - b[1], a[0] - b[0]
    )
    angle = np.abs(radians * 180.0 / np.pi)
    
    if angle > 180.0:
        angle = 360 - angle

    return angle


import datetime


def to_timestamp(time_float):
    return datetime.datetime.fromtimestamp(time_float).strftime("%Y-%m-%d %H:%M:%S,%f")


def calculate_distance(p1, p2):
    return sqrt((p2[0] - p1[0]) ** 2 + (p2[1] - p1[1]) ** 2)


def should_start_timer(
    keypoints,
    hip_displacement,
    knee_displacement,
    ankle_displacement,
    threshold_angle=80.0,
    hip_thresh=3.0,
    knee_thresh=3.0,
    ankle_thresh=1.5,
):
    """
    Determine if the timer should start based on displacement conditions.
    """
    # calculate angle between hip and shoulder and knee
    knee, hip, shoulder = keypoints
    angle = calculate_angle(knee, hip, shoulder)
    print(f"knee hip shoulder angle: {angle:.2f} degrees")
    return angle < threshold_angle and (
        hip_displacement > hip_thresh
        or knee_displacement > knee_thresh
        or ankle_displacement > ankle_thresh
    )


def get_landmark_coordinates(landmarks, frame_width, frame_height):
    hip = [
        landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].x * frame_width,
        landmarks[mp_pose.PoseLandmark.LEFT_HIP.value].y * frame_height,
    ]
    knee = [
        landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].x * frame_width,
        landmarks[mp_pose.PoseLandmark.LEFT_KNEE.value].y * frame_height,
    ]
    ankle = [
        landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].x * frame_width,
        landmarks[mp_pose.PoseLandmark.LEFT_ANKLE.value].y * frame_height,
    ]
    shoulder = [
        landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].x * frame_width,
        landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER.value].y * frame_height,
    ]
    return hip, knee, ankle, shoulder


def determine_failure(elapsed_time, threshold_time=12):
    """
    Determine if the user has failed based on the elapsed time.
    """
    # convert elapsed time to seconds
    logging.info(f"Elapsed time: {elapsed_time}")
    return "FAILED" if elapsed_time > threshold_time else "PASSED"


def summarise_results(
    counter,
    elapsed_time,
    rep_durations,
    joint_displacement_map=None,
    joint_velocity_map=None,
):

    logging.info(
        "========================= Video processing summary ========================="
    )
    logging.info(f"Pass status: {determine_failure(elapsed_time)}")
    logging.info(
        f"Total repetitions: {counter} completed in {elapsed_time:.2f} seconds"
    )
    if rep_durations:
        for reps in rep_durations:
            logging.info(f"Duration: {reps:.2f} seconds")
        logging.info(
            f"Maximum duration per repetition: {max(rep_durations):.2f} seconds"
        )
        logging.info(
            f"Average duration per repetition: {sum(rep_durations) / len(rep_durations):.2f} seconds"
        )


def display_knee_and_hip_angle(image, knee_angle, knee, hip_angle, hip, x_displacement=60):
    """
    Display the knee angle on the screen. 60px to the right of the knee
    """
    cv2.putText(
        image,
        f"{knee_angle} deg",
        (int(knee[0] + x_displacement), int(knee[1])),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.5,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )
    cv2.putText(
        image,
        f"{hip_angle} deg",
        (int(hip[0] + x_displacement), int(hip[1])),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.5,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )


def draw_joint_displacement(prev_point, curr_point, image):
    prev_x, prev_y = map(int, prev_point)
    curr_x, curr_y = map(int, curr_point)
    cv2.circle(image, (curr_x, curr_y), 10, (0, 0, 255), -1)
    cv2.line(image, (prev_x, prev_y), (curr_x, curr_y), (0, 255, 0), 2)


def display_x_and_y_from_point(point):
    return f"[x: {point[0]}, y: {point[1]}]"


def calculate_and_draw_joint_displacement(
    prev_frame, prev_points, curr_points, image, joint_displacement_history, real_time
):
    logging.info(
        "========================= Optical flow results ========================="
    )
    joint_names = ("HIP", "KNEE", "ANKLE")  # Names of the joints
    prev_hip, prev_knee, prev_ankle = prev_points
    hip, knee, ankle = curr_points
    if prev_frame is not None:
        for prev_point, curr_point, _, joint_name in zip(
            [prev_hip, prev_knee, prev_ankle],
            [hip, knee, ankle],
            [(0, 0, 255)] * 3,
            joint_names,
        ):
            draw_joint_displacement((prev_point), curr_point, image)
            displacement = calculate_distance(prev_point, curr_point)
            logging.info(
                f"{joint_name} Displacement: {displacement:.2f} px (from {display_x_and_y_from_point(prev_point)} to {display_x_and_y_from_point(curr_point)})"
            )
            joint_displacement_history[joint_name].append((real_time, displacement))
    return joint_displacement_history


def calculate_and_store_velocities(
    hip_displacement,
    knee_displacement,
    ankle_displacement,
    frame_time,
    joint_displacement,
    joint_velocity_history,
):
    hip_velocity = hip_displacement / frame_time
    knee_velocity = knee_displacement / frame_time
    ankle_velocity = ankle_displacement / frame_time
    logging.info(
        f"Hip Velocity: {hip_velocity:.2f} px/s, Knee Velocity: {knee_velocity:.2f} px/s, Ankle Velocity: {ankle_velocity:.2f} px/s"
    )
    joint_velocity_history["HIP"].append((to_timestamp(time.time()), hip_velocity))
    joint_velocity_history["KNEE"].append((to_timestamp(time.time()), knee_velocity))
    joint_velocity_history["ANKLE"].append((to_timestamp(time.time()), ankle_velocity))

    return joint_velocity_history


def display_information(image, counter, stage, max_angle):
    """
    Display the number of repetitions on the screen.
    """
    # Setup status box
    cv2.rectangle(image, (0, 0), (550, 100), (245, 117, 16), -1)

    # Rep data
    cv2.putText(
        image,
        "REPS",
        (15, 12),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.5,
        (0, 0, 0),
        1,
        cv2.LINE_AA,
    )
    cv2.putText(
        image,
        str(counter),
        (10, 60),
        cv2.FONT_HERSHEY_SIMPLEX,
        2,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )

    # Stage data
    cv2.putText(
        image,
        "STAGE",
        (65, 12),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.5,
        (0, 0, 0),
        1,
        cv2.LINE_AA,
    )
    cv2.putText(
        image,
        stage,
        (60, 60),
        cv2.FONT_HERSHEY_SIMPLEX,
        2,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )
    # display max_angle per rep
    # display_max_angle(image, max_angle)

    cv2.putText(
        image,
        "MAX ANGLE",
        (300, 12),
        cv2.FONT_HERSHEY_SIMPLEX,
        0.5,
        (0, 0, 0),
        1,
        cv2.LINE_AA,
    )
    cv2.putText(
        image,
        f"{max_angle:.2f}",
        (250, 60),
        cv2.FONT_HERSHEY_SIMPLEX,
        2,
        (255, 255, 255),
        2,
    )


def display_timer(image, elapsed_time):
    """
    Display the elapsed time on the screen.
    """
    cv2.putText(
        image,
        f"Time: {elapsed_time:.2f} s",
        (800, 60),
        cv2.FONT_HERSHEY_SIMPLEX,
        1,
        (255, 255, 255),
        2,
        cv2.LINE_AA,
    )


def draw_landmarks_and_connections(image, results):
    """
    Draw the landmarks and connections on the image.
    """
    mp_drawing.draw_landmarks(
        image,
        results.pose_landmarks,
        mp_pose.POSE_CONNECTIONS,
        mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
        mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2),
    )


def get_real_time_from_frames(frame_counter, fps):
    return frame_counter / fps

In [6]:
import numpy as np
import cv2
import logging
import time
# Initialize variables for counter logic
counter = 0
stage = None
confirm_frames = 5
stage_counter = 0
max_angle_per_rep = 0
last_angle = 0
up_stage_threshold_angle = 135
down_stage_threshold_angle = 105

# Initialize variables for optical flow
prev_frame = None
prev_hip = None
prev_knee = None
prev_ankle = None

# for optical flow postprocessing
joint_history = {
    "HIP": [],
    "KNEE": [],
    "ANKLE": [],
}

joint_displacement_history = {
    "HIP": [],
    "KNEE": [],
    "ANKLE": [],
}
joint_velocity_history = {
    "HIP": [],
    "KNEE": [],
    "ANKLE": [],
}

max_angles = []
frames_after_start = 0
frame_rate = None
frame_time = None
timer_started = False
start_time = None
elapsed_time = 0
finished = False
rep_durations = []  # List to store the duration of each repetition
def get_real_time_from_frames(frame_count, fps):
    return frame_count / fps


def draw_keypoints_and_vectors(image, prev_points, current_points):
    for (x_new, y_new), (x_old, y_old) in zip(current_points, prev_points):
        # Draw current point
        cv2.circle(image, (int(x_new), int(y_new)), 5, (0, 255, 0), -1)

        # Draw line from previous point to current point
        cv2.line(image, (int(x_old), int(y_old)), (int(x_new), int(y_new)), (0, 255, 255), 2)


# Lucas-Kanade optical flow parameters
lk_params = dict(winSize=(21, 21), maxLevel=2,
                 criteria=(cv2.TERM_CRITERIA_EPS | cv2.TERM_CRITERIA_COUNT, 10, 0.03))
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    cap = cv2.VideoCapture("../test/CST_MF1.mp4")
    ret, prev_frame = cap.read()
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    results = pose.process(cv2.cvtColor(prev_frame, cv2.COLOR_BGR2RGB))
    prev_hip, prev_knee, prev_ankle, prev_shoulder = get_landmark_coordinates(results.pose_landmarks.landmark, prev_frame.shape[1], prev_frame.shape[0])
    prev_points = np.array([prev_hip, prev_knee, prev_ankle, prev_shoulder], dtype=np.float32).reshape(-1, 2)

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        next_points, st, err = cv2.calcOpticalFlowPyrLK(prev_gray, gray, prev_points, None)

        if next_points is not None and st.all() == 1:
            current_points = next_points.reshape(-1, 2)
        else:
            results = pose.process(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))
            current_points = np.array(get_landmark_coordinates(results.pose_landmarks.landmark, frame.shape[1], frame.shape[0]), dtype=np.float32).reshape(-1, 2)

        # Draw the keypoints and their motion vectors
        draw_keypoints_and_vectors(frame, prev_points, current_points)

        # Update for next iteration
        prev_gray = gray.copy()
        prev_points = current_points.copy()

        # Display the processed frame
        cv2.imshow("Sit Stand Test", frame)
        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()

I0000 00:00:1718040775.756586 12553107 gl_context.cc:357] GL version: 2.1 (2.1 ATI-5.2.4), renderer: AMD Radeon Pro 5500M OpenGL Engine


: 