In [3]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

In [4]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [5]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [6]:
def draw_landmarks(image, results):
    # mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

In [7]:
def draw_styled_landmarks(image, results):
    # # Draw face connections
    # mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS, 
    #                          mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
    #                          mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
    #                          ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [8]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    # face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, lh, rh])

In [14]:
import os

folder_path = r"D:\MP_DataRecorded"

# Get only folder names
folders = [name for name in os.listdir(folder_path) if os.path.isdir(os.path.join(folder_path, name))]

print(folders)  # This will print a list of folder names


['a', 'afternoon', 'again', 'bye', 'd', 'deaf', 'dont_understand', 'e', 'evening', 'fast', 'fine', 'good', 'h', 'hearing', 'hello', 'how', 'how_much', 'job', 'later', 'maybe', 'morning', 'my_name_is', 'n', 'name', 'nice_to_meet', 'night', 'no', 'now', 'no_sign', 'p', 'please', 'see you', 'sign_language', 'slow', 'sorry', 'take_care', 'thank_you', 'today', 'tomorrow', 'understand', 'wait', 'welcome', 'what', 'when', 'where', 'who', 'why', 'yes', 'yesterday', 'you']


In [None]:
# Path for exported data, numpy arrays
DATA_PATH = r"D:\MP_DataRecorded" 

# Actions that we try to detect
actions = np.array(['afternoon', 'again', 'bye', 'deaf', 'dont_understand', 'evening', 'fast', 'fine', 'good', 'hello', 'how', 'how_much', 'job', 'later', 'maybe', 'morning', 'name', 'nice_to_meet', 'night', 'no', 'no_sign', 'please', 'see you', 'sign_language', 'slow', 'sorry', 'take_care', 'thank_you', 'today', 'tomorrow', 'understand', 'wait', 'welcome', 'what', 'when', 'where', 'who', 'why', 'which', 'yes', 'yesterday', 'you'])

# 160 videos worth of data
no_sequences = 160 #change depending on how many videos you have in each folder

# Videos are going to be 60 frames in length
sequence_length = 60 #change to 30 or 40 for faster detection

# Folder start
# start_folder = 40

In [13]:
actions.shape

(1,)

In [None]:
for action in actions: 
    for sequence in range(no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [None]:
# ------- code to test mediapipipe -------------
# cap = cv2.VideoCapture(1)
# # Set mediapipe model 
# with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
#     while cap.isOpened():

#         # Read feed
#         ret, frame = cap.read()

#         # rotate into portrait (90° CCW)
#         frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)

#         # Make detections
#         image, results = mediapipe_detection(frame, holistic)
#         print(results)
        
#         # Draw landmarks
#         draw_styled_landmarks(image, results)

#         # Show to screen
#         cv2.imshow('OpenCV Feed', image)

#         # Break gracefully
#         if cv2.waitKey(10) & 0xFF == ord('q'):
#             break
#     cap.release()
#     cv2.destroyAllWindows()

In [16]:
cap = cv2.VideoCapture(1)
 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # Loop through actions
    for action in actions:
        # Loop through sequences aka videos
        for sequence in range(start_folder, start_folder+no_sequences):
            frame_num = 0  # Initialize frame_num outside the inner loop
            paused = False  # Flag to track pause state
            
            while frame_num < sequence_length:  # Changed to while loop for restart functionality
                # Handle paused state
                if paused:
                    # Create a copy of the last frame to display pause message
                    pause_frame = image.copy()
                    cv2.putText(pause_frame, 'PAUSED', (220, 200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 4, cv2.LINE_AA)
                    cv2.putText(pause_frame, 'Press C to continue', (180, 230), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.75, (0, 0, 255), 2, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', pause_frame)
                    
                    # Check for key presses while paused
                    key = cv2.waitKey(10) & 0xFF
                    if key == ord('c'):  # Continue collection
                        paused = False
                    elif key == ord('q'):  # Quit
                        break
                    continue  # Skip the rest of the loop while paused
                
                # Read feed
                ret, frame = cap.read()
                # rotate into portrait (90° CCW)
                frame = cv2.rotate(frame, cv2.ROTATE_90_CLOCKWISE)
                # Make detections
                image, results = mediapipe_detection(frame, holistic)

                # Draw landmarks
                draw_styled_landmarks(image, results)
                
                # Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(5000)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
                    # Add frame counter display
                    cv2.putText(image, 'Frame: {}/{}'.format(frame_num, sequence_length-1), (15,30), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    
                    # Add instructions for controls
                    cv2.putText(image, 'R: Restart | P: Pause | C: Continue | Q: Quit', (15,50), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1, cv2.LINE_AA)
                    
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                
                # Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Handle key presses
                key = cv2.waitKey(10) & 0xFF
                if key == ord('q'):  # Quit if 'q' is pressed
                    break
                elif key == ord('r'):  # Restart current sequence if 'r' is pressed
                    frame_num = 0  # Reset frame counter
                    # Optional: Display restart message
                    restart_frame = image.copy()
                    cv2.putText(restart_frame, 'RESTARTING SEQUENCE', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 4, cv2.LINE_AA)
                    cv2.imshow('OpenCV Feed', restart_frame)
                    cv2.waitKey(1000)  # Brief pause to show restart message
                    continue  # Skip the frame_num increment
                elif key == ord('p'):  # Pause collection if 'p' is pressed
                    paused = True
                    continue  # Skip frame increment
                
                frame_num += 1  # Increment frame counter
            
            # Check if we need to break out of all loops (if q was pressed)
            if key == ord('q'):
                break
        
        # Check if we need to break out of all loops (if q was pressed)
        if key == ord('q'):
            break
                    
    cap.release()
    cv2.destroyAllWindows()

In [11]:
cap.release()
cv2.destroyAllWindows()

In [18]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

label_map = {label:num for num, label in enumerate(actions)}

In [19]:
label_map

{'afternoon': 0,
 'again': 1,
 'bye': 2,
 'deaf': 3,
 'dont_understand': 4,
 'evening': 5,
 'fast': 6,
 'fine': 7,
 'good': 8,
 'hello': 9,
 'how': 10,
 'how_much': 11,
 'job': 12,
 'later': 13,
 'maybe': 14,
 'morning': 15,
 'name': 16,
 'nice_to_meet': 17,
 'night': 18,
 'no': 19,
 'no_sign': 20,
 'please': 21,
 'see you': 22,
 'sign_language': 23,
 'slow': 24,
 'sorry': 25,
 'take_care': 26,
 'thank_you': 27,
 'today': 28,
 'tomorrow': 29,
 'understand': 30,
 'wait': 31,
 'welcome': 32,
 'what': 33,
 'when': 34,
 'where': 35,
 'who': 36,
 'why': 37,
 'which': 38,
 'yes': 39,
 'yesterday': 40,
 'you': 41}

In [22]:
sequences, labels = [], []
for action in actions:
    print(action)
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

afternoon
again
bye
deaf
dont_understand
evening
fast
fine
good
hello
how
how_much
job
later
maybe
morning
name
nice_to_meet
night
no
no_sign
please
see you
sign_language
slow
sorry
take_care
thank_you
today
tomorrow
understand
wait
welcome
what
when
where
who
why
which
yes
yesterday
you


In [23]:
X = np.array(sequences)

In [24]:
X.shape

(6720, 60, 258)

In [25]:
y = to_categorical(labels).astype(int)

In [26]:
np.save('X.npy', X)
np.save('y.npy', y)