In [1]:
!pip install mediapipe opencv-python filterpy scipy matplotlib




[notice] A new release of pip is available: 23.0.1 -> 25.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [4]:
import cv2

def preprocess_video(video_path, target_fps=30, target_size=(1280, 720), background_subtraction=False):
    
    cap = cv2.VideoCapture(video_path)
    if not cap.isOpened():
        print(f"Error opening video: {video_path}")
        return []

    orig_fps = cap.get(cv2.CAP_PROP_FPS)
    frame_interval = int(orig_fps / target_fps) if orig_fps > target_fps else 1

    if background_subtraction:
        back_sub = cv2.createBackgroundSubtractorMOG2(history=500, varThreshold=16, detectShadows=True)
        # or
        # back_sub = cv2.createBackgroundSubtractorKNN()

    frames = []
    frame_idx = 0
    while True:
        ret, frame = cap.read()
        if not ret:
            break

        if frame_idx % frame_interval == 0:
            # Resize frame
            frame_resized = cv2.resize(frame, target_size)

            # Convert to HSV for color normalization if needed or keep in BGR
            # Histogram equalization per channel
            frame_yuv = cv2.cvtColor(frame_resized, cv2.COLOR_BGR2YUV)
            frame_yuv[:, :, 0] = cv2.equalizeHist(frame_yuv[:, :, 0])  
            frame_eq = cv2.cvtColor(frame_yuv, cv2.COLOR_YUV2BGR)

            # Background subtraction
            if background_subtraction:
                fg_mask = back_sub.apply(frame_eq)
                frame_eq = cv2.bitwise_and(frame_eq, frame_eq, mask=fg_mask)

            frames.append(frame_eq)

        frame_idx += 1

    cap.release()
    return frames

# Example 
# video_path = r"D:\fitz\fitzpatrick17k-main\pose\UCF50\Drumming\v_Drumming_g01_c01.avi"
video_path = r"D:\fitz\fitzpatrick17k-main\pose\UCF50\PullUps\v_Pullup_g01_c01.avi"

frames = preprocess_video(video_path, background_subtraction=True)
print(f"Total frames extracted: {len(frames)}")

# Optionally display first frame for verification
if frames:
    cv2.imshow("Preprocessed Frame", frames[0])
    cv2.waitKey(0)
    cv2.destroyAllWindows()


Total frames extracted: 167


In [6]:

import cv2
import mediapipe as mp
import numpy as np
from filterpy.kalman import KalmanFilter
from scipy.signal import savgol_filter
import matplotlib.pyplot as plt


In [7]:

def create_kalman_filter():
    kf = KalmanFilter(dim_x=4, dim_z=2)
    kf.F = np.array([[1, 0, 1, 0],
                     [0, 1, 0, 1],
                     [0, 0, 1, 0],
                     [0, 0, 0, 1]])
    kf.H = np.array([[1, 0, 0, 0],
                     [0, 1, 0, 0]])
    kf.P *= 1000
    kf.R = np.eye(2) * 5
    kf.Q = np.eye(4)
    return kf


In [8]:

def calculate_angle(a, b, c):
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    ba = a - b
    bc = c - b
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc) + 1e-6)
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return np.degrees(angle)


In [None]:

# # def classify_joint_movement(delta_y, angle_velocity, epsilon_y=5, epsilon_theta=2):
# #     if delta_y < -epsilon_y and angle_velocity > epsilon_theta:
# #         return 'UP'
# #     elif delta_y > epsilon_y and angle_velocity < -epsilon_theta:
# #         return 'DOWN'
#     # elif abs(delta_y) < epsilon_y and abs(angle_velocity) < epsilon_theta:
#     #     return 'STATIC'
# #     else:
# #         return 'TRANSITION'
# def classify_direction(delta_y, angle_velocity, epsilon_y=5, epsilon_theta=2):
#     """
#     Classify movement direction based on displacement and angle velocity.
#     Returns: 'UP', 'DOWN', or 'NONE'
#     """
#     if delta_y < -epsilon_y and angle_velocity > epsilon_theta:
#         return 'UP'
#     elif delta_y > epsilon_y and angle_velocity < -epsilon_theta:
#         return 'DOWN'
#     else:
#         return 'NONE'


# def classify_motion_state(delta_y, angle_velocity, epsilon_y=5, epsilon_theta=2):
#     """
#     Classify motion state as STATIC or TRANSITION.
#     """
#     if abs(delta_y) < epsilon_y and abs(angle_velocity) < epsilon_theta:
#         return 'STATIC'
#     else:
#         return 'TRANSITION'

# Global variable to remember last state
last_state = None

def classify_joint_movement(delta_y, angle_velocity, epsilon_y=5, epsilon_theta=2):
    global last_state
    
    if delta_y < -epsilon_y and angle_velocity > epsilon_theta:
        last_state = 'UP'
    elif delta_y > epsilon_y and angle_velocity < -epsilon_theta:
        last_state = 'DOWN'
    elif abs(delta_y) < epsilon_y and abs(angle_velocity) < epsilon_theta:
        return last_state
    return "TRANSITION"




In [None]:
# Mapping for different body parts: (name, joint triplet IDs)
JOINT_TRIPLETS = {
    "right_elbow": (12, 14, 16),  # Right shoulder, right elbow, right wrist
    "left_elbow": (11, 13, 15),   # Left shoulder, left elbow, left wrist
    "right_knee": (24, 26, 28),   # Right hip, right knee, right ankle
    "left_knee": (23, 25, 27)     # Left hip, left knee, left ankle
}

def process_joint_movements(image, joint_history, angle_history):
    y_offset = 50  # Starting position for text
    for name, (A_id, B_id, C_id) in JOINT_TRIPLETS.items():
        # Get latest coordinates
        if len(joint_history.get(f'joint_{B_id}', [])) < 2:
            continue 

        A = joint_history[f'joint_{A_id}'][-1]
        B = joint_history[f'joint_{B_id}'][-1]
        C = joint_history[f'joint_{C_id}'][-1]

        # Calculate angle
        angle = calculate_angle(A, B, C)
        angle_history.setdefault(name, []).append(angle)

        # Calculate movement parameters
        delta_y = B[1] - joint_history[f'joint_{B_id}'][-2][1]
        angle_velocity = 0
        if len(angle_history[name]) > 1:
            angle_velocity = angle - angle_history[name][-2]

        # Classify movement
        label = classify_joint_movement(delta_y, angle_velocity)

        # Display on image
        cv2.putText(
            image,
            f"{name.replace('_', ' ').title()}: {label} Angle: {angle:.2f}",
            (30, y_offset),
            cv2.FONT_HERSHEY_SIMPLEX,
            0.9,
            (255, 0, 0),
            2
        )
        y_offset += 40  # Move down for next label


In [None]:

# Define joint index mappings (Mediapipe Pose IDs)
ANGLE_JOINTS = {
    "right_elbow": (12, 14, 16),  # Shoulder, Elbow, Wrist
    "left_elbow": (11, 13, 15),
    "right_knee": (24, 26, 28),   # Hip, Knee, Ankle
    "left_knee": (23, 25, 27),
    # "spine": (11, 23, 24)         # Left Shoulder, Left Hip, Right Hip
    "left_hip_knee": (11, 23, 25),     # Left Shoulder, Left Hip, Left Knee
    "right_hip_knee": (12, 24, 26) 
}

# Colors for each joint label
JOINT_COLORS = {
    "right_elbow": (255, 0, 0),
    "left_elbow": (0, 255, 0),
    "right_knee": (0, 0, 255),
    "left_knee": (255, 255, 0),
    "left_hip_knee": (255, 0, 255),
    "right_hip_knee": (0, 255, 255)
}




In [None]:
import csv

def draw_trajectories(image, joint_history, max_len=20):
    """Draw trailing lines showing movement history for each joint."""
    for key, points in joint_history.items():
        if len(points) < 2:
            continue
        color = (0, 255, 255)  # Example trail color: cyan
        for i in range(max(0, len(points)-max_len), len(points)-1):
            cv2.line(image, points[i], points[i+1], color, 2)


def detect_posture(angle_history):
    """
    Will be replaced by classification model
    """
    left_knee_angles = angle_history.get('left_knee', [])
    right_knee_angles = angle_history.get('right_knee', [])

    if not left_knee_angles or not right_knee_angles:
        return "Unknown"

    left_angle = left_knee_angles[-1]
    right_angle = right_knee_angles[-1]

    if left_angle > 160 and right_angle > 160:
        return "Standing"
    elif left_angle < 100 and right_angle < 100:
        return "Sitting"
    else:
        return "Transitioning"


def save_csv_log(filename, angle_history, joint_history, status_history):
    """Save angle, joint positions, and status to CSV file."""
    with open(filename, mode='w', newline='') as csv_file:
        fieldnames = ['frame', 'joint', 'angle', 'x', 'y', 'status']
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writeheader()
        frames = len(next(iter(angle_history.values()), []))
        for frame_idx in range(frames):
            for joint_name in ANGLE_JOINTS.keys():
                angle = angle_history.get(joint_name, [None]*frames)[frame_idx]
                # Use last known joint b (middle joint) position as example
                b_idx = ANGLE_JOINTS[joint_name][1]
                pos = joint_history.get(f'joint_{b_idx}', [(None, None)]*frames)[frame_idx]
                status = status_history.get(joint_name, [""]*frames)[frame_idx]
                writer.writerow({
                    'frame': frame_idx,
                    'joint': joint_name,
                    'angle': angle,
                    'x': pos[0],
                    'y': pos[1],
                    'status': status
                })


def write_video(output_path, frame_width, frame_height, fps=30):
    """Return a VideoWriter object for saving output."""
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    return cv2.VideoWriter(output_path, fourcc, fps, (frame_width, frame_height))


In [None]:
pose_kalman_filters = {}
joint_history = {}
angle_history = {}
frame_height, frame_width = frames[0].shape[:2]
output_video_path = "output_annotated.avi"
video_writer = write_video(output_video_path, frame_width, frame_height, fps=30)
status_history = {joint: [] for joint in ANGLE_JOINTS.keys()}

In [None]:
import cv2
import mediapipe as mp
import numpy as np

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

video_path = r"D:\fitz\fitzpatrick17k-main\pose\UCF50\GolfSwing\v_GolfSwing_g01_c01.avi"

# video_path = r"D:\fitz\fitzpatrick17k-main\pose\UCF50\PullUps\v_Pullup_g01_c01.avi"
frames = preprocess_video(video_path, background_subtraction=False)

with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    for frame in frames:
        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(
            image,
            results.pose_landmarks,
            mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
            mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
        )

        if results.pose_landmarks:
            for idx, lm in enumerate(results.pose_landmarks.landmark):
                key = f'joint_{idx}'
                x, y = int(lm.x * image.shape[1]), int(lm.y * image.shape[0])

                if key not in pose_kalman_filters:
                    pose_kalman_filters[key] = create_kalman_filter()
                    # pose_kalman_filters[key].statePre = np.array([[x], [y], [0], [0]], np.float32)
                    pose_kalman_filters[key].x[:2] = np.array([[x], [y]])

                kf = pose_kalman_filters[key]
                kf.predict()
                # kf.correct(np.array([[np.float32(x)], [np.float32(y)]]))
                kf.update(np.array([x, y]))
                pred_x, pred_y = int(kf.x[0]), int(kf.x[1])

                if key not in joint_history:
                    joint_history[key] = []
                joint_history[key].append((pred_x, pred_y))

            # Process angles & movements (as you have it)
            for name, (a_idx, b_idx, c_idx) in ANGLE_JOINTS.items():
                if all(len(joint_history.get(f'joint_{i}', [])) >= 2 for i in (a_idx, b_idx, c_idx)):
                    a = joint_history[f'joint_{a_idx}'][-1]
                    b = joint_history[f'joint_{b_idx}'][-1]
                    c = joint_history[f'joint_{c_idx}'][-1]

                    angle = calculate_angle(a, b, c)
                    angle_history.setdefault(name, []).append(angle)

                    delta_y = b[1] - joint_history[f'joint_{b_idx}'][-2][1]
                    angle_velocity = angle - angle_history[name][-2] if len(angle_history[name]) > 1 else 0

                    label = classify_joint_movement(delta_y, angle_velocity)

                    cv2.putText(
                        image,
                        f"{name.replace('_', ' ').title()}: {label} {int(angle)}°",
                        (b[0] + 10, b[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6,
                        JOINT_COLORS[name], 2
                    )
                    status_history[name].append(label)
        posture = detect_posture(angle_history)
        cv2.putText(
            image,
            f"Posture: {posture}",
            (30, 30),  # top-left corner
            cv2.FONT_HERSHEY_SIMPLEX, 1.0,
            (0, 255, 255), 2
        )
        # draw_trajectories(image, joint_history)


        cv2.imshow('Pose Tracking', image)
        video_writer.write(image)

        if cv2.waitKey(30) & 0xFF == ord('q'):
            break

cv2.destroyAllWindows()


  pred_x, pred_y = int(kf.x[0]), int(kf.x[1])


In [21]:

mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

cap = cv2.VideoCapture(0)
with mp_pose.Pose(min_detection_confidence=0.5, min_tracking_confidence=0.5) as pose:
    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        image = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)
        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        mp_drawing.draw_landmarks(
            image,
            results.pose_landmarks,
            mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(245, 117, 66), thickness=2, circle_radius=2),
            mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
        )

        if results.pose_landmarks:
            for idx, lm in enumerate(results.pose_landmarks.landmark):
                key = f'joint_{idx}'
                x, y = int(lm.x * frame.shape[1]), int(lm.y * frame.shape[0])

                if key not in pose_kalman_filters:
                    pose_kalman_filters[key] = create_kalman_filter()
                    pose_kalman_filters[key].x[:2] = np.array([[x], [y]])

                kf = pose_kalman_filters[key]
                kf.predict()
                kf.update(np.array([x, y]))
                pred_x, pred_y = int(kf.x[0]), int(kf.x[1])

                if key not in joint_history:
                    joint_history[key] = []
                joint_history[key].append((pred_x, pred_y))
                # cv2.circle(image, (pred_x, pred_y), 4, (0, 255, 0), -1)

            # Process angles & movements
            for name, (a_idx, b_idx, c_idx) in ANGLE_JOINTS.items():
                if all(len(joint_history.get(f'joint_{i}', [])) >= 2 for i in (a_idx, b_idx, c_idx)):
                    a = joint_history[f'joint_{a_idx}'][-1]
                    b = joint_history[f'joint_{b_idx}'][-1]
                    c = joint_history[f'joint_{c_idx}'][-1]

                    angle = calculate_angle(a, b, c)
                    angle_history.setdefault(name, []).append(angle)

                    delta_y = b[1] - joint_history[f'joint_{b_idx}'][-2][1]
                    angle_velocity = angle - angle_history[name][-2] if len(angle_history[name]) > 1 else 0

                    label = classify_joint_movement(delta_y, angle_velocity)

                    cv2.putText(
                        image,
                        f"{name.replace('_', ' ').title()}: {label} {int(angle)}°",
                        (b[0] + 10, b[1] - 10),
                        cv2.FONT_HERSHEY_SIMPLEX, 0.6,
                        JOINT_COLORS[name], 2
                    )

        cv2.imshow('Pose Tracking', image)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break

cap.release()
cv2.destroyAllWindows()



  pred_x, pred_y = int(kf.x[0]), int(kf.x[1])
