<a href="https://colab.research.google.com/github/Chathurya99/Pose_Fit_FYP/blob/master/Exercise_keypoints.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
import tensorflow_hub as hub
import cv2
import numpy as np
import pandas as pd
from google.colab import files
import os

# --- Model Loading ---
# Person Detector (SSD MobileNet V2 from TF Hub)
detector_hub_url = "https://tfhub.dev/tensorflow/ssd_mobilenet_v2/2"
print(f"Loading person detector model from {detector_hub_url}...")
try:
    detector_model = hub.load(detector_hub_url)
    print("Person detector model loaded successfully.")
except Exception as e:
    print(f"Error loading person detector model: {e}")
    raise

# MoveNet Thunder model
movenet_hub_url = "https://tfhub.dev/google/movenet/singlepose/thunder/4"
print(f"Loading MoveNet Thunder model from {movenet_hub_url}...")
try:
    movenet_pose_model = hub.load(movenet_hub_url)
    movenet_input_size = 256 # Expected input size for Movenet
    print("MoveNet Thunder model loaded successfully.")
except Exception as e:
    print(f"Error loading MoveNet model: {e}")
    raise

# --- Helper Functions ---

def run_detector(detector, image_tensor_rgb):
    """Runs person detection on an input RGB image tensor."""
    if image_tensor_rgb.dtype != tf.uint8:
        image_tensor_rgb = tf.cast(image_tensor_rgb, tf.uint8)
    if len(image_tensor_rgb.shape) == 3: # If single image
        image_tensor_rgb = tf.expand_dims(image_tensor_rgb, axis=0)
    detector_output = detector(image_tensor_rgb)
    result = {key: value.numpy() for key, value in detector_output.items()}
    return result

def run_movenet_on_crop(pose_model, cropped_image_rgb_tf, movenet_target_size):
    """Runs MoveNet pose estimation on a cropped RGB image tensor."""
    movenet_input_img = tf.image.resize_with_pad(cropped_image_rgb_tf, movenet_target_size, movenet_target_size)
    movenet_input_img = tf.cast(movenet_input_img, dtype=tf.int32)
    if len(movenet_input_img.shape) == 3: # If single image
         movenet_input_img = tf.expand_dims(movenet_input_img, axis=0)
    model_output = pose_model.signatures['serving_default'](movenet_input_img)
    return model_output['output_0'] # Keypoints with scores

def get_crop_from_bbox(image_np, bbox_normalized, padding_factor):
    """
    Creates a square crop from the image based on the normalized bounding box.
    Args:
        image_np: Original image (H, W, C).
        bbox_normalized: [ymin, xmin, ymax, xmax] normalized coordinates for the base box.
        padding_factor: Factor to expand the crop around the max dimension of the bbox.
    Returns:
        cropped_image_np: The cropped image region.
        crop_details: Dict with crop's top-left (x1, y1) in original frame and crop's W, H.
    """
    img_h, img_w = image_np.shape[:2]
    ymin, xmin, ymax, xmax = bbox_normalized

    # Denormalize to pixel coordinates
    xmin_px = int(xmin * img_w)
    xmax_px = int(xmax * img_w)
    ymin_px = int(ymin * img_h)
    ymax_px = int(ymax * img_h)

    box_w = xmax_px - xmin_px
    box_h = ymax_px - ymin_px

    if box_w <= 0 or box_h <= 0: return None, None

    center_x = xmin_px + box_w / 2.0
    center_y = ymin_px + box_h / 2.0

    # Determine the side length of the square crop
    crop_edge_length = max(box_w, box_h) * padding_factor
    crop_edge_half = crop_edge_length / 2.0

    # Calculate intended crop boundaries
    intended_y1 = int(center_y - crop_edge_half)
    intended_y2 = int(center_y + crop_edge_half)
    intended_x1 = int(center_x - crop_edge_half)
    intended_x2 = int(center_x + crop_edge_half)

    # Clip coordinates to be within image bounds for the actual crop
    actual_y1 = max(0, intended_y1)
    actual_y2 = min(img_h, intended_y2)
    actual_x1 = max(0, intended_x1)
    actual_x2 = min(img_w, intended_x2)

    if actual_y2 <= actual_y1 or actual_x2 <= actual_x1: return None, None

    cropped_image_np = image_np[actual_y1:actual_y2, actual_x1:actual_x2]

    if cropped_image_np.shape[0] == 0 or cropped_image_np.shape[1] == 0: return None, None

    crop_details_dict = {
        "x1_original_frame": actual_x1,
        "y1_original_frame": actual_y1,
        "width_of_crop": cropped_image_np.shape[1],
        "height_of_crop": cropped_image_np.shape[0]
    }
    return cropped_image_np, crop_details_dict

def get_bbox_from_keypoints(keypoints_abs_coords, img_width, img_height):
    """
    Calculates a bounding box from absolute keypoint coordinates.
    Returns normalized [ymin, xmin, ymax, xmax].
    Filters out keypoints with NaN coordinates.
    """
    valid_x = [kp[0] for kp in keypoints_abs_coords if not np.isnan(kp[0])]
    valid_y = [kp[1] for kp in keypoints_abs_coords if not np.isnan(kp[1])]

    if not valid_x or not valid_y:
        return None

    xmin_px = min(valid_x)
    xmax_px = max(valid_x)
    ymin_px = min(valid_y)
    ymax_px = max(valid_y)

    # Normalize
    bbox_norm = [
        ymin_px / img_height,
        xmin_px / img_width,
        ymax_px / img_height,
        xmax_px / img_width
    ]
    return bbox_norm


def extract_keypoints_video_tracked(
    video_path, detector, pose_estimator, movenet_dim,
    target_csv_canvas_width, target_csv_canvas_height,
    initial_person_score_thresh=0.4, # For the general detector
    movenet_confidence_thresh=0.2,   # Avg score of MoveNet keypoints to consider track valid
    tracking_crop_padding_factor=1.7 # Padding for the crop (both initial and tracked)
    ):

    all_frames_data = []
    cap = cv2.VideoCapture(video_path)
    frame_idx = 0

    if not cap.isOpened():
        print(f"Error: Could not open video file {video_path}")
        return pd.DataFrame()

    original_vid_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    original_vid_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
    print(f"IMPORTANT: Video processing at: {original_vid_width}x{original_vid_height}")
    print(f"Target CSV canvas: {target_csv_canvas_width}x{target_csv_canvas_height}")
    print(f"Using tracking_crop_padding_factor: {tracking_crop_padding_factor}")
    print(f"Movenet confidence threshold for tracking: {movenet_confidence_thresh}")

    last_tracked_person_bbox_normalized = None # Store the bbox from previous successful frame

    while cap.isOpened():
        ret, frame_bgr_np = cap.read()
        if not ret: break
        frame_idx += 1

        current_keypoints_for_csv_row = [frame_idx] # Start with frame number
        nan_keypoints_row = [frame_idx] + [np.nan] * 34 # Predefined row for failures

        frame_rgb_np = cv2.cvtColor(frame_bgr_np, cv2.COLOR_BGR2RGB)
        frame_rgb_tf = tf.convert_to_tensor(frame_rgb_np)

        base_bbox_for_crop = None # This will be [ymin, xmin, ymax, xmax] normalized

        # --- Determine Bounding Box for Cropping ---
        if last_tracked_person_bbox_normalized:
            base_bbox_for_crop = last_tracked_person_bbox_normalized
            # print(f"Frame {frame_idx}: Using last tracked bbox for crop.")
        else:
            # Try to detect with general person detector (e.g., first frame or after track loss)
            # print(f"Frame {frame_idx}: No tracked bbox, running general person detector.")
            detected_objects = run_detector(detector, frame_rgb_tf)
            person_boxes = []
            person_scores_list = []
            for i in range(int(detected_objects['num_detections'][0])):
                class_id = int(detected_objects['detection_classes'][0][i])
                score = float(detected_objects['detection_scores'][0][i])
                if class_id == 1 and score >= initial_person_score_thresh:
                    person_boxes.append(detected_objects['detection_boxes'][0][i])
                    person_scores_list.append(score)

            if person_boxes:
                base_bbox_for_crop = person_boxes[np.argmax(person_scores_list)] # Highest score person
                # print(f"Frame {frame_idx}: Person detected by general detector.")
            else:
                # No person detected by general detector
                if frame_idx % 50 == 0 or frame_idx == 1:
                    print(f"Frame {frame_idx}: General detector found no person. Outputting NaNs.")
                all_frames_data.append(nan_keypoints_row[:])
                last_tracked_person_bbox_normalized = None # Ensure re-detection next time
                continue

        # --- Create Crop based on base_bbox_for_crop ---
        person_crop_np, crop_info = get_crop_from_bbox(
            frame_rgb_np, base_bbox_for_crop, padding_factor=tracking_crop_padding_factor
        )

        if person_crop_np is None or person_crop_np.size == 0:
            if frame_idx % 50 == 0 or frame_idx == 1:
                print(f"Frame {frame_idx}: Failed to create valid crop. Outputting NaNs.")
            all_frames_data.append(nan_keypoints_row[:])
            last_tracked_person_bbox_normalized = None # Trigger re-detection
            continue

        # --- Run MoveNet on the Crop ---
        person_crop_tf = tf.convert_to_tensor(person_crop_np)
        keypoints_movenet_output = run_movenet_on_crop(pose_estimator, person_crop_tf, movenet_dim)
        keypoints_norm_crop = np.squeeze(keypoints_movenet_output.numpy()) # (17,3) y,x,score

        # --- Evaluate MoveNet Output and Update Tracking ---
        avg_movenet_score = np.mean(keypoints_norm_crop[:, 2])

        if avg_movenet_score < movenet_confidence_thresh:
            if frame_idx % 50 == 0 or frame_idx == 1:
                print(f"Frame {frame_idx}: MoveNet confidence too low ({avg_movenet_score:.2f}). Outputting NaNs.")
            all_frames_data.append(nan_keypoints_row[:])
            last_tracked_person_bbox_normalized = None # Lost track, trigger re-detection
            continue

        # Track successful, process keypoints
        keypoints_abs_for_next_bbox = [] # Store abs (x,y) in original frame for next bbox calc

        # For Debugging: Print abs coords for the first keypoint (nose) for the first few frames
        first_keypoint_abs_x_debug, first_keypoint_abs_y_debug = np.nan, np.nan

        for i in range(keypoints_norm_crop.shape[0]): # For each of 17 keypoints
            y_norm_on_crop_input = keypoints_norm_crop[i, 0]
            x_norm_on_crop_input = keypoints_norm_crop[i, 1]

            x_px_in_crop = x_norm_on_crop_input * crop_info['width_of_crop']
            y_px_in_crop = y_norm_on_crop_input * crop_info['height_of_crop']

            x_abs_orig = x_px_in_crop + crop_info['x1_original_frame']
            y_abs_orig = y_px_in_crop + crop_info['y1_original_frame']
            keypoints_abs_for_next_bbox.append((x_abs_orig, y_abs_orig))

            if i == 0 and frame_idx <= 5: # For nose keypoint (index 0) and first 5 frames
                first_keypoint_abs_x_debug = x_abs_orig
                first_keypoint_abs_y_debug = y_abs_orig

            # Scale to target CSV canvas
            csv_x = (x_abs_orig / original_vid_width) * target_csv_canvas_width
            csv_y = (y_abs_orig / original_vid_height) * target_csv_canvas_height
            current_keypoints_for_csv_row.extend([csv_x, csv_y])

        if frame_idx <= 5 : # Print debug for first 5 frames
             print(f"Frame {frame_idx} Debug: Nose (abs_x, abs_y) on {original_vid_width}x{original_vid_height} frame BEFORE final scaling to canvas: ({first_keypoint_abs_x_debug:.2f}, {first_keypoint_abs_y_debug:.2f}) -> CSV Nose_X: {current_keypoints_for_csv_row[1]:.2f}")

        all_frames_data.append(current_keypoints_for_csv_row)

        # Update last_tracked_person_bbox_normalized for the next frame
        last_tracked_person_bbox_normalized = get_bbox_from_keypoints(keypoints_abs_for_next_bbox, original_vid_width, original_vid_height)
        if last_tracked_person_bbox_normalized is None:
             # This can happen if all keypoints were NaN (already handled by confidence check)
             # or if min/max calculation failed. Trigger re-detection.
            if frame_idx % 50 == 0 or frame_idx == 1:
                print(f"Frame {frame_idx}: Could not derive bbox from keypoints. Will re-detect.")


        if frame_idx % 50 == 0:
            print(f"Processed {frame_idx} frames...")

    cap.release()
    print(f"Finished processing. Total frames processed: {frame_idx}")

    keypoint_names_ordered = [
        "nose", "left_eye", "right_eye", "left_ear", "right_ear",
        "left_shoulder", "right_shoulder", "left_elbow", "right_elbow",
        "left_wrist", "right_wrist", "left_hip", "right_hip",
        "left_knee", "right_knee", "left_ankle", "right_ankle"
    ]
    csv_columns = ["frame"]
    for name in keypoint_names_ordered:
        csv_columns.append(f"{name}_x"); csv_columns.append(f"{name}_y")
    output_df = pd.DataFrame(all_frames_data, columns=csv_columns)
    return output_df

# --- Main script execution ---
TARGET_CSV_CANVAS_WIDTH = 640
TARGET_CSV_CANVAS_HEIGHT = 480 # Your target canvas for the final CSV

# <<<< --- ADJUST THIS PADDING FACTOR --- >>>>
# This factor controls the "zoom" level of the crop around the person (tracked or initially detected).
# Larger values mean a looser crop (person is smaller in the view fed to MoveNet).
# Smaller values mean a tighter crop.
# Based on previous results, we might need a value around 1.7 to 2.0 or higher if current values are too small.
# If current values are too large, decrease this.
ADJUSTED_TRACKING_CROP_PADDING_FACTOR = 1.7 # Initial guess, adjust this based on output

# Confidence for initial person detection by the general detector
INITIAL_PERSON_DETECTION_THRESHOLD = 0.3 # Lowered slightly to catch more initial poses

# Average confidence of MoveNet keypoints to consider the track "valid" for the next frame
MOVENET_TRACKING_CONFIDENCE_THRESHOLD = 0.2 # Avg score of all 17 keypoints

print("Please upload your exercise video file:")
uploaded_video_colab = files.upload()

if not uploaded_video_colab:
    print("No video file was uploaded. Exiting.")
else:
    video_input_filename = list(uploaded_video_colab.keys())[0]
    print(f"\nVideo '{video_input_filename}' uploaded successfully.")
    print("\nStarting keypoint extraction with keypoint-guided tracking...")

    keypoints_result_df = extract_keypoints_video_tracked(
        video_input_filename,
        detector_model,
        movenet_pose_model,
        movenet_input_size,
        TARGET_CSV_CANVAS_WIDTH,
        TARGET_CSV_CANVAS_HEIGHT,
        initial_person_score_thresh=INITIAL_PERSON_DETECTION_THRESHOLD,
        movenet_confidence_thresh=MOVENET_TRACKING_CONFIDENCE_THRESHOLD,
        tracking_crop_padding_factor=ADJUSTED_TRACKING_CROP_PADDING_FACTOR
    )

    if not keypoints_result_df.empty:
        output_csv_filename = (f"exercise_keypoints_tracked_pad{ADJUSTED_TRACKING_CROP_PADDING_FACTOR}_"
                               f"conf{MOVENET_TRACKING_CONFIDENCE_THRESHOLD}_{video_input_filename.split('.')[0]}.csv")
        keypoints_result_df.to_csv(output_csv_filename, index=False)
        print(f"\nKeypoints saved to '{output_csv_filename}'")
        files.download(output_csv_filename)
        print("\nDownload initiated.")
        print(f"IMPORTANT: Check the number of rows in the output CSV. It should be closer to {2175} now.")
        print(f"If values are still off, the main parameter to tune is 'ADJUSTED_TRACKING_CROP_PADDING_FACTOR' (currently {ADJUSTED_TRACKING_CROP_PADDING_FACTOR}).")
    else:
        print("No keypoints were extracted. The DataFrame is empty.")

Loading person detector model from https://tfhub.dev/tensorflow/ssd_mobilenet_v2/2...
Person detector model loaded successfully.
Loading MoveNet Thunder model from https://tfhub.dev/google/movenet/singlepose/thunder/4...
MoveNet Thunder model loaded successfully.
Please upload your exercise video file:


Saving lunges.mp4 to lunges.mp4

Video 'lunges.mp4' uploaded successfully.

Starting keypoint extraction with keypoint-guided tracking...
IMPORTANT: Video processing at: 768x432
Target CSV canvas: 640x480
Using tracking_crop_padding_factor: 1.7
Movenet confidence threshold for tracking: 0.2
Frame 1 Debug: Nose (abs_x, abs_y) on 768x432 frame BEFORE final scaling to canvas: (413.69, 100.52) -> CSV Nose_X: 344.74
Frame 2 Debug: Nose (abs_x, abs_y) on 768x432 frame BEFORE final scaling to canvas: (415.02, 42.02) -> CSV Nose_X: 345.85
Frame 3 Debug: Nose (abs_x, abs_y) on 768x432 frame BEFORE final scaling to canvas: (413.07, 88.19) -> CSV Nose_X: 344.22
Frame 4 Debug: Nose (abs_x, abs_y) on 768x432 frame BEFORE final scaling to canvas: (412.43, 45.51) -> CSV Nose_X: 343.70
Frame 5 Debug: Nose (abs_x, abs_y) on 768x432 frame BEFORE final scaling to canvas: (410.97, 85.64) -> CSV Nose_X: 342.48
Processed 50 frames...
Processed 100 frames...
Processed 150 frames...
Finished processing. Total

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>


Download initiated.
IMPORTANT: Check the number of rows in the output CSV. It should be closer to 2175 now.
If values are still off, the main parameter to tune is 'ADJUSTED_TRACKING_CROP_PADDING_FACTOR' (currently 1.7).
