In [None]:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
from torch.optim import Optimizer

def add_noise(features, noise_level=0.01):
    return features + noise_level * torch.randn_like(features)

def calculate_multilabel_accuracy(outputs, labels, threshold=0.5):
    predictions = (torch.sigmoid(outputs) > threshold).float()
    correct_predictions = (predictions == labels).float()
    accuracy = correct_predictions.mean().item()
    return accuracy

# Train model (w/ grad)
def train(model: nn.Module, train_dataloader: DataLoader, test_dataloader: DataLoader, optimizer: Optimizer, loss_fn, batch_size, num_epochs=20, print_every=10, patience=5):
    train_losses = []
    test_losses = []

    train_accuracies = []
    test_accuracies = []

    best_val_accuracy = 0
    best_model_state = None
    patience_counter = 0
    
    for ep in range(num_epochs):
        model.train()
        running_loss = 0.0
        running_acc = 0.0
        num_batches = 0

        for features, labels in train_dataloader:
            features = add_noise(features, noise_level=0.01)
            optimizer.zero_grad()
            outputs = model(features)
            loss = loss_fn(outputs, labels.float())
            running_loss += loss.item()

            # Calculate accuracy for multilabel
            running_acc += calculate_multilabel_accuracy(outputs, labels)
            num_batches += 1
            
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
            optimizer.step()

        train_loss = running_loss / num_batches
        train_acc = running_acc / num_batches
        train_losses.append(train_loss)
        train_accuracies.append(train_acc)

        val_loss, val_accuracy = evaluate(model, test_dataloader, loss_fn)
        test_losses.append(val_loss)
        test_accuracies.append(val_accuracy)

        if val_accuracy > best_val_accuracy:
            best_val_accuracy = val_accuracy
            best_model_state = model.state_dict()
            patience_counter = 0
        else:
            patience_counter += 1
            if patience_counter >= patience:
                print(f"Early stopping at epoch {ep}")
                break

        if print_every > 0 and ep % print_every == 0:
            print(f"Epoch: {ep} | Train Loss: {train_loss:.4f} | Train Accuracy: {train_acc:.4f} | "
                  f"Test Loss: {val_loss:.4f} | Test Accuracy: {val_accuracy:.4f}")
            
    if best_model_state is not None:
        model.load_state_dict(best_model_state)

    return train_losses, test_losses, train_accuracies, test_accuracies


# Evaluate model (no grad)
def evaluate(model: nn.Module, dataloader: DataLoader, loss_fn):
    total_loss = 0
    total_accuracy = 0
    num_batches = 0

    model.eval()

    with torch.no_grad():
        for features, labels in dataloader:
            outputs = model(features)
            loss = loss_fn(outputs, labels.float())
            total_loss += loss.item()
            
            # Calculate accuracy for multilabel
            total_accuracy += calculate_multilabel_accuracy(outputs, labels)
            num_batches += 1

    avg_loss = total_loss / num_batches
    avg_accuracy = total_accuracy / num_batches

    return avg_loss, avg_accuracy

In [None]:
# VGGish multilabel classification model - input a VGGish 10x128 feature -> output multiple binary predictions
# Each output represents presence/absence of a particular audio class

class VGGishNet(nn.Module):
    def __init__(self, hidden_dim=64, dropout=0.2, num_classes=8):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(128, hidden_dim),
            nn.BatchNorm1d(hidden_dim),
            nn.LeakyReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, hidden_dim // 2),
            nn.BatchNorm1d(hidden_dim // 2),
            nn.LeakyReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim // 2, num_classes)
        )

    def forward(self, x):
        x = x.mean(dim=1) # Average across time dimension (batch_size, time, features) -> (batch_size, features)
        return self.net(x)
    
# Load the data
train_features = torch.load('../data/train_features.pt')
train_labels = torch.load('../data/train_labels.pt')

test_features = torch.load('../data/test_features.pt')
test_labels = torch.load('../data/test_labels.pt')

custom_features = torch.load('../data/custom_test_features.pt')
custom_labels = torch.load('../data/custom_test_labels.pt')

# Calculate mean and std from training data
train_mean = train_features.mean(dim=0, keepdim=True)
train_std = train_features.std(dim=0, keepdim=True)

# Normalize all datasets using training statistics
train_features = (train_features - train_mean) / (train_std + 1e-7)
test_features = (test_features - train_mean) / (train_std + 1e-7)
custom_features = (custom_features - train_mean) / (train_std + 1e-7)

train_dataset = TensorDataset(train_features, train_labels)
# test_dataset = TensorDataset(torch.vstack([test_features, custom_features]), torch.cat([test_labels, custom_labels]))
test_dataset = TensorDataset(test_features, test_labels)
custom_dataset = TensorDataset(custom_features, custom_labels)

In [None]:
import numpy as np

hidden_sizes = [16, 32, 64, 128]
dropouts = np.linspace(0, 0.5, 6)
weight_decays = np.logspace(-5, 0, 6)
learning_rates = np.logspace(-5, -1, 5)
batch_sizes = [8, 16, 32, 64, 128]

best_accuracy = 0.0
best_model = None
best_params = None
best_train_losses = []
best_test_losses = []
best_train_accuracies = []
best_test_accuracies = []
acc_params = []

loss_fn = nn.BCEWithLogitsLoss()

for bs in batch_sizes:
    train_dataloader = DataLoader(train_dataset, batch_size=int(bs), shuffle=True, drop_last=True)
    test_dataloader = DataLoader(test_dataset, batch_size=int(bs), shuffle=False, drop_last=False)
    
    for i in range(10):
        hs = int(np.random.choice(hidden_sizes))
        do = float(np.random.choice(dropouts))
        wd = float(np.random.choice(weight_decays))
        lr = float(np.random.choice(learning_rates))

        model = VGGishNet(hidden_dim=hs, dropout=do)
        optimizer = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd)
        train_losses, test_losses, train_accuracies, test_accuracies = train(model, train_dataloader, test_dataloader, optimizer, loss_fn, bs, num_epochs=50, print_every=0, patience=20)
        
        final_accuracy = evaluate(model, test_dataloader, loss_fn)[1]
        
        if best_model is None or final_accuracy > best_accuracy:
            best_model = VGGishNet(hidden_dim=hs, dropout=do)
            best_model.load_state_dict(model.state_dict())
            best_accuracy = final_accuracy
            best_params = (hs, do, wd, lr, bs)
            best_train_losses = train_losses
            best_test_losses = test_losses
            best_train_accuracies = train_accuracies
            best_test_accuracies = test_accuracies

        acc_params.append((final_accuracy, hs, do, wd, lr, bs))

        print(f"hs: {hs}, do: {do}, wd: {wd}, lr: {lr}, bs: {bs}, train acc: {np.max(train_accuracies)}, test acc: {final_accuracy}")
          
acc_params.sort(key=lambda x: x[0], reverse=True)
print(acc_params[:5])

print("Best params (hidden size, dropout, weight decay, learning rate, batch size):", best_params, best_accuracy)

In [None]:
import matplotlib.pyplot as plt

plt.subplot(2, 1, 1)
plt.plot(best_train_losses, label='Train Loss')
plt.plot(best_test_losses, label='Test Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.title('Epochs vs. Loss')

plt.subplot(2, 1, 2)
plt.plot(best_train_accuracies, label='Train Accuracy')
plt.plot(best_test_accuracies, label='Test Accuracy')
plt.xlabel('Epochs')
plt.ylabel('Accuracy')
plt.legend()
plt.title('Epochs vs. Accuracy')

plt.tight_layout()
plt.show()

In [None]:
print('train (loss, accuracy)', evaluate(best_model, train_dataloader, nn.CrossEntropyLoss()))
print('test:', evaluate(best_model, DataLoader(TensorDataset(test_features, test_labels)), nn.CrossEntropyLoss()))
print('custom:', evaluate(best_model, DataLoader(TensorDataset(custom_features, custom_labels)), nn.CrossEntropyLoss()))
print('combined (test+custom):', evaluate(best_model, test_dataloader, nn.CrossEntropyLoss()))

In [287]:
model = VGGishNet()
model.load_state_dict(torch.load('audio-classifier-model-weights.pth', map_location='cpu'))

sum(p.numel() for p in model.parameters() if p.requires_grad)

10792