In [2]:
#1. Import the libraries
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix, accuracy_score
from sklearn.preprocessing import MinMaxScaler

import tensorflow as tf
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import LSTM, Dense, Input, Dropout, Conv1D, MaxPooling1D, GRU, TimeDistributed, Flatten ,Conv2D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
from tensorflow.keras.regularizers import l2
from tensorflow.keras.optimizers import Adam

import mediapipe as mp
import seaborn as sns
import matplotlib.pyplot as plt
from imblearn.over_sampling import SMOTE

In [3]:
#2. Declare and initiliaze paths and constants
# DATA_PATH = 'DATA'  # Path to save videos and keypoints
DATA_PATH = 'NEW_DATA'  # Path to save new videos and keypoints for adding new data

# Accumulated actions
# ACTIONS = np.array(['Mana'])

# New actions to be added to the dataset
ACTIONS = np.array(['Hi'])

NO_SEQUENCES = 90  # Number of videos per action
SEQUENCE_LENGTH = 30  # Frames per video

In [4]:
#3. Declare the functions for collecting data
# Mediapipe setup
mp_holistic = mp.solutions.holistic  # Holistic model
mp_drawing = mp.solutions.drawing_utils  # Drawing utilities

def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False
    results = model.process(image)
    image.flags.writeable = True
    return image, results

def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33 * 4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21 * 3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21 * 3)
    return np.concatenate([pose, lh, rh])

def extract_keypoints_from_video(video_path):
    cap = cv2.VideoCapture(video_path)
    keypoints = []
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                break

            _, results = mediapipe_detection(frame, holistic)
            keypoint_frame = extract_keypoints(results)
            keypoints.append(keypoint_frame)

    cap.release()
    return np.array(keypoints)

def process_videos_to_keypoints(start_sequence=0, end_sequence=None):
    """
    Process videos to extract keypoints and save as .npy files,
    keeping .npy indices aligned with video indices.
    """
    for action in ACTIONS:
        action_path = os.path.join(DATA_PATH, action)
        
        # Get all video files in the action directory
        video_files = [f for f in os.listdir(action_path) if f.endswith(".mp4")]
        
        # Sort files numerically based on the numeric part of their names
        video_files.sort(key=lambda x: int(''.join(filter(str.isdigit, x)) or 0))
        
        # If end_index is None, process all files from start_index onwards
        if end_sequence is None:
            end_sequence = len(video_files)
        
        for video_file in video_files[start_sequence:end_sequence]:
            # Extract the numeric index from the video filename
            video_index = int(''.join(filter(str.isdigit, os.path.splitext(video_file)[0])) or 0)
            
            video_path = os.path.join(action_path, video_file)
            keypoints = extract_keypoints_from_video(video_path)
            
            # Save keypoints as .npy, ensuring consistent zero-based naming
            npy_path = os.path.join(action_path, f"{video_index}.npy")
            np.save(npy_path, keypoints)
            print(f"Processed and saved keypoints for Action({action}) {video_file} as {video_index}.npy")

def record_videos():
    # default will record 30 video
    # start_sequence = 0
    # last_sequence = 30
    start_sequence = 0
    last_sequence = 30
    os.makedirs(DATA_PATH, exist_ok=True)
    for action in ACTIONS:
        os.makedirs(os.path.join(DATA_PATH, action), exist_ok=True)

    cap = cv2.VideoCapture(0)
    frame_rate = 30  # Frames per second
    video_duration = 3  # Duration in seconds
    total_frames = frame_rate * video_duration
    
    for action in ACTIONS:
        print(f"Recording for action: {action}")
        for sequence in range(start_sequence, last_sequence):
            video_path = os.path.join(DATA_PATH, action, f"{sequence}.mp4")
            out = cv2.VideoWriter(video_path, cv2.VideoWriter_fourcc(*'mp4v'), frame_rate, (640, 480))

            for frame_num in range(total_frames):
                ret, frame = cap.read()
                if not ret:
                    print("Failed to capture frame.")
                    break

                # Display recording information
                cv2.rectangle(frame, (0,0), (640, 60), (255, 255, 255), -1)
                cv2.putText(frame, f"Recording {action}: Video {sequence+1}/{last_sequence}", 
                            (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (14, 14, 14), 2)
                cv2.imshow("Recording", frame)

                out.write(frame)

                if cv2.waitKey(10) & 0xFF == ord('q'):
                    print("Recording interrupted.")
                    break

            out.release()

    cap.release()
    cv2.destroyAllWindows()

In [10]:
#4. Run the function to start recording the data into mp4
record_videos()

Recording for action: Hi
Recording interrupted.
Recording interrupted.


KeyboardInterrupt: 

In [5]:
#5. Convert mp4 to npy(numpy array)

# Process sequences from index (start_sequence) to (end_sequence-not inclusive) for all actions
process_videos_to_keypoints(start_sequence=0, end_sequence=90)



Processed and saved keypoints for Action(Hi) 0.mp4 as 0.npy
Processed and saved keypoints for Action(Hi) 1.mp4 as 1.npy
Processed and saved keypoints for Action(Hi) 2.mp4 as 2.npy
Processed and saved keypoints for Action(Hi) 3.mp4 as 3.npy
Processed and saved keypoints for Action(Hi) 4.mp4 as 4.npy
Processed and saved keypoints for Action(Hi) 5.mp4 as 5.npy
Processed and saved keypoints for Action(Hi) 6.mp4 as 6.npy
Processed and saved keypoints for Action(Hi) 7.mp4 as 7.npy
Processed and saved keypoints for Action(Hi) 8.mp4 as 8.npy
Processed and saved keypoints for Action(Hi) 9.mp4 as 9.npy
Processed and saved keypoints for Action(Hi) 10.mp4 as 10.npy
Processed and saved keypoints for Action(Hi) 11.mp4 as 11.npy
Processed and saved keypoints for Action(Hi) 12.mp4 as 12.npy
Processed and saved keypoints for Action(Hi) 13.mp4 as 13.npy
Processed and saved keypoints for Action(Hi) 14.mp4 as 14.npy
Processed and saved keypoints for Action(Hi) 15.mp4 as 15.npy


KeyboardInterrupt: 

In [7]:
#6. Visualize the keypoints for a specific data recording

# Define Mediapipe connections (assumes 33 keypoints)
# Updated connections to include only pose, left hand, and right hand
POSE_CONNECTIONS = [
    (0, 1), (1, 2), (2, 3), (3, 7),  # Upper body
    (0, 4), (4, 5), (5, 6), (6, 8),  # Upper body
    (9, 10),  # Neck
    (11, 12), (11, 13), (13, 15), (15, 17),  # Left side
    (12, 14), (14, 16), (16, 18)  # Right side
]

HAND_CONNECTIONS = [
    # Connections for fingers (same for both hands)
    (0, 1), (1, 2), (2, 3), (3, 4),  # Thumb
    (0, 5), (5, 6), (6, 7), (7, 8),  # Index finger
    (5, 9), (9, 10), (10, 11), (11, 12),  # Middle finger
    (9, 13), (13, 14), (14, 15), (15, 16),  # Ring finger
    (13, 17), (17, 18), (18, 19), (19, 20)  # Pinky
]


def visualize_extracted_keypoints(keypoints, action_name, frame_idx):
    """
    Visualize extracted keypoints (pose, left hand, right hand).

    :param keypoints: Numpy array of shape (num_keypoints,)
                      Includes pose (33*4), left hand (21*3), right hand (21*3).
    :param action_name: String, action name
    :param frame_idx: Integer, current frame index for labeling
    """
    canvas = np.ones((480, 640, 3), dtype=np.uint8) * 255  # Blank white canvas

    # Extract pose, left hand, and right hand keypoints
    pose = keypoints[:33 * 4].reshape((33, 4))[:, :2] * [640, 480]  # Normalize to image size
    lh = keypoints[33 * 4:33 * 4 + 21 * 3].reshape((21, 3))[:, :2] * [640, 480]
    rh = keypoints[33 * 4 + 21 * 3:].reshape((21, 3))[:, :2] * [640, 480]

    # Draw pose connections
    for start, end in POSE_CONNECTIONS:
        x1, y1 = pose[start]
        x2, y2 = pose[end]
        cv2.line(canvas, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)
    for x, y in pose:
        cv2.circle(canvas, (int(x), int(y)), 5, (0, 0, 255), -1)

    # Draw left hand connections
    for start, end in HAND_CONNECTIONS:
        if start < len(lh) and end < len(lh):
            x1, y1 = lh[start]
            x2, y2 = lh[end]
            cv2.line(canvas, (int(x1), int(y1)), (int(x2), int(y2)), (255, 0, 0), 2)
    for x, y in lh:
        cv2.circle(canvas, (int(x), int(y)), 5, (0, 0, 255), -1)

    # Draw right hand connections
    for start, end in HAND_CONNECTIONS:
        if start < len(rh) and end < len(rh):
            x1, y1 = rh[start]
            x2, y2 = rh[end]
            cv2.line(canvas, (int(x1), int(y1)), (int(x2), int(y2)), (0, 0, 255), 2)
    for x, y in rh:
        cv2.circle(canvas, (int(x), int(y)), 5, (0, 255, 0), -1)

    # Add action label and frame info
    cv2.putText(canvas, f"Frame: {frame_idx}", (10, 30),
                cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 0), 2)

    # Display the canvas
    cv2.imshow("Extracted Keypoint Visualization", canvas)
    cv2.waitKey(50)  # Adjust delay between frames

# Load extracted keypoints
keypoints_data = np.load(f"{DATA_PATH}/Hi/10.npy")  # Shape (NUM_SEQUENCES, SEQUENCE_LENGTH, num_keypoints)
SEQUENCE_LENGTH = 30  # Adjust based on your actual data
NUM_KEYPOINTS = 258
reshaped_data = keypoints_data.reshape(-1, SEQUENCE_LENGTH, NUM_KEYPOINTS)

for sequence_idx, sequence in enumerate(reshaped_data):
    for frame_idx, keypoints in enumerate(sequence):
        # Reshape and normalize keypoints for visualization
        pose = keypoints[:33 * 4].reshape((33, 4))[:, :2] * [640, 480]  # Pose keypoints
        lh = keypoints[33 * 4:33 * 4 + 21 * 3].reshape((21, 3))[:, :2] * [640, 480]  # Left hand
        rh = keypoints[33 * 4 + 21 * 3:].reshape((21, 3))[:, :2] * [640, 480]  # Right hand

        visualize_extracted_keypoints(keypoints, f"Action_{sequence_idx}", frame_idx)

cv2.destroyAllWindows()