In [None]:
# Run the mp_setup.sh bash file (I wasnt sure if we wanted to keep using a notebook)
# Will install mediapipe to the global env

In [None]:
#@markdown To better demonstrate the Pose Landmarker API, we have created a set of visualization tools that will be used in this colab. These will draw the landmarks on a detect person, as well as the expected connections between those markers.

from mediapipe import solutions
from mediapipe.framework.formats import landmark_pb2
import numpy as np


def draw_landmarks_on_image(rgb_image, detection_result):
  pose_landmarks_list = detection_result.pose_landmarks
  annotated_image = np.copy(rgb_image)

  # Loop through the detected poses to visualize.
  for idx in range(len(pose_landmarks_list)):
    pose_landmarks = pose_landmarks_list[idx]

    # Draw the pose landmarks.
    pose_landmarks_proto = landmark_pb2.NormalizedLandmarkList()
    pose_landmarks_proto.landmark.extend([
      landmark_pb2.NormalizedLandmark(x=landmark.x, y=landmark.y, z=landmark.z) for landmark in pose_landmarks
    ])
    solutions.drawing_utils.draw_landmarks(
      annotated_image,
      pose_landmarks_proto,
      solutions.pose.POSE_CONNECTIONS,
      solutions.drawing_styles.get_default_pose_landmarks_style())
  return annotated_image


In [None]:
import cv2
import matplotlib.pyplot as plt


def cv2_imshow(image):
    """
    Display an image using matplotlib in Jupyter Notebook.

    Args:
        image: The image to display. Should be in BGR format (OpenCV default).
    """
    # Convert the image from BGR (OpenCV default) to RGB (matplotlib default)
    img_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

    # Display the image using matplotlib
    plt.imshow(img_rgb)
    plt.axis('off')  # Hide the axis
    plt.show()

girl_img = cv2.imread("sample-frames/mp_example.png")
cv2_imshow(girl_img)

### **Media Pipeline**

In [None]:
import mediapipe as mp
from mediapipe.tasks import python
from mediapipe.tasks.python import vision

def mp_init_detector(model_path='mp-model/pose_landmarker.task'):
    # STEP 2: Create a PoseLandmarker object.
    base_options = python.BaseOptions(model_asset_path=model_path)
    options = vision.PoseLandmarkerOptions(
        base_options=base_options,
        output_segmentation_masks=True)
    return vision.PoseLandmarker.create_from_options(options)

def mp_process_image(detector, mp_img):
    # Detect features in image
    detection_result = detector.detect(mp_img)

    # Associate enums with string values
    body_parts_dict = {
        'nose': mp.solutions.pose.PoseLandmark.NOSE,
        'left_eye_inner': mp.solutions.pose.PoseLandmark.LEFT_EYE_INNER,
        'left_eye': mp.solutions.pose.PoseLandmark.LEFT_EYE,
        'left_eye_outer': mp.solutions.pose.PoseLandmark.LEFT_EYE_OUTER,
        'right_eye_inner': mp.solutions.pose.PoseLandmark.RIGHT_EYE_INNER,
        'right_eye': mp.solutions.pose.PoseLandmark.RIGHT_EYE,
        'right_eye_outer': mp.solutions.pose.PoseLandmark.RIGHT_EYE_OUTER,
        'left_ear': mp.solutions.pose.PoseLandmark.LEFT_EAR,
        'right_ear': mp.solutions.pose.PoseLandmark.RIGHT_EAR,
        'mouth_left': mp.solutions.pose.PoseLandmark.MOUTH_LEFT,
        'mouth_right': mp.solutions.pose.PoseLandmark.MOUTH_RIGHT,
        'left_shoulder': mp.solutions.pose.PoseLandmark.LEFT_SHOULDER,
        'right_shoulder': mp.solutions.pose.PoseLandmark.RIGHT_SHOULDER,
        'left_elbow': mp.solutions.pose.PoseLandmark.LEFT_ELBOW,
        'right_elbow': mp.solutions.pose.PoseLandmark.RIGHT_ELBOW,
        'left_wrist': mp.solutions.pose.PoseLandmark.LEFT_WRIST,
        'right_wrist': mp.solutions.pose.PoseLandmark.RIGHT_WRIST,
        'left_pinky': mp.solutions.pose.PoseLandmark.LEFT_PINKY,
        'right_pinky': mp.solutions.pose.PoseLandmark.RIGHT_PINKY,
        'left_index': mp.solutions.pose.PoseLandmark.LEFT_INDEX,
        'right_index': mp.solutions.pose.PoseLandmark.RIGHT_INDEX,
        'left_thumb': mp.solutions.pose.PoseLandmark.LEFT_THUMB,
        'right_thumb': mp.solutions.pose.PoseLandmark.RIGHT_THUMB,
        'left_hip': mp.solutions.pose.PoseLandmark.LEFT_HIP,
        'right_hip': mp.solutions.pose.PoseLandmark.RIGHT_HIP,
        'left_knee': mp.solutions.pose.PoseLandmark.LEFT_KNEE,
        'right_knee': mp.solutions.pose.PoseLandmark.RIGHT_KNEE,
        'left_ankle': mp.solutions.pose.PoseLandmark.LEFT_ANKLE,
        'right_ankle': mp.solutions.pose.PoseLandmark.RIGHT_ANKLE,
        'left_heel': mp.solutions.pose.PoseLandmark.LEFT_HEEL,
        'right_heel': mp.solutions.pose.PoseLandmark.RIGHT_HEEL,
        'left_foot_index': mp.solutions.pose.PoseLandmark.LEFT_FOOT_INDEX,
        'right_foot_index': mp.solutions.pose.PoseLandmark.RIGHT_FOOT_INDEX
    }

    # Create a dictonary so we can index into body parts
    # EX Usage: body_parts['ankle_left]
    body_parts = {}
    if detection_result.pose_landmarks:
        for pose_landmarks in detection_result.pose_landmarks:
            for part_name, part_enum in body_parts_dict.items():
                body_parts[part_name] = pose_landmarks[part_enum]

    return detection_result, body_parts

def mp_debug_show_image(detector, img_path):
    image = mp.Image.create_from_file(img_path)

    if image is None:
        raise ValueError(f"Image at path {img_path} could not be loaded.")

    detection_result, _ = mp_process_image(detector, image)

    image_rgb = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)

    # STEP 5: Process the detection result. In this case, visualize it.
    annotated_image = draw_landmarks_on_image(
        image_rgb,
        detection_result
    )

    # Display the annotated image, otherwise return it.
    cv2_imshow(cv2.cvtColor(annotated_image, cv2.COLOR_RGB2BGR))



In [None]:
# Usage
mp_detector = mp_init_detector()
mp_image = mp.Image.create_from_file("sample-frames/mp_example.png")
detection_results, body_parts = mp_process_image(
    mp_detector,
    mp_image
)

left_food_index = body_parts['left_foot_index']
print("Left foot index")
print(f"position: {left_food_index.x:.3f}, {left_food_index.y:.3f}, {left_food_index.z:.3f}")
print(f"visibility: {left_food_index.visibility:.3f}")

# Debug
mp_debug_show_image(mp_detector,"sample-frames/mp_example.png")
mp_debug_show_image(mp_detector,"sample-frames/good1.png")
mp_debug_show_image(mp_detector,"sample-frames/good2.png")

mp_debug_show_image(mp_detector,"sample-frames/bad1.png")
mp_debug_show_image(mp_detector,"sample-frames/bad2.png")

In [None]:
def calculate_angle(a, b, c, img_rgb=None, color=(255, 0, 0), text_scale=4.0):
    a = np.array([a.x, a.y, a.z])
    b = np.array([b.x, b.y, b.z])
    c = np.array([c.x, c.y, c.z])

    radians = np.arctan2(c[1] - b[1], c[0] - b[0]) - np.arctan2(a[1] - b[1], a[0] - b[0])
    angle = np.abs(radians * 180.0 / np.pi)

    if angle > 180.0:
        angle = 360 - angle

    if img_rgb is not None:
        height, width, _ = img_rgb.shape

        # Scale the coordinates to fit the image dimensions
        points = [
            (int(a[0] * width), int(a[1] * height)),
            (int(b[0] * width), int(b[1] * height)),
            (int(c[0] * width), int(c[1] * height))
        ]

        # Adjust text size based on the image height
        text_size = text_scale * height / 1000

        cv2.line(img_rgb, points[0], points[1], color, 4)
        cv2.line(img_rgb, points[1], points[2], color, 4)
        cv2.putText(
            img_rgb,
            str(int(angle)),
            (points[1][0] + 10, points[1][1] - 25),
            cv2.FONT_HERSHEY_SIMPLEX,
            text_size,
            color,
            2
        )
        return angle, img_rgb

    return angle, None

### **Push Up Detection**

In [None]:
def do_pushup(body_parts, vis_threshold=0.75, img_rgb=None):
    required_parts = [
        'left_shoulder', 'left_elbow', 'left_wrist',
        'right_shoulder', 'right_elbow', 'right_wrist',
        'left_hip', 'right_hip', 'left_ankle', 'right_ankle'
    ]

    # Check if any required body part is None
    for part in required_parts:
        if body_parts[part].visibility == 0:
            return -1, img_rgb, None, False

    l_sh = body_parts['left_shoulder']
    l_elbow = body_parts['left_elbow']
    l_wrist = body_parts['left_wrist']
    l_hip = body_parts['left_hip']
    l_ankle = body_parts['left_ankle']

    r_sh = body_parts['right_shoulder']
    r_elbow = body_parts['right_elbow']
    r_wrist = body_parts['right_wrist']
    r_hip = body_parts['right_hip']
    r_ankle = body_parts['right_ankle']

    # Get visibility of left and right sides
    # Left arm and leg
    l_vis = (l_wrist.visibility + l_elbow.visibility + l_sh.visibility + l_ankle.visibility + l_hip.visibility) / 5

    # Right arm and leg
    r_vis = (r_wrist.visibility + r_elbow.visibility + r_sh.visibility + r_ankle.visibility + r_hip.visibility) / 5

    # Visibility (assuming 1 arm/ankle will always be obstructed)
    if l_vis < vis_threshold and r_vis >= vis_threshold:
        t_sh, t_elbow, t_wrist = r_sh, r_elbow, r_wrist
        t_hip, t_ankle = r_hip, r_ankle
    elif r_vis < vis_threshold and l_vis >= vis_threshold:
        t_sh, t_elbow, t_wrist = l_sh, l_elbow, l_wrist
        t_hip, t_ankle = l_hip, l_ankle
    elif l_vis < vis_threshold and r_vis < vis_threshold:
        return -1, img_rgb, None, False
    else:
        t_sh, t_elbow, t_wrist = l_sh, l_elbow, l_wrist
        t_hip, t_ankle = l_hip, l_ankle

    # Calculate angles
    shoulder_angle, img1 = calculate_angle(t_hip, t_sh, t_elbow, img_rgb, (255, 0, 0))
    elbow_angle, img2 = calculate_angle(t_sh, t_elbow, t_wrist, img1, (0, 255, 0))
    body_angle, img3 = calculate_angle(t_sh, t_hip, t_ankle, img2, (0, 0, 255))

    # Determine movement and correctness
    if 0 < shoulder_angle < 80 and 120 < elbow_angle < 180 and 145 < body_angle < 180:
        # Correct - extension
        movement = 'extension'
        correctness = True
        return 2, img3, movement, correctness
    elif 0 < shoulder_angle < 80 and 50 < elbow_angle < 120 and 145 < body_angle < 180:
        # Correct - flexion
        movement = 'flexion'
        correctness = True
        return 1, img3, movement, correctness
    else:
        # Bad posture
        movement = 'other'
        correctness = False
        return 0, img3, movement, correctness

In [None]:
import os
import random

def randomly_choose_images_and_process(folder_path, exercise_function, num_images=5):
    # Get all files in the specified folder
    all_files = os.listdir(folder_path)
    
    # Filter out non-image files (optional, depending on your folder content)
    image_extensions = ['.jpg', '.jpeg', '.png']
    image_files = [f for f in all_files if os.path.splitext(f)[1].lower() in image_extensions]
    
    # Randomly select a specified number of images
    selected_images = random.sample(image_files, min(num_images, len(image_files)))

    # Process each selected image with the specified exercise function
    for img_name in selected_images:
        img_path = os.path.join(folder_path, img_name)
        print(f"Processing image: {img_path}")
        generic_detector_static(img_path, exercise_function)

# Specify the folder containing your images
image_folder = r"cleaned_frames\train\push up"

# Number of images to process
num_images_to_process = 3

# Call the function to randomly choose and process images
randomly_choose_images_and_process(image_folder, do_pushup, num_images_to_process)

### **Deadlift Detection**

In [None]:
def do_deadlift(body_parts, vis_threshold=0.75, img_rgb=None):
    required_parts = [
        'left_shoulder', 'right_shoulder',
        'left_hip', 'right_hip',
        'left_knee', 'right_knee',
        'left_ankle', 'right_ankle',
        'left_elbow', 'right_elbow'
    ]

    # Check if any required body part is None
    for part in required_parts:
        if body_parts[part].visibility == 0:
            return -1, img_rgb, None, False

    l_sh = body_parts['left_shoulder']
    r_sh = body_parts['right_shoulder']
    l_hip = body_parts['left_hip']
    r_hip = body_parts['right_hip']
    l_knee = body_parts['left_knee']
    r_knee = body_parts['right_knee']
    l_ankle = body_parts['left_ankle']
    r_ankle = body_parts['right_ankle']
    l_elbow = body_parts['left_elbow']
    r_elbow = body_parts['right_elbow']

    # Get visibility of left and right sides
    l_vis = (l_sh.visibility + l_hip.visibility + l_knee.visibility + l_ankle.visibility + l_elbow.visibility) / 5
    r_vis = (r_sh.visibility + r_hip.visibility + r_knee.visibility + r_ankle.visibility + r_elbow.visibility ) / 5

    # Use the side with better visibility
    if l_vis < vis_threshold and r_vis >= vis_threshold:
        t_sh, t_hip, t_knee, t_ankle, t_elbow  = r_sh, r_hip, r_knee, r_ankle, r_elbow
    elif r_vis < vis_threshold and l_vis >= vis_threshold:
        t_sh, t_hip, t_knee, t_ankle, t_elbow = l_sh, l_hip, l_knee, l_ankle, l_elbow
    elif l_vis < vis_threshold and r_vis < vis_threshold:
        return -1, img_rgb, None, False
    else:
        t_sh, t_hip, t_knee, t_ankle, t_elbow = l_sh, l_hip, l_knee, l_ankle, l_elbow

    # Calculate angles correctly
    hip_angle, img1 = calculate_angle(t_sh, t_hip, t_knee, img_rgb, (255, 0, 0))
    knee_angle, img2 = calculate_angle(t_hip, t_knee, t_ankle, img1, (0, 255, 0))
    shoulder_angle, img3 = calculate_angle(t_hip, t_sh, t_elbow, img2, (0, 0, 255))

    # Determine movement and correctness
    if 110 < hip_angle < 180 and 150 < knee_angle < 180 and 0 < shoulder_angle < 30:
        # Correct - extension
        movement = 'extension'
        correctness = True
        return 2, img3, movement, correctness
    elif 25 < hip_angle < 110 and 0 < knee_angle < 150 and 0 < shoulder_angle < 90:
        # Correct - flexion
        movement = 'flexion'
        correctness = True
        return 1, img3, movement, correctness
    else:
        # Bad posture 
        movement = 'other'
        correctness = False
        return 0, img3, movement, correctness

### **Squat Detection**

In [None]:
def do_squat(body_parts, vis_threshold=0.75, img_rgb=None):
    required_parts = [
        'left_shoulder', 'right_shoulder',
        'left_hip', 'right_hip',
        'left_knee', 'right_knee',
        'left_ankle', 'right_ankle',
    ]

    # Check if any required body part is None
    for part in required_parts:
        if body_parts[part].visibility == 0:
            return -1, img_rgb, None, False

    l_sh = body_parts['left_shoulder']
    r_sh = body_parts['right_shoulder']
    l_hip = body_parts['left_hip']
    r_hip = body_parts['right_hip']
    l_knee = body_parts['left_knee']
    r_knee = body_parts['right_knee']
    l_ankle = body_parts['left_ankle']
    r_ankle = body_parts['right_ankle']

    # Get visibility of left and right sides
    l_vis = (l_sh.visibility + l_hip.visibility + l_knee.visibility + l_ankle.visibility ) / 4
    r_vis = (r_sh.visibility + r_hip.visibility + r_knee.visibility + r_ankle.visibility ) / 4

    # Use the side with better visibility
    if l_vis < vis_threshold and r_vis >= vis_threshold:
        t_sh, t_hip, t_knee, t_ankle = r_sh, r_hip, r_knee, r_ankle
    elif r_vis < vis_threshold and l_vis >= vis_threshold:
        t_sh, t_hip, t_knee, t_ankle = l_sh, l_hip, l_knee, l_ankle
    elif l_vis < vis_threshold and r_vis < vis_threshold:
        return -1, img_rgb, None, False
    else:
        t_sh, t_hip, t_knee, t_ankle = l_sh, l_hip, l_knee, l_ankle

    # Calculate angles correctly
    hip_angle, img1 = calculate_angle(t_sh, t_hip, t_knee, img_rgb, (255, 0, 0))
    knee_angle, img2 = calculate_angle(t_hip, t_knee, t_ankle, img1, (0, 255, 0))

    # Calculate the distances between the knees and ankles
    knee_distance = np.linalg.norm([l_knee.x - r_knee.x, l_knee.y - r_knee.y, l_knee.z - r_knee.z])
    ankle_distance = np.linalg.norm([l_ankle.x - r_ankle.x, l_ankle.y - r_ankle.y, l_ankle.z - r_ankle.z])

    # Calculate the ratio and check for knee cave
    if knee_distance / ankle_distance < 0.5:  # Example ratio threshold, adjust as needed
        #knees caving in
        movement = 'other'
        correctness = False
        return 0, img2, movement, correctness

    if 150 < hip_angle < 180 and 150 < knee_angle < 180:
        # extended position
        movement = 'extension'
        correctness = True
        return 2, img2, movement, correctness
    elif 50 < hip_angle < 160 and 40 < knee_angle < 150:
        # flexed position
        movement = 'flexion'
        correctness = True
        return 1, img2, movement, correctness
    else:
        # bad posture
        movement = 'other'
        correctness = False
        return 0, img2, movement, correctness

In [None]:
def generic_detector_static(img_path, exercise_function, show_keypoints=False, show_angles=True):
    # Mediapipe
    mp_detector = mp_init_detector()
    mp_img = mp.Image.create_from_file(img_path)
    detection_results, body_parts = mp_process_image(
        mp_detector,
        mp_img
    )

    # cv2
    img_rgb = None
    if show_angles:
        img_rgb = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)

    # Body part usage
    if bool(body_parts):
        if show_keypoints:
            mp_debug_show_image(mp_detector, img_path)

        print("Exercise Detector")
        result, img_with_angles, _ , _= exercise_function(body_parts, 0.75, img_rgb)
        if result < 0:
            #might add argument to delete picture?
            print(f"WARNING: Low visibility confidence, skipping {img_path}")
        elif result > 0:
            print("Good technique")
            if result == 2:
                print("Extended position")
            elif result == 1:
                print("Flexed position")
        else:
            print("Bad technique / different exercise")

        if show_angles and img_with_angles is not None:
            cv2_imshow(cv2.cvtColor(img_with_angles, cv2.COLOR_RGB2BGR))
    else:
        print(f"WARNING: Mediapipe could not detect landmarks, skipping {img_path}")

In [None]:
def pushup_detector_static(img_path, show_keypoints=False, show_angles=True):
    # Mediapipe
    mp_detector = mp_init_detector()
    mp_img = mp.Image.create_from_file(img_path)
    detection_results, body_parts = mp_process_image(
        mp_detector,
        mp_img
    )

    # cv2
    img_rgb = None
    if show_angles:
        img_rgb = cv2.cvtColor(cv2.imread(img_path), cv2.COLOR_BGR2RGB)

    # Body part usage
    if bool(body_parts):
        if show_keypoints:
            mp_debug_show_image(mp_detector, img_path)

        print("Pushup Detector")
        pushup_res, img_with_angles = do_pushup(body_parts, 0.75, img_rgb)
        if pushup_res < 0:
            print(f"WARNING: Low visibility confidence, skipping {img_path}")
        elif pushup_res > 0:
            print("Good posture")
            if pushup_res == 2:
                print("Raised pushup")
            elif pushup_res == 1:
                print("Lowered pushup")
        else:
            print("Bad posture")

        if show_angles and img_with_angles is not None:
            cv2_imshow(cv2.cvtColor(img_with_angles, cv2.COLOR_RGB2BGR))
    else:
        print(f"WARNING: Mediapipe could not detect landmarks, skipping {img_path}")


def pushup_detector_dynamic(frame, detector, show_keypoints=False, show_angles=True):
    mp_img = mp.Image(image_format=mp.ImageFormat.SRGB, data=frame)
    detection_results, body_parts = mp_process_image(detector, mp_img)

    img_rgb = None
    if show_angles:
        img_rgb = frame

    if bool(body_parts):
        # Draw full body landmarks
        if show_keypoints:
            res_frame = draw_landmarks_on_image(img_rgb, detection_results)
            return res_frame, None

        #result, img_with_angles, movement, correctness
        pushup_res, res_frame, _, _ = do_pushup(body_parts, 0.75, img_rgb)

        if pushup_res > 0:
            print("Good posture")
            if pushup_res == 2:
                return res_frame, "Raised pushup"
            elif pushup_res == 1:
                return res_frame, "Lowered pushup"
        else:
            return res_frame, "Bad posture"

    # If no body parts detected or any other condition, return the frame with a default message
    return frame, "No body parts detected"




In [None]:
import csv
import json

def annotate_images(source_dirs, output_csv, detector):
    with open(output_csv, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["image_path", "exercise", "keypoints", "movement", "correctness"])
        
        for exercise, source_dir in source_dirs.items():
            for img_name in os.listdir(source_dir):
                img_path = os.path.join(source_dir, img_name)
                mp_image = mp.Image.create_from_file(img_path)
                detection_result, body_parts = mp_process_image(detector, mp_image)
                
                if not body_parts:
                    continue

                if exercise == "pushup":
                    result, img_with_angles, movement, correctness = do_pushup(body_parts, 0.75)
                elif exercise == "squat":
                    result, img_with_angles, movement, correctness = do_squat(body_parts, 0.75)
                elif exercise == "deadlift":
                    result, img_with_angles, movement, correctness = do_deadlift(body_parts, 0.75)
                else:
                    continue

                if result < 0:
                    continue

                keypoints = {k: [v.x, v.y, v.z, v.visibility] for k, v in body_parts.items()}

                writer.writerow([img_path, exercise, json.dumps(keypoints), movement, correctness])

# Define source directories and output CSV
source_dirs = {
    "pushup": r"cleaned_frames\val\push up",
    "squat": r"cleaned_frames\val\squat",
    "deadlift": r"cleaned_frames\val\deadlifting"
}
output_csv = "annotations_val.csv"

# Annotate images
mp_detector = mp_init_detector()
annotate_images(source_dirs, output_csv, mp_detector)

# **Webcam**

In [None]:
def run_webcam():
    detector = mp_init_detector()

    cap = cv2.VideoCapture(0)

    while cap.isOpened():
        success, frame = cap.read()
        if not success:
            continue

        #result, img_with_angles, movement, correctness
        frame, guide= pushup_detector_dynamic(frame, detector, show_keypoints=False, show_angles=True)

        # Display the classification and guide on the top right of the frame
        text_x = frame.shape[1] - 300  # Adjust x position based on frame width
        classification = "Pushup"
        cv2.putText(frame, classification, (text_x, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA)
        if guide:
            cv2.putText(frame, guide, (text_x, 60), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2, cv2.LINE_AA)

        # Display the frame
        cv2.imshow('Pushup Detector', frame)

        if cv2.waitKey(5) & 0xFF == ord('q'):
            break

    cap.release()
    cv2.destroyAllWindows()

# NOTE: Hold q to close the webcam
run_webcam()
