## USING CONV2D and CONV1D

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix

from bmis_emg_utils import *


import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

In [2]:
subject = 6
no_gesture = 7
fs = 200
notch_freq = 60.0
quality_factor = 30.0
fc = 10.0
fh = 99.0
order = 5
window_time = 200
overlap = 60
no_channel = 8 # use 1 for 1D Conv and 8 for 2D Conv

In [4]:
# Load data

data, label = get_data_subject_specific(subject, no_gesture, fs, notch_freq, quality_factor, fc, fh, order, window_time, overlap, no_channel)

print('The total data shape is {} and label is {}'.format(data.shape, label.shape))

#np.squeeze(data)



X_train, y_train, X_test, y_test = spilt_data(data, label, ratio=0.2)

y_train = np.squeeze(y_train, axis=1)
y_test = np.squeeze(y_test, axis=1)




X_train = np.expand_dims(X_train, axis=3)
X_test = np.expand_dims(X_test, axis=3)

X_train = np.transpose(X_train, (0, 3, 1, 2))
X_test = np.transpose(X_test, (0, 3, 1, 2))


print('Training Set is{} Test Set {}'.format(X_train.shape, X_test.shape))

print('Training Label Set is{} Test Label Set {}'.format(y_train.shape, y_test.shape))

The total data shape is (4148, 8, 40) and label is (4148, 1)
Training Set is(3318, 1, 8, 40) Test Set (830, 1, 8, 40)
Training Label Set is(3318,) Test Label Set (830,)


In [5]:
class CustomDataset(Dataset):
    def __init__(self, data, labels):
        self.data = torch.from_numpy(data).to(torch.float32)
        self.labels = torch.from_numpy(labels).to(torch.int64)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        x = self.data[idx]
        y = self.labels[idx]
        return x, y

In [6]:
train_dataset = CustomDataset(X_train, y_train)
test_dataset = CustomDataset(X_test, y_test)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=1)


test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=1)

In [7]:
for batch, (X, y) in enumerate(train_loader):
    
    print(X.shape, y.shape)

torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch.Size([16])
torch.Size([16, 1, 8, 40]) torch

In [8]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)  # Change the number of input channels to 1
        self.pool = nn.MaxPool2d(2, 2) # 2x2 max pooling
        self.conv2 = nn.Conv2d(32, 32, 3)
        self.fc1 = nn.Linear(1152, 151)
        self.fc3 = nn.Linear(151, 7)

    def forward(self, x):
        x = F.relu(self.conv1(x)) # 2x2 max pooling after first convolutional layer
        x = self.pool(F.relu(self.conv2(x))) # 2x2 max pooling after second convolutional layer

        # Flatten (reshape to 1D vector of size 16*5*5=400 for batch size 32)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc3(x), dim=1)

        return x

model = Net()

learning_rate = 1e-3
batch_size = 16
epochs = 30

# Initialize the loss function
loss_fn = nn.CrossEntropyLoss()
# Initialize the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, eps=1e-07)

In [None]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.conv1 = nn.Conv1d(1, 32, 3)  # Change the number of input channels to 1
        self.pool = nn.MaxPool1d(1, 1) # 2x2 max pooling
        self.conv2 = nn.Conv1d(32, 32, 3)
        self.fc1 = nn.Linear(1152, 151)
        self.fc3 = nn.Linear(151, 7)

    def forward(self, x):
        x = F.relu(self.conv1(x)) # 2x2 max pooling after first convolutional layer
        x = self.pool(F.relu(self.conv2(x))) # 2x2 max pooling after second convolutional layer

        # Flatten (reshape to 1D vector of size 16*5*5=400 for batch size 32)
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = F.softmax(self.fc3(x), dim=1)

        return x

model = Net()

learning_rate = 1e-3
batch_size = 16
epochs = 30

# Initialize the loss function
loss_fn = nn.CrossEntropyLoss()
# Initialize the optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, eps=1e-07)

In [9]:
def train_loop(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    # Set the model to training mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        # Compute prediction and loss

        optimizer.zero_grad()
        
        pred = model(X)
        loss = loss_fn(pred, y)
        accuracy = (pred.argmax(1) == y).type(torch.float).sum().item() / len(y)

        # Backpropagation
        loss.backward()
        optimizer.step()
        

        if batch % 100 == 0:
            loss, current = loss.item(), (batch + 1) * len(X)

            print(f"Training Accuracy: {accuracy*100:>5f}%")
            
            
            print(f"Training loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")
            
    


def test_loop(dataloader, model, loss_fn):
    # Set the model to evaluation mode - important for batch normalization and dropout layers
    # Unnecessary in this situation but added for best practices
    model.eval()
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    test_loss, correct = 0, 0

    # Evaluating th e model with torch.no_grad() ensures that no gradients are computed during test mode
    # also serves to reduce unnecessary gradient computations and memory usage for tensors with requires_grad=True



    all_preds = []
    all_labels = []
    
    with torch.no_grad():
        for X, y in dataloader:
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()

            _, preds = torch.max(pred , 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(y.cpu().numpy())

    test_loss /= num_batches
    correct /= size
    print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    
    return all_preds, all_labels

In [10]:
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train_loop(train_loader, model, loss_fn, optimizer)
    test_loop(test_loader, model, loss_fn)

Epoch 1
-------------------------------
Training Accuracy: 18.750000%
Training loss: 1.946236  [   16/ 3318]
Training Accuracy: 43.750000%
Training loss: 1.738241  [ 1616/ 3318]
Training Accuracy: 62.500000%
Training loss: 1.519277  [ 3216/ 3318]
Test Error: 
 Accuracy: 69.2%, Avg loss: 1.467238 

Epoch 2
-------------------------------
Training Accuracy: 75.000000%
Training loss: 1.404920  [   16/ 3318]
Training Accuracy: 81.250000%
Training loss: 1.347493  [ 1616/ 3318]
Training Accuracy: 93.750000%
Training loss: 1.235177  [ 3216/ 3318]
Test Error: 
 Accuracy: 75.7%, Avg loss: 1.408822 

Epoch 3
-------------------------------
Training Accuracy: 81.250000%
Training loss: 1.328267  [   16/ 3318]
Training Accuracy: 75.000000%
Training loss: 1.442610  [ 1616/ 3318]
Training Accuracy: 68.750000%
Training loss: 1.471652  [ 3216/ 3318]
Test Error: 
 Accuracy: 78.6%, Avg loss: 1.387416 

Epoch 4
-------------------------------
Training Accuracy: 100.000000%
Training loss: 1.208263  [   16/

In [None]:
all_labels, all_preds = test_loop(test_loader, model, loss_fn)
cm = confusion_matrix(all_labels, all_preds)


# Plot confusion matrix
plt.figure(figsize=(10, 7))
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Blues)
plt.title('Confusion Matrix')
plt.colorbar()
tick_marks = np.arange(7)  # Assuming 7 classes
plt.xticks(tick_marks, range(7), rotation=45)
plt.yticks(tick_marks, range(7))

# Loop over data dimensions and create text annotations
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        plt.text(j, i, cm[i, j], ha="center", va="center", color="red")

plt.tight_layout()
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()

In [None]:
train_loader.dataset.labels.dtype
train_loader.dataset.data.dtype

## CROSS SUBJECT EVALUATION

from torch.utils.data import TensorDataset, DataLoader

subject = 2
no_gesture = 7
fs = 200
notch_freq = 60.0
quality_factor = 30.0
fc = 10.0
fh = 99.0
order = 5
window_time = 200
overlap = 60
no_channel = 8


data_2, label_2 = get_data_subject_specific(subject, no_gesture, fs, notch_freq, quality_factor, fc, fh, order, window_time, overlap, no_channel)

X_train_2, y_train_2, X_test_2, y_test_2 = spilt_data(data_2, label_2, ratio=0.2)

y_train_2 = np.squeeze(y_train_2, axis=1)
y_test_2 = np.squeeze(y_test_2, axis=1)




X_train_2 = np.expand_dims(X_train_2, axis=3)
X_test_2 = np.expand_dims(X_test_2, axis=3)

X_train_2 = np.transpose(X_train_2, (0, 3, 1, 2))
X_test_2 = np.transpose(X_test_2, (0, 3, 1, 2))


print('Training Set is{} Test Set {}'.format(X_train.shape, X_test.shape))

# Create a TensorDataset object
train_dataset = TensorDataset(X_train_2, y_train_2)
test_dataset = TensorDataset(X_test_2, y_test_2)

# Create a DataLoader object
dataloader_train = DataLoader(train_dataset, batch_size=16, shuffle=True)
dataloader_test = DataLoader(test_dataset, batch_size=16, shuffle=False)

In [14]:
data = np.arange(100).reshape(2, 50)
data.shape

(2, 50)

In [15]:
window_size = 5
overlap = 2

In [16]:
def sliding_window(data, window_size, overlap):
    # Number of channels is the number of rows in the data
    num_channels = data.shape[0]

    # Number of datapoints is the number of columns in the data
    num_datapoints = data.shape[1]

    # Calculate the step size based on the window size and overlap
    step_size = window_size - overlap

    # Initialize an empty list to store the windows
    windows = []

    # Slide the window across the data
    for i in range(0, num_datapoints - window_size + 1, step_size):
        # Extract the window from the data
        window = data[:, i:i+window_size]

        # Add the window to the list of windows
        windows.append(window)

    return windows

In [18]:
windows = sliding_window(data, window_size, overlap)

windows = np.array(windows)
windows.shape

(16, 2, 5)

In [19]:
pwd

'/home/bmis/Documents/AI-Workspace/sEMGClassification/AdaptiveLearning/code'