In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from copy import deepcopy
import torch.optim as optim
import h5py
import numpy as np
from torch.utils.data import TensorDataset, DataLoader, random_split


## Train, validation and test set creation

In [1]:
# Loading the spectrograms for input
def load_X():
  with h5py.File('X_Slowed_Sped15.mat','r') as f:       # Open the .mat datafile
      cell_ds = f['outputSpectrograms']                 # Select the spectrograms
      refs = cell_ds.flatten[0, :]                      # Remove empty dimension (sometimes [:,0] is needed when dimensions are swapped
      all_specs = np.array([f[r][()] for r in refs])    # Make a numpy array of all spectrograms
      all_specs = all_specs.transpose((0, 3, 2, 1))     # Matlab uses a different order of dimensions, so reshape it so the dimensions equal to the dimensions in matlab
      X = torch.tensor(all_specs, dtype=torch.float32)  # Create a tensor
  return X

# Loading the labels
def load_y():
  activities = []
  with h5py.File('y_Slowed_Sped15.mat', 'r') as f:
      cell_ds = f['fileLabels']
      # Loop over every sample and append the activity to the list
      for i in range(len(cell_ds)):
          struct_ref = cell_ds[i, 0]
          struct_grp = f[struct_ref]
          field_obj = struct_grp['activity']
          activity = field_obj[0, 0]
          activities.append(activity)
  return torch.tensor(activities, dtype=torch.long)

def get_dataloaders():
  X = load_X().unsqueeze(1) # Unsqueeze the second dimension because a CNN expects a [n, 1, H, W] shape, where H and W are the height and width of the spectrogram
  y = load_y()
  return X, y

# Load the pytorch arrays and save them in a numpy array
X, y = get_dataloaders()
np.save('X.npy', X)
np.save('y.npy', y)


Create the train, validation and test set

In [4]:
def make_loaders(batch_size,
                 train_frac=0.7, val_frac=0.15,
                 num_workers=4, seed=1):
    X = np.load('X_Slowed_Sped15.npy')
    y = np.load('y_Slowed_Sped15.npy')-1
    X = torch.from_numpy(X)
    y = torch.from_numpy(y)
    N = X.size(0)

    # Divide samples between the train, test and validation set randomly
    g = torch.Generator().manual_seed(seed)
    perm = torch.randperm(N, generator=g)   # Creating a random permutation of the indices
    n_train = int(train_frac * N)
    n_val   = int(val_frac   * N)
    train_idx = perm[:n_train]              # Selecting indices for the train, val and test set
    val_idx   = perm[n_train:n_train+n_val]
    test_idx  = perm[n_train+n_val:]

    # build train set
    X_tr = X[train_idx]                      # (n_train,1,300,347,3)
    y_tr = y[train_idx]                      # (n_train,)

    # We have for every sample 3 versions, the normal, slowed and sped version. In the train set we use all, in the test and validation set we only use the normal version.

    # bring the aug‐axis next to batch, then flatten
    X_tr = X_tr.permute(0,4,1,2,3)           # (n_train,3,1,300,347)
    Naug, C, H, W = 3, X_tr.size(2), X_tr.size(3), X_tr.size(4)
    X_tr = X_tr.reshape(-1, C, H, W)         # (n_train*3,1,300,347)
    y_tr = y_tr.repeat_interleave(Naug)      # (n_train*3,)

    # Make class weights
    counts = np.bincount(y, minlength=6)  # length=6
    class_wts = torch.tensor([1.0/count for count in counts], dtype=torch.float32).to(device)

    # Create the train loader
    train_ds = TensorDataset(X_tr, y_tr)
    train_loader = DataLoader(
        train_ds, batch_size=batch_size, shuffle=True,
        num_workers=num_workers, pin_memory=True
    )

    # build val set (only first aug: index 0)
    X_val = X[val_idx, :, :, :, 0]           # (n_val,1,300,347)
    y_val = y[val_idx]
    val_ds = TensorDataset(X_val, y_val)
    val_loader = DataLoader(
        val_ds, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True
    )

    # build test set (only first aug)
    X_test = X[test_idx, :, :, :, 0]
    y_test = y[test_idx]
    test_ds = TensorDataset(X_test, y_test)
    test_loader = DataLoader(
        test_ds, batch_size=batch_size, shuffle=False,
        num_workers=num_workers, pin_memory=True
    )

    return train_loader, val_loader, test_loader, class_wts

# --- Creating the Loaders ---
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_loader, val_loader, test_loader, class_wts = make_loaders(batch_size=32, train_frac=0.7, val_frac=0.15)

## Neural Network

In [6]:
class RadarCNN(nn.Module):
    """
    3-conv + 3-pool + 3-dense CNN for 600×693 Doppler spectrograms.
      Layer-1 : Conv2d( 1 → 50 , 2×2) + ReLU
      Layer-2 : MaxPool2d(3)
      Layer-3 : Conv2d(50 → 50 , 2×2) + ReLU
      Layer-4 : MaxPool2d(3)
      Layer-5 : Conv2d(50 → 50 , 2×2) + ReLU
      Layer-6 : MaxPool2d(3)
      Layer-7-8 : two dense layers
      Output  : Soft-max over 6 classes
    Xavier-uniform initialisation is applied to *all* conv and dense layers.
    """

    def __init__(self, conv_channels=50, kernel_size=2, pool_size=3, p_conv=0.0, n_classes: int = 6, img_size=(600, 693)):
        super().__init__()

        # ─── Convolution / Pooling trunk ─────────────────────────────────────────
        self.features = nn.Sequential(
            # Block 1
            nn.Conv2d(1, conv_channels, kernel_size=kernel_size, padding=0),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=p_conv),
            nn.MaxPool2d(kernel_size=pool_size, stride=pool_size),
            # Block 2
            nn.Conv2d(conv_channels, conv_channels, kernel_size=kernel_size, padding=0),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=p_conv),
            nn.MaxPool2d(kernel_size=pool_size, stride=pool_size),
            # Block 3
            nn.Conv2d(conv_channels, conv_channels, kernel_size=kernel_size, padding=0),
            nn.ReLU(inplace=True),
            nn.Dropout2d(p=p_conv),
            nn.MaxPool2d(kernel_size=pool_size, stride=pool_size),
        )


        # ─── Work out the flattened size automatically ──────────────────────────
        with torch.no_grad():
            dummy = torch.zeros(1, 1, *img_size)          # BCHW = (1,1,600,693)
            print(self.features(dummy).shape)
            n_flat = self.features(dummy).numel()

        # ─── Three dense layers + soft-max ──────────────────────────────────────
        self.classifier = nn.Sequential(                  # Layers 7–8
            nn.Linear(n_flat, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),                              # Dropout layer to prevent overfitting

            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(0.5),

            nn.Linear(256, n_classes)
        )

        self._init_weights()                              # Xavier-uniform

    # ────────────────────────────────────────────────────────────────────────────
    def forward(self, x):
        x = self.features(x)          # (B,50,H’,W’)
        x = torch.flatten(x, 1)       # -> (B, n_flat)
        x = self.classifier(x)        # -> (B, n_classes)
        return x                      # (logits; use softmax or CE-loss outside)

    # ────────────────────────────────────────────────────────────────────────────
    def _init_weights(self):
        """Xavier-uniform on every conv and linear weight; zero bias."""
        for m in self.modules():
            if isinstance(m, (nn.Conv2d, nn.Linear)):
                nn.init.xavier_uniform_(m.weight)
                if m.bias is not None:
                    nn.init.zeros_(m.bias)

## Model Training

In [None]:
def train_model(model, train_loader, val_loader, test_loader, criterion, optimizer, device, num_epochs, patience):
    """
    Trains and validates the model, returning the best model (highest validation accuracy).

    Args:
        model       : nn.Module, the neural network to train
        train_loader: DataLoader for training data
        val_loader  : DataLoader for validation data
        criterion   : loss function, e.g., nn.CrossEntropyLoss()
        optimizer   : optimizer, e.g., torch.optim.Adam(model.parameters(), lr=1e-3)
        device      : 'cuda' or 'cpu'
        num_epochs  : number of training epochs

    Returns:
        best_model  : state_dict of the model with the highest validation accuracy
    """
    model.to(device)
    best_model_wts = deepcopy(model.state_dict())
    best_acc = 0.0
    epochs_no_improve= 0

    for epoch in range(num_epochs):
        # Training phase
        model.train()
        running_loss = 0.0
        running_corrects = 0
        running_total = 0

        for inputs, labels in train_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            # Take a gradient descent step
            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            # Values to later calculate Train Loss
            _, preds = torch.max(outputs, 1)
            running_loss += loss.item() * inputs.size(0)
            running_corrects += torch.sum(preds == labels.data)
            running_total += labels.size(0)

        # Calculate train loss
        epoch_loss = running_loss / running_total
        epoch_acc = running_corrects.double() / running_total

        # Validation phase
        model.eval()
        val_loss = 0.0
        val_corrects = 0
        val_total = 0

        # Calculate Validation Loss
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)
                _, preds = torch.max(outputs, 1)
                val_loss += loss.item() * inputs.size(0)
                val_corrects += torch.sum(preds == labels.data)
                val_total += labels.size(0)

        val_epoch_loss = val_loss / val_total
        val_epoch_acc = val_corrects.double() / val_total

        print(f'Epoch {epoch+1}/{num_epochs} | '
              f'Train Loss: {epoch_loss:.4f}, Train Acc: {epoch_acc:.4f} | '
              f'Val Loss: {val_epoch_loss:.4f}, Val Acc: {val_epoch_acc:.4f}')

        # deep copy the best model if validation acc has improved
        if val_epoch_acc > best_acc:
            best_acc = val_epoch_acc
            best_model_wts = deepcopy(model.state_dict())
            epochs_no_improve= 0
        # If 5 epochs do not improve validation acc further, then stop training
        else:
            epochs_no_improve += 1
            if epochs_no_improve >= patience:
                print(f'Early stopping at epoch {epoch+1} '
                      f'after {patience} epochs with no val_loss improvement.')
                break

    # Calculate test loss
    model.eval()
    test_loss = 0.0
    test_corrects = 0
    test_total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            _, preds = torch.max(outputs, 1)
            test_loss += loss.item() * inputs.size(0)
            test_corrects += torch.sum(preds == labels.data)
            test_total += labels.size(0)

    test_epoch_acc = test_corrects.double() / test_total
    print(test_epoch_acc)

    # load best model weights
    model.load_state_dict(best_model_wts)

    # Save the model
    torch.save(model, "full_model.pth")
    return model

# Usage example:
im_size = train_loader.dataset[0][0][0].shape
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")      # Train on GPU
model     = RadarCNN(img_size=(im_size[0], im_size[1])).to(device)
criterion = nn.CrossEntropyLoss(weight=class_wts)                          # Cross Entropy Loss
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)                  # Adam Optimizer
best_model = train_model(model, train_loader, val_loader, test_loader, criterion, optimizer, device, num_epochs=32, patience=5)

## Import the Model and apply on the test set

In [None]:
# Create needed variables
im_size = train_loader.dataset[0][0][0].shape
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")      # Train on GPU
model     = RadarCNN(img_size=(im_size[0], im_size[1])).to(device)
criterion = nn.CrossEntropyLoss(weight=class_wts)                          # Cross Entropy Loss
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)                  # Adam Optimizer

# Calculate test loss
model = torch.load("full_model.pth")
model.eval()
test_loss = 0.0
test_corrects = 0
test_total = 0
with torch.no_grad():
    for inputs, labels in test_loader:
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        _, preds = torch.max(outputs, 1)
        test_loss += loss.item() * inputs.size(0)
        test_corrects += torch.sum(preds == labels.data)
        test_total += labels.size(0)

test_epoch_loss = test_loss / test_total
test_epoch_acc = test_corrects.double() / test_total
print(test_epoch_acc)