In [3]:
# Working torch version. By pressing enter you start to record a gesture of arbitrary length, stop recording by pressing enter again, 
# repeat for desired number of gestures. The recorded gestures are then compiled to a pytorch dataset used to train the model bellow
# to classify the gestures.

import torch
from torch import nn
import cv2
import time
import mediapipe as mp
import numpy as np
import signal
import sys
import csv

# Initialize MediaPipe Holistic
mp_holistic = mp.solutions.holistic
mp_drawing = mp.solutions.drawing_utils

# Initialize variables
capturing = False
gestures = {}
pose_check = {}
poserow_check = {}

# Function to handle interrupt signal
def signal_handler_csv(sig, frame):
    global gestures
    print('You pressed Ctrl+C or stopped the script!')
    export_gestures_to_csv(gestures)
    cap.release()
    cv2.destroyAllWindows()
    sys.exit(0)

def signal_handler_torch(sig, frame):
    global gestures
    print('You pressed Ctrl+C or stopped the script!')
    store_as_torch_tensors(gestures)
    cap.release()
    cv2.destroyAllWindows()
    sys.exit(0)

def store_as_torch_tensors(gestures):
    for name, gesture in gestures.items():
        numpy_array = np.array(gesture, dtype=np.float32)  # Ensure the NumPy array is of type float32
        torch_tensor = torch.from_numpy(numpy_array)
        gestures[name] = torch_tensor

# Set the signal handler
signal.signal(signal.SIGINT, signal_handler_torch)

# Function to export gestures to CSV
def export_gestures_to_csv(gestures):
    with open('gestures_data.csv', mode='w', newline='') as f:
        csv_writer = csv.writer(f, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
        for gesture_name, gesture_data in gestures.items():
            for pose in gesture_data:
                row = [gesture_name] + pose
                csv_writer.writerow(row)
    print("Gesture data exported to 'gestures_data.csv'.")


    #return torch_tensor

# Function to capture poses
def capture_poses():
    global capturing, gestures, cap
    # Initialize Video Capture
    cap = cv2.VideoCapture(0)
    frame_width = int(cap.get(3))
    current_pose = []
    current_gesture = []

    # Initiate holistic model
    with mp_holistic.Holistic(min_detection_confidence=0.5, min_tracking_confidence=0.5) as holistic:
        while cap.isOpened():
            ret, frame = cap.read()
            if not ret:
                print("Failed to grab frame")
                break

            # Recolor Feed
            image = cv2.cvtColor(cv2.flip(frame, 1), cv2.COLOR_BGR2RGB)
            image.flags.writeable = False

            # Make Detections
            results = holistic.process(image)

            # Recolor image back to BGR for rendering
            image.flags.writeable = True
            image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)

            # Draw landmarks
            mp_drawing.draw_landmarks(image, results.right_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
            mp_drawing.draw_landmarks(image, results.left_hand_landmarks, mp_holistic.HAND_CONNECTIONS)
            mp_drawing.draw_landmarks(image, results.pose_landmarks, mp_holistic.POSE_CONNECTIONS)

            # Capture pose and save data
            if capturing:
                try:
                    pose = results.pose_landmarks.landmark
                    #pose_check[0] = pose
                    pose_row = np.array([[landmark.x, landmark.y, landmark.z] for landmark in pose])#.flatten())
                    poserow_check[0] = pose_row
                    current_gesture.append(pose_row)
                except AttributeError:
                    pass  # Handle the case where no landmarks are detected

                cv2.putText(image, "Capturing... ", (frame_width // 2 - 20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
            else:
                cv2.putText(image, "Press 'Enter' to start/stop capturing", (frame_width // 2 - 20, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)

            cv2.imshow('Movement, Music & Machines', image)

            key = cv2.waitKey(1) & 0xFF
            if key == ord('\r'):  # Enter key is pressed
                if capturing:
                    capturing = False
                    #ts = time.time() - t
                    gesture_name = f"gesture_{len(gestures) + 1}"
                    gestures[gesture_name] = current_gesture #, [ts]]
                    current_gesture = []
                    print(f"Capture stopped. Gesture {gesture_name} saved.")
                else:
                    capturing = True
                    #t = time.time()
                    print("Capture started.")
            elif key == ord('q'):
                break

    cap.release()
    cv2.destroyAllWindows()
    return gestures

# Start capturing poses
gestures = capture_poses()

INFO: Created TensorFlow Lite XNNPACK delegate for CPU.


: 

In [None]:
from torch.utils.data import Dataset, DataLoader
from torch.optim import Adam
from sklearn.model_selection import train_test_split
import numpy as np
from torch import nn
from spiralnet import instantiate_model as instantiate_spiralnet 
from torch.nn import init

class GestureDataset(Dataset):
    def __init__(self, gestures):
        self.gestures = gestures
        self.labels = [i for i in range(len(gestures))]

    def __len__(self):
        return len(self.gestures)

    def __getitem__(self, idx):
        gesture_name = f"gesture_{idx + 1}"
        gesture = self.gestures[gesture_name]
        label = self.labels[idx]
        return gesture, label
    

class SpiralnetClassifierGRU(nn.Module):
    def __init__(self, nr_of_classes, embedding_dim=32, nr_spiralnet_layers=4, nr_rnn_layers=2):
        super(SpiralnetClassifierGRU, self).__init__()
        self.nr_of_gesture_classes = nr_of_classes
        self.embedding_dim = embedding_dim
        self.spiralnet = instantiate_spiralnet(nr_layers=nr_spiralnet_layers, output_dim=self.embedding_dim)
        self.layer_norm = nn.LayerNorm(self.embedding_dim)
        self.gru = nn.GRU(self.embedding_dim, self.embedding_dim, nr_rnn_layers, bidirectional=False, batch_first=False)
        self.gelu = nn.GELU()
        self.fc = nn.Linear(self.embedding_dim, self.nr_of_gesture_classes)
        self.softmax = nn.Softmax(dim=1)

        for param in self.gru.parameters():
            if len(param.shape) >= 2:
                init.xavier_uniform_(param)

    def forward(self, x):
        x = self.spiralnet(x)
        x = self.layer_norm(x)
        x, _ = self.gru(x)
        x = self.gelu(x[-1])
        logits = self.fc(x)
        return logits

nr_of_classes = len(list(gestures.keys()))
model = SpiralnetClassifierGRU(nr_of_classes)

criterion = nn.CrossEntropyLoss()
optimizer = Adam(model.parameters(), lr=0.001)

def train_model(model, dataset, criterion, optimizer, num_epochs=100):
    for epoch in range(num_epochs):
        running_loss = 0.0
        for i in range(len(dataset)):
            gesture, label = dataset[i]
            optimizer.zero_grad()
            outputs = model(gesture)  # Assuming gesture is of correct shape
            loss = criterion(outputs.unsqueeze(0), torch.tensor([label], dtype=torch.long))  # Corrected target dtype
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            
            #if i % 10 == 9:  # Print every 10 gestures
            print('[%d, %5d] loss: %.3f' %
                      (epoch + 1, i + 1, running_loss / 10))
            running_loss = 0.0

    print('Finished Training')


# Dataset
gesture_dataset = GestureDataset(gestures)

# Train the model
train_model(model, gesture_dataset, criterion, optimizer)
