In [1]:
import mediapipe as mp
import cv2
import numpy as np
import pandas as pd
import os, csv
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

# Drawing helpers
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# Define Important Landmarks for Bench Press
IMPORTANT_LMS = [
    "LEFT_SHOULDER", "RIGHT_SHOULDER",
    "LEFT_ELBOW", "RIGHT_ELBOW",
    "LEFT_WRIST", "RIGHT_WRIST"
]

# Generate all columns of the data frame
HEADERS = ["label"]
for lm in IMPORTANT_LMS:
    HEADERS += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

def rescale_frame(frame, percent=60):
    """Resize frame for faster processing"""
    width = int(frame.shape[1] * percent / 100)
    height = int(frame.shape[0] * percent / 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation=cv2.INTER_AREA)

def init_csv(dataset_path: str):
    """Create CSV file with headers if not exists"""
    if not os.path.exists(dataset_path):  # Ensure CSV is created
        print(f"Creating {dataset_path}...")
        with open(dataset_path, mode="w", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(HEADERS)
    else:
        print(f"{dataset_path} already exists.")

def calculate_angle(a, b, c):
    """Calculate angle between three points"""
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    
    ba = a - b
    bc = c - b
    
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return np.degrees(angle)

def classify_bench_press(left_elbow_angle, right_elbow_angle, threshold=90):
    """Classify movement phase based on elbow angles"""
    if left_elbow_angle > threshold and right_elbow_angle > threshold:
        return "UP"
    else:
        return "DOWN"

def export_landmark_to_csv(dataset_path: str, results, label: str):
    """Export detected pose landmarks to CSV"""
    landmarks = results.pose_landmarks.landmark
    keypoints = []

    try:
        for lm in IMPORTANT_LMS:
            keypoint = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([keypoint.x, keypoint.y, keypoint.z, keypoint.visibility])

        keypoints = list(np.array(keypoints).flatten())
        keypoints.insert(0, label)

        with open(dataset_path, mode="a", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
        print(f"✔ Saved data to {dataset_path}")

    except Exception as e:
        print(f" Error saving landmarks: {e}")

### 2. Process Training Data
DATASET_PATH = "train.csv"
init_csv(DATASET_PATH)

cap = cv2.VideoCapture("vid/test/Test.mp4")
save_counts = 0

with mp_pose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7) as pose:
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            print("End of Video or Error Opening File!")
            break

        image = rescale_frame(image, 60)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image.flags.writeable = False
        results = pose.process(image)

        if not results.pose_landmarks:
            print(" No human detected in frame.")
            continue

        image.flags.writeable = True
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

        # Extract required landmarks for angle calculation
        landmarks = results.pose_landmarks.landmark
        left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].y]
        left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW].y]
        left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST].y]
        
        right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER].y]
        right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW].y]
        right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST].y]
        
        # Compute angles
        left_elbow_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)
        right_elbow_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
        
        # Determine phase
        label = classify_bench_press(left_elbow_angle, right_elbow_angle)
        export_landmark_to_csv(DATASET_PATH, results, label)
        save_counts += 1

        # Draw landmarks
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(244, 117, 66), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(245, 66, 230), thickness=2, circle_radius=2)
        )

        # Display Saved Count
        cv2.putText(image, f"Saved: {save_counts}", (50, 50), cv2.FONT_HERSHEY_COMPLEX, 2, (0, 0, 0), 2, cv2.LINE_AA)
        cv2.putText(image, f"Label: {label}", (50, 100), cv2.FONT_HERSHEY_COMPLEX, 2, (0, 255, 0), 2, cv2.LINE_AA)
        
        cv2.imshow("Bench Press Data Collection", image)

        if cv2.waitKey(1) & 0xFF == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()


Creating train.csv...
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
 No human detected in frame.
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ Saved data to train.csv
✔ S

In [2]:
import mediapipe as mp
import cv2
import numpy as np
import pandas as pd
import os, csv
import seaborn as sns

import warnings
warnings.filterwarnings('ignore')

# Initialize Mediapipe Pose
mp_drawing = mp.solutions.drawing_utils
mp_pose = mp.solutions.pose

# Define Important Landmarks for Bench Press
IMPORTANT_LMS = [
    "LEFT_SHOULDER", "RIGHT_SHOULDER",
    "LEFT_ELBOW", "RIGHT_ELBOW",
    "LEFT_WRIST", "RIGHT_WRIST"
]

# Generate all columns of the data frame
HEADERS = ["label"]
for lm in IMPORTANT_LMS:
    HEADERS += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

def rescale_frame(frame, percent=60):
    """Resize frame for faster processing"""
    width = int(frame.shape[1] * percent / 100)
    height = int(frame.shape[0] * percent / 100)
    dim = (width, height)
    return cv2.resize(frame, dim, interpolation=cv2.INTER_AREA)

def init_csv(dataset_path: str):
    """Create CSV file with headers if not exists"""
    if not os.path.exists(dataset_path):
        print(f"Creating {dataset_path}...")
        with open(dataset_path, mode="w", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(HEADERS)
    else:
        print(f"{dataset_path} already exists.")

def remove_duplicate_rows(dataset_path: str):
    """Remove duplicate rows in dataset"""
    if not os.path.exists(dataset_path):
        print(f"{dataset_path} not found!")
        return
    
    df = pd.read_csv(dataset_path)
    df.drop_duplicates(keep="first", inplace=True)
    df.to_csv(dataset_path, sep=',', encoding='utf-8', index=False)
    print(f"Cleaned and saved {dataset_path}")

def describe_dataset(dataset_path: str):
    """Describe dataset statistics"""
    if not os.path.exists(dataset_path):
        print(f"{dataset_path} not found!")
        return None

    data = pd.read_csv(dataset_path)
    print(f"Headers: {list(data.columns.values)}")
    print(f'Rows: {data.shape[0]} | Columns: {data.shape[1]}\n')
    print(f"Labels: \n{data['label'].value_counts()}\n")
    print(f"Missing values: {data.isnull().values.any()}\n")
    return data

def calculate_angle(a, b, c):
    """Calculate angle between three points"""
    a = np.array(a)
    b = np.array(b)
    c = np.array(c)
    
    ba = a - b
    bc = c - b
    
    cosine_angle = np.dot(ba, bc) / (np.linalg.norm(ba) * np.linalg.norm(bc))
    angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
    return np.degrees(angle)

def classify_bench_press(left_elbow_angle, right_elbow_angle, threshold=90):
    """Classify movement phase based on elbow angles"""
    if left_elbow_angle > threshold and right_elbow_angle > threshold:
        return "UP"
    else:
        return "DOWN"

def export_landmark_to_csv(dataset_path: str, results, label: str):
    """Export detected pose landmarks to CSV"""
    landmarks = results.pose_landmarks.landmark
    keypoints = []

    try:
        for lm in IMPORTANT_LMS:
            keypoint = landmarks[mp_pose.PoseLandmark[lm].value]
            keypoints.append([keypoint.x, keypoint.y, keypoint.z, keypoint.visibility])

        keypoints = list(np.array(keypoints).flatten())
        keypoints.insert(0, label)

        with open(dataset_path, mode="a", newline="") as f:
            csv_writer = csv.writer(f, delimiter=",", quotechar='"', quoting=csv.QUOTE_MINIMAL)
            csv_writer.writerow(keypoints)
        print(f"✔ Saved data to {dataset_path}")

    except Exception as e:
        print(f" Error saving landmarks: {e}")

### 3. Clean and Analyze Data
TEST_DATASET_PATH = "test.csv"
remove_duplicate_rows(TEST_DATASET_PATH)
df = describe_dataset(TEST_DATASET_PATH)
if df is not None:
    sns.countplot(x='label', data=df, palette="Set1")

### 4. Process Test Data (Higher Confidence & Stickman Fix)
init_csv(TEST_DATASET_PATH)

cap = cv2.VideoCapture("vid/train/Train.mp4")
save_counts = 0

with mp_pose.Pose(min_detection_confidence=0.7, min_tracking_confidence=0.7) as pose:  
    while cap.isOpened():
        ret, image = cap.read()
        if not ret:
            print("End of Video or Error Opening File!")
            break

        image = rescale_frame(image, 60)
        image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        image_rgb.flags.writeable = False
        results = pose.process(image_rgb)

        if not results.pose_landmarks:
            print("No human detected in frame.")
            continue

        image.flags.writeable = True
        image = cv2.cvtColor(image_rgb, cv2.COLOR_RGB2BGR)

        # Extract required landmarks for angle calculation
        landmarks = results.pose_landmarks.landmark
        left_shoulder = [landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].x, landmarks[mp_pose.PoseLandmark.LEFT_SHOULDER].y]
        left_elbow = [landmarks[mp_pose.PoseLandmark.LEFT_ELBOW].x, landmarks[mp_pose.PoseLandmark.LEFT_ELBOW].y]
        left_wrist = [landmarks[mp_pose.PoseLandmark.LEFT_WRIST].x, landmarks[mp_pose.PoseLandmark.LEFT_WRIST].y]
        
        right_shoulder = [landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER].x, landmarks[mp_pose.PoseLandmark.RIGHT_SHOULDER].y]
        right_elbow = [landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW].x, landmarks[mp_pose.PoseLandmark.RIGHT_ELBOW].y]
        right_wrist = [landmarks[mp_pose.PoseLandmark.RIGHT_WRIST].x, landmarks[mp_pose.PoseLandmark.RIGHT_WRIST].y]
        
        # Compute angles
        left_elbow_angle = calculate_angle(left_shoulder, left_elbow, left_wrist)
        right_elbow_angle = calculate_angle(right_shoulder, right_elbow, right_wrist)
        
        # Determine phase
        label = classify_bench_press(left_elbow_angle, right_elbow_angle)
        export_landmark_to_csv(TEST_DATASET_PATH, results, label)
        save_counts += 1

        # Draw Pose Landmarks
        mp_drawing.draw_landmarks(
            image, results.pose_landmarks, mp_pose.POSE_CONNECTIONS,
            mp_drawing.DrawingSpec(color=(0, 255, 0), thickness=2, circle_radius=4),
            mp_drawing.DrawingSpec(color=(255, 0, 0), thickness=2, circle_radius=2)
        )

        # Display Instructions and Label
        cv2.putText(image, "Press Q to Quit", (10, 30), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (255, 255, 255), 2, cv2.LINE_AA)
        cv2.putText(image, f"Saved: {save_counts}", (50, 70), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2, cv2.LINE_AA)
        cv2.putText(image, f"Label: {label}", (50, 110), 
                    cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 255), 2, cv2.LINE_AA)
        
        cv2.imshow("Bench Press Test Data Collection", image)

        if cv2.waitKey(10) & 0xFF == ord("q"):
            break

    cap.release()
    cv2.destroyAllWindows()


test.csv not found!
test.csv not found!
Creating test.csv...
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data to test.csv
✔ Saved data t