In [None]:
import cv2
import numpy as np # data set will be in numpy array format
import os # for file paths
from matplotlib import pyplot as plt
import time # delay between each frame
import mediapipe as mp # holistic 
mp_holistic = mp.solutions.holistic 

In [None]:
import os
import numpy as np
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

folder_path = 'MVP_dataset'
signs = []
unique_signs = set()

if os.listdir(folder_path):
    for sign in os.listdir(folder_path):
        sign_path = os.path.join(folder_path, sign)
        unique_signs.add(sign)
        for video in os.listdir(sign_path):
            video_path = os.path.join(sign_path, video)
            video_keypoints = []
            for file in os.listdir(video_path):
                if file.endswith('.npy'):
                    file_path = os.path.join(video_path, file)
                    keypoints = np.load(file_path)
                    video_keypoints.append(keypoints)
            signs.append(video_keypoints)

    signs = np.array(signs)

    if signs.size:
        label_encoder = LabelEncoder()
        integer_encoded = label_encoder.fit_transform(signs)
        onehot_encoder = OneHotEncoder(sparse=False)
        integer_encoded = integer_encoded.reshape(len(integer_encoded), 1)
        signs_labels = onehot_encoder.fit_transform(integer_encoded)
    else:
        print("The signs array is empty.")
else:
    print("No data to process.")


In [None]:
import torch
import torch.nn as nn

class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()

        self.lstm1 = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.lstm3 = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc1 = nn.Linear(hidden_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, output_dim)


    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)

        return x

model = LSTMModel(input_dim=258, hidden_dim=64, output_dim=signs_labels.shape[1])
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())

# Split the dataset with sci-kit learn before this, categorical split
signs = torch.tensor(signs, dtype=torch.float32)
signs_labels = torch.tensor(signs_labels, dtype=torch.long)


for epoch in range(2000):
    # Forward pass
    y_pred = model(signs)

    # Compute loss and accuracy
    loss = loss_fn(y_pred, signs_labels)
    _, prediction = y_pred.max(dim=1)
    accuracy = (prediction == signs_labels).float().mean()

    # Print loss and accuracy
    print('Epoch: ', epoch, 'Loss: ', loss.item(), 'Accuracy: ', accuracy.item()*100, '%')

    # Zero gradients, perform a backward pass, and update the weights.
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
torch.save(model, 'mvpX.pth')

In [None]:
#For loading a model
import torch
import torch.nn as nn
class LSTMModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim):
        super(LSTMModel, self).__init__()

        self.lstm1 = nn.LSTM(input_dim, hidden_dim, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.lstm3 = nn.LSTM(hidden_dim, hidden_dim, batch_first=True)
        self.fc1 = nn.Linear(hidden_dim, 64)
        self.fc2 = nn.Linear(64, 32)
        self.fc3 = nn.Linear(32, output_dim)


    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)

        return x

signs = np.array(['hello', 'sorry', 'help'])
model = LSTMModel(input_dim=258, hidden_dim=64, output_dim=signs.shape[0])
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters())
model = torch.load('mvp1.pth')

In [None]:
import optuna

def optimize(trial):
    input_dim = 258
    hidden_dim = trial.suggest_int('hidden_dim', 16, 128)
    output_dim = actions.shape[0]

    model = LSTMModel(input_dim=input_dim, hidden_dim=hidden_dim, output_dim=output_dim)
    loss_fn = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=trial.suggest_loguniform('lr', 1e-5, 1e-1))
  
    for epoch in range(500):
        # Forward pass
        y_pred = model(X_train)

        # Compute loss and accuracy
        loss = loss_fn(y_pred, y_train)
        _, prediction = y_pred.max(dim=1)
        accuracy = (prediction == y_train).float().mean()
    
        # Zero gradients, perform a backward pass, and update the weights.
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Early stopping based on validation loss
        if trial.should_prune(epoch, loss.item()):
            raise optuna.exceptions.TrialPruned()
        
    return accuracy.item()

study = optuna.create_study(direction='maximize')
study.optimize(optimize, n_trials=100)

best_params = study.best_params
print('Best params: ', best_params)


In [None]:
# Testing the model 



with torch.no_grad():
    y_pred = model(signs)
    loss = loss_fn(y_pred, signs_labels)
    _, prediction = y_pred.max(dim=1)
    accuracy = (prediction == signs_labels).float().mean()
    print('Test loss: ', loss.item(), 'Test accuracy: ', accuracy.item()*100, '%')

In [None]:
from sklearn.metrics import confusion_matrix



# Find the class with the highest probability for each sample
_, predictions_np = y_pred.max(dim=1)

# Convert the predictions to NumPy arrays
predictions_np = predictions_np.numpy()

# Convert the test set labels to integers
y_test_int = y_test.argmax(axis=1)

# Convert the predictions to integers
predictions_int = predictions_np.argmax(axis=1)

# Calculate the confusion matrix
cm = confusion_matrix(y_test_int, predictions_int)

print(cm)

In [None]:
import seaborn as sns

sns.heatmap(cm, annot=True)

In [None]:
#First part from Nick
def preprocess_frame(frame):
    
    frame.flags.writeable = False
    frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
    results = holistic.process(frame)  

    left_hand, right_hand, pose, landmarks = np.zeros(21 * 3), np.zeros(21 * 3), np.zeros(33 * 4), np.zeros(258)

    if not results.left_hand_landmarks:
        left_hand = np.zeros(21 * 3)
    else:
        lh = results.left_hand_landmarks
        for i, landmark in enumerate(lh.landmark):
            shift_ind = i * 3
            left_hand[shift_ind] = landmark.x
            left_hand[shift_ind + 1] = landmark.y
            left_hand[shift_ind + 2] = landmark.z            

    if not results.right_hand_landmarks:
        right_hand = torch.zeros(21 * 3)
    else:
        rh = results.right_hand_landmarks
        for j, landmark in enumerate(rh.landmark):
            shift_ind = j * 3
            right_hand[shift_ind] = landmark.x
            right_hand[shift_ind + 1] = landmark.y
            right_hand[shift_ind + 2] = landmark.z

    if not results.pose_landmarks:
        pose = torch.zeros(33 * 4)
    else:
        # Add pose keypoints (25 w/ 3d coordinates plus visbility probability)
        pl = results.pose_landmarks
        for k, landmark in enumerate(pl.landmark):
            # Ignore lower body (landmarks #25-33)


            shift_ind = k * 4
            pose[shift_ind] = landmark.x
            pose[shift_ind + 1] = landmark.y
            pose[shift_ind + 2] = landmark.z  
            pose[shift_ind + 3] = landmark.visibility  

    # Concatenate processed frame
    landmarks = np.concatenate([pose, left_hand, right_hand])
    

    hand_detected = False
    # Check if left hand is in frame
    if left_hand.any() and (np.min(left_hand[:21*3:3]) > x_min) and (np.max(left_hand[:21*3:3]) < x_max) and (np.min(left_hand[1:21*3:3]) > y_min) and (np.max(left_hand[1:21*3:3]) < y_max):
        hand_detected = True
    # Check if right hand is in frame
    if right_hand.any() and (np.min(right_hand[:21*3:3]) > x_min) and (np.max(right_hand[:21*3:3]) < x_max) and (np.min(right_hand[1:21*3:3]) > y_min) and (np.max(right_hand[1:21*3:3]) < y_max):
        hand_detected = True

    return landmarks, hand_detected

In [None]:
def get_holistic_model():
    # Get Mediapipe holistic solution
    mp_holistic = mp.solutions.holistic

    # Instantiate holistic model, specifying minimum detection and tracking confidence levels
    holistic = mp_holistic.Holistic(
        static_image_mode=True,
        min_detection_confidence=0.5,
        min_tracking_confidence=0.5) 
    
    return holistic

In [None]:
import time
from collections import deque
accuracy = 0.4
x_min, x_max = 0, 100
y_min, y_max = 0, 100
counter = 0
prev_pred = -1
current_prediction = ""
holistic = get_holistic_model()
queue = deque(maxlen=48)
cap = cv2.VideoCapture(0)
while cap.isOpened():

        ret, frame = cap.read()
        
        landmarks, hand_detected = preprocess_frame(frame)
        
        if hand_detected:  
                queue.append(landmarks)
                queue_model = torch.tensor(queue, dtype=torch.float32)
                output = model(queue_model)
                res = output[0]
                res = res.detach().numpy()
                
                if len(queue) == 48 and np.max(res) > accuracy:
                    pred = np.argmax(res)
                    if prev_pred == pred:
                        counter += 1
                    else:
                        counter = 0
                        prev_pred = pred
                    if counter == 5:
                        current_prediction = signs[pred]
                        prediction = signs[pred]
                        cv2.putText(frame, f'Prediction: {prediction}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        cv2.putText(frame, f'Prediction: {current_prediction}', (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
        # Show the frame
        cv2.imshow('frame', frame)
        

        # Check for user input
        key = cv2.waitKey(1)  # Wait for 1ms for the user to press a key
        if key == ord('q'):  # If the user pressed 'q', break out of the loop
            break

# Release the camera and destroy the window
cap.release()
cv2.destroyAllWindows()