# Data Collection

20 Words (daily used words): <br>
"Family", "Friends", "Work", "School", "Home", "Car", "Happy", "Sad", "Play", "Help", "Eat", "Drink", "Sleep", "Sorry", "Computer", "Money", "Phone", "Cloth", "Me", "Stop"

## Import Library

In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp

## Keypoint using Mediapipe Holistic

In [2]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

In [3]:
# Color conversion 
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR to RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable 
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB to BGR
    return image, results

In [4]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=2), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=1)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=1)
                             ) 

## Extract Keypoint Values

In [5]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

## Setup Folders for Data Collection

In [6]:
# Desired path for data
desired_path = "C:\\Users\\erwin\\Desktop\\ASL_Translation_FYP"

# Creating a subfolder for MP_Data within the desired path
# DATA_PATH = os.path.join(desired_path, 'ASL_Dataset')

DATA_PATH = os.path.join(desired_path, 'ASL_Dataset2')  

# Actions that we try to detect
actions = np.array(["Family", "Friends", "Work", "School", "Home", "Car", "Happy", "Sad", "Play", 
                    "Help", "Eat", "Drink", "Sleep", "Sorry", "Computer", "Money", "Phone", "Cloth", "Me", "Stop"])

# Thirty videos worth of data
no_sequences = 40

# Videos are going to be 30 frames in length
sequence_length = 30

# Folder start
start_folder = 1

In [8]:
for action in actions:
    action_path = os.path.join(DATA_PATH, action)
    
    # Check if the action directory exists, create if not
    if not os.path.exists(action_path):
        os.makedirs(action_path)
        dirmax = 0
    else:
        # List directories that are numeric and find the max
        dir_list = [int(dir_name) for dir_name in os.listdir(action_path) if dir_name.isdigit()]
        if dir_list:  # If the directory is not empty and has numeric folders
            dirmax = max(dir_list)
        else:
            dirmax = 0

    # Calculate how many new directories need to be created to reach a total of 40
    existing_dirs_count = len(dir_list)
    new_dirs_to_create = no_sequences - existing_dirs_count

    # Create directories if fewer than 40 exist
    if new_dirs_to_create > 0:
        for sequence in range(1, new_dirs_to_create + 1):
            new_dir_path = os.path.join(action_path, str(dirmax + sequence))
            os.makedirs(new_dir_path, exist_ok=True)
    else:
        print(f"No new directories needed for '{action}'. Already has {existing_dirs_count} directories.")

## Data Collection

In [9]:
action = input(f"Choose an action to record {actions}: ")

if action not in actions:
    print("Invalid action selected.")
else:
    cap = cv2.VideoCapture(0)

    # Set mediapipe model
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        stop = False  # Global flag to stop recording
        # Loop through 40 sequences for the chosen action
        for sequence in range(1, no_sequences + 1):
            if stop:
                break
            
            # Reset frame before each video starts
            ret, frame = cap.read()
            if not ret:
                print("Failed to grab frame.")
                break

            image, results = mediapipe_detection(frame, holistic)
            draw_styled_landmarks(image, results)

            initial_text = f'Start Collection for {action} - Video {sequence}'
            cv2.putText(image, initial_text, (15, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0,0,255), 2, cv2.LINE_AA)
            cv2.imshow('OpenCV Feed', image)
            cv2.waitKey(2000)  # Show initial text for a bit longer

            for frame_num in range(sequence_length):
                ret, frame = cap.read()
                if not ret:
                    print("Failed to grab frame.")
                    break

                image, results = mediapipe_detection(frame, holistic)
                draw_styled_landmarks(image, results)

                # Update the display text for frame collection
                display_text = f'Collecting frames for {action} - Video {sequence}'
                cv2.putText(image, display_text, (15,20), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255), 1, cv2.LINE_AA)
                cv2.imshow('OpenCV Feed', image)

                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Check for 'q' to quit
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    stop = True
                    break

            if stop:
                print("Recording stopped by user.")
                break

    cap.release()
    cv2.destroyAllWindows()

Recording stopped by user.
