In [11]:
!pip install Jupyter-Beeper opencv-python numpy mediapipe



In [12]:
import cv2
import numpy as np
import os
import time
import mediapipe as mp
import shutil
import jupyter_beeper

In [13]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [14]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

In [15]:
def draw_styled_landmarks(image, results):
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [16]:
def extract_keypoints(results):
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([lh, rh])

# Setting Up Folders for Data Collection

In [17]:
# Path for exported data, numpy arrays
DATA_PATH = 'data'
# frame_path= 'D:\\Final_Project\\frames'
# detection_path= 'D:\\Final_Project\\detected_frames'/

# Actions that we try to detect
# actions = np.array(['A', 'B', 'C','D', 'E', 'F','G', 'H', 'I','J', 'K', 'L','M', 'N', 'O','P', 'Q', 'R','S', 'T', 'U','V', 'W','X', 'Y', 'Z'])
actions = np.array(['NO_SIGN','A', 'B', 'C'])


# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30

# Folder start
start_folder = 0

In [18]:
for action in actions: 
    for sequence in range(start_folder,start_folder+no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
            # os.makedirs(os.path.join(frame_path, action, str(sequence)))
            # os.makedirs(os.path.join(detection_path, action, str(sequence)))
        except:
            pass

In [19]:
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # NEW LOOP
    action=actions[2] #m
    # Loop through actions
    sequence=start_folder
    while(sequence < start_folder+no_sequences):
        # Loop through video length aka sequence length
        for frame_num in range(sequence_length):
            
            # fr_path = os.path.join(frame_path, action, str(sequence), str(frame_num)+".jpg")
            # dfr_path = os.path.join(detection_path, action, str(sequence), str(frame_num)+".jpg")
            
                # Read feed
            ret, frame = cap.read()
            

                # Make detections
            image, results = mediapipe_detection(frame, holistic)

                # Draw landmarks
            draw_styled_landmarks(image, results)
                
                # NEW Apply wait logic
            if frame_num == 0: 
                cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                cv2.imshow('OpenCV Feed', image)
            else: 
                cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                           cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                cv2.imshow('OpenCV Feed', image)
                
               
            # cv2.imwrite(dfr_path, image)
            # cv2.imwrite(fr_path, frame)
                # NEW Export keypoints
            keypoints = extract_keypoints(results)
            npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
            np.save(npy_path, keypoints)

                # Break gracefully
            if cv2.waitKey(10) & 0xFF == ord('q'):
                #cv2.putText(image, 'CURRENT SEQUENCE DISCARD', (120,200), 
                 #            cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                print('quit')
                cv2.waitKey(2000)
                break
        
        cv2.putText(image, 'PRESS Y IF REQUIRED NOW', (120,200), 
                             cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
        cv2.imshow('OpenCV Feed', image)
        if cv2.waitKey(30000) & 0xFF == ord('y'):
            sequence=sequence+1
            cv2.waitKey(3000)
            b = jupyter_beeper.Beeper()
            # Default config is frequency=440 Hz, secs=0.7 seconds, and
            # blocking=False (b.beep() will return when the sound begins)
            b.beep(frequency=530, secs=0.2, blocking=True)
            cv2.waitKey(1000)
        elif cv2.waitKey(30000) & 0xFF == ord('q'):
                cap.release()
                cv2.destroyAllWindows()
        else:
            cv2.putText(image, 'CURRENT SEQUENCE DISCARD', (120,200), 
                           cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
            # shutil.rmtree(os.path.join(frame_path,action,str(sequence)))
            # shutil.rmtree(os.path.join(detection_path,action,str(sequence)))
            shutil.rmtree(os.path.join(DATA_PATH,action,str(sequence)))
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
            # os.makedirs(os.path.join(frame_path, action, str(sequence)))
            # os.makedirs(os.path.join(detection_path, action, str(sequence)))
            cv2.waitKey(1000)
                
    cap.release()
    cv2.destroyAllWindows()

I0000 00:00:1738247421.659851    5846 gl_context_egl.cc:85] Successfully initialized EGL. Major : 1 Minor: 5
I0000 00:00:1738247421.663491  124373 gl_context.cc:369] GL version: 3.2 (OpenGL ES 3.2 Mesa 24.0.9-0ubuntu0.3), renderer: AMD Radeon Graphics (radeonsi, renoir, LLVM 17.0.6, DRM 3.57, 6.8.0-51-generic)
W0000 00:00:1738247421.770473  124356 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1738247421.839390  124361 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1738247421.844126  124365 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.
W0000 00:00:1738247421.844500  124364 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. 

In [20]:
cap.release()
cv2.destroyAllWindows()