In [1]:
import numpy as np
import pandas as pd
import os
from pathlib import Path
import matplotlib.pyplot as plt
import seaborn as sns
from typing import List
from matplotlib import cm
from sklearn.model_selection import train_test_split
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import balanced_accuracy_score

import warnings
warnings.filterwarnings("ignore", category=UserWarning)

# Dataset Function:

### 🎧 AudioFrameDataset Class

This custom PyTorch `Dataset` is designed to load **time-aligned audio features and frame-level multi-label annotations** for training sound event detection models.

#### 🗂 What It Does:
- Loads `.npz` files containing various audio features and concatenates them across time.
- Loads corresponding frame-wise class labels for multiple sound event types.
- Supports **multi-label classification** (more than one event per time step).
- Optionally **binarizes labels** by averaging across multiple annotators.
- Stores the sequence lengths up front to support **bucketing or sorting** based on time length.

#### 🔍 Inputs:
- `feature_dir`: Path to the directory with audio features.
- `label_dir`: Path to the directory with annotation labels.
- `file_list`: List of filenames (without extensions) to be used from the dataset.
- `class_names`: List of all possible class labels.
- `binary_labels`: Whether to convert annotator labels into binary labels (True by default).

#### 📥 Output Per Sample:
- `feature`: Tensor of shape `(time_steps, total_features)` — stacked features across time.
- `multi_label`: Tensor of shape `(time_steps, num_classes)` — frame-wise binary labels for each class.


In [2]:
class AudioFrameDataset(Dataset):
    def __init__(self, feature_dir, label_dir, file_list, class_names, binary_labels=True):
        """
        Args:
            feature_dir (str): Directory containing audio feature .npz files
            label_dir (str): Directory containing label .npz files
            file_list (List[str]): List of base filenames (without extensions)
            class_names (List[str]): List of all possible class names (e.g., ["Alarm", "BirdChirp", "BackgroundNoise"])
            binary_labels (bool): If True, create binary labels for multi-label classification
        """
        self.feature_dir = feature_dir
        self.label_dir = label_dir
        self.file_list = file_list
        self.class_names = class_names  # List of all class names for multi-label
        self.binary_labels = binary_labels
        self.lengths = []
        for filename in self.file_list:
            path = os.path.join(self.feature_dir, f"{filename}.npz")
            features = np.load(path)['embeddings']  # Use any one modality to get time length
            self.lengths.append(features.shape[0])

    
    def __len__(self):
        return len(self.file_list)


    def get_length(self, idx):
        return self.lengths[idx]


    def __getitem__(self, idx):
        filename = self.file_list[idx]

        # Load features
        feat_path = os.path.join(self.feature_dir, f"{filename}.npz")
        features_npz = np.load(feat_path)

        # Concatenate all feature arrays (time-aligned)
        feature = np.concatenate([
            features_npz['embeddings'],
            features_npz['melspectrogram'],
            features_npz['mfcc'],
            features_npz['mfcc_delta'],
            features_npz['mfcc_delta2'],
            features_npz['flatness'],
            features_npz['centroid'],
            features_npz['flux'],
            features_npz['energy'],
            features_npz['power'],
            features_npz['bandwidth'],
            features_npz['contrast'],
            features_npz['zerocrossingrate']
        ], axis=-1)  # shape: (time, total_features)

        # Load labels
        label_path = os.path.join(self.label_dir, f"{filename}_labels.npz")
        label_npz = np.load(label_path)

        # Initialize the multi-label array (binary matrix)
        multi_label = np.zeros((feature.shape[0], len(self.class_names)))  # shape: (time, num_classes)

        # Loop through each class and assign the binary values for each time step
        for i, class_name in enumerate(self.class_names):
            if class_name in label_npz:
                class_labels = label_npz[class_name]  # shape: (time, annotators)
                
                if self.binary_labels:
                    # If binary, average across annotators and threshold
                    multi_label[:, i] = (class_labels.mean(axis=-1) > 0.5).astype(np.float32)
                else:
                    # Keep the full annotator labels (optional)
                    multi_label[:, i] = class_labels.mean(axis=-1).astype(np.float32)

        # Convert to torch tensors
        feature = torch.from_numpy(feature).float()
        multi_label = torch.from_numpy(multi_label).float()  # Shape: (time, num_classes)

        return feature, multi_label
        

In [3]:
filename = os.path.join("labels", "14_labels.npz")
labels = np.load(filename)

class_names = list(labels.keys())

directory_path = "audio_features"

file_list = [os.path.splitext(f)[0] for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f)) and f.endswith('.npz')]

dataset = AudioFrameDataset("audio_features", "labels", file_list, class_names)

loader = DataLoader(dataset, batch_size=1, shuffle=True)


### 🧩 Collate Function for Padding

When working with variable-length sequences, we use a custom **collate function** to pad each batch to the same length. This ensures tensors can be stacked and passed through the model in batches.

#### 🔶 Input (X):
- **Shape:** `(time_steps, features)`
- **Padding:** Extra time steps are added with **all features set to zero**.
- **Interpretation:** In the feature space, these padded regions contain no meaningful data. The model may learn to treat them as *silence* or simply ignore them.

#### 🔷 Labels (Y):
- **Shape:** `(time_steps, num_classes)` — one-hot encoded label vectors
- **Padding:** Extra time steps are added with **all class values set to zero**.
- **Interpretation:** These indicate *unlabeled* time regions and **should be masked** during training to avoid misleading the loss function.


In [4]:
def collate_fn(batch):
    # Sort by the length of the feature (time steps)
    batch.sort(key=lambda x: len(x[0]), reverse=True)  # Sort by the length of the feature tensor
    
    padded_features = []
    padded_labels = []
    
    max_len_feature = max([len(features) for features, _ in batch])  # Maximum sequence length (time steps) for features
    max_len_label = max([len(labels) for _, labels in batch])  # Maximum sequence length (time steps) for labels
    
    for features, labels in batch:
        feature_len = len(features)
        label_len = len(labels)
        
        # Padding features
        if feature_len < max_len_feature:
            pad_size = max_len_feature - feature_len
            features = torch.cat([features, torch.zeros(pad_size, features.size(1))], dim=0)  # Pad features
        
        # Padding labels
        if label_len < max_len_label:
            pad_size = max_len_label - label_len
            labels = torch.cat([labels, torch.zeros(pad_size, labels.size(1))], dim=0)  # Pad labels
        
        padded_features.append(features)
        padded_labels.append(labels)

    # Stack padded features and labels into tensors
    padded_features = torch.stack(padded_features)
    padded_labels = torch.stack(padded_labels)
    
    return padded_features, padded_labels


### 🎯 Notes on Accuracy and Training

- Padding `X` with zeros is **not just "silence"** in the audio sense — it represents **a lack of information** in feature space. The model should learn not to associate this with any real class.

- Padding `Y` with zeros **must be handled with a mask**. This mask is applied during loss and accuracy computation to ensure:
  - Padded labels don't contribute to the loss.
  - The model isn't penalized for predictions on time steps with no true labels.

✅ This setup allows efficient batching without compromising model accuracy or training signal quality.

---

### 🪣 Bucket Sampler Function:

#### ⚙️ How It Works:
- Sorts or groups samples by their sequence length (e.g., number of time steps).
- Forms batches where the sequence lengths within a batch are as similar as possible.
- Optionally shuffles batches while maintaining internal length consistency.

In [5]:
from torch.utils.data import Sampler
import random

class BucketBatchSampler(Sampler):
    def __init__(self, dataset, batch_size, shuffle=True):
        self.dataset = dataset
        self.batch_size = batch_size
        self.shuffle = shuffle

        # Sort indices by length
        self.sorted_indices = sorted(range(len(dataset)), key=lambda i: dataset.get_length(i))

        # Divide into buckets
        self.batches = [
            self.sorted_indices[i:i + batch_size]
            for i in range(0, len(self.sorted_indices), batch_size)
        ]

        if self.shuffle:
            random.shuffle(self.batches)

    def __iter__(self):
        for batch in self.batches:
            yield batch

    def __len__(self):
        return len(self.batches)


# Splitting the Data:

#### 🔀 Function: `create_train_val_test_splits`
This function splits a list of audio feature filenames into:
- **Train set**  
- **Validation set**  
- **Test set**

It ensures reproducibility via a fixed `random_state`, and properly adjusts the sizes of the validation and test splits.

#### 🧪 Dataset Initialization
Using the file lists returned by the split function, we create separate instances of `AudioFrameDataset` for:
- `train_dataset`
- `val_dataset`
- `test_dataset`

Each dataset points to its respective `.npz` files for features and labels.

#### 🚚 Efficient Loading with Bucketing
To improve batching efficiency for variable-length sequences:
- We use a custom `BucketBatchSampler` that groups sequences of similar lengths into the same batch (reduces padding).
- Each DataLoader is created using the corresponding bucketed sampler and a `collate_fn` that handles padding.


In [6]:
# Define a function to split your dataset into train, validation, and test sets
def create_train_val_test_splits(file_list, test_size=0.2, val_size=0.1, random_state=42):
    # Split into train + temp (for validation + test)
    train_files, temp_files = train_test_split(file_list, test_size=test_size + val_size, random_state=random_state)
    
    # Split temp into validation and test
    val_size_adjusted = val_size / (test_size + val_size)  # Adjust for the size of the temp set
    val_files, test_files = train_test_split(temp_files, test_size=val_size_adjusted, random_state=random_state)
    
    return train_files, val_files, test_files

# Example usage
directory_path = "audio_features"
file_list = [os.path.splitext(f)[0] for f in os.listdir(directory_path) if os.path.isfile(os.path.join(directory_path, f)) and f.endswith('.npz')]

# Split the data into train, validation, and test sets
train_files, val_files, test_files = create_train_val_test_splits(file_list)

# Now create instances of the AudioFrameDataset for each split
train_dataset = AudioFrameDataset(
    feature_dir="audio_features",
    label_dir="labels",
    file_list=train_files,
    class_names=class_names,
    binary_labels=True
)

val_dataset = AudioFrameDataset(
    feature_dir="audio_features",
    label_dir="labels",
    file_list=val_files,
    class_names=class_names,
    binary_labels=True
)

test_dataset = AudioFrameDataset(
    feature_dir="audio_features",
    label_dir="labels",
    file_list=test_files,
    class_names=class_names,
    binary_labels=True
)

# Create DataLoaders for each dataset
batch_size = 32
train_sampler = BucketBatchSampler(train_dataset, batch_size)
val_sampler = BucketBatchSampler(val_dataset, batch_size, shuffle=False)
test_sampler = BucketBatchSampler(test_dataset, batch_size, shuffle=False)

train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, collate_fn=collate_fn)
val_loader = DataLoader(val_dataset, batch_sampler=val_sampler, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_sampler=test_sampler, collate_fn=collate_fn)


In [7]:
# Function to print a few batches from a DataLoader
def print_loader_info(loader, loader_name):
    print(f"\n{loader_name} Loader Info:")
    for i, (features, labels) in enumerate(loader):
        print(f"Batch {i+1}:")
        print(f"  Features shape: {features.shape}")  # Should be (batch_size, time_steps, num_features)
        print(f"  Labels shape: {labels.shape}")      # Should be (batch_size, time_steps)
        
        if i == 2:  # Show only the first 3 batches to avoid too much output
            break

# Print info for each loader
print_loader_info(train_loader, "Training")
print_loader_info(val_loader, "Validation")
print_loader_info(test_loader, "Test")



Training Loader Info:
Batch 1:
  Features shape: torch.Size([32, 249, 942])
  Labels shape: torch.Size([32, 249, 58])
Batch 2:
  Features shape: torch.Size([32, 210, 942])
  Labels shape: torch.Size([32, 210, 58])
Batch 3:
  Features shape: torch.Size([32, 179, 942])
  Labels shape: torch.Size([32, 179, 58])

Validation Loader Info:
Batch 1:
  Features shape: torch.Size([32, 127, 942])
  Labels shape: torch.Size([32, 127, 58])
Batch 2:
  Features shape: torch.Size([32, 130, 942])
  Labels shape: torch.Size([32, 130, 58])
Batch 3:
  Features shape: torch.Size([32, 132, 942])
  Labels shape: torch.Size([32, 132, 58])

Test Loader Info:
Batch 1:
  Features shape: torch.Size([32, 129, 942])
  Labels shape: torch.Size([32, 129, 58])
Batch 2:
  Features shape: torch.Size([32, 133, 942])
  Labels shape: torch.Size([32, 133, 58])
Batch 3:
  Features shape: torch.Size([32, 137, 942])
  Labels shape: torch.Size([32, 137, 58])


# Evaluation Metric :

### 📏 Balanced Accuracy (with Masking)

When dealing with padded sequences (e.g., audio or time-series data of variable lengths), it's important to avoid letting the model learn from or be evaluated on artificial padding.

#### 🛑 Why Masking Is Necessary:
- **Padded values are not real data** — they're just placeholders to align sequence lengths within a batch.
- **Including padded time steps in loss or accuracy** calculations would distort training and mislead evaluation.
- **Masking ensures** the model focuses only on valid, meaningful data points.

🧠 We use a **mask** to tell the model:  
> “Ignore these positions — they don't carry useful information.”


In [8]:
def get_masked_loss_and_accuracy(logits, labels, padding_mask, loss_fn):
    # logits: (batch, time, num_classes)
    # labels: (batch, time, num_classes) — one-hot
    # padding_mask: (batch, time) — 1 if valid, 0 if padded

    # Convert one-hot to class indices
    target = labels.argmax(dim=-1)  # (batch, time)
    pred = logits.argmax(dim=-1)    # (batch, time)

    # Flatten everything
    flat_logits = logits.view(-1, logits.shape[-1])         # (batch*time, num_classes)
    flat_target = target.view(-1)                           # (batch*time,)
    flat_mask = padding_mask.view(-1).bool()                # (batch*time,)

    # Compute loss only on valid (unpadded) positions
    loss_per_timestep = loss_fn(flat_logits, flat_target)   # (batch*time,)
    masked_loss = loss_per_timestep[flat_mask].mean()

    # Compute balanced accuracy
    true = flat_target[flat_mask].cpu().numpy()
    pred = pred.view(-1)[flat_mask].cpu().numpy()
    acc = balanced_accuracy_score(true, pred)

    return masked_loss, acc


# Baseline Model:

### 🧠 SimpleTimeStepClassifier

This is a basic fully-connected (feedforward) neural network for classifying each time step in a sequence (e.g., audio frames or time-series data).

#### 🔧 Model Architecture:
- **Input shape:** `(batch_size, time_steps, input_dim)`
- **Layers:**
  - Linear layer mapping `input_dim → hidden_dim`
  - ReLU activation
  - Linear layer mapping `hidden_dim → num_classes`
- The model processes each time step independently (no temporal modeling like RNNs or Transformers).

#### 📤 Output:
- Returns logits of shape `(batch_size, time_steps, num_classes)`, where each time step is classified independently.

This type of model is suitable for tasks like **frame-wise classification** in audio or video, where each time step (or frame) is labeled separately.


In [9]:
class SimpleTimeStepClassifier(nn.Module):
    def __init__(self, input_dim=942, hidden_dim=256, num_classes=58):
        super(SimpleTimeStepClassifier, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, num_classes)
        )
    
    def forward(self, x):
        # x: (batch_size, time_steps, input_dim)
        batch_size, time_steps, _ = x.shape
        x = x.view(-1, x.shape[-1])  # Flatten to (batch_size * time_steps, input_dim)
        out = self.net(x)
        return out.view(batch_size, time_steps, -1)  # Reshape back to (batch, time_steps, num_classes)


In [10]:
def train(model, dataloader, optimizer, loss_fn, device):
    model.train()
    total_loss, total_acc = 0, 0

    for features, labels in dataloader:
        features, labels = features.to(device), labels.to(device)
        padding_mask = (labels.sum(dim=-1) != 0).float()  # (batch, time)

        logits = model(features)

        loss, acc = get_masked_loss_and_accuracy(logits, labels, padding_mask, loss_fn)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
        total_acc += acc

    return total_loss / len(dataloader), total_acc / len(dataloader)


def evaluate(model, dataloader, loss_fn, device):
    model.eval()
    total_loss, total_acc = 0, 0

    with torch.no_grad():
        for features, labels in dataloader:
            features, labels = features.to(device), labels.to(device)
            padding_mask = (labels.sum(dim=-1) != 0).float()

            logits = model(features)
            loss, acc = get_masked_loss_and_accuracy(logits, labels, padding_mask, loss_fn)

            total_loss += loss.item()
            total_acc += acc

    return total_loss / len(dataloader), total_acc / len(dataloader)


In [11]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = SimpleTimeStepClassifier().to(device)
optimizer = optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss(reduction='none')

for epoch in range(10):
    train_loss, train_acc = train(model, train_loader, optimizer, loss_fn, device)
    val_loss, val_acc = evaluate(model, val_loader, loss_fn, device)

    print(f"Epoch {epoch+1}:")
    print(f"  Train Loss: {train_loss:.4f}, Balanced Accuracy: {train_acc:.4f}")
    print(f"  Val   Loss: {val_loss:.4f}, Balanced Accuracy: {val_acc:.4f}")

test_loss, test_acc = evaluate(model, test_loader, loss_fn, device)
print(f"\n  Test  Loss: {test_loss:.4f}, Balanced Accuracy: {test_acc:.4f}")


Epoch 1:
  Train Loss: 8.0869, Balanced Accuracy: 0.1788
  Val   Loss: 3.7401, Balanced Accuracy: 0.2245
Epoch 2:
  Train Loss: 3.1176, Balanced Accuracy: 0.3531
  Val   Loss: 2.7914, Balanced Accuracy: 0.3742
Epoch 3:
  Train Loss: 2.4368, Balanced Accuracy: 0.4119
  Val   Loss: 2.1601, Balanced Accuracy: 0.4333
Epoch 4:
  Train Loss: 2.0169, Balanced Accuracy: 0.4577
  Val   Loss: 1.7911, Balanced Accuracy: 0.4758
Epoch 5:
  Train Loss: 1.8703, Balanced Accuracy: 0.4785
  Val   Loss: 1.7801, Balanced Accuracy: 0.4756
Epoch 6:
  Train Loss: 1.6639, Balanced Accuracy: 0.5029
  Val   Loss: 1.6402, Balanced Accuracy: 0.5036
Epoch 7:
  Train Loss: 1.5915, Balanced Accuracy: 0.5161
  Val   Loss: 1.5460, Balanced Accuracy: 0.5159
Epoch 8:
  Train Loss: 1.4999, Balanced Accuracy: 0.5354
  Val   Loss: 1.4857, Balanced Accuracy: 0.5294
Epoch 9:
  Train Loss: 1.4870, Balanced Accuracy: 0.5361
  Val   Loss: 1.5400, Balanced Accuracy: 0.5267
Epoch 10:
  Train Loss: 1.3843, Balanced Accuracy: 0.55

 #### BACC = 57.88% pretty good? Random guessing would yield 1.7%