In [13]:
import numpy as np
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, random_split
from scipy import signal
import wfdb

# Check if CUDA is available and set the device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. Load the Dataset
def load_signal(file_path):
    # Function to load a single signal file from wfdb
    record = wfdb.rdrecord(file_path, sampto=1000)
    return record.p_signal

def load_dataset(base_path):
    signals = []
    labels = []
    for session in range(1, 4):
        for subject in range(1, 44):
            session_path = os.path.join(base_path, f'Session{session}/session{session}_participant{subject}')
            for gesture in range(1, 18):
                for trial in range(1, 8):
                    dat_file = f'session{session}_participant{subject}_gesture{gesture}_trial{trial}'
                    signal = load_signal(os.path.join(session_path, dat_file))
                    signals.append(signal)
                    labels.append(gesture)  # Append the gesture label
    return np.array(signals), np.array(labels)

# 2. Generate Spectrograms
def generate_spectrogram(np_signal):
    f, t, Sxx = signal.spectrogram(np_signal, fs=2048)  # Adjust fs as per the .hea file
    return Sxx

def create_spectrograms(signals):
    spectrograms = []
    for signal in signals:
        spec = generate_spectrogram(signal.flatten())
        spectrograms.append(spec)
    return np.array(spectrograms)

# 3. Define the Dataset Class
class EMGDataset(Dataset):
    def __init__(self, signals, labels):
        self.signals = signals
        self.labels = labels

    def __len__(self):
        return len(self.signals)

    def __getitem__(self, idx):
        x = self.signals[idx]  # Get the signal
        x = torch.tensor(x, dtype=torch.float32)  # Convert to PyTorch tensor
        x = x.unsqueeze(0)  # Add a channel dimension: (1, H, W)
        x = x.repeat(32, 1, 1)  # Repeat to have 32 channels
        y = self.labels[idx] - 1  # Assuming labels are 1-based; convert to 0-based
        return x, torch.tensor(y, dtype=torch.long)

# 4. Define the TCN Model
class TCNModel(nn.Module):
    def __init__(self, input_channels, num_classes):
        super(TCNModel, self).__init__()
        self.tcn = nn.Sequential(
            nn.Conv2d(input_channels, 64, kernel_size=(3, 3), padding=(1, 1)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1)),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),
            nn.Flatten(),
            nn.Linear(128 * (input_height // 4) * (input_width // 4), num_classes)  # Adjust according to input size
        )

    def forward(self, x):
        return self.tcn(x)

# 5. Set hyperparameters and load data
BASE_PATH = '_your_path'  # Update with your dataset path
BATCH_SIZE = 4
NUM_EPOCHS = 25

# Load the dataset
signals, labels = load_dataset(BASE_PATH)
spectrograms = create_spectrograms(signals)

# Create the dataset and dataloaders
emg_dataset = EMGDataset(spectrograms, labels)
train_size = int(0.8 * len(emg_dataset))
test_size = len(emg_dataset) - train_size
train_dataset, test_dataset = random_split(emg_dataset, [train_size, test_size])

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Initialize the model
input_channels = 32
num_classes = 17  # Adjust based on your specific task

input_height, input_width = spectrograms[0].shape  # Get size of the first spectrogram

model = TCNModel(input_channels=input_channels, num_classes=num_classes).to(device)

# 6. Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# 7. Training Loop
for epoch in range(NUM_EPOCHS):
    model.train()  # Set model to training mode
    total_loss = 0

    for inputs, labels in train_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()  # Zero the parameter gradients
        outputs = model(inputs)  # Forward pass
        loss = criterion(outputs, labels)  # Calculate loss
        loss.backward()  # Backward pass
        optimizer.step()  # Update weights

        total_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{NUM_EPOCHS}], Loss: {total_loss / len(train_loader):.4f}')

# 8. Testing Loop
model.eval()  # Set model to evaluation mode
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)

        outputs = model(inputs)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

accuracy = correct / total
print(f'Accuracy: {accuracy * 100:.2f}%')


Epoch [1/25], Loss: 2.8399
Epoch [2/25], Loss: 2.7952
Epoch [3/25], Loss: 2.6864
Epoch [4/25], Loss: 2.5822
Epoch [5/25], Loss: 2.4563
Epoch [6/25], Loss: 2.3884
Epoch [7/25], Loss: 2.3340
Epoch [8/25], Loss: 2.2828
Epoch [9/25], Loss: 2.2261
Epoch [10/25], Loss: 2.1850
Epoch [11/25], Loss: 2.1259
Epoch [12/25], Loss: 2.0822
Epoch [13/25], Loss: 2.0374
Epoch [14/25], Loss: 2.0111
Epoch [15/25], Loss: 1.9725
Epoch [16/25], Loss: 1.9388
Epoch [17/25], Loss: 1.8876
Epoch [18/25], Loss: 1.8849
Epoch [19/25], Loss: 1.8761
Epoch [20/25], Loss: 1.8053
Epoch [21/25], Loss: 1.7705
Epoch [22/25], Loss: 1.7487
Epoch [23/25], Loss: 1.7227
Epoch [24/25], Loss: 1.6862
Epoch [25/25], Loss: 1.6391
Accuracy: 34.58%
