In [None]:
import os
import random

import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import Dataset
import torch.nn.functional as F
import torchaudio.transforms as T
from torch.utils.data import DataLoader, random_split
import torch.optim as optim
from torchvision import transforms

from time import time
from tqdm import tqdm

from src.model.model import SpeakerCountCNN
import src.model.model as m

In [2]:
class SpectrogramDataset(Dataset):
    def __init__(self, csv_file, data_dir):

        self.data = pd.read_csv(csv_file)
        self.data_dir = data_dir
        self.labels = self.data['speaker_count'].astype(int).tolist()  # <- add this


    def __len__(self):
    
        return len(self.data)
    

    def __getitem__(self, idx):
    
        row = self.data.iloc[idx]
        tensor_path = os.path.join(self.data_dir, row['spectrogram'])
        spectrogram = torch.load(tensor_path).unsqueeze(0).float();  # shape: [1, H, W]
        label = int(row['speaker_count'])
        return spectrogram, label

### Initial Model
~82% accuracy on val

In [None]:
# # creates torch dataset using spectrogram files as 'x' and csv of labels as 'y'
# dataset = SpectrogramDataset(csv_file=r"data/spectrogram_labels.csv", data_dir=r"data/spectrograms") # grab dataset and convert to tensor



# # Hyperparams
# conv1_out = 16
# conv2_out = 32
# conv3_out = 64
# dropout_prob = 0.3
# fc_hidden = 128
# num_classes = 4
# # fixed image size
# input_height = 96
# input_width = 64



# # init model
# model = SpeakerCountCNN(
#     input_height = input_height,
#     input_width = input_width,
#     conv1_out = conv1_out,
#     conv2_out = conv2_out,
#     conv3_out = conv3_out,
#     fc_hidden = fc_hidden,
#     dropout_prob = dropout_prob
# )



# # manual train/test/split
# train_size = int(0.8 * len(dataset))
# val_size = len(dataset) - train_size
# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])



# # batch loader
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=32)


# # cpu or gpu; whichever is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)


# # basic loss and optimiser
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=1e-3)



# # Model training
# for epoch in range(10):
#     model.train()
#     total_loss = 0
#     start_time = time()

#     # tqdm shows a progress bar
#     for x, y in tqdm(train_loader, desc=f"Epoch {epoch+1}", leave=False):
#         x, y = x.to(device), y.to(device)
#         optimizer.zero_grad()
#         output = model(x)
#         loss = criterion(output, y)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()

#     end_time = time()
#     epoch_duration = end_time - start_time
#     avg_loss = total_loss / len(train_loader)

#     print(f"Epoch {epoch+1} — Loss: {avg_loss:.4f} — Time: {epoch_duration:.2f}s")


# # Validation - this is the important metric
# model.eval()
# correct = 0
# total = 0
# with torch.no_grad():
#     for x, y in val_loader:
#         x, y = x.to(device), y.to(device)
#         output = model(x)
#         preds = output.argmax(dim=1)
#         correct += (preds == y).sum().item()
#         total += y.size(0)

# print(f"Validation Accuracy: {correct / total:.2%}")



# # save
# # torch.save(model.state_dict(), 'SpeakerCountCNN_v0.01.pt')

### Hyperparameter Tuning

Starting with random search to find the 'general area' for the best params. Then a grid-search to really refine.

In [4]:
# # step 1: stratified sampling. Want to use a subset of the data for quick tuning, but can't have class imbalance. Solution is sss
# from torch.utils.data import Subset, DataLoader
# from sklearn.model_selection import train_test_split


# # Get labels
# full_dataset = dataset # pointer fun
# labels = dataset.labels



# # Sample 30% with stratification - a cheeky way to use train/test split
# subset_indices, _ = train_test_split(
#     list(range(len(full_dataset))),
#     train_size=0.3,
#     stratify=labels,
#     random_state=42
# )



# # Create a reduced dataset
# reduced_dataset = Subset(full_dataset, subset_indices)



# # Now split reduced dataset into train/val (e.g. 80/20)
# reduced_labels = [dataset.labels[i] for i in subset_indices]
# train_idx, val_idx = train_test_split(
#     list(range(len(reduced_dataset))),
#     train_size=0.8,
#     stratify=reduced_labels,
#     random_state=42
# )



# train_loader = DataLoader(Subset(reduced_dataset, train_idx), batch_size=32, shuffle=True)
# val_loader = DataLoader(Subset(reduced_dataset, val_idx), batch_size=32)

In [5]:
# # Hyperparam search space for Random Search (RS)
# # RS works by randomly selecting a config, num_trials times. This gives a general idea of a good config without iterating through every possible config
# param_space = {
#     "lr": [1e-4, 1e-3, 1e-2],
#     "dropout": [0.1, 0.3, 0.5],
#     "fc_hidden": [64, 128, 256],
#     "conv1_out": [8, 16, 32],
#     "conv2_out": [16, 32, 64],
#     "conv3_out": [32, 64, 128],
#     "optimizer": ["Adam", "SGD"]
# }

# # RS is very simple to implement, one line of code
# def sample_config():
#     return {k: random.choice(v) for k, v in param_space.items()}

In [6]:
# # 10 configs pulled randomly from param_space, each trained for 3 epochs
# num_trials = 10
# epochs = 3
# results = []

# for trial in range(1, num_trials + 1):
#     config = sample_config()
#     print(f"\n=== Trial {trial}/{num_trials} ===")
#     print(f"Config: {config}")



#     # Setting up the model with the current random config
#     model = SpeakerCountCNN(
#         input_height=input_height,
#         input_width=input_width,
#         conv1_out=config["conv1_out"],
#         conv2_out=config["conv2_out"],
#         conv3_out=config["conv3_out"],
#         fc_hidden=config["fc_hidden"],
#         dropout_prob=config["dropout"]
#     ).to(device)



#     # Two optimisers are tested in the random search
#     if config["optimizer"] == "Adam":
#         optimizer = optim.Adam(model.parameters(), lr=config["lr"])
#     else:
#         optimizer = optim.SGD(model.parameters(), lr=config["lr"])



#     # Cross entropy loss
#     criterion = nn.CrossEntropyLoss()



#     # Training
#     for epoch in range(1, epochs + 1):
#         model.train()
#         total_loss = 0
#         start = time()


#         for xb, yb in tqdm(train_loader, desc=f"Trial {trial} — Epoch {epoch}", leave=False):
#             xb, yb = xb.to(device), yb.to(device)
#             optimizer.zero_grad()
#             out = model(xb)
#             loss = criterion(out, yb)
#             loss.backward()
#             optimizer.step()
#             total_loss += loss.item()


#         # This is just verbose to inspect progress
#         duration = time() - start
#         avg_loss = total_loss / len(train_loader)
#         print(f"Epoch {epoch} — Loss: {avg_loss:.4f} — Time: {duration:.2f}s")



#     # Validation
#     model.eval()
#     correct = 0
#     total = 0

#     with torch.no_grad():
#         for xb, yb in val_loader:
#             xb, yb = xb.to(device), yb.to(device)
#             preds = model(xb).argmax(dim=1)
#             correct += (preds == yb).sum().item()
#             total += yb.size(0)

#     acc = correct / total
#     print(f"Validation Accuracy: {acc:.2%}")
#     results.append((acc, config))



# # Show top results
# results.sort(reverse=True, key=lambda x: x[0])
# print("\n=== Top Results ===")
# for acc, cfg in results[:5]:
#     print(f"Accuracy: {acc:.4f} | Config: {cfg}")

### Random Search complete - now Grid Search

In [7]:
# # Grid search HPs manually interpretted from random search results
# param_grid = {
#     "lr":       [5e-4, 1e-3, 5e-3],
#     "dropout":  [0.25, 0.3, 0.35],
#     "fc_hidden":[64, 128, 256],
#     "conv1_out":[16, 32],
#     "conv2_out":[16, 32, 64],
#     "conv3_out":[64, 128],
#     "optimizer":["Adam"],
# }



# # Utility to expand the grid into a list of configs
# from itertools import product
# def generate_grid_configs(grid):
#     """
#     Yields every combination of hyperparameters in `grid`.
#     """
#     keys = list(grid.keys())
#     for vals in product(*grid.values()):
#         yield dict(zip(keys, vals))



# # Build and inspect
# grid_configs = list(generate_grid_configs(param_grid))
# print(f"Total configurations: {len(grid_configs)}") 



# train_loader, val_loader = get_stratified_loaders(
#     dataset,
#     subset_frac=0.1,   
#     train_frac=0.8,    
#     batch_size=32,
#     random_state=42
# )

In [8]:
# results = []
# epochs = 4


# # For each config
# for idx, config in enumerate(grid_configs, start=1):
#     print(f"\n=== Config {idx}/{len(grid_configs)} ===")
#     print(f"Config: {config}")



#     # Build model
#     model = SpeakerCountCNN(
#         input_height=input_height,
#         input_width=input_width,
#         conv1_out=config["conv1_out"],
#         conv2_out=config["conv2_out"],
#         conv3_out=config["conv3_out"],
#         fc_hidden=config["fc_hidden"],
#         dropout_prob=config["dropout"]
#     ).to(device)



#     # Adam optim
#     optimizer = optim.Adam(model.parameters(), lr=config["lr"])



#     # Cross Entropy loss
#     criterion = nn.CrossEntropyLoss()



#     # Training loop
#     for epoch in range(1, epochs + 1):
#         model.train()
#         total_loss = 0
#         start = time()

#         for xb, yb in tqdm(train_loader, desc=f"Conf {idx} — Ep {epoch}", leave=False):
#             xb, yb = xb.to(device), yb.to(device)
#             optimizer.zero_grad()
#             out = model(xb)
#             loss = criterion(out, yb)
#             loss.backward()
#             optimizer.step()
#             total_loss += loss.item()

#         duration = time() - start
#         avg_loss = total_loss / len(train_loader)
#         print(f"Epoch {epoch} — Loss: {avg_loss:.4f} — Time: {duration:.2f}s")



#     # Validation
#     model.eval()
#     correct, total = 0, 0
#     with torch.no_grad():
#         for xb, yb in val_loader:
#             xb, yb = xb.to(device), yb.to(device)
#             preds = model(xb).argmax(dim=1)
#             correct += (preds == yb).sum().item()
#             total += yb.size(0)

#     acc = correct / total
#     print(f"Validation Accuracy: {acc:.2%}")
#     results.append((acc, config))



# # 4. Summarize top performers
# results.sort(reverse=True, key=lambda x: x[0])
# print("\n=== Top Results ===")
# for acc, cfg in results[:5]:
#     print(f"Accuracy: {acc:.4f} | Config: {cfg}")

In [9]:
# import optuna

# # --- Helper functions ---
# def train_one_epoch(model, loader, optimizer, criterion, device):
#     model.train()
#     total_loss = 0
#     for xb, yb in loader:
#         xb, yb = xb.to(device), yb.to(device)
#         optimizer.zero_grad()
#         out = model(xb)
#         loss = criterion(out, yb)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()
#     return total_loss / len(loader)

# def validate(model, loader, device):
#     model.eval()
#     correct, total = 0, 0
#     with torch.no_grad():
#         for xb, yb in loader:
#             xb, yb = xb.to(device), yb.to(device)
#             preds = model(xb).argmax(dim=1)
#             correct += (preds == yb).sum().item()
#             total += yb.size(0)
#     return correct / total

# # --- Objective for Optuna ---
# def objective(trial):
#     # 1) Sample hyperparameters
#     lr        = trial.suggest_categorical("lr", [5e-4, 1e-3, 5e-3])
#     dropout   = trial.suggest_categorical("dropout", [0.25, 0.3, 0.35])
#     fc_hidden = trial.suggest_categorical("fc_hidden", [64, 128, 256])
#     conv1_out = trial.suggest_categorical("conv1_out", [16, 32])
#     conv2_out = trial.suggest_categorical("conv2_out", [16, 32, 64])
#     conv3_out = trial.suggest_categorical("conv3_out", [64, 128])

#     # 2) Build model
#     model = SpeakerCountCNN(
#         input_height=input_height,
#         input_width=input_width,
#         conv1_out=conv1_out,
#         conv2_out=conv2_out,
#         conv3_out=conv3_out,
#         fc_hidden=fc_hidden,
#         dropout_prob=dropout
#     ).to(device)

#     optimizer = optim.Adam(model.parameters(), lr=lr)
#     criterion = nn.CrossEntropyLoss()

#     # 3) Training + pruning
#     max_epochs = 4
#     for epoch in range(max_epochs):
#         train_loss = train_one_epoch(model, train_loader, optimizer, criterion, device)
#         val_acc    = validate(model, val_loader, device)

#         # report intermediate objective value
#         trial.report(val_acc, epoch)
#         if trial.should_prune():
#             raise optuna.exceptions.TrialPruned()

#     return val_acc  # final validation accuracy

# # --- Run study ---
# if __name__ == "__main__":
#     study = optuna.create_study(
#         direction="maximize",
#         pruner=optuna.pruners.SuccessiveHalvingPruner()
#     )
#     study.optimize(objective, n_trials=100)

#     print("Best trial:")
#     best = study.best_trial
#     print(f"  Value: {best.value:.4f}")
#     print("  Params:")
#     for key, val in best.params.items():
#         print(f"    {key}: {val}")

#     # Optionally, save best hyperparameters to disk
#     import json
#     with open("best_params.json", "w") as f:
#         json.dump(best.params, f, indent=2)


In [10]:
# import math
# import copy

# # Parameters
# max_epochs = 4
# keep_frac  = 0.5  # keep top 50% after each epoch

# # Start with full grid
# current_configs = copy.deepcopy(grid_configs)
# config_states   = {}  # to optionally resume from saved state dicts

# for epoch in range(1, max_epochs + 1):
#     print(f"\n=== Epoch {epoch}/{max_epochs}: {len(current_configs)} configs ===")
#     epoch_results = []

#     for idx, config in enumerate(current_configs, start=1):
#         print(f"  [{idx}/{len(current_configs)}] Config: {config}")

#         # Build model (fresh each epoch or resume)
#         model = SpeakerCountCNN(
#             input_height=input_height,
#             input_width=input_width,
#             conv1_out=config["conv1_out"],
#             conv2_out=config["conv2_out"],
#             conv3_out=config["conv3_out"],
#             fc_hidden=config["fc_hidden"],
#             dropout_prob=config["dropout"]
#         ).to(device)

#         # If you’ve stored a state from the previous epoch, load it
#         key = tuple(sorted(config.items()))
#         if key in config_states:
#             model.load_state_dict(config_states[key])

#         # Optimizer & loss
#         optimizer = torch.optim.Adam(model.parameters(), lr=config["lr"])
#         criterion = nn.CrossEntropyLoss()

#         # Train exactly one epoch
#         train_one_epoch(model, train_loader, optimizer, criterion, device)

#         # Validate
#         acc = validate(model, val_loader, device)
#         print(f"    → Val Acc: {acc:.2%}")

#         # Record and save state for next round
#         epoch_results.append((config, acc, model.state_dict()))

#     # Sort by descending accuracy and keep top fraction
#     epoch_results.sort(key=lambda x: x[1], reverse=True)
#     keep_n = max(1, math.floor(len(epoch_results) * keep_frac))
#     survivors = epoch_results[:keep_n]

#     # Prepare for next epoch
#     current_configs = [cfg for cfg, _, _ in survivors]
#     config_states   = { tuple(sorted(cfg.items())): state
#                         for cfg, _, state in survivors }

# # After all epochs, the top survivor is:
# best_cfg, best_acc, best_state = survivors[0]
# print(f"\n=== Best Config ===\nAccuracy: {best_acc:.4f}\nParams: {best_cfg}")

# # Optionally, save the best model weights:
# best_model = SpeakerCountCNN(
#     input_height=input_height,
#     input_width=input_width,
#     conv1_out=best_cfg["conv1_out"],
#     conv2_out=best_cfg["conv2_out"],
#     conv3_out=best_cfg["conv3_out"],
#     fc_hidden=best_cfg["fc_hidden"],
#     dropout_prob=best_cfg["dropout"]
# ).to(device)
# best_model.load_state_dict(best_state)
# torch.save(best_model.state_dict(), "best_GridModel.pt")

### Final model longer training after grid search

In [11]:
# # creates torch dataset using spectrogram files as 'x' and csv of labels as 'y'
# dataset = SpectrogramDataset(csv_file=r"data/spectrogram_labels.csv", data_dir=r"data/spectrograms") # grab dataset and convert to tensor


# # fixed image size
# input_height = 96
# input_width = 64


# # manual train/test/split
# train_size = int(0.8 * len(dataset))
# val_size = len(dataset) - train_size
# train_dataset, val_dataset = random_split(dataset, [train_size, val_size])



# # batch loader
# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# val_loader = DataLoader(val_dataset, batch_size=32)



# # cpu or gpu; whichever is available
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# model.to(device)



# # basic loss and optimiser
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=1e-3)



# model = best_model



# # Model training
# for epoch in range(10):
#     model.train()
#     total_loss = 0
#     start_time = time()

#     # tqdm shows a progress bar
#     for x, y in tqdm(train_loader, desc=f"Epoch {epoch+1}", leave=False):
#         x, y = x.to(device), y.to(device)
#         optimizer.zero_grad()
#         output = model(x)
#         loss = criterion(output, y)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()

#     end_time = time()
#     epoch_duration = end_time - start_time
#     avg_loss = total_loss / len(train_loader)

#     print(f"Epoch {epoch+1} — Loss: {avg_loss:.4f} — Time: {epoch_duration:.2f}s")



# # Validation - this is the important metric
# model.eval()
# correct = 0
# total = 0
# with torch.no_grad():
#     for x, y in val_loader:
#         x, y = x.to(device), y.to(device)
#         output = model(x)
#         preds = output.argmax(dim=1)
#         correct += (preds == y).sum().item()
#         total += y.size(0)

# print(f"Validation Accuracy: {correct / total:.2%}")

## Vanishing gradient

The setup above caused vanishing gradient. Trying skip connections

In [12]:
# import torch
# import torch.nn as nn
# import torch.nn.functional as F

# # 1) Residual block with skip connection
# class ResidualBlock(nn.Module):
#     def __init__(self, in_ch, out_ch, stride=1):
#         super().__init__()
#         self.conv_block = nn.Sequential(
#             nn.Conv2d(in_ch, out_ch, kernel_size=3, stride=stride, padding=1, bias=False),
#             nn.BatchNorm2d(out_ch),
#             nn.ReLU(inplace=True),
#             nn.Conv2d(out_ch, out_ch, kernel_size=3, padding=1, bias=False),
#             nn.BatchNorm2d(out_ch)
#         )
#         # projection if we change channels or downsample
#         self.proj = nn.Sequential()
#         if stride != 1 or in_ch != out_ch:
#             self.proj = nn.Sequential(
#                 nn.Conv2d(in_ch, out_ch, kernel_size=1, stride=stride, bias=False),
#                 nn.BatchNorm2d(out_ch)
#             )
#         self.relu = nn.ReLU(inplace=True)

#     def forward(self, x):
#         identity = self.proj(x)
#         out = self.conv_block(x)
#         out += identity
#         return self.relu(out)



# # 2) Updated SpeakerCountCNN using ResidualBlock
# class SpeakerCountCNN_skip(nn.Module):
#     def __init__(
#         self,
#         conv1_out,
#         conv2_out,
#         conv3_out,
#         fc_hidden,
#         dropout_prob,
#         input_height=input_height,
#         input_width=input_width
#     ):
#         super().__init__()
#         # replace conv+bn+relu with residual blocks
#         self.layer1 = ResidualBlock(1, conv1_out, stride=1)
#         self.layer2 = ResidualBlock(conv1_out, conv2_out, stride=1)
#         self.layer3 = ResidualBlock(conv2_out, conv3_out, stride=1)

#         self.pool    = nn.MaxPool2d(2, 2)
#         self.dropout = nn.Dropout(dropout_prob)

#         # compute flattened size dynamically
#         with torch.no_grad():
#             dummy = torch.zeros(1, 1, input_height, input_width)
#             x = self.pool(self.layer1(dummy))
#             x = self.pool(self.layer2(x))
#             x = self.pool(self.layer3(x))
#             flattened_size = x.view(1, -1).shape[1]

#         self.fc1 = nn.Linear(flattened_size, fc_hidden)
#         self.fc2 = nn.Linear(fc_hidden, num_classes)

#     def forward(self, x):
#         x = self.pool(self.layer1(x))
#         x = self.pool(self.layer2(x))
#         x = self.pool(self.layer3(x))

#         x = torch.flatten(x, 1)
#         x = self.dropout(x)
#         x = F.relu(self.fc1(x))
#         x = self.fc2(x)
#         return x

In [13]:
# device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# config = {'lr': 0.0005, 'dropout': 0.35, 'fc_hidden': 64, 'conv1_out': 32, 'conv2_out': 16, 'conv3_out': 128, 'optimizer': 'Adam'}

# model = SpeakerCountCNN(
#     input_height = input_height,
#     input_width  = input_width,
#     conv1_out    = config["conv1_out"],
#     conv2_out    = config["conv2_out"],
#     conv3_out    = config["conv3_out"],
#     fc_hidden    = config["fc_hidden"],
#     dropout_prob = config["dropout"]
# ).to(device)



# # Prepare dataset and simple split
# dataset = SpectrogramDataset(
#     csv_file = r"data/spectrogram_labels.csv",
#     data_dir  = r"data/spectrograms"
# )
# train_size = int(0.8 * len(dataset))
# val_size   = len(dataset) - train_size
# train_ds, val_ds = random_split(dataset, [train_size, val_size])

# train_loader = DataLoader(train_ds, batch_size=32, shuffle=True)
# val_loader   = DataLoader(val_ds,   batch_size=32, shuffle=False)



# # Loss, optimizer, scheduler
# criterion = nn.CrossEntropyLoss()
# optimizer = optim.Adam(model.parameters(), lr=config["lr"])
# scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
#     optimizer, mode="min", factor=0.5, patience=2)



# # Training loop with scheduler.step AFTER avg_loss is computed
# for epoch in range(10):
#     model.train()
#     total_loss = 0
#     start = time()

#     for x, y in tqdm(train_loader, desc=f"Epoch {epoch+1}", leave=False):
#         x, y = x.to(device), y.to(device)
#         optimizer.zero_grad()
#         out  = model(x)
#         loss = criterion(out, y)
#         loss.backward()
#         optimizer.step()
#         total_loss += loss.item()

#     avg_loss = total_loss / len(train_loader)
#     scheduler.step(avg_loss)               # ← here, after avg_loss
#     duration = time() - start

#     print(f"Epoch {epoch+1} — Loss: {avg_loss:.4f} — Time: {duration:.1f}s")



# # Validation
# model.eval()
# correct, total = 0, 0
# with torch.no_grad():
#     for x, y in val_loader:
#         x, y    = x.to(device), y.to(device)
#         preds   = model(x).argmax(dim=1)
#         correct += (preds == y).sum().item()
#         total   += y.size(0)

# print(f"Validation Accuracy: {correct/total:.2%}")

In [14]:
# # Confusion Matrix

# from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
# import matplotlib.pyplot as plt


# y_true, y_pred = [], []

# with torch.no_grad():
#     for x, y in val_loader:
#         x = x.to(device)
#         preds = model(x).argmax(dim=1).cpu()
#         y_true.extend(y.numpy())
#         y_pred.extend(preds.numpy())


# cm = confusion_matrix(y_true, y_pred)


# disp = ConfusionMatrixDisplay(confusion_matrix=cm)
# disp.plot(cmap='Blues', values_format='d')
# plt.title("Confusion Matrix — Validation")
# plt.tight_layout()
# plt.show()

### Save when happy

In [15]:
# # save
# torch.save(model.state_dict(), 'SpeakerCountCNN_v0.11.pt')

#### Init & Helpers

In [30]:
# Setup - packages & helpers

import torch, torch.nn as nn, torch.optim as optim
from torch.utils.data import DataLoader, Subset, random_split
from sklearn.model_selection import train_test_split
from time import time
from tqdm import tqdm



# Data loaders for full data set
def get_loaders(dataset, train_frac=0.8, bs=32, seed=42):
    n = len(dataset)
    t = int(train_frac * n)
    train_ds, val_ds = random_split(dataset, [t, n-t], generator=torch.Generator().manual_seed(seed))
    return (DataLoader(train_ds, batch_size=bs, shuffle=True),
            DataLoader(val_ds, batch_size=bs, shuffle=False))



# Data loaders for subset, makes hp tuning quicker (stratified sampling prevents class imbalance)
def get_stratified_loaders(
    dataset,
    subset_frac: float = 0.3,
    train_frac: float = 0.8,
    batch_size: int = 32,
    random_state: int = 42,
    shuffle_train: bool = True
):

    # 1. Stratified sampling of the full dataset
    full_indices = list(range(len(dataset)))
    labels = dataset.labels
    subset_idx, _ = train_test_split(
        full_indices,
        train_size=subset_frac,
        stratify=labels,
        random_state=random_state
    )
    reduced_ds = Subset(dataset, subset_idx)

    # 2. Stratified train/val split of the reduced dataset
    reduced_labels = [labels[i] for i in subset_idx]
    train_idx, val_idx = train_test_split(
        list(range(len(reduced_ds))),
        train_size=train_frac,
        stratify=reduced_labels,
        random_state=random_state
    )

    # 3. Build DataLoaders
    train_loader = DataLoader(
        Subset(reduced_ds, train_idx),
        batch_size=batch_size,
        shuffle=shuffle_train
    )
    val_loader = DataLoader(
        Subset(reduced_ds, val_idx),
        batch_size=batch_size,
        shuffle=False
    )

    return train_loader, val_loader



# Model builder
def build_model(cfg, input_h, input_w):
    return SpeakerCountCNN(
        input_height=input_h, input_width=input_w,
        conv1_out=cfg['conv1_out'], conv2_out=cfg['conv2_out'],
        conv3_out=cfg['conv3_out'], fc_hidden=cfg['fc_hidden'],
        dropout_prob=cfg['dropout']
    )



# One‐epoch training with tqdm
from tqdm import tqdm
from time import time
def train_one_epoch(model, loader, opt, crit, device):
    model.train()
    total = 0
    for x, y in tqdm(loader, desc="Training", unit="batch"):
        x, y = x.to(device), y.to(device)
        opt.zero_grad()
        loss = crit(model(x), y)
        loss.backward()
        opt.step()
        total += loss.item()
    return total / len(loader)



# Evaluate model
def validate(model, loader, device):
    model.eval()
    correct = 0
    with torch.no_grad():
        for x,y in loader:
            x,y = x.to(device), y.to(device)
            preds = model(x).argmax(1)
            correct += (preds==y).sum().item()
    return correct / len(loader.dataset)

#### Define the full dataset

In [18]:
dataset = SpectrogramDataset(csv_file=r"data/spectrogram_labels.csv", data_dir=r"data/spectrograms")

#### Hyperparameter tuning with Successive‐Halving (SH) via Optuna
SH runs all models over one epoch and keeps models that perform above a pruning threshold. This repeats, reducing the model set each epoch, until a winner is clear.

The Optuna package was chosen because it caches model weights each epoch, hugely reducing compute. Additionally it uses continuous, asynchronous pruning rather than fixed-round halving, improving scalability and resource allocation.

In [20]:
import optuna

# SH trial over 4 epochs; after 4 epochs the best model is chosen from remaining models.
def objective(trial):

    cfg = {
      'lr': trial.suggest_categorical('lr', [5e-4,1e-3,5e-3]),
      'dropout': trial.suggest_categorical('dropout', [0.25,0.3,0.35]),
      'fc_hidden': trial.suggest_categorical('fc_hidden',[64,128,256]),
      'conv1_out': trial.suggest_categorical('conv1_out',[16,32]),
      'conv2_out': trial.suggest_categorical('conv2_out',[16,32,64]),
      'conv3_out': trial.suggest_categorical('conv3_out',[64,128]),
    }

    # Helper to build model, see setup
    model = build_model(cfg, input_h=96, input_w=64).to(device)

    # Adam optim
    opt   = optim.Adam(model.parameters(), lr=cfg['lr'])

    # Cross entropy objective
    crit  = nn.CrossEntropyLoss()

    # Training and comparing to existing models
    for epoch in range(5):
        train_one_epoch(model, train_loader, opt, crit, device)
        val_acc = validate(model, val_loader, device)
        trial.report(val_acc, epoch)
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    return val_acc


# Creates a stratified subset, makes hp tuning quicker
train_loader, val_loader = get_stratified_loaders(
    dataset,
    subset_frac=0.3,   
    train_frac=0.8,    
    batch_size=32,
    random_state=42
)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

study  = optuna.create_study(direction='maximize',
            pruner=optuna.pruners.SuccessiveHalvingPruner(), study_name='0-2_Speaker_HPTune')

study.optimize(objective, n_trials=100)

best_cfg = study.best_trial.params
print("Best HPs:", best_cfg)

[I 2025-07-26 15:55:38,101] A new study created in memory with name: 0-2_Speaker_HPTune
[I 2025-07-26 16:01:49,488] Trial 0 finished with value: 0.8347145488029466 and parameters: {'lr': 0.001, 'dropout': 0.25, 'fc_hidden': 128, 'conv1_out': 16, 'conv2_out': 16, 'conv3_out': 64}. Best is trial 0 with value: 0.8347145488029466.
[I 2025-07-26 16:05:11,727] Trial 1 finished with value: 0.8416206261510129 and parameters: {'lr': 0.001, 'dropout': 0.3, 'fc_hidden': 128, 'conv1_out': 32, 'conv2_out': 16, 'conv3_out': 64}. Best is trial 1 with value: 0.8416206261510129.
[I 2025-07-26 16:12:56,032] Trial 2 pruned. 
[I 2025-07-26 16:18:56,443] Trial 3 pruned. 
[I 2025-07-26 16:21:39,525] Trial 4 pruned. 
[I 2025-07-26 16:23:56,875] Trial 5 pruned. 
[I 2025-07-26 16:32:18,383] Trial 6 pruned. 
[I 2025-07-26 16:33:22,392] Trial 7 pruned. 
[I 2025-07-26 16:38:37,751] Trial 8 pruned. 
[I 2025-07-26 16:43:26,702] Trial 9 pruned. 
[I 2025-07-26 16:50:09,848] Trial 10 pruned. 
[I 2025-07-26 16:52:58,99

Best HPs: {'lr': 0.0005, 'dropout': 0.3, 'fc_hidden': 256, 'conv1_out': 16, 'conv2_out': 64, 'conv3_out': 64}


In [33]:
study.best_trial.value

0.8522099447513812

### Final Model training, based on best config found above


In [32]:
# Model with config from Optuna study
final_model = build_model(best_cfg, 96, 64).to(device)



# Adam optimizer
opt = optim.Adam(final_model.parameters(), lr=best_cfg['lr'])



# Cross Entropy loss
crit = nn.CrossEntropyLoss()



# Loaders
train_loader, val_loader = get_loaders(dataset)



# Train
num_epochs = 10
for epoch in range(num_epochs):
    t0 = time()
    loss = train_one_epoch(final_model, train_loader, opt, crit, device)  # tqdm is inside this
    dur = time() - t0
    print(f"Epoch {epoch+1} — Loss {loss:.4f} — {dur:.1f}s")



# Validate
val_acc = validate(final_model, val_loader, device)
print(f"Final val accuracy: {val_acc:.2%}")
torch.save(final_model.state_dict(), r"src/model/best_0-2_3layer.pt")

Training: 100%|██████████| 905/905 [13:47<00:00,  1.09batch/s]


Epoch 1 — Loss 0.5027 — 827.8s


Training: 100%|██████████| 905/905 [16:31<00:00,  1.10s/batch]


Epoch 2 — Loss 0.4432 — 991.5s


Training: 100%|██████████| 905/905 [13:11<00:00,  1.14batch/s]


Epoch 3 — Loss 0.4149 — 791.8s


Training: 100%|██████████| 905/905 [13:15<00:00,  1.14batch/s]


Epoch 4 — Loss 0.3926 — 795.3s


Training: 100%|██████████| 905/905 [11:50<00:00,  1.27batch/s]


Epoch 5 — Loss 0.3790 — 710.6s


Training: 100%|██████████| 905/905 [06:43<00:00,  2.24batch/s]


Epoch 6 — Loss 0.3681 — 403.4s


Training: 100%|██████████| 905/905 [11:45<00:00,  1.28batch/s]


Epoch 7 — Loss 0.3542 — 705.3s


Training: 100%|██████████| 905/905 [09:32<00:00,  1.58batch/s]


Epoch 8 — Loss 0.3434 — 572.3s


Training: 100%|██████████| 905/905 [10:31<00:00,  1.43batch/s]


Epoch 9 — Loss 0.3282 — 631.9s


Training: 100%|██████████| 905/905 [07:19<00:00,  2.06batch/s]


Epoch 10 — Loss 0.3114 — 439.3s
Final val accuracy: 86.93%


### Confusion Matrix (CM)

In [None]:
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import matplotlib.pyplot as plt



y_true, y_pred = [], []



with torch.no_grad():
    for x, y in val_loader:
        x = x.to(device)
        preds = model(x).argmax(dim=1).cpu()
        y_true.extend(y.numpy())
        y_pred.extend(preds.numpy())



cm = confusion_matrix(y_true, y_pred)



disp = ConfusionMatrixDisplay(confusion_matrix=cm)
disp.plot(cmap='Blues', values_format='d')
plt.title("Confusion Matrix — Validation")
plt.tight_layout()
plt.show()