In [54]:
import torch
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import Dataset

from torch.utils.data import DataLoader

from sklearn.model_selection import train_test_split

## Building the 1D-CNN with residual connections.

In [55]:
class ResidualBlock1D(nn.Module):
    def __init__(self, in_channels, out_channels, stride=1):
        super().__init__()
        
        # First conv layer
        self.conv1 = nn.Conv1d(
            in_channels, 
            out_channels, 
            kernel_size=3, 
            stride=stride, 
            padding=1
        )
        self.bn1 = nn.BatchNorm1d(out_channels)
        
        # Second conv layer
        self.conv2 = nn.Conv1d(
            out_channels, 
            out_channels, 
            kernel_size=3, 
            stride=1, 
            padding=1
        )
        self.bn2 = nn.BatchNorm1d(out_channels)
        
        # If we change channels or stride, we need to adjust the skip path
        if stride != 1 or in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv1d(in_channels, out_channels, kernel_size=1, stride=stride),
                nn.BatchNorm1d(out_channels)
            )
        else:
            self.downsample = None

    def forward(self, x):
        identity = x  # save original input for skip connection
        
        out = self.conv1(x)
        out = self.bn1(out)
        out = F.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        
        # Adjust identity if needed
        if self.downsample is not None:
            identity = self.downsample(identity)
        
        # Add skip connection
        out = out + identity
        out = F.relu(out)
        
        return out


In [56]:
class ResNet1D(nn.Module):
    def __init__(self, n_channels, n_classes):
        super().__init__()
        
        # Initial convolution "stem"
        self.conv1 = nn.Conv1d(
            in_channels=n_channels,
            out_channels=64,
            kernel_size=7,
            stride=2,
            padding=3
        )
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        
        # Residual stages
        self.layer1 = self._make_layer(64, 64, num_blocks=2, stride=1)
        self.layer2 = self._make_layer(64, 128, num_blocks=2, stride=2)
        self.layer3 = self._make_layer(128, 256, num_blocks=2, stride=2)
        self.layer4 = self._make_layer(256, 512, num_blocks=2, stride=2)
        
        # Global average pooling over time dimension
        self.global_pool = nn.AdaptiveAvgPool1d(1)  # output: (batch, channels, 1)
        
        # Final classifier
        self.fc = nn.Linear(512, n_classes)

    def _make_layer(self, in_channels, out_channels, num_blocks, stride):
        layers = []
        # First block may change channels/stride
        layers.append(ResidualBlock1D(in_channels, out_channels, stride=stride))
        # Remaining blocks keep same channels/stride=1
        for _ in range(1, num_blocks):
            layers.append(ResidualBlock1D(out_channels, out_channels, stride=1))
        return nn.Sequential(*layers)

    def forward(self, x):
        # x shape: (batch, n_channels, n_times)
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        
        # Residual layers
        x = self.layer1(x)  # shape: (batch, 64, T1)
        x = self.layer2(x)  # shape: (batch, 128, T2)
        x = self.layer3(x)  # shape: (batch, 256, T3)
        x = self.layer4(x)  # shape: (batch, 512, T4)
        
        # Global average pooling: average over time dimension
        x = self.global_pool(x)  # (batch, 512, 1)
        x = x.squeeze(-1)        # (batch, 512)
        
        # Classifier
        logits = self.fc(x)      # (batch, n_classes)
        return logits


In [57]:
#initialze the CNN

n_channels = 32      # or however many EEG channels we have
n_classes = 2        # ADHD vs Control

model = ResNet1D(n_channels=n_channels, n_classes=n_classes)

## Building the training Loop

In [58]:
#Dataset class

class EEGDataset(Dataset):
    def __init__(self, X, y):
        self.X = X.float()
        self.y = y.long()

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [59]:
# split X and Y based on subjects 
X_raw = np.load("../data/X_tqwt_wpd.npy")
y_raw = np.load("../data/y_labels.npy")
subject_ids = np.load("../data/subject_ids.npy", allow_pickle=True)

unique_subs = np.unique(subject_ids)
# Not required 
#print("Number of subjects:", len(unique_subs))
#print("Subject IDs:", unique_subs)

In [None]:
train_mask = np.isin(subject_ids, X_raw)
val_mask = np.isin(subject_ids, y_raw)

In [None]:
# convert to torch tensors
X = X[train_mask]
y = y[train_mask]

X_val_np = X[val_mask]
y_val_np = y[val_mask]


X_train_np = torch.tensor(X, dtype=torch.float32)
y_train_np = torch.tensor(y, dtype=torch.long)

X_val_np = torch.tensor(X_val_np, dtype=torch.float32)
y_val_np = torch.tensor(y_val_np, dtype=torch.long)

In [None]:
# lets save the files once 
np.save("../data/X_train.npy", X_train)
np.save("../data/y_train.npy", y_train)
np.save("../data/X_val.npy", X_val)
np.save("../data/y_val.npy", y_val)

In [None]:
X_train = np.load("../data/X_train.npy")
y_train = np.load("../data/X_train.npy")
X_val = np.load("../data/X_train.npy")
y_val = np.load("../data/X_train.npy")

In [23]:
#Creating Dataloaders 

train_dataset = EEGDataset(X_train, y_train)
val_dataset = EEGDataset(X_val, y_val)


train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=32, shuffle=False)

NameError: name 'X_val' is not defined

In [None]:

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader   = DataLoader(val_dataset,   batch_size=32, shuffle=False)

In [None]:
#letting the computer know what piece of hardware to run the training 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
# Initializing the model,loss and optimizer 

model = ResNet1D(n_channels=X_train.shape[1], n_classes=2).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

In [None]:
#Training loop

def train_one_epoch(model, train_loader, criterion, optimizer, device):
    model.train()  # put model in "training mode"
    running_loss = 0.0
    correct = 0
    total = 0

    for X, y in train_loader:
        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        logits = model(X)

        # 2. Compute loss
        loss = criterion(logits, y)

        # 3. Zero out old gradients
        optimizer.zero_grad()

        # 4. Compute gradients
        loss.backward()

        # 5. Update weights
        optimizer.step()

        # Track training accuracy & loss
        running_loss += loss.item() * X.size(0)

        _, predicted = torch.max(logits, dim=1)
        correct += (predicted == y).sum().item()
        total += y.size(0)

    epoch_loss = running_loss / total
    epoch_acc = correct / total

    return epoch_loss, epoch_acc


In [None]:
#Valid loop with no gradient Updates 
# Very similar to the training loop, except this one sets the model to eval and its accompanied by other

def validate(model, val_loader, criterion, device):
    model.eval()  # evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0

    # Do NOT track gradients
    with torch.no_grad():
        for X, y in val_loader:
            X, y = X.to(device), y.to(device)

            logits = model(X)
            loss = criterion(logits, y)

            running_loss += loss.item() * X.size(0)
            _, predicted = torch.max(logits, dim=1)
            correct += (predicted == y).sum().item()
            total += y.size(0)

    val_loss = running_loss / total
    val_acc = correct / total

    return val_loss, val_acc


In [None]:
#epich just refers to a com[lete pass throught of the dataset 

epochs = 20

for epoch in range(epochs):
    train_loss, train_acc = train_one_epoch(
        model, train_loader, criterion, optimizer, device
    )

    val_loss, val_acc = validate(
        model, val_loader, criterion, device
    )

    print(f"Epoch {epoch+1}/{epochs}")
    print(f"  Train Loss: {train_loss:.4f} | Train Acc: {train_acc:.4f}")
    print(f"  Val   Loss: {val_loss:.4f} | Val   Acc: {val_acc:.4f}")


In [7]:
# Saving for Saliency later
torch.save(model.state_dict(), "resnet_eeg.pth")
# Can be loaded with the following code 
# model.load_state_dict(torch.load("resnet_eeg.pth"))

NameError: name 'model' is not defined