In [2]:
pip install mediapipe


Collecting mediapipe
  Downloading mediapipe-0.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.7 kB)
Collecting protobuf<5,>=4.25.3 (from mediapipe)
  Downloading protobuf-4.25.4-cp37-abi3-manylinux2014_x86_64.whl.metadata (541 bytes)
Collecting sounddevice>=0.4.4 (from mediapipe)
  Downloading sounddevice-0.5.0-py3-none-any.whl.metadata (1.4 kB)
Downloading mediapipe-0.10.15-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (35.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m35.9/35.9 MB[0m [31m20.2 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading protobuf-4.25.4-cp37-abi3-manylinux2014_x86_64.whl (294 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.6/294.6 kB[0m [31m11.0 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading sounddevice-0.5.0-py3-none-any.whl (32 kB)
Installing collected packages: protobuf, sounddevice, mediapipe
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.20.

In [8]:
import os
import cv2
import csv
import mediapipe as mp

# Initialize MediaPipe Pose
mp_pose = mp.solutions.pose
pose = mp_pose.Pose(static_image_mode=False, min_detection_confidence=0.5, min_tracking_confidence=0.5)

# Set the directory containing the mp4 files and output CSV path
video_directory = '/content/drive/MyDrive/pushup dataset/Wrong sequence'
csv_file_path = '/content/pushup_analysis.csv'

# Define important landmarks
IMPORTANT_LMS = [
    "NOSE",
    "LEFT_EYE_INNER",
    "LEFT_EYE",
    "LEFT_EYE_OUTER",
    "RIGHT_EYE_INNER",
    "RIGHT_EYE",
    "RIGHT_EYE_OUTER",
    "LEFT_EAR",
    "RIGHT_EAR",
    "MOUTH_LEFT",
    "MOUTH_RIGHT",
    "LEFT_SHOULDER",
    "RIGHT_SHOULDER",
    "LEFT_ELBOW",
    "RIGHT_ELBOW",
    "LEFT_WRIST",
    "RIGHT_WRIST",
    "LEFT_PINKY",
    "RIGHT_PINKY",
    "LEFT_INDEX",
    "RIGHT_INDEX",
    "LEFT_THUMB",
    "RIGHT_THUMB",
    "LEFT_HIP",
    "RIGHT_HIP",
    "LEFT_KNEE",
    "RIGHT_KNEE",
    "LEFT_ANKLE",
    "RIGHT_ANKLE",
    "LEFT_HEEL",
    "RIGHT_HEEL",
    "LEFT_FOOT_INDEX",
    "RIGHT_FOOT_INDEX"
]


# Generate column headers
HEADERS = ["Filename", "Pushup Count"]
for lm in IMPORTANT_LMS:
    HEADERS += [f"{lm.lower()}_x", f"{lm.lower()}_y", f"{lm.lower()}_z", f"{lm.lower()}_v"]

# Create a new CSV file and write headers
with open(csv_file_path, mode='w', newline='') as csv_file:
    csv_writer = csv.writer(csv_file)
    csv_writer.writerow(HEADERS)

def analyze_pushups(video_path):
    cap = cv2.VideoCapture(video_path)
    pushup_count = 0
    # State to track pushup position
    in_pushup_position = False
    previous_shoulder_height = None

    # Initialize list to store landmark data for the current frame
    landmark_data_list = []

    while cap.isOpened():
        ret, frame = cap.read()
        if not ret:
            break

        # Convert the frame to RGB
        rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)

        # Process the frame and detect pose landmarks
        results = pose.process(rgb_frame)

        if results.pose_landmarks:
            # Draw landmarks on the frame
            mp.solutions.drawing_utils.draw_landmarks(frame, results.pose_landmarks, mp_pose.POSE_CONNECTIONS)

            # Extract shoulder landmarks
            left_shoulder = results.pose_landmarks.landmark[mp_pose.PoseLandmark.LEFT_SHOULDER]
            right_shoulder = results.pose_landmarks.landmark[mp_pose.PoseLandmark.RIGHT_SHOULDER]
            shoulder_height = (left_shoulder.y + right_shoulder.y) / 2

            # Simple pushup detection logic
            if previous_shoulder_height is not None:
                if shoulder_height < previous_shoulder_height and not in_pushup_position:
                    pushup_count += 1
                    in_pushup_position = True
                elif shoulder_height > previous_shoulder_height:
                    in_pushup_position = False

            previous_shoulder_height = shoulder_height

            # Collect landmark data for important landmarks
            landmarks = {lm: {'x': 0, 'y': 0, 'z': 0, 'v': 0} for lm in IMPORTANT_LMS}
            for landmark_name, landmark in mp_pose.PoseLandmark.__members__.items():
                if landmark_name in landmarks:
                    landmark_data = results.pose_landmarks.landmark[landmark]
                    landmarks[landmark_name] = {
                        'x': landmark_data.x,
                        'y': landmark_data.y,
                        'z': landmark_data.z,
                        'v': landmark_data.visibility
                    }

            # Append landmark data for the current frame
            landmark_data_list.append(landmarks)

        # Display the frame (optional)
        # cv2.imshow('Pushup Detection', frame)

    cap.release()
    # cv2.destroyAllWindows()

    # Write the collected landmark data to CSV
    with open(csv_file_path, mode='a', newline='') as csv_file:
        csv_writer = csv.writer(csv_file)
        for landmarks in landmark_data_list:
            row = [os.path.basename(video_path), pushup_count]
            for lm in IMPORTANT_LMS:
                data = landmarks[lm]
                row.extend([data['x'], data['y'], data['z'], data['v']])
            csv_writer.writerow(row)

    return pushup_count

# Process each video file in the directory
for filename in os.listdir(video_directory):
    if filename.endswith(".mp4"):
        video_path = os.path.join(video_directory, filename)
        print(f"Processing {video_path}")
        pushup_count = analyze_pushups(video_path)

        # Write the pushup count result to the CSV file
        with open(csv_file_path, mode='a', newline='') as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerow([filename, pushup_count] + [''] * (len(HEADERS) - 2))  # Ensure alignment

# Close the Pose solution
pose.close()


Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/Copy of push up 57.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/18.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/13.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/Copy of push up 167.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/6.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/4.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/11.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/14.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/1.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/Copy of push up 153.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/16.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/15.mp4
Processing /content/drive/MyDrive/pushup dataset/Wrong sequence/Copy of push up 107.mp4
Processing /c

In [9]:
import csv
import os

# Paths to the input CSV files
correct_pushup_file_path = '/content/correctpushup.csv'
incorrect_pushup_file_path = '/content/incorrectpushup.csv'

# Path to the output combined CSV file
combined_csv_file_path = '/content/pushup_analysis.csv'

# Step 1: Create the combined CSV file
# Check if the combined CSV file exists. If not, create it.
if not os.path.exists(combined_csv_file_path):
    with open(combined_csv_file_path, mode='w', newline='') as combined_file:
        pass  # Create an empty file

# Step 2: Initialize a list to hold the combined data
combined_data = []

# Step 3: Read the correct pushup CSV file and append data to combined_data
with open(correct_pushup_file_path, mode='r', newline='') as correct_file:
    csv_reader = csv.reader(correct_file)
    header = next(csv_reader)  # Read the header
    combined_data.append(header)  # Add the header to the combined data
    for row in csv_reader:
        combined_data.append(row)

# Step 4: Read the incorrect pushup CSV file and append data to combined_data
with open(incorrect_pushup_file_path, mode='r', newline='') as incorrect_file:
    csv_reader = csv.reader(incorrect_file)
    next(csv_reader)  # Skip the header
    for row in csv_reader:
        combined_data.append(row)

# Step 5: Write the combined data to the combined CSV file
with open(combined_csv_file_path, mode='w', newline='') as combined_file:
    csv_writer = csv.writer(combined_file)
    csv_writer.writerows(combined_data)

print(f"Combined CSV file has been created and saved to {combined_csv_file_path}.")


Combined CSV file has been created and saved to /content/pushup_analysis.csv.


In [10]:
import csv
import random

# Path to the original CSV file
csv_file_path = '/content/pushup_analysis.csv'
train_csv_file_path = '/content/train.csv'  # Output train file
test_csv_file_path = '/content/test.csv'    # Output test file

# Read all rows from the original CSV file
with open(csv_file_path, mode='r', newline='') as infile:
    csv_reader = csv.reader(infile)
    rows = list(csv_reader)

# Shuffle the rows to randomize the split
header = rows[0]  # Extract the header
data_rows = rows[1:]  # Exclude the header for shuffling
random.shuffle(data_rows)

# Calculate the split index
split_index = int(0.8 * len(data_rows))

# Split the data into train and test sets
train_rows = data_rows[:split_index]
test_rows = data_rows[split_index:]

# Write the train data to train.csv
with open(train_csv_file_path, mode='w', newline='') as train_file:
    csv_writer = csv.writer(train_file)
    csv_writer.writerow(header)  # Write the header
    csv_writer.writerows(train_rows)  # Write the train rows

# Write the test data to test.csv
with open(test_csv_file_path, mode='w', newline='') as test_file:
    csv_writer = csv.writer(test_file)
    csv_writer.writerow(header)  # Write the header
    csv_writer.writerows(test_rows)  # Write the test rows

print(f"CSV file has been split into {train_csv_file_path} (train) and {test_csv_file_path} (test).")


CSV file has been split into /content/train.csv (train) and /content/test.csv (test).
