# Task 1 - Data exploration

In [11]:
# Cell 1: Imports
import os
from glob import glob
import torchaudio
import torch
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence


In [12]:
# Cell 2: Dataset + helpers

class AccentAudioDataset(Dataset):
    """
    Lazily loads .wav files and extracts accent & gender from filename.
    Filenames like '3f_utterance123.wav' where:
      - first char = accent [1-5]
      - second char = gender 'm' or 'f'
    """
    def __init__(self, data_dir, transform=None):
        self.filepaths = sorted(glob(os.path.join(data_dir, '*.wav')))
        if not self.filepaths:
            raise RuntimeError(f"[AccentAudioDataset] No .wav files in {data_dir!r}")
        self.transform = transform

    def __len__(self):
        return len(self.filepaths)

    def __getitem__(self, idx):
        path = self.filepaths[idx]
        waveform, sr = torchaudio.load(path)          # [1, L]
        if sr != 16000:
            waveform = torchaudio.functional.resample(waveform, sr, 16000)
        # online standardization
        waveform = (waveform - waveform.mean()) / (waveform.std() + 1e-9)
        fname = os.path.basename(path)
        accent = int(fname[0]) - 1                   # 0–4
        gender = 0 if fname[1] == 'm' else 1
        return waveform.squeeze(0), accent, gender

def collate_fn(batch):
    """ Pad to batch-max length, return lengths + labels """
    waves, accents, genders = zip(*batch)
    lengths = torch.tensor([w.size(0) for w in waves], dtype=torch.long)
    padded  = pad_sequence(waves, batch_first=True)
    return padded, lengths, torch.tensor(accents), torch.tensor(genders)

def compute_dataset_stats(data_dir, sample_rate=16000):
    """ Scan WAVs for duration stats; errors if folder’s empty """
    paths = glob(os.path.join(data_dir, '*.wav'))
    if not paths:
        raise RuntimeError(f"[compute_dataset_stats] No .wav files in {data_dir!r}")
    durations = torch.tensor([torchaudio.info(p).num_frames / sample_rate for p in paths])
    return {
        'count': len(durations),
        'min_s':  float(durations.min()),
        'max_s':  float(durations.max()),
        'mean_s': float(durations.mean()),
        'p90_s':  float(durations.kthvalue(int(0.9*len(durations))).values)
    }


In [14]:
# Cell 3: Point to your folders & verify
# ↳ Copy paths exactly, no trailing spaces ↓
TRAIN_DIR = "/Users/bramdewaal/Desktop/Uni/VSC/Deep Learning/Assignment/Train"
TEST_DIR  = "/Users/bramdewaal/Desktop/Uni/VSC/Deep Learning/Assignment/Test"

# Strip accidental whitespace just in case
TRAIN_DIR = TRAIN_DIR.strip()
TEST_DIR  = TEST_DIR.strip()

print("TRAIN_DIR:", TRAIN_DIR, "→ exists?", os.path.isdir(TRAIN_DIR))
print("  sample files:", os.listdir(TRAIN_DIR)[:5])
print(" TEST_DIR:", TEST_DIR, "→ exists?", os.path.isdir(TEST_DIR))
print("  sample files:", os.listdir(TEST_DIR)[:5])


TRAIN_DIR: /Users/bramdewaal/Desktop/Uni/VSC/Deep Learning/Assignment/Train → exists? True
  sample files: ['2m_9039.wav', '4f_1887.wav', '4f_9571.wav', '1m_3736.wav', '1m_3078.wav']
 TEST_DIR: /Users/bramdewaal/Desktop/Uni/VSC/Deep Learning/Assignment/Test → exists? True
  sample files: ['9430.wav', '4458.wav', '1534.wav', '8510.wav', '7192.wav']


In [16]:
# Cell 4 (fast sanity check): instantiate datasets, build loaders & grab one batch

# 1) Instantiate datasets
train_ds = AccentAudioDataset(TRAIN_DIR)
test_ds  = AccentAudioDataset(TEST_DIR)

# 2) Create DataLoaders with num_workers=0 to avoid spawn overhead in Jupyter
train_loader = DataLoader(
    train_ds,
    batch_size=32,
    shuffle=True,
    num_workers=0,       # no worker processes
    pin_memory=False,    # lower overhead in notebook
    collate_fn=collate_fn
)
test_loader = DataLoader(
    test_ds,
    batch_size=32,
    shuffle=False,
    num_workers=0,
    pin_memory=False,
    collate_fn=collate_fn
)

# 3) Sanity-check one batch from each loader
for name, loader in [("Train", train_loader), ("Test", test_loader)]:
    waves, lengths, accents, genders = next(iter(loader))
    print(f"{name} batch → waves: {waves.shape}, lengths: {lengths.shape}")


Train batch → waves: torch.Size([32, 151552]), lengths: torch.Size([32])
Test batch → waves: torch.Size([32, 150186]), lengths: torch.Size([32])


# Task 2 - Raw signal 1d cnn

In [18]:
# Cell 5: Define Raw-Signal 1D CNN
import torch.nn as nn
import torch.nn.functional as F

class RawCNN(nn.Module):
    """
    1D CNN for raw waveform classification (5 accents).
    """
    def __init__(self, num_classes=5, dropout=0.2):
        super().__init__()
        self.net = nn.Sequential(
            # Conv stage 1: 1→16 channels, downsample by 4
            nn.Conv1d(1, 16, kernel_size=7, stride=2, padding=3),
            nn.BatchNorm1d(16),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(2),  # seq_len / 4 total

            # Conv stage 2: 16→32, downsample by 2
            nn.Conv1d(16, 32, kernel_size=5, stride=2, padding=2),
            nn.BatchNorm1d(32),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(2),  # seq_len / 8 total

            # Conv stage 3: 32→64, downsample by 2
            nn.Conv1d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.BatchNorm1d(64),
            nn.ReLU(inplace=True),
            nn.MaxPool1d(2),  # seq_len / 16 total

            nn.AdaptiveAvgPool1d(1),  # collapse time dim
            nn.Flatten(),             # → [B, 64]
            nn.Dropout(dropout),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        """
        x: FloatTensor [B, T] raw wave
        """
        # add channel dim
        x = x.unsqueeze(1)  # [B,1,T]
        return self.net(x)


In [19]:
# Cell 6: Device selection & forward pass test (using MPS if available)

import torch

# 1) Select device: MPS (Apple GPU) if available, else CPU
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
print("Using device:", device)

# 2) Move model to device
model = RawCNN(num_classes=5, dropout=0.2).to(device)
model.eval()

# 3) Grab one batch, move to device, and test forward pass
with torch.no_grad():
    waves, lengths, accents, genders = next(iter(train_loader))
    waves = waves.to(device)
    logits = model(waves)
    assert logits.shape == (waves.size(0), 5), f"Expected [B,5], got {logits.shape}"
    print(f"✅ RawCNN forward pass on {device} OK: {logits.shape}")


Using device: mps
✅ RawCNN forward pass on mps OK: torch.Size([32, 5])


In [20]:
# Cell 7: Training & Evaluation Functions with MPS Support

import time
import torch
import torch.nn.functional as F

def train_one_epoch(model, loader, optimizer, device):
    model.train()
    total_loss = 0.0
    correct = 0
    total = 0
    start = time.time()
    for waves, lengths, accents, _ in loader:
        waves, accents = waves.to(device), accents.to(device)
        optimizer.zero_grad()
        logits = model(waves)
        loss = F.cross_entropy(logits, accents)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        optimizer.step()

        total_loss += loss.item() * waves.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == accents).sum().item()
        total += waves.size(0)
    elapsed = time.time() - start
    return total_loss / total, correct / total, elapsed

@torch.no_grad()
def evaluate(model, loader, device):
    model.eval()
    total_loss = 0.0
    correct = 0
    total = 0
    for waves, lengths, accents, _ in loader:
        waves, accents = waves.to(device), accents.to(device)
        logits = model(waves)
        loss = F.cross_entropy(logits, accents)

        total_loss += loss.item() * waves.size(0)
        preds = logits.argmax(dim=1)
        correct += (preds == accents).sum().item()
        total += waves.size(0)
    return total_loss / total, correct / total

# Quick function test (one batch)
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
model = RawCNN(num_classes=5, dropout=0.2).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

train_loss, train_acc, train_time = train_one_epoch(model, train_loader, optimizer, device)
val_loss,   val_acc   = evaluate(model, test_loader, device)
print(f"Train   → loss: {train_loss:.4f}, acc: {train_acc:.3f}, time: {train_time:.1f}s")
print(f"Validate→ loss: {val_loss:.4f}, acc: {val_acc:.3f}")


Train   → loss: 1.5560, acc: 0.319, time: 53.7s
Validate→ loss: 0.8815, acc: 0.107


In [21]:
# Cell 8: Full Training Loop with Scheduler & Early Stopping

num_epochs = 5
patience = 2
best_val_loss = float('inf')
epochs_no_improve = 0

# Scheduler: halve LR when val loss plateaus
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer,
                                                       mode='min',
                                                       factor=0.5,
                                                       patience=1)

for epoch in range(1, num_epochs + 1):
    epoch_start = time.time()
    train_loss, train_acc, train_time = train_one_epoch(model, train_loader, optimizer, device)
    val_loss, val_acc = evaluate(model, test_loader, device)
    scheduler.step(val_loss)
    epoch_time = time.time() - epoch_start

    print(f"Epoch {epoch:02d} | "
          f"Train Loss: {train_loss:.4f}, Acc: {train_acc:.3f} | "
          f"Val   Loss: {val_loss:.4f}, Acc: {val_acc:.3f} | "
          f"Time: {epoch_time:.1f}s")

    # Early stopping check
    if val_loss < best_val_loss - 1e-4:
        best_val_loss = val_loss
        epochs_no_improve = 0
    else:
        epochs_no_improve += 1
        if epochs_no_improve > patience:
            print(f"⏹ Early stopping at epoch {epoch}")
            break


Epoch 01 | Train Loss: 1.4941, Acc: 0.384 | Val   Loss: 0.8813, Acc: 0.109 | Time: 21.1s
Epoch 02 | Train Loss: 1.4381, Acc: 0.418 | Val   Loss: 0.8987, Acc: 0.107 | Time: 15.2s
Epoch 03 | Train Loss: 1.3776, Acc: 0.439 | Val   Loss: 0.9240, Acc: 0.096 | Time: 15.5s
Epoch 04 | Train Loss: 1.3358, Acc: 0.459 | Val   Loss: 0.9336, Acc: 0.105 | Time: 12.9s
⏹ Early stopping at epoch 4


In [22]:
# Cell 9: Spectrogram Dataset & DataLoader
import torch
import torch.nn.functional as F
import torchaudio.transforms as T
from torch.utils.data import DataLoader

# Mel-spectrogram transform
mel_transform = T.MelSpectrogram(
    sample_rate=16000, n_fft=512, hop_length=256, n_mels=64
)

class SpectrogramDataset(AccentAudioDataset):
    def __init__(self, data_dir, mel_transform):
        super().__init__(data_dir)
        self.mel_transform = mel_transform

    def __getitem__(self, idx):
        waveform, accent, gender = super().__getitem__(idx)
        spec = self.mel_transform(waveform.unsqueeze(0))  # [1, n_mels, frames]
        spec = torch.log(spec + 1e-9)
        return spec, accent, gender

def spectrogram_collate_fn(batch):
    specs, accents, genders = zip(*batch)
    lengths = torch.tensor([s.size(2) for s in specs], dtype=torch.long)
    T_max = lengths.max().item()
    padded = [F.pad(s, (0, T_max - s.size(2))) for s in specs]
    padded = torch.stack(padded)  # [B,1,n_mels,T_max]
    return padded, lengths, torch.tensor(accents), torch.tensor(genders)

# Instantiate loaders
spec_train_ds = SpectrogramDataset(TRAIN_DIR, mel_transform)
spec_test_ds  = SpectrogramDataset(TEST_DIR,  mel_transform)

spec_train_loader = DataLoader(
    spec_train_ds, batch_size=32, shuffle=True,
    num_workers=0, pin_memory=False,
    collate_fn=spectrogram_collate_fn
)
spec_test_loader = DataLoader(
    spec_test_ds, batch_size=32, shuffle=False,
    num_workers=0, pin_memory=False,
    collate_fn=spectrogram_collate_fn
)

# Sanity check
specs, lengths, accents, genders = next(iter(spec_train_loader))
print("Spectrogram batch →", specs.shape, lengths.shape)


Spectrogram batch → torch.Size([32, 1, 64, 608]) torch.Size([32])


In [23]:
# Cell 10: Spectrogram CNN Definition & Forward Test
import torch.nn as nn

class SpectrogramCNN(nn.Module):
    def __init__(self, num_classes=5, dropout=0.3):
        super().__init__()
        self.net = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(True),
            nn.MaxPool2d(2),

            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(True),
            nn.MaxPool2d(2),

            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(True),
            nn.MaxPool2d(2),

            nn.AdaptiveAvgPool2d((1,1)),
            nn.Flatten(),
            nn.Dropout(dropout),
            nn.Linear(64, num_classes)
        )

    def forward(self, x):
        return self.net(x)

# Forward‐pass test on MPS/CPU
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
model = SpectrogramCNN().to(device)
with torch.no_grad():
    specs = specs.to(device)
    logits = model(specs)
    print("✅ SpectrogramCNN forward:", logits.shape)


✅ SpectrogramCNN forward: torch.Size([32, 5])


In [24]:
# Cell 11: Train & Eval SpectrogramCNN on MPS/CPU (one‐batch smoke test)

import torch.optim as optim

# 1) Device
device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")
print("Using device:", device)

# 2) Instantiate model, optimizer
model_spec = SpectrogramCNN(num_classes=5, dropout=0.3).to(device)
optimizer_spec = optim.Adam(model_spec.parameters(), lr=1e-3)

# 3) Quick one‐batch train & eval
train_loss, train_acc, _ = train_one_epoch(model_spec, spec_train_loader, optimizer_spec, device)
val_loss,   val_acc       = evaluate   (model_spec, spec_test_loader,        device)

print(f"SpectrogramCNN → Train loss: {train_loss:.4f}, acc: {train_acc:.3f}")
print(f"                 Val   loss: {val_loss:.4f}, acc: {val_acc:.3f}")


Using device: mps
SpectrogramCNN → Train loss: 1.5379, acc: 0.316
                 Val   loss: 0.9028, acc: 0.109


In [25]:
# Cell 12: Full Training Loop for SpectrogramCNN with Scheduler & Early Stopping

num_epochs = 5
patience   = 2
best_val   = float('inf')
no_improve = 0

# Scheduler to reduce LR on plateau
scheduler_spec = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer_spec, mode='min', factor=0.5, patience=1
)

for epoch in range(1, num_epochs + 1):
    t0 = time.time()
    tr_loss, tr_acc, tr_time = train_one_epoch(
        model_spec, spec_train_loader, optimizer_spec, device
    )
    vl_loss, vl_acc = evaluate(model_spec, spec_test_loader, device)
    scheduler_spec.step(vl_loss)
    dt = time.time() - t0

    print(f"Epoch {epoch:02d} | "
          f"Train Loss: {tr_loss:.4f}, Acc: {tr_acc:.3f} | "
          f"Val   Loss: {vl_loss:.4f}, Acc: {vl_acc:.3f} | "
          f"Time: {dt:.1f}s")

    # early stopping
    if vl_loss < best_val - 1e-4:
        best_val   = vl_loss
        no_improve = 0
    else:
        no_improve += 1
        if no_improve > patience:
            print(f"⏹ Early stopping at epoch {epoch}")
            break


Epoch 01 | Train Loss: 1.4207, Acc: 0.424 | Val   Loss: 0.9557, Acc: 0.123 | Time: 16.5s
Epoch 02 | Train Loss: 1.3080, Acc: 0.485 | Val   Loss: 1.2023, Acc: 0.102 | Time: 15.4s
Epoch 03 | Train Loss: 1.2394, Acc: 0.516 | Val   Loss: 0.9736, Acc: 0.132 | Time: 13.1s
Epoch 04 | Train Loss: 1.1719, Acc: 0.549 | Val   Loss: 1.0695, Acc: 0.114 | Time: 14.1s
⏹ Early stopping at epoch 4


# Task 3 regularization

In [26]:
# Cell 13: Regularization Experiments on SpectrogramCNN

import itertools
import torch.optim as optim

# Hyperparameter grid
dropouts     = [0.1, 0.3, 0.5]
weight_decays = [0.0, 1e-4, 1e-3]

results = []

device = torch.device("mps") if torch.backends.mps.is_available() else torch.device("cpu")

for do, wd in itertools.product(dropouts, weight_decays):
    # 1) Instantiate model & optimizer with wd
    model = SpectrogramCNN(num_classes=5, dropout=do).to(device)
    optim_spec = optim.Adam(model.parameters(), lr=1e-3, weight_decay=wd)
    scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
        optim_spec, mode='min', factor=0.5, patience=1
    )

    # 2) Train with early stopping for up to 5 epochs
    best_val = float('inf')
    no_improve = 0
    for epoch in range(1, 6):
        tr_loss, tr_acc, _ = train_one_epoch(model, spec_train_loader, optim_spec, device)
        vl_loss, vl_acc    = evaluate(model, spec_test_loader,       device)
        scheduler.step(vl_loss)
        if vl_loss < best_val - 1e-4:
            best_val, no_improve = vl_loss, 0
        else:
            no_improve += 1
        if no_improve > 2:
            break

    # 3) Record final val accuracy & time/epoch ~ from spec loop timings
    results.append({
        'dropout': do,
        'weight_decay': wd,
        'val_acc': vl_acc,
        'train_acc': tr_acc
    })
    print(f"do={do}, wd={wd} → val_acc={vl_acc:.3f}")

# 4) Tabulate results
import pandas as pd
df = pd.DataFrame(results)
print(df)


do=0.1, wd=0.0 → val_acc=0.098
do=0.1, wd=0.0001 → val_acc=0.127
do=0.1, wd=0.001 → val_acc=0.118
do=0.3, wd=0.0 → val_acc=0.123
do=0.3, wd=0.0001 → val_acc=0.123
do=0.3, wd=0.001 → val_acc=0.131
do=0.5, wd=0.0 → val_acc=0.127
do=0.5, wd=0.0001 → val_acc=0.114
do=0.5, wd=0.001 → val_acc=0.103
   dropout  weight_decay   val_acc  train_acc
0      0.1        0.0000  0.098004   0.589387
1      0.1        0.0001  0.127042   0.598863
2      0.1        0.0010  0.117967   0.567593
3      0.3        0.0000  0.123412   0.512634
4      0.3        0.0001  0.123412   0.550221
5      0.3        0.0010  0.130672   0.532217
6      0.5        0.0000  0.127042   0.456412
7      0.5        0.0001  0.114338   0.456728
8      0.5        0.0010  0.103448   0.475995
