# Important Libraries

In [7]:
import cv2
import numpy as np
import mediapipe as mp
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical
import os

# Setting Up Actions

In [8]:
# Define paths and actions
data_path = "action_data"  # Main directory for data
actions = ['Fighting', 'Yoga', 'Walking']  # Actions to collect data for
required_sequences = 15  # Expected number of sequences per action
sequence_length = 30  # Length of each sequence

# Media Pipe Set Up

In [None]:
# Mediapipe setup
mp_holistic = mp.solutions.h11111olistic
mp_drawing = mp.solutions.drawing_utils

# Extracting Keypoints

In [10]:
def extract_keypoints(results):
    # Pose landmarks (including legs)
    pose = np.array([[res.x, res.y, res.z] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33 * 3)
    
    # Left hand landmarks
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21 * 3)
    
    # Right hand landmarks
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21 * 3)
    
    # Left leg landmarks (indices 23-25 in Mediapipe)
    left_leg = np.array([[res.x, res.y, res.z] for res in [results.pose_landmarks.landmark[i] for i in [23, 25, 27, 29, 31]]]).flatten() if results.pose_landmarks else np.zeros(5 * 3)
    
    # Right leg landmarks (indices 24-26 in Mediapipe)
    right_leg = np.array([[res.x, res.y, res.z] for res in [results.pose_landmarks.landmark[i] for i in [24, 26, 28, 30, 32]]]).flatten() if results.pose_landmarks else np.zeros(5 * 3)
    
    # Concatenate all landmarks
    return np.concatenate([pose, lh, rh, left_leg, right_leg])

# Data Collection

In [14]:
# Initialize resources
cap = None
holistic = None

try:
    # Initialize the Video Capture for live webcam
    cap = cv2.VideoCapture(0)  # 0 for default camera

    # Set camera resolution to 1080p
    cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1920)  # Set width to 1920 pixels
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 1080)  # Set height to 1080 pixels

    # Initialize Mediapipe Holistic model
    holistic = mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5)

    print("Starting data verification and collection...")

    for action in actions:
        folder_path = os.path.join(data_path, action)
        os.makedirs(folder_path, exist_ok=True)

        # Count existing sequences
        existing_files = [file for file in os.listdir(folder_path) if file.endswith('.npy')]
        num_files = len(existing_files)
        print(f"Action '{action}': {num_files}/{required_sequences} sequences available.")

        # Collect missing sequences
        if num_files < required_sequences:
            for sequence_num in range(num_files, required_sequences):
                print(f"Collecting sequence {sequence_num + 1} for '{action}'...")
                sequence = []

                while len(sequence) < sequence_length:
                    ret, frame = cap.read()
                    if not ret:
                        print("Camera error. Exiting...")
                        raise RuntimeError("Failed to read from camera.")

                    frame = cv2.flip(frame, 1)
                    frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    results = holistic.process(frame_rgb)

                    # Draw landmarks
                    mp_drawing.draw_landmarks(frame, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
                    mp_drawing.draw_landmarks(frame, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
                    mp_drawing.draw_landmarks(frame, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

                    # Display progress information on the frame
                    cv2.putText(frame, f"Action: {action}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
                    cv2.putText(frame, f"Sequence: {sequence_num + 1}/{required_sequences}", (10, 70), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 0), 2)
                    cv2.putText(frame, f"Frames: {len(sequence)}/{sequence_length}", (10, 110), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)

                    # Show the frame
                    cv2.imshow("Data Collection", frame)

                    # Append keypoints
                    keypoints = extract_keypoints(results)
                    sequence.append(keypoints)

                    # Exit on 'q' key
                    if cv2.waitKey(10) & 0xFF == ord("q"):
                        print("Data collection interrupted by user.")
                        raise KeyboardInterrupt  # Trigger safe exit

                # Save the sequence
                np.save(os.path.join(folder_path, f"{sequence_num}.npy"), sequence)
                print(f"Sequence {sequence_num + 1} saved.")

except KeyboardInterrupt:
    print("\nOperation stopped by user.")
except Exception as e:
    print(f"An error occurred: {e}")
finally:
    # Ensure resources are released
    if cap:
        cap.release()
    cv2.destroyAllWindows()
    if holistic:
        holistic.close()
    print("All resources released. Exiting...")


Starting data verification and collection...
Action 'Fighting': 15/15 sequences available.
Action 'Yoga': 0/15 sequences available.
Collecting sequence 1 for 'Yoga'...
Sequence 1 saved.
Collecting sequence 2 for 'Yoga'...
Sequence 2 saved.
Collecting sequence 3 for 'Yoga'...
Sequence 3 saved.
Collecting sequence 4 for 'Yoga'...
Sequence 4 saved.
Collecting sequence 5 for 'Yoga'...
Sequence 5 saved.
Collecting sequence 6 for 'Yoga'...
Sequence 6 saved.
Collecting sequence 7 for 'Yoga'...
Sequence 7 saved.
Collecting sequence 8 for 'Yoga'...
Sequence 8 saved.
Collecting sequence 9 for 'Yoga'...
Sequence 9 saved.
Collecting sequence 10 for 'Yoga'...
Sequence 10 saved.
Collecting sequence 11 for 'Yoga'...
Sequence 11 saved.
Collecting sequence 12 for 'Yoga'...
Sequence 12 saved.
Collecting sequence 13 for 'Yoga'...
Sequence 13 saved.
Collecting sequence 14 for 'Yoga'...
Sequence 14 saved.
Collecting sequence 15 for 'Yoga'...
Sequence 15 saved.
Action 'Walking': 0/15 sequences available.
Co

# Shape Verification

In [None]:
import os
import numpy as np

for action in actions:
    folder_path = os.path.join(data_path, action)
    for file in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file)
        data = np.load(file_path)
        if data.shape != (sequence_length, 255):  # Update 255 to match your feature size
            print(f"Inconsistent shape in {file_path}: {data.shape}")
print("Shape verification complete.")

Shape verification complete.


# Fixing Length

In [None]:
def fix_sequence_length(data, target_length=30):
    if len(data) > target_length:
        return data[:target_length]  # Truncate
    elif len(data) < target_length:
        padding = np.zeros((target_length - len(data), data.shape[1]))  # Pad with zeros
        return np.vstack((data, padding))  # Append padding
    return data

for action in actions:
    folder_path = os.path.join(data_path, action)
    for file in os.listdir(folder_path):
        file_path = os.path.join(folder_path, file)
        data = np.load(file_path)
        fixed_data = fix_sequence_length(data, target_length=sequence_length)
        np.save(file_path, fixed_data)  # Overwrite with fixed sequence
print("Fixed sequence lengths.")

Fixed sequence lengths.


# Label Map

In [17]:
label_map = {label: num for num, label in enumerate(actions)}

# Dataset Splitting

In [None]:
# Load data
X, y = [], []
for action in actions:
    for file in os.listdir(os.path.join(data_path, action)):
        data = np.load(os.path.join(data_path, action, file))  # Load the .npy file
        X.append(data)
        y.append(label_map[action])  # Map action label to numeric value

X = np.array(X)
y = to_categorical(y).astype(int)  # Convert labels to one-hot encoding

# First split: Train (70%) and Temp (30% for validation + test)
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)

# Second split: Validation (15%) and Test (15%)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# Check the sizes of the splits
print(f"Training set: {X_train.shape}, Validation set: {X_val.shape}, Test set: {X_test.shape}")

Training set: (31, 30, 255), Validation set: (7, 30, 255), Test set: (7, 30, 255)


# Splitted Folder Save

In [None]:
import numpy as np
import os

# Specify the folder to save the datasets
save_folder = "data_splits"

# Create the folder if it doesn't exist
os.makedirs(save_folder, exist_ok=True)

# Save datasets to the specified folder
np.save(os.path.join(save_folder, "X_train.npy"), X_train)
np.save(os.path.join(save_folder, "X_val.npy"), X_val)
np.save(os.path.join(save_folder, "X_test.npy"), X_test)
np.save(os.path.join(save_folder, "y_train.npy"), y_train)
np.save(os.path.join(save_folder, "y_val.npy"), y_val)
np.save(os.path.join(save_folder, "y_test.npy"), y_test)

print(f"Datasets saved successfully in the folder: {save_folder}")

Datasets saved successfully in the folder: data_splits
