In [None]:
import numpy as np

import torch
import torch.nn as nn
import torch.optim as optim

from torch.utils.data import Dataset, DataLoader, random_split
from torch.utils.tensorboard import SummaryWriter

In [None]:
# Load raw data from the file
file_path = "MW.txt"  # Replace with actual file path
with open(file_path, "r") as file:
    lines = file.readlines()

# Extract codes (4th column) and brainwaves (7th column onward)
codes = []
brainwaves = []

for line in lines[1:]:  # Skip header
    parts = line.strip().split("\t")  # Assuming tab-separated values
    code = parts[4]  # 4th column (code)
    brainwave_values = list(map(float, parts[6:][0].split(',')))  # 7th column to end (brainwave signals)
    
    codes.append(int(code))
    brainwaves.append(brainwave_values)

In [None]:
target_length = max(len(ts) for ts in brainwaves)  # Choose max length

# Function to interpolate a time series to the target length
def interpolate_timeseries(series, target_length):
    x_old = np.linspace(0, 1, len(series))  # Original time points
    x_new = np.linspace(0, 1, target_length)  # New time points
    return np.interp(x_new, x_old, series).tolist()

# Apply interpolation to each series
resampled_brainwaves = [interpolate_timeseries(ts, target_length) for ts in brainwaves]

print(resampled_brainwaves)

In [None]:
# Convert to NumPy arrays
codes = np.array(codes)  # String values
brainwave_array = np.array(resampled_brainwaves)  # Numerical brainwave data

# Save separately
np.save("codes.npy", codes)  # Save codes
np.save("brainwaves.npy", brainwave_array)  # Save brainwave signals

print("Brainwave data and codes saved successfully!")

ValueError: setting an array element with a sequence. The requested array has an inhomogeneous shape after 1 dimensions. The detected shape was (67634,) + inhomogeneous part.

In [8]:
class EEGDataset(Dataset):
    def __init__(self, brainwave_array, codes):
        """
        Custom PyTorch Dataset for EEG data.
        
        Parameters:
        - brainwave_array: NumPy array of shape (N, 1024), EEG signals
        - codes: NumPy array of shape (N, 1), corresponding labels
        """
        self.X = torch.tensor(brainwave_array, dtype=torch.float32)  # Convert to tensor
        self.y = torch.tensor(codes, dtype=torch.long).squeeze()  # Convert to tensor, remove extra dim
        
    def __len__(self):
        return len(self.X)  # Number of samples
    
    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]  # Returns (EEG signal, code)

# Load NumPy arrays
brainwave_array = np.load("brainwaves.npy")  # Shape (N, 1024)
codes = np.load("codes.npy")+1 # Shape (N, 1)

# Create Dataset
dataset = EEGDataset(brainwave_array, codes)

In [9]:
# Split the dataset into training (80%) and validation (20%) sets
train_size = int(0.8 * len(dataset))  # 80% for training
val_size = len(dataset) - train_size  # Remaining 20% for validation
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Create DataLoader for both training and validation datasets
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

In [10]:
class EEGClassifier(nn.Module):
    def __init__(self, num_classes=11):  # You can change the number of classes
        super(EEGClassifier, self).__init__()
        
        self.conv1 = nn.Conv1d(in_channels=1, out_channels=32, kernel_size=3, stride=1, padding=1)  # (1, 1024) -> (32, 1024)
        self.pool = nn.MaxPool1d(2)  # Downsample by a factor of 2
        self.conv2 = nn.Conv1d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)  # (32, 1024) -> (64, 1024)
        self.conv3 = nn.Conv1d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)  # (64, 1024) -> (128, 1024)
        
        self.fc1 = nn.Linear(128 * 128, 512)  # Flattened features (128 channels, downsampled size)
        self.fc2 = nn.Linear(512, num_classes)  # Output layer for classification

        self.dropout = nn.Dropout(p=0.5)  # Dropout for regularization

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension (N, 1024) -> (N, 1, 1024)
        
        x = self.pool(torch.relu(self.conv1(x)))  # Conv1 + ReLU + MaxPool
        x = self.pool(torch.relu(self.conv2(x)))  # Conv2 + ReLU + MaxPool
        x = self.pool(torch.relu(self.conv3(x)))  # Conv3 + ReLU + MaxPool
        
        x = x.view(x.size(0), -1)  # Flatten (N, 128, 128) -> (N, 128*128)
        
        x = self.dropout(torch.relu(self.fc1(x)))  # FC1 + ReLU + Dropout
        x = self.fc2(x)  # Output layer
        return x

In [None]:
# Initialize TensorBoard
writer = SummaryWriter('runs/EEG_classification')

# Initialize the model, criterion, and optimizer
model = EEGClassifier(num_classes=11)  # Assuming 10 classes (modify for your case)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [12]:
def train_model(model, train_loader, val_loader, epochs=10):
    model.train()
    global_step = 0  # To keep track of steps for TensorBoard logging
    
    for epoch in range(epochs):
        train_loss = 0
        train_correct = 0
        train_total = 0
        
        # Training phase
        for eeg_batch, code_batch in train_loader:
            eeg_batch, code_batch = eeg_batch.to(device), code_batch.to(device)
            
            optimizer.zero_grad()
            outputs = model(eeg_batch)  # Forward pass
            loss = criterion(outputs, code_batch)
            loss.backward()  # Backpropagation
            optimizer.step()  # Optimizer step
            
            train_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            train_correct += (predicted == code_batch).sum().item()
            train_total += code_batch.size(0)
            
            # TensorBoard: Log every 10 steps
            if global_step % 10 == 0:
                writer.add_scalar('Training Loss', loss.item(), global_step)
                writer.add_scalar('Training Accuracy', (predicted == code_batch).sum().item() / code_batch.size(0), global_step)
                
            global_step += 1
        
        # Calculate average loss and accuracy for training set
        avg_train_loss = train_loss / len(train_loader)
        train_accuracy = train_correct / train_total
        
        # Validation phase
        model.eval()
        val_loss = 0
        val_correct = 0
        val_total = 0
        
        with torch.no_grad():
            for eeg_batch, code_batch in val_loader:
                eeg_batch, code_batch = eeg_batch.to(device), code_batch.to(device)
                outputs = model(eeg_batch)
                loss = criterion(outputs, code_batch)
                
                val_loss += loss.item()
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == code_batch).sum().item()
                val_total += code_batch.size(0)
                
        # Calculate average loss and accuracy for validation set
        avg_val_loss = val_loss / len(val_loader)
        val_accuracy = val_correct / val_total
        
        # Log to TensorBoard
        writer.add_scalar('Validation Loss', avg_val_loss, epoch)
        writer.add_scalar('Validation Accuracy', val_accuracy, epoch)
        
        # Print results
        print(f"Epoch [{epoch+1}/{epochs}]")
        print(f"Train Loss: {avg_train_loss:.4f}, Train Accuracy: {train_accuracy:.4f}")
        print(f"Validation Loss: {avg_val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}")
    
    writer.close()  # Close the TensorBoard writer

In [None]:
train_model(model, train_loader, val_loader, epochs=10)