In [1]:
# imports
import pandas as pd
import numpy as np
import cv2
import os
#import matplotlib.pyplot as plt
#%matplotlib inline
#import time
import mediapipe as mp

In [31]:
# SCRIPT 1 - Extracting key points - to test if everything is working

mp_holistic = mp.solutions.holistic # Holistic model - make our detection
mp_drawing = mp.solutions.drawing_utils # Drawing utilities - make our drawings

# FUNCTIONS:

# To extract keypoint values from frame using mediapipe
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) # COLOR CONVERSION BGR 2 RGB
    image.flags.writeable = False                  # Image is no longer writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # COLOR COVERSION RGB 2 BGR
    return image, results

# To draw landmarks and pose connections on the frame using the results extracted
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION) # Draw face connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS) # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw right hand connections

def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

# MAIN - Visualizing the Realtime pose estimations using opencv (Press 'q' to close window and break)

cap = cv2.VideoCapture(0) # establish video capture with your webcam

# Set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        #print(results)

        # Draw landmarks
        draw_landmarks(image, results)

        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully (Press 'q' to close window and break)
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

In [3]:
cap.release()
cv2.destroyAllWindows()

In [40]:
# Path for exported data BY Default the folder is presernt in the directory
if not os.path.exists(os.path.join('Data Collection')):
    os.makedirs(os.path.join('Data Collection'))

DATA_PATH = os.path.join('Data Collection')
# Actions that we try to detect, array of words we are training on:
#Allactions = np.array(['NoSign','hello','you','work',''where','thanks','sorry', 'how'])
# ['NoSign','hello','thanks','sorry','you','work','where','how']
# ['NoSign','how']
actions = np.array(['NoSign'])
# 30 videos worth of data
no_sequences = 40
# Videos are going to be 30 frames in length
sequence_length = 25

In [43]:
# when adding onto the dataset
DATA_PATH = os.path.join('Data Collection')
# ['NoSign','hello','thanks','sorry','you','work','where','how']

start_folder = 40
end_folder = 120

for action in actions:
    # to start a new word change start_folder to 0 and end to 40
    for sequence in range(start_folder, end_folder):
        try:
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass
print('Folders Created!!!!')

Folders Created!!!!


In [44]:
# Script 3 ---- Collect Training data

mp_holistic = mp.solutions.holistic # Holistic model - make our detection
mp_drawing = mp.solutions.drawing_utils # Drawing utilities - make our drawings

# FUNCTIONS:
# ------------- MAIN - Start Collection Loop -----------

#def Data_Collection(DATA_PATH, actions, no_sequences, sequence_length):
    
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    cv2.waitKey(1000)
    # Loop through actions
    for action in actions:
        # Loop through sequences aka videos
        blank_screen = np.zeros((480,640,3), dtype="uint8")
        cv2.imshow('OpenCV Feed', blank_screen)
        cv2.putText(blank_screen, 'Action: {}'.format(action), (170,210), 
                   cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0,255,0), 2, cv2.LINE_AA)
        cv2.imshow('OpenCV Feed', blank_screen)
        cv2.waitKey(500)

        #for sequence in range(no_sequences):
        for sequence in range(start_folder, end_folder):
            # Loop through video length aka sequence length
            for frame_num in range(sequence_length):

                # Read feed
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)
                # print(results)

                # Draw landmarks
                draw_landmarks(image, results)

                # NEW Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (17,14), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(500)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (17,14), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)

                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    break

    cap.release()
    cv2.destroyAllWindows()
    print('Data Collection Complete!!!')


# --------------- Function Call - Start Collection ------------ #

# # Path for exported data, numpy arrays
# DATA_PATH = os.path.join('Data Collection')
# # Actions that we try to detect
# actions = np.array(['NoSign','hello', 'thanks', 'iloveyou'])
# # 30 videos worth of data
# no_sequences = 30
# # Videos are going to be 30 frames in length
# sequence_length = 30

#Data_Collection(DATA_PATH, actions, no_sequences, sequence_length)


cap.release()
cv2.destroyAllWindows()

Data Collection Complete!!!


In [6]:
cap.release()
cv2.destroyAllWindows()

# 6. Preprocess Data and Create Labels and Features

In [18]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [19]:
label_map = {label:num for num, label in enumerate(actions)}

# DATA_PATH = os.path.join('Data Collection') - folder name for where the data is.

sequences, labels = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [20]:
np.array(sequences).shape # [40 videos x No. of actions, 25 frames per video, 1662 datapoints per frame]

(120, 25, 1662)

In [45]:
actions = np.array(['NoSign','hello','thanks','sorry','you','work','where','how'])
label_map = {label:num for num, label in enumerate(actions)}
#no_sequences = 40
sequence_length = 25

DATA_PATH = os.path.join('Data Collection')
sequences2, labels2 = [], []
for action in actions:
    for sequence in np.array(os.listdir(os.path.join(DATA_PATH, action))).astype(int):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences2.append(window)
        labels2.append(label_map[action])

In [48]:
np.array(sequences2).shape

(960, 25, 1662)