In [111]:
import mediapipe as mp
import cv2 as cv
import os
import numpy as np

In [113]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('data') 

# Actions that we try to detect
# actions = np.array(['hello', 'thanks', 'iloveyou'])
actions = np.array(['yes', 'no'])

# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30

for action in actions:    
    for sequence in range(no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

In [115]:
def mediapipe_detection(image, model):
    image = cv.cvtColor(image, cv.COLOR_BGR2RGB) # Color conversion from BGR to RGB
    image.flags.writeable = False                  # Image is no longe writeable
    results = model.process(image)                 # Make prediction
    image.flags.writeable = True                   # Image is now writeable
    image = cv.cvtColor(image, cv.COLOR_RGB2BGR) # Color converson from RGB to BGR
    return image, results

In [117]:
def draw_styled_landmarks(image, results):
    # Draw Face connections - if we want just outlines of the face
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_CONTOURS,
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             )

                            # OR if we want mesh
    # mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION,
    #                          mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1),
    #                          mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
    #                          )
    # # Draw Pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=2),
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             )
    # Draw Left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=2),
                             mp_drawing.DrawingSpec(color=(121,44,121), thickness=2, circle_radius=2)
                             )
    # Draw Right hand connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=2),
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             )

In [119]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [121]:
mp_holistic = mp.solutions.holistic   # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

cap = cv.VideoCapture(0)

# Set webcam width and height
cap.set(3,1280)

# set mediapipe model
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    #NEW LOOP
    # Loop through action
    for action in actions:
        # Loop through sequences aka videos
        for sequence in range(no_sequences):
            # Loop through video length aka sequences length
            for frame_num in range(sequence_length):
                
                # Read Frame
                ret, frame = cap.read()
                frame = cv.flip(frame, 1)
                
                # Make detection
                image, results = mediapipe_detection(frame, holistic)
                
                # Draw Landmarks
                draw_styled_landmarks(image, results)
                
                # New apply wait logic
                if frame_num == 0:
                    cv.putText(image, "Starting Collection", (120,200),
                               cv.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 4, cv.LINE_AA)
                    cv.putText(image, "Collecting frames for {} video number {}".format(action, sequence), (15,12),
                               cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv.LINE_AA)
                    
                    # Show to Screen
                    cv.imshow("Opencv Feed", image)
                    cv.waitKey(300)
                    
                else:
                    cv.putText(image, "Collecting frames for {} video number {}".format(action, sequence), (15,12),
                               cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv.LINE_AA)
                    cv.imshow("Opencv Feed", image)

                # New Keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)
                
                # Brake gracefully
                if cv.waitKey(10) & 0xFF == ord("q"):
                    break
    cap.release()
    cv.destroyAllWindows()

# Preporcess data and create labels and features

In [60]:
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

In [62]:
label_map = {label:num for num, label in enumerate(actions)}
label_map

{'hello': 0, 'thanks': 1, 'iloveyou': 2}

In [64]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [66]:
X = np.array(sequences)

In [68]:
X.shape

(90, 30, 1662)

In [70]:
y = to_categorical(labels).astype(int)

In [72]:
y

array([[1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [1, 0, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0, 1, 0],
       [0,

In [74]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.05, shuffle=True, random_state=1)

In [76]:
X_train.shape

(85, 30, 1662)

In [78]:
X_test.shape

(5, 30, 1662)

In [7]:
# cap.release()
# cv.destroyAllWindows()

# Build Model

In [None]:
!conda install imgaug

In [92]:
import os
import cv2
import imgaug.augmenters as iaa
import pandas as pd
import numpy as np
import tensorflow as tf
import random
import datetime as dt
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from tensorflow.python.keras.layers import *
from tensorflow.keras.layers import *

ModuleNotFoundError: No module named 'imgaug'

In [82]:
early_stopping = tf.keras.callbacks.EarlyStopping(restore_best_weights=True,patience=10)

In [86]:
with tf.device('/GPU:0'):
    all_data_dir = 'data'
    image_height, image_width = 120, 160
    sequence_length = 8
    X, y = [], []

    image_seq_augmenter = iaa.Sequential([
        iaa.Fliplr(0),
        iaa.Crop(percent=(0, 0.1)),
        iaa.LinearContrast((0.75, 1.5)),
        iaa.GaussianBlur(sigma=(0.0, 1.0)),
        iaa.Multiply((0.8, 1.2), per_channel=0.2)
    ])

    for idx, class_name in enumerate(os.listdir(all_data_dir)):
        for image_seq_name in os.listdir(os.path.join(all_data_dir, class_name)):
            image_seq = []
            for frame_name in os.listdir(os.path.join(all_data_dir, class_name, image_seq_name)):
                frame = cv2.imread(os.path.join(all_data_dir, class_name, image_seq_name, frame_name))
                frame = cv2.resize(frame, (image_height, image_width))
                frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                image_seq.append(frame)
            image_seq_aug = image_seq_augmenter(images=image_seq)
            X.extend([image_seq, image_seq_aug])
            y.extend([idx for i in range(2)])

    X = (np.array(X) / 255.0).astype('float32') # (n_samples, n_frames, height, width, channels)
    y = np.array(y)                             # (n_samples)

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=True, random_state=1)

    early_stopping = tf.keras.callbacks.EarlyStopping(restore_best_weights=True,
                                                      patience=10)

    model = tf.keras.Sequential([
        TimeDistributed(Conv2D(16, 3, activation='relu', input_shape=(sequence_length, image_height, image_width, 3),
                   padding='same')),
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D()),
        TimeDistributed(Dropout(0.3)),

        TimeDistributed(Conv2D(32, 3, activation='relu', padding='same')),
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D()),
        TimeDistributed(Dropout(0.3)),

        TimeDistributed(Conv2D(64, 3, activation='relu', padding='same')),
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D()),
        TimeDistributed(Dropout(0.3)),

        TimeDistributed(Conv2D(64, 3, activation='relu', padding='same')),
        TimeDistributed(BatchNormalization()),
        TimeDistributed(MaxPooling2D()),
        TimeDistributed(Dropout(0.3)),

        TimeDistributed(Flatten()),
        LSTM(32),
        Dense(4),
    ])

    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.Adam(),
        metrics=['accuracy'],
    )

    model_train_hist = model.fit(
        X_train, y_train,
        shuffle=True,
        batch_size=4,
        epochs=70,
        validation_split=0.2,
        callbacks=[early_stopping],
    )

    model_eval_loss, model_eval_acc = model.evaluate(X_test, y_test)
    date_time_format = '%Y_%m_%d__%H_%M_%S'
    current_date_time_dt = dt.datetime.now()
    current_date_time_str = dt.datetime.strftime(current_date_time_dt, date_time_format)

    model_name = f'model__date_time_{current_date_time_str}__loss_{model_eval_loss}__acc_{model_eval_acc}__hand.h5'
    model.save(model_name)

    df_train_hist = pd.DataFrame(model_train_hist.history)
    df_train_hist.loc[:, ['loss', 'val_loss']].plot()
    plt.show()

NameError: name 'image_height' is not defined