In [1]:
import cv2
import numpy as np
import os
from matplotlib import pyplot as plt
import time
import mediapipe as mp
import signal
from sklearn.model_selection import train_test_split
from tensorflow.keras.utils import to_categorical

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

### Landmark detection for LSTM

In [2]:
def signal_handler(signal, frame):
    # KeyboardInterrupt detected, exiting
    global is_interrupted
    is_interrupted = True


In [3]:
mp_holistic = mp.solutions.holistic 
mp_drawing = mp.solutions.drawing_utils

In [4]:
def mediapipe_detection(image, model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image.flags.writeable = False            
    results = model.process(image)          
    image.flags.writeable = True           
    image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
    return image, results

In [5]:
def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)

In [6]:
def draw_styled_landmarks(image, results):
    # Draw face connections
    mp_drawing.draw_landmarks(image, results.face_landmarks, mp_holistic.FACEMESH_TESSELATION, 
                             mp_drawing.DrawingSpec(color=(80,110,10), thickness=1, circle_radius=1), 
                             mp_drawing.DrawingSpec(color=(80,256,121), thickness=1, circle_radius=1)
                             ) 
    # Draw pose connections
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                             mp_drawing.DrawingSpec(color=(80,22,10), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(80,44,121), thickness=2, circle_radius=2)
                             ) 
    # Draw left hand connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(121,22,76), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(121,44,250), thickness=2, circle_radius=2)
                             ) 
    # Draw right hand connections  
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS, 
                             mp_drawing.DrawingSpec(color=(245,117,66), thickness=2, circle_radius=4), 
                             mp_drawing.DrawingSpec(color=(245,66,230), thickness=2, circle_radius=2)
                             ) 

In [37]:
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        # print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)

        # Show to screen
        cv2.imshow('OpenCV Feed', image)
        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

### Extract the Keypoints 

In [7]:
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    face = np.array([[res.x, res.y, res.z] for res in results.face_landmarks.landmark]).flatten() if results.face_landmarks else np.zeros(468*3)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose, face, lh, rh])

In [9]:
result_test = extract_keypoints(results)

In [10]:
468*3+33*4+21*3+21*3

1662

In [11]:
len(result_test)

1662

### Setting up the folders

In [8]:
# Path for exported data, numpy arrays
DATA_PATH = os.path.join('MP_Data') 

# Actions that we try to detect
actions = np.array(['hello', 'thanks', 'iloveyou'])

# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30

In [13]:
for action in actions: 
    for sequence in range(no_sequences):
        try: 
            os.makedirs(os.path.join(DATA_PATH, action, str(sequence)))
        except:
            pass

# Manual Data collection

In [14]:
cap = cv2.VideoCapture(0)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    
    # NEW LOOP
    # Loop through actions
    for action in actions:
        # Loop through sequences aka videos
        for sequence in range(no_sequences):
            # Loop through video length aka sequence length
            for frame_num in range(sequence_length):

                # Read feed
                ret, frame = cap.read()

                # Make detections
                image, results = mediapipe_detection(frame, holistic)
#                 print(results)

                # Draw landmarks
                draw_styled_landmarks(image, results)
                
                # NEW Apply wait logic
                if frame_num == 0: 
                    cv2.putText(image, 'STARTING COLLECTION', (120,200), 
                               cv2.FONT_HERSHEY_SIMPLEX, 1, (0,255, 0), 4, cv2.LINE_AA)
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                    cv2.waitKey(2000)
                else: 
                    cv2.putText(image, 'Collecting frames for {} Video Number {}'.format(action, sequence), (15,12), 
                               cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1, cv2.LINE_AA)
                    # Show to screen
                    cv2.imshow('OpenCV Feed', image)
                
                # NEW Export keypoints
                keypoints = extract_keypoints(results)
                npy_path = os.path.join(DATA_PATH, action, str(sequence), str(frame_num))
                np.save(npy_path, keypoints)

                # Break gracefully
                if cv2.waitKey(10) & 0xFF == ord('q'):
                    break
                    
    cap.release()
    cv2.destroyAllWindows()

# pre proc

In [9]:

actions = np.array(['hello', 'thanks', 'iloveyou'])

# Thirty videos worth of data
no_sequences = 30

# Videos are going to be 30 frames in length
sequence_length = 30
label_map = {label:num for num, label in enumerate(actions)}
label_map

{'hello': 0, 'thanks': 1, 'iloveyou': 2}

In [10]:
sequences, labels = [], []
for action in actions:
    for sequence in range(no_sequences):
        window = []
        for frame_num in range(sequence_length):
            res = np.load(os.path.join(DATA_PATH, action, str(sequence), "{}.npy".format(frame_num)))
            window.append(res)
        sequences.append(window)
        labels.append(label_map[action])

In [11]:
np.array(sequences).shape
X = np.array(sequences)

In [12]:
X.shape

(90, 30, 1662)

In [13]:
y = to_categorical(labels).astype(int)

In [14]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1,random_state=133)

In [15]:
y_test.shape

(9, 3)

### PyTorch

In [16]:
X_train = X_train.astype(np.float32)
y_train = y_train.astype(np.float32)
X_test = X_test.astype(np.float32)
y_test = y_test.astype(np.float32)
training_dataset = torch.utils.data.TensorDataset(torch.from_numpy(X_train), torch.from_numpy(y_train))
testing_dataset  = torch.utils.data.TensorDataset(torch.from_numpy(X_test), torch.from_numpy(y_test))
trainloader = torch.utils.data.DataLoader(training_dataset, shuffle=False, batch_size=100)
testloader = torch.utils.data.DataLoader(testing_dataset, shuffle=False)

In [17]:
X_train.shape

(81, 30, 1662)

In [18]:
input_shape = (30,1662)
class LSTM_Model(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTM_Model, self).__init__()
        
        self.lstm1 = nn.LSTM(input_dim, hidden_dim, batch_first=True, 
                             num_layers=1, bidirectional=False)
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim*2, batch_first=True,
                             num_layers=1, bidirectional=True)
        self.lstm3 = nn.LSTM(hidden_dim*4, hidden_dim, batch_first=True,
                             num_layers=1, bidirectional=False)
        self.fc1 = nn.Linear(hidden_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, output_dim)
        
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1)
        
    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        
        x = self.fc1(x[:, -1, :])
        x = self.relu(x)
        x = self.fc2(x)
        x = self.relu(x)
        x = self.fc3(x)
        x = self.softmax(x)
        
        return x


In [19]:
input_dim = 1662
hidden_dim = 64
output_dim = 3
lr = 0.001
batch_size = 100
num_epochs = 500

In [20]:
model = LSTM_Model(input_dim, hidden_dim, output_dim)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()

In [None]:
for epoch in range(num_epochs) :
    for i, data in enumerate(trainloader) :
        x,y = data
        optimizer.zero_grad()
        outputs = model(x)
        loss = criterion(outputs,y)
        loss.backward()
        optimizer.step()

In [22]:
XtestTorch = torch.from_numpy(X_test)
YtestTorch = torch.from_numpy(y_test)

In [23]:
model.eval()
y_pred = model(XtestTorch)

In [24]:
torch.save(model.state_dict(),"./modelwt1.pth")

# Inference

Directly loading model from model weights

In [25]:
input_dim = 1662
hidden_dim = 64
output_dim = 3
model = LSTM_Model(input_dim,hidden_dim,output_dim) # we do not specify pretrained=True, i.e. do not load default weights
model.load_state_dict(torch.load('modelwt.pth'))

<All keys matched successfully>

## realtime vid

In [26]:
actions = np.array(['hello', 'thanks', 'iloveyou'])

In [27]:
def get_predictions(sequences):
    X_test = np.array(sequences)
    X_test.shape

    X_test = X_test.astype(np.float32)
    XtestTorch = torch.from_numpy(X_test)

    model.eval()
    y_pred = model(XtestTorch)
    y_pred = y_pred.tolist()[0]

    #convert predictions to percentages
    sum = y_pred[0]+y_pred[1]+y_pred[2]
    predictions = []
    predictions.append(round((y_pred[0]/sum)*100,3))
    predictions.append(round((y_pred[1]/sum)*100,3))
    predictions.append(round((y_pred[2]/sum)*100,3))

    return predictions

In [28]:
cap = cv2.VideoCapture(0)
# Set mediapipe model 
sequence =[]
window = []
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():

        # Read feed
        ret, frame = cap.read()

        # Make detections
        image, results = mediapipe_detection(frame, holistic)

        keypoints = extract_keypoints(results)

        if(len(window)<30):
            window.append(keypoints)
        else:
            window.pop(0)
            window.append(keypoints)
        

        sequence = []
        sequence.append(window)
        predictions = get_predictions(sequence)
        
        print(predictions)

        
        draw_styled_landmarks(image, results)
        image = cv2.flip(image,1)
        image = cv2.putText(image, 'Hello: '+str(predictions[0]), (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 
                   1, (255, 0, 0), 2, cv2.LINE_AA)
        image = cv2.putText(image, 'Thanks: '+str(predictions[1]), (50, 100), cv2.FONT_HERSHEY_SIMPLEX, 
                   1, (255, 0, 0), 2, cv2.LINE_AA)
        image = cv2.putText(image, 'I love you: '+str(predictions[2]), (50, 150), cv2.FONT_HERSHEY_SIMPLEX, 
                   1, (255, 0, 0), 2, cv2.LINE_AA)
        
        cv2.imshow('OpenCV Feed', image)
        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

[75.028, 1.455, 23.518]
[98.502, 0.002, 1.496]
[99.836, 0.0, 0.164]
[99.932, 0.0, 0.068]
[99.951, 0.0, 0.049]
[99.957, 0.0, 0.043]
[99.96, 0.0, 0.04]
[99.961, 0.0, 0.039]
[99.962, 0.0, 0.038]
[99.962, 0.0, 0.038]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.963, 0.0, 0.037]
[99.962, 0.0, 0.038]
[99.962, 0.0, 0.038]
[99.961, 0.0, 0.039]
[99.961, 0.0, 0.039]
[99.96, 0.0, 0.04]
[99.96, 0.0, 0.04]
[99.96, 0.0, 0.04]
[99.961, 0.0, 0.039]
[99.961, 0.0, 0.