In [19]:
%%script false
# Check if webcam functionality is working
import cv2

cam = cv2.VideoCapture(1)

while True:
    check, frame = cam.read()

    cv2.imshow('video', frame)

    if cv2.waitKey(1) == ord('q'):
        break

cam.release()
cv2.destroyAllWindows()

Couldn't find program: 'false'


In [20]:
from scipy import stats
from matplotlib import pyplot as plt 
import numpy as np
import cv2
import mediapipe as mp
import tensorflow as tf
import torch
import torch.nn as nn
import os
import shutil

In [21]:
# Check if GPU is available
import tensorflow as tf; print(tf.config.list_physical_devices('GPU'))
import torch; print(torch.cuda.is_available())

[PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU')]
True


##### Utils

In [22]:
def load_keypoints(folder_path):
    """
    Load the keypoints from a given file path.
    
    Parameters:
    path (str): Path to the .npy file containing the keypoints.
    
    Returns:
    list of np.ndarray: List of frames where each frame is a numpy array representing keypoints.
    """
    frames_keypoints = []

    files = [f for f in os.listdir(folder_path) if f.endswith('.npy')]
    # print(files)

    # arrange files in ascending order
    files.sort(key=lambda f: int(''.join(filter(str.isdigit, f))))

    # Load the keypoints from the .npy file
    for file in files:
        file_path = os.path.join(folder_path, file)
        frame_keypoints = np.load(file_path)
        frames_keypoints.append(frame_keypoints)

    return frames_keypoints

In [23]:
def save_keypoints(frames_keypoints, folder_path):
    """
    Save the keypoints to a given file path.
    
    Parameters:
    frames_keypoints (list of np.ndarray): List of frames where each frame is a numpy array representing keypoints.
    folder_path (str): Path to the folder where the keypoints will be saved.
    """
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
    else:
        # if exist, delete folder and recreate the folder
        shutil.rmtree(folder_path, ignore_errors=True)
        os.makedirs(folder_path)

    for i, frame_keypoints in enumerate(frames_keypoints):
        file_path = os.path.join(folder_path, f'{i + 1}.npy')
        np.save(file_path, frame_keypoints)

##### Mediapipie Initialization

In [24]:
mp_holistic = mp.solutions.holistic # Holistic model
mp_drawing = mp.solutions.drawing_utils # Drawing utilities

def mediapipe_detection(image,model):
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)  # Color conversion from BGR to RGB
    image.flags.writeable = False                   # Image is no longer writeable
    results = model.process(image)                  # Make prediction
    image.flags.writeable = True                    # Image is no longer writeable
    image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)   # Color conversion RGB to BGR
    return image, results

def draw_landmarks(image, results):
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)  # Draw pose connections
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS) # Draw left connections
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)    # Draw right connections

def draw_styled_landmarks(image,results):
    # Draw pose connection
    mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(0,0,255), thickness=5,circle_radius=5),
                              mp_drawing.DrawingSpec(color=(80,110,10), thickness=5,circle_radius=5)
                              )
    # Draw left hand connection
    mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(255, 255, 0), thickness=5,circle_radius=5),
                              mp_drawing.DrawingSpec(color=(255, 255, 0), thickness=5,circle_radius=5)
                              )
    # Draw right hand connection
    mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS,
                              mp_drawing.DrawingSpec(color=(255, 255, 0), thickness=5,circle_radius=5),
                              mp_drawing.DrawingSpec(color=(255, 255, 0), thickness=5,circle_radius=5)
                              )
    
def extract_keypoints(results):
    pose = np.array([[res.x, res.y, res.z, res.visibility] for res in results.pose_landmarks.landmark]).flatten() if results.pose_landmarks else np.zeros(33*4)
    lh = np.array([[res.x, res.y, res.z] for res in results.left_hand_landmarks.landmark]).flatten() if results.left_hand_landmarks else np.zeros(21*3)
    rh = np.array([[res.x, res.y, res.z] for res in results.right_hand_landmarks.landmark]).flatten() if results.right_hand_landmarks else np.zeros(21*3)
    return np.concatenate([pose,lh,rh])

colors = [(245,117,16), (117,245,16), (16,117,245)]
def prob_viz(res, actions, input_frame, colors):
    output_frame = input_frame.copy()
    for num, prob in enumerate(res):
        cv2.rectangle(output_frame, (0,60+num*40), (int(prob*100), 90+num*40), colors[num], -1)
        cv2.putText(output_frame, actions[num], (0, 85+num*40), cv2.FONT_HERSHEY_SIMPLEX, 1, (255,255,255), 2, cv2.LINE_AA)
        
    return output_frame

##### Model Initialization

In [29]:
video_directory = 'TRAIN_5'

sum = 0

gesture_folder = np.array(os.listdir(video_directory))
for gestures in gesture_folder:
    gesture = []

    for fname in os.listdir(os.path.join(video_directory, gestures)):
        path = os.path.join(video_directory, gestures, fname)
        if os.path.isdir(path):
            gesture.append(fname)

gestures = np.array(gesture_folder)
label_map = {label: num for num, label in enumerate(gesture_folder)}

In [30]:
%%script false
# Define your custom LSTM model
class CustomLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(CustomLSTM, self).__init__()
        self.lstm1 = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.lstm2 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.lstm3 = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.fc1 = nn.Linear(hidden_size, 64)
        self.fc2 = nn.Linear(64, 128)
        self.fc3 = nn.Linear(128, 64)
        self.fc4 = nn.Linear(64, 32)
        self.fc5 = nn.Linear(32, 32)
        self.output_layer = nn.Linear(32, num_classes)
        
    def forward(self, x):
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)
        x, _ = self.lstm3(x)
        x = torch.relu(self.fc1(x[:, -1, :]))
        x = torch.relu(self.fc2(x))
        x = torch.relu(self.fc3(x))
        x = torch.relu(self.fc4(x))
        x = torch.relu(self.fc5(x))
        x = self.output_layer(x)
        return x
    
# Instantiate the model
input_size = 258
hidden_size = 64
num_classes = len(label_map)
model = CustomLSTM(input_size, hidden_size, num_classes)

# Load the saved model state dictionary
model_filename = 'models/lstm_model_train_5_0.88.pth'
loaded_model_state_dict = torch.load(model_filename)

# Load the state dictionary into the model
model.load_state_dict(loaded_model_state_dict)
model.eval()  # Set the model to evaluation mode

Couldn't find program: 'false'


In [32]:
# %%script false
# load transformer model
# Define Transformer model for classification
class CustomTransformer(nn.Module):
    def __init__(self, input_size, num_classes, d_model=64, nhead=8, num_encoder_layers=3, dim_feedforward=128, dropout=0.1):
        super(CustomTransformer, self).__init__()
        self.input_projection = nn.Linear(input_size, d_model)  # Project input to model dimension
        self.positional_encoding = nn.Parameter(torch.zeros(1, 5000, d_model))  # Positional Encoding
        self.encoder = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(
                d_model=d_model,
                nhead=nhead,
                dim_feedforward=dim_feedforward,
                dropout=dropout,
                batch_first=True,  # Ensure batch is first dim
            ),
            num_layers=num_encoder_layers,
        )
        self.fc = nn.Linear(d_model, num_classes)  # Final classification layer

    def forward(self, x):
        # Project input to d_model dimension
        x = self.input_projection(x)
        
        # Add positional encoding
        seq_len = x.size(1)
        x = x + self.positional_encoding[:, :seq_len, :]
        
        # Pass through Transformer Encoder
        x = self.encoder(x)
        
        # Take the last token's representation for classification
        x = x[:, -1, :]
        x = self.fc(x)
        return x
    
# Model, loss, and optimizer
input_size = 258
num_classes = len(label_map)
model = CustomTransformer(input_size=input_size, num_classes=num_classes)

# Load the saved model state dictionary
model_filename = 'models/transformer_model_train_5_0.93.pth'
loaded_model_state_dict = torch.load(model_filename)

# Load the state dictionary into the model
model.load_state_dict(loaded_model_state_dict)
model.eval()  # Set the model to evaluation mode

  loaded_model_state_dict = torch.load(model_filename)


CustomTransformer(
  (input_projection): Linear(in_features=258, out_features=64, bias=True)
  (encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-2): 3 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=64, out_features=64, bias=True)
        )
        (linear1): Linear(in_features=64, out_features=128, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=128, out_features=64, bias=True)
        (norm1): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((64,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=False)
      )
    )
  )
  (fc): Linear(in_features=64, out_features=107, bias=True)
)

##### Frame Preprocessing

In [33]:
# Anchor Based Normalization
def normalize_keypoints(keypoints_sequence):
    '''
    - pose_keypoints: np.array of shape (33*4,), containing (x, y, z, visibility) for each of the 33 pose keypoints.
    - left_hand_keypoints: Optional, np.array of shape (21*3,), containing (x, y, z) for each of the 21 left hand keypoints.
    - right_hand_keypoints: Optional, np.array of shape (21*3,), containing (x, y, z) for each of the 21 right hand keypoints.
    '''

    # keypoint indices for should and nose
    LEFT_SHOULDER = 11
    RIGHT_SHOULDER = 12
    NOSE = 0

    # Define wrist indices in the hand keypoints
    LEFT_WRIST = 15
    RIGHT_WRIST = 16

    # Results array
    results = []
    
    for keypoints in keypoints_sequence:
        pose_keypoints = keypoints[:33*4]  # Extract pose keypoints
        left_hand_keypoints = keypoints[33*4:33*4+21*3]   # Extract left hand keypoints
        right_hand_keypoints = keypoints[33*4+21*3:]  # Extract right hand keypoints
        
        # Reshape pose into 33x4 array
        pose_keypoints = pose_keypoints.reshape(33, 4)

        # Extract (x, y) coordinates of the left and right shoulders
        left_shoulder = pose_keypoints[LEFT_SHOULDER, :2] # (x, y)
        right_shoulder = pose_keypoints[RIGHT_SHOULDER, :2] # (x, y)

        # Calculate neck as midpoint between left and right shoulders
        neck = (left_shoulder + right_shoulder) / 2.0

        # Extract (x, y) coordinates of the nose
        head = pose_keypoints[NOSE, :2]

        # Calculate the normalization factor (distance between neck and nose)
        norm_factor = np.linalg.norm(head - neck)

        # Avoid division by zero
        if norm_factor == 0:
            #set norm_factor to 1 if division by zero
            norm_factor = 1
            

        # Normalize pose keypoints (x_k, y_k)
        normalized_pose = np.copy(pose_keypoints)
        for k in range(33):
            normalized_pose[k, :2] = (pose_keypoints[k, :2] - neck) / norm_factor

        # reshape into 21x3 array
        left_hand_keypoints = left_hand_keypoints.reshape(21, 3)
        right_hand_keypoints = right_hand_keypoints.reshape(21, 3)

        # Get wrist coordinates
        left_wrist = pose_keypoints[LEFT_WRIST, :2] # (x, y)
        right_wrist = pose_keypoints[RIGHT_WRIST, :2] # (x, y)

        if np.all(left_hand_keypoints == 0.00000000e+00):
            # print(f"{npy_file} hand keypoints are all zeros")
            normalized_left = np.copy(left_hand_keypoints)
        else:
            normalized_left = np.copy(left_hand_keypoints)
            for k in range(21):
            # Align left hand to left wrist
                normalized_left[k, :2] = (left_hand_keypoints[k, :2] - left_hand_keypoints[0, :2]) / norm_factor + (left_wrist - neck) / norm_factor

        if np.all(right_hand_keypoints == 0.00000000e+00):
            # print(f"{npy_file} hand keypoints are all zeros")
            normalized_right = np.copy(right_hand_keypoints)
        else:
            normalized_right = np.copy(right_hand_keypoints)
            for k in range(21):
            # Align right hand to right wrist
                normalized_right[k, :2] = (right_hand_keypoints[k, :2] - right_hand_keypoints[0, :2]) / norm_factor + (right_wrist - neck) / norm_factor

        # Concatenate normalized pose and hand keypoints
        normalized_keypoints = np.concatenate([normalized_pose.flatten(), normalized_left.flatten(), normalized_right.flatten()])
        results.append(normalized_keypoints)

    # Return the normalized keypoints
    return results

##### Webcam

In [34]:
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas

# Visualization function
def plot_sequence(sequence):
    seq_array = np.array(sequence)
    fig, ax = plt.subplots(figsize=(4, 2))
    ax.imshow(seq_array.T, aspect='auto', interpolation='nearest', cmap='viridis')
    ax.set_title("Sequence Heatmap")
    ax.set_xlabel("Frame")
    ax.set_ylabel("Keypoints")
    plt.tight_layout()
    
    canvas = FigureCanvas(fig)
    canvas.draw()
    img = np.frombuffer(canvas.tostring_rgb(), dtype=np.uint8)
    img = img.reshape(canvas.get_width_height()[::-1] + (3,))
    plt.close(fig)
    return img

In [37]:
# Initialize variables
sequence = []
sentence = []
predictions = []
threshold = 0.5
frame_count = 0

cap = cv2.VideoCapture(1)
# Set mediapipe model 
with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
    while cap.isOpened():
        ret, frame = cap.read()

        frame_count += 1

        # Make detections
        image, results = mediapipe_detection(frame, holistic)
        print(results)
        
        # Draw landmarks
        draw_styled_landmarks(image, results)
        
        # 2. Prediction logic
        keypoints = extract_keypoints(results)
        sequence.append(keypoints)
        sequence = sequence[-30:]
        
        if len(sequence) == 30:
            # Normalize the keypoints
            normalized_keypoints = normalize_keypoints(sequence)

            res = model(torch.tensor(np.expand_dims(normalized_keypoints, axis=0),dtype=torch.float32))
            print(gestures[res.argmax(dim=1)])
            predictions.append(res.argmax(dim=1))

            # prediction logic
            if np.unique(predictions[-10:])[0]==res.argmax(dim=1): 
                if res.argmax(dim=1) > threshold: 
                    
                    if len(sentence) > 0: 
                        if gestures[res.argmax(dim=1)] != sentence[-1]:
                            sentence.append(gestures[res.argmax(dim=1)])
                    else:
                        sentence.append(gestures[res.argmax(dim=1)])

            if len(sentence) > 5: 
                sentence = sentence[-5:]

        if frame_count >= 30:
            frame_count = 0

        # Display frame count
        cv2.putText(image, f'Frame: {frame_count}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1, cv2.LINE_AA)
            
        cv2.rectangle(image, (0,0), (640, 40), (245, 117, 16), -1)
        cv2.putText(image, ' '.join(sentence), (1,30), 
                       cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 255, 255), 2, cv2.LINE_AA)

        # Show to screen
        cv2.imshow('OpenCV Feed', image)

        # Break gracefully
        if cv2.waitKey(10) & 0xFF == ord('q'):
            break
    cap.release()
    cv2.destroyAllWindows()

<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.solution_base.SolutionOutputs'>
<class 'mediapipe.python.soluti

In [None]:
# To do, create a reset gesture to clear out sequence