In [125]:
import numpy as np
import matplotlib.pyplot as plt
import sys
import os
import copy
import pandas as pd
import time
from sklearn.preprocessing import StandardScaler

# Loading in data

In [126]:
print("Loading")
path_Ben_linux_1 = "/home/rubin/Research/data/metadata_IMU_EMG_allgestures_allusers(1).pkl"

start_time = time.time()
data_df = pd.read_pickle(path_Ben_linux_1)
end_time = time.time()
print(f"Completed in {end_time - start_time}s")

Loading
Completed in 0.04612112045288086s


# Split Dataset into train and test split, for now just use unimpaired

In [127]:
pIDs_impaired = ['P102','P103','P104','P105','P106','P107','P108','P109','P110','P111',
       'P112','P114','P115','P116','P118','P119','P121','P122','P123','P124','P125',
       'P126','P127','P128', 'P131', 'P132']
# note participants P001 and P003 because they dont have duplicate or open gestures
pIDs_unimpaired = ['P004','P005','P006','P008','P010','P011']

def split_and_preprocess_by_user(data_df, training_users, test_users):
    metadata_cols = ['Participant', 'Gesture_ID', 'Gesture_Num']

    # Split data by users
    train_df = data_df[data_df['Participant'].isin(training_users)]
    test_df = data_df[data_df['Participant'].isin(test_users)]

    # Subset metadata columns for training and testing sets
    train_metadata_df = train_df[metadata_cols].reset_index(drop=True)
    test_metadata_df = test_df[metadata_cols].reset_index(drop=True)

    # Drop metadata columns from the dataframes
    train_df = train_df.drop(metadata_cols, axis=1).reset_index(drop=True)
    test_df = test_df.drop(metadata_cols, axis=1).reset_index(drop=True)

    # Scale the data
    train_scaler = StandardScaler()

    # Fit on training data and transform both train and test sets
    ppd_train_df = pd.DataFrame(train_scaler.fit_transform(train_df))
    ppd_test_df = pd.DataFrame(train_scaler.transform(test_df))


    # Split IMU and EMG data
    ppd_train_imu_df = ppd_train_df.iloc[:, :72]
    ppd_train_emg_df = ppd_train_df.iloc[:, 72:]
    ppd_test_imu_df = ppd_test_df.iloc[:, :72]
    ppd_test_emg_df = ppd_test_df.iloc[:, 72:]

    # Concatenate metadata back to the processed data
    ppd_train_imu_df = pd.concat([train_metadata_df, ppd_train_imu_df], axis=1)
    ppd_train_emg_df = pd.concat([train_metadata_df, ppd_train_emg_df], axis=1)
    ppd_test_imu_df = pd.concat([test_metadata_df, ppd_test_imu_df], axis=1)
    ppd_test_emg_df = pd.concat([test_metadata_df, ppd_test_emg_df], axis=1)

    return ppd_train_imu_df, ppd_train_emg_df, ppd_test_imu_df, ppd_test_emg_df
    
ppd_train_imu_df, ppd_train_emg_df, ppd_test_imu_df, ppd_test_emg_df = split_and_preprocess_by_user(data_df, ['P006','P008','P010','P011'], ['P004','P005'])


In [128]:
print(ppd_train_emg_df.shape)
ppd_train_emg_df.head()

(25600, 19)


Unnamed: 0,Participant,Gesture_ID,Gesture_Num,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87
0,P006,pan,6,-0.743813,-0.545412,-0.716175,-1.265814,-0.471762,-0.765925,-0.394592,-0.814324,-0.666034,-0.420698,-0.45558,-0.523804,-0.401575,-0.589676,-0.013454,-0.459606
1,P006,pan,6,-0.739128,-0.566745,-0.712895,-1.261461,-0.495683,-0.638692,-0.286238,-0.812573,-0.666577,-0.409303,-0.452457,-0.523252,-0.485012,-0.601681,-0.052729,-0.466065
2,P006,pan,6,-0.720411,-0.593631,-0.716057,-1.271174,-0.457003,-0.693394,-0.096152,-0.82789,-0.665456,-0.442158,-0.450521,-0.520637,-0.437734,-0.597538,-0.005312,-0.384134
3,P006,pan,6,-0.699357,-0.535351,-0.720962,-1.269762,-0.514277,-0.593569,-0.228329,-0.829022,-0.668456,-0.448856,-0.448315,-0.525809,-0.438102,-0.600117,-0.122651,-0.359784
4,P006,pan,6,-0.700718,-0.510521,-0.719368,-1.256697,-0.573755,-0.743579,-0.303105,-0.826969,-0.669708,-0.450974,-0.446557,-0.53187,-0.434066,-0.604493,-0.286525,-0.374547


In [129]:
import numpy as np
import torch
from torch.utils.data import Dataset

class EMG_IMU_Dataset(Dataset):
    def __init__(self, emg_df, imu_df, num_channels_emg=16, num_channels_imu=72, time_units=64):
        # Create labels array using only Gesture_ID from EMG data
        self.labels = emg_df['Gesture_ID'].values
        
        # Exclude metadata columns and reshape the EMG and IMU data
        emg_data = emg_df.drop(['Participant', 'Gesture_ID', 'Gesture_Num'], axis=1).values
        imu_data = imu_df.drop(['Participant', 'Gesture_ID', 'Gesture_Num'], axis=1).values

        # EMG data processing: (num_samples, time_units, num_channels_emg)
        num_samples_emg = len(emg_data) // time_units
        self.emg_data = emg_data.reshape(num_samples_emg, time_units, num_channels_emg).transpose((0, 2, 1))

        # IMU data processing: (num_samples, time_units, num_channels_imu)
        num_samples_imu = len(imu_data) // time_units
        self.imu_data = imu_data.reshape(num_samples_imu, time_units, num_channels_imu).transpose((0, 2, 1))

        # Create a dictionary to map each unique Gesture_ID to an integer label
        unique_labels = np.unique(self.labels)
        self.label_map = {label: i for i, label in enumerate(unique_labels)}
        
        # Map labels to integers
        self.labels = np.array([self.label_map[label] for label in self.labels[:num_samples_emg * time_units:time_units]])

        # Create a dictionary to map (Participant, Gesture_ID, Gesture_Num) to index
        self.index_map = {(row['Participant'], row['Gesture_ID'], row['Gesture_Num']): idx // time_units 
                          for idx, row in emg_df.iterrows()}

        # Sanity check
        print(f"EMG Data shape: {self.emg_data.shape}")
        print(f"IMU Data shape: {self.imu_data.shape}")
        print(f"Labels shape: {self.labels.shape}")
        print(f"Label mapping: {self.label_map}")
    
    def __len__(self):
        return len(self.emg_data)

    def __getitem__(self, idx):
        if isinstance(idx, tuple):
            # Get item by (Participant, Gesture_ID, Gesture_Num)
            idx = self.index_map[idx]
        
        emg_data = torch.tensor(self.emg_data[idx], dtype=torch.float32)
        imu_data = torch.tensor(self.imu_data[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        return emg_data, imu_data, label


In [136]:
from torch.utils.data import DataLoader

# Create dataset instances
train_dataset = EMG_IMU_Dataset(ppd_train_emg_df, ppd_train_imu_df)
test_dataset = EMG_IMU_Dataset(ppd_test_emg_df, ppd_test_imu_df)

# Create DataLoader instances
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)


EMG Data shape: (400, 16, 64)
IMU Data shape: (400, 72, 64)
Labels shape: (400,)
Label mapping: {'close': 0, 'delete': 1, 'duplicate': 2, 'move': 3, 'open': 4, 'pan': 5, 'rotate': 6, 'select-single': 7, 'zoom-in': 8, 'zoom-out': 9}
EMG Data shape: (200, 16, 64)
IMU Data shape: (200, 72, 64)
Labels shape: (200,)
Label mapping: {'close': 0, 'delete': 1, 'duplicate': 2, 'move': 3, 'open': 4, 'pan': 5, 'rotate': 6, 'select-single': 7, 'zoom-in': 8, 'zoom-out': 9}


In [131]:
import torch.nn as nn

class EMG_IMU_CNN(nn.Module):
    def __init__(self, num_classes=10):
        super(EMG_IMU_CNN, self).__init__()
        
        # Convolutional Block for EMG Data (16 channels)
        self.emg_conv1 = nn.Conv1d(in_channels=16, out_channels=32, kernel_size=3, padding=1)
        self.pool = nn.MaxPool1d(kernel_size=2)
        self.emg_conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.emg_conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)

        # Convolutional Block for IMU Data (72 channels)
        self.imu_conv1 = nn.Conv1d(in_channels=72, out_channels=32, kernel_size=3, padding=1)
        self.imu_conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.imu_conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, padding=1)

        # Fully connected layers after concatenation
        # Flattened size of EMG and IMU branches (128 * 8 each)
        self.fc1 = nn.Linear(128 * 8 * 2, 256)  # Multiply by 2 for concatenation of EMG and IMU
        self.fc2 = nn.Linear(256, 128)
        self.fc3 = nn.Linear(128, num_classes)

    def forward(self, emg_input, imu_input):
        # Forward pass for EMG data
        x_emg = torch.relu(self.emg_conv1(emg_input))
        x_emg = self.pool(x_emg)
        x_emg = torch.relu(self.emg_conv2(x_emg))
        x_emg = self.pool(x_emg)
        x_emg = torch.relu(self.emg_conv3(x_emg))
        x_emg = self.pool(x_emg)
        
        # Forward pass for IMU data
        x_imu = torch.relu(self.imu_conv1(imu_input))
        x_imu = self.pool(x_imu)
        x_imu = torch.relu(self.imu_conv2(x_imu))
        x_imu = self.pool(x_imu)
        x_imu = torch.relu(self.imu_conv3(x_imu))
        x_imu = self.pool(x_imu)

        # Flatten both EMG and IMU outputs
        x_emg = torch.flatten(x_emg, start_dim=1)
        x_imu = torch.flatten(x_imu, start_dim=1)

        # Concatenate the EMG and IMU outputs
        x = torch.cat((x_emg, x_imu), dim=1)

        # Pass through the fully connected layers
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        
        return x


In [142]:
def train(model, optimizer, dataloader, criterion, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for emg_data, imu_data, labels in dataloader:
        # Move EMG data, IMU data, and labels to the device (GPU or CPU)
        emg_data, imu_data, labels = emg_data.to(device), imu_data.to(device), labels.to(device)

        # Zero the gradients
        optimizer.zero_grad()

        # Forward pass with both EMG and IMU data
        outputs = model(emg_data, imu_data)
        loss = criterion(outputs, labels)
        
        # Backward pass
        loss.backward()

        # Update the weights
        optimizer.step()

        # Update running loss
        running_loss += loss.item()
        
        # Compute accuracy
        _, predicted = outputs.max(1)
        total += labels.size(0)
        correct += predicted.eq(labels).sum().item()

    # Calculate final accuracy for the epoch
    accuracy = 100. * correct / total
    return running_loss / len(dataloader), accuracy


In [143]:
def evaluate(model, dataloader, criterion, device):
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():  # Disable gradient computation
        for emg_data, imu_data, labels in dataloader:
            # Move EMG data, IMU data, and labels to the device (GPU or CPU)
            emg_data, imu_data, labels = emg_data.to(device), imu_data.to(device), labels.to(device)
            
            # Forward pass
            outputs = model(emg_data, imu_data)
            loss = criterion(outputs, labels)
            
            # Update running loss
            running_loss += loss.item()
            
            # Compute accuracy
            _, predicted = outputs.max(1)
            total += labels.size(0)
            correct += predicted.eq(labels).sum().item()

    # Compute accuracy over the whole dataset
    accuracy = 100. * correct / total
    return running_loss / len(dataloader), accuracy


In [147]:
def train_model(model, train_loader, test_loader, criterion, optimizer, num_epochs, device):   
    model.to(device)
    for epoch in range(num_epochs):
        start_time = time.time()

        # Train the model for one epoch
        train_loss, train_acc = train(model, optimizer, train_loader, criterion, device)

        end_time = time.time()
        epoch_time = end_time - start_time

        print(f"Epoch {epoch+1}/{num_epochs}")
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Epoch Time: {epoch_time:.2f} seconds")
        test_loss, test_acc = evaluate(model, test_loader, criterion, device)
        print(f"Test Loss: {test_loss:.4f}, Test Acc: {test_acc:.2f}%")


In [149]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize EMG_IMU_CNN model and move it to the device
emgimu_model = EMG_IMU_CNN().to(device)

# Number of epochs
num_epochs = 100

# Loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(emgimu_model.parameters(), lr=0.0001)

# Train the EMG-IMU model
train_model(emgimu_model, train_loader, test_loader, criterion, optimizer, num_epochs, device)



Epoch 1/100
Train Loss: 2.3015, Train Acc: 12.00%
Epoch Time: 0.11 seconds
Test Loss: 2.3204, Test Acc: 7.00%
Epoch 2/100
Train Loss: 2.2937, Train Acc: 14.75%
Epoch Time: 0.09 seconds
Test Loss: 2.3244, Test Acc: 10.00%
Epoch 3/100
Train Loss: 2.2810, Train Acc: 13.50%
Epoch Time: 0.09 seconds
Test Loss: 2.3294, Test Acc: 10.00%
Epoch 4/100
Train Loss: 2.2664, Train Acc: 19.25%
Epoch Time: 0.09 seconds
Test Loss: 2.3393, Test Acc: 10.50%
Epoch 5/100
Train Loss: 2.2490, Train Acc: 22.00%
Epoch Time: 0.08 seconds
Test Loss: 2.3673, Test Acc: 9.50%
Epoch 6/100
Train Loss: 2.2309, Train Acc: 29.00%
Epoch Time: 0.08 seconds
Test Loss: 2.4092, Test Acc: 9.50%
Epoch 7/100
Train Loss: 2.2014, Train Acc: 30.75%
Epoch Time: 0.07 seconds
Test Loss: 2.5085, Test Acc: 11.50%
Epoch 8/100
Train Loss: 2.1597, Train Acc: 35.50%
Epoch Time: 0.07 seconds
Test Loss: 2.6194, Test Acc: 11.50%
Epoch 9/100
Train Loss: 2.1037, Train Acc: 47.50%
Epoch Time: 0.07 seconds
Test Loss: 2.7881, Test Acc: 14.50%
Epoc