In [3]:
import os
import numpy as np
import librosa
import torch
import torchaudio
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Set device (GPU if available)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Configuration and hyperparameters
SR = 16000            # sample rate for audio (convert all audio to 16 kHz)
N_MELS = 128          # number of Mel filterbank channels
TIME_MASK_PARAM = 30  # max width of time mask (in Mel frames) for SpecAugment
FREQ_MASK_PARAM = 8   # max width of frequency mask (in Mel bins) for SpecAugment
BATCH_SIZE = 32
NUM_EPOCHS = 30
LEARNING_RATE = 1e-3
WEIGHT_DECAY = 1e-4
DROPOUT_PROB_CONV = 0.2   # dropout after conv layers
DROPOUT_PROB_FC = 0.5     # dropout before fully connected layer
LSTM_HIDDEN = 128         # hidden size of LSTM
LSTM_LAYERS = 2           # number of LSTM layers
BIDIRECTIONAL = True      # use bi-directional LSTM

# Dataset definition
class AudioDataset(Dataset):
    def __init__(self, data_dir, subset="train"):
        """
        Dataset for audio question detection.
        data_dir: path to dataset (contains 'questions' and 'others' subfolders)
        subset: "train" or "test", used for toggling augmentation.
        """
        self.data_dir = data_dir
        self.subset = subset
        # Map subfolders to labels
        self.label_map = {"others": 0, "questions": 1}
        self.file_list = []  # list of (filepath, label)
        for class_name, label in self.label_map.items():
            class_dir = os.path.join(data_dir, class_name)
            if not os.path.isdir(class_dir):
                continue
            for fname in os.listdir(class_dir):
                if fname.lower().endswith(".wav"):
                    self.file_list.append((os.path.join(class_dir, fname), label))
        # If training set, compute global mean and std for normalization
        if subset == "train":
            self.global_mean, self.global_std = self._compute_global_norm_stats()
        else:
            # For test, we expect to use the same normalization as computed on train
            # So we require train stats to have been computed externally and saved or passed in.
            # For simplicity here, we'll assume train dataset was initialized first and set class variables.
            # (In practice, you might load precomputed mean/std values.)
            if hasattr(AudioDataset, "train_mean") and hasattr(AudioDataset, "train_std"):
                self.global_mean = AudioDataset.train_mean
                self.global_std = AudioDataset.train_std
            else:
                raise RuntimeError("Training statistics not found for normalization.")

    def _compute_global_norm_stats(self):
        """Compute global mean and std over all spectrogram pixels in the training set."""
        sum_val = 0.0
        sum_sq_val = 0.0
        count = 0
        for filepath, _ in self.file_list:
            # Load audio
            y, sr = librosa.load(filepath, sr=SR)
            # Compute Mel spectrogram (power) and convert to dB
            mel_spec = librosa.feature.melspectrogram(
                y=y, sr=SR, n_mels=N_MELS, power=2.0
            )
            mel_db = librosa.power_to_db(mel_spec, top_db=80)
            # Flatten to 1D and accumulate statistics
            mel_db_flat = mel_db.flatten()
            sum_val += mel_db_flat.sum()
            sum_sq_val += np.square(mel_db_flat).sum()
            count += mel_db_flat.size
        global_mean = sum_val / count
        global_var = sum_sq_val / count - (global_mean ** 2)
        global_std = np.sqrt(global_var)
        # Save for reuse in test dataset
        AudioDataset.train_mean = global_mean
        AudioDataset.train_std = global_std
        return global_mean, global_std

    def __len__(self):
        return len(self.file_list)

    def __getitem__(self, idx):
        filepath, label = self.file_list[idx]
        # Load audio file and compute Mel spectrogram
        y, sr = librosa.load(filepath, sr=SR)
        mel_spec = librosa.feature.melspectrogram(y=y, sr=SR, n_mels=N_MELS, power=2.0)
        mel_db = librosa.power_to_db(mel_spec, top_db=80)
        # Normalize spectrogram using global mean and std (computed from train set)
        mel_db_norm = (mel_db - self.global_mean) / (self.global_std + 1e-6)
        # Convert to torch tensor
        mel_tensor = torch.tensor(mel_db_norm, dtype=torch.float32)
        # For consistency, add channel dimension (1 channel for Mel spectrogram)
        mel_tensor = mel_tensor.unsqueeze(0)  # shape: (1, N_MELS, time_frames)
        # Apply SpecAugment (time and frequency masking) only on training data
        if self.subset == "train":
            # Define masking transforms (these use random masks each call)
            time_mask = torchaudio.transforms.TimeMasking(time_mask_param=TIME_MASK_PARAM)
            freq_mask = torchaudio.transforms.FrequencyMasking(freq_mask_param=FREQ_MASK_PARAM)
            mel_tensor = time_mask(mel_tensor)
            mel_tensor = freq_mask(mel_tensor)
        return mel_tensor, label

# Collate function for DataLoader to pad variable-length sequences
def pad_collate(batch):
    """
    Pad spectrograms in the batch to the same time dimension.
    This will pad with zeros (which, after normalization, correspond to silence).
    """
    # Batch is a list of (spec_tensor, label)
    # Find max time length in this batch
    max_frames = max(item[0].shape[-1] for item in batch)
    specs = []
    labels = []
    for spec, label in batch:
        seq_len = spec.shape[-1]
        if seq_len < max_frames:
            # Pad with zeros on the time axis
            pad_width = max_frames - seq_len
            # pad shape: (1, N_MELS, pad_width)
            pad_tensor = torch.zeros((spec.shape[0], spec.shape[1], pad_width), dtype=torch.float32)
            spec_padded = torch.cat([spec, pad_tensor], dim=-1)
        else:
            spec_padded = spec
        specs.append(spec_padded)
        labels.append(label)
    # Stack into batch tensors
    specs = torch.stack(specs)      # shape: (batch, 1, N_MELS, max_frames)
    labels = torch.tensor(labels, dtype=torch.long)
    return specs, labels


class CRNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(1, 32, 3, 1, 1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.2),
            nn.Conv2d(32, 64, 3, 1, 1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2),
            nn.Dropout(0.2),
        )
        # build a dummy to find (channels * freq_out)
        with torch.no_grad():
            dummy = torch.zeros(1, 1, N_MELS, 100)  # 100 time frames is arbitrary
            c_out, f_out = self.conv(dummy).shape[1:3]  # (batch, C, F, T) → C,F
        feat_dim = c_out * f_out
        self.lstm = nn.LSTM(
            feat_dim,
            LSTM_HIDDEN,
            num_layers=LSTM_LAYERS,
            batch_first=True,
            bidirectional=BIDIRECTIONAL,
            dropout=0.3,
        )
        self.dropout_fc = nn.Dropout(DROPOUT_PROB_FC)
        self.fc = nn.Linear(LSTM_HIDDEN * (2 if BIDIRECTIONAL else 1), 2)

    def forward(self, x):
        x = self.conv(x)  # (B,C,F,T)
        B, C, F, T = x.shape
        x = x.permute(0, 3, 1, 2).reshape(B, T, C * F)
        h, _ = self.lstm(x)
        if BIDIRECTIONAL:
            h_last = torch.cat([h[:, -1, :LSTM_HIDDEN], h[:, 0, LSTM_HIDDEN:]], dim=1)
        else:
            h_last = h[:, -1]
        return self.fc(self.dropout_fc(h_last))

# Initialize datasets and data loaders
train_dataset = AudioDataset(data_dir="cleaned_dataset/train", subset="train")
test_dataset  = AudioDataset(data_dir="cleaned_dataset/test",  subset="test")
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=pad_collate)
test_loader  = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=pad_collate)

# Initialize model, loss, optimizer, scheduler
model = CRNNModel().to(device)
criterion = nn.CrossEntropyLoss(label_smoothing=0.1)  # using label smoothing for stability
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)  # decay LR after 10 epochs

# Training loop
for epoch in range(1, NUM_EPOCHS+1):
    model.train()
    running_loss = 0.0
    correct_train = 0
    total_train = 0
    for specs, labels in train_loader:
        specs = specs.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(specs)                # forward pass
        loss = criterion(outputs, labels)     # compute loss (with label smoothing)
        loss.backward()                       # backpropagate
        optimizer.step()
        # accumulate training stats
        running_loss += loss.item() * specs.size(0)
        _, predicted = torch.max(outputs, dim=1)
        correct_train += (predicted == labels).sum().item()
        total_train += labels.size(0)
    train_loss = running_loss / total_train
    train_acc = correct_train / total_train

    # Evaluation on test set
    model.eval()
    test_loss = 0.0
    correct_test = 0
    total_test = 0
    # For F1 score calculation:
    true_positives = true_negatives = false_positives = false_negatives = 0
    with torch.no_grad():
        for specs, labels in test_loader:
            specs = specs.to(device)
            labels = labels.to(device)
            outputs = model(specs)
            loss = criterion(outputs, labels)
            test_loss += loss.item() * specs.size(0)
            # Predicted class
            _, predicted = torch.max(outputs, dim=1)
            correct_test += (predicted == labels).sum().item()
            total_test += labels.size(0)
            # F1 components
            for i in range(labels.size(0)):
                if predicted[i] == 1 and labels[i] == 1:
                    true_positives += 1
                elif predicted[i] == 1 and labels[i] == 0:
                    false_positives += 1
                elif predicted[i] == 0 and labels[i] == 1:
                    false_negatives += 1
                # (predicted==0 and label==0 -> true_negatives, not needed for F1)
    test_loss = test_loss / total_test
    test_acc = correct_test / total_test
    # Compute F1 (binary classification: treat "question" as positive class)
    if true_positives == 0:
        # Avoid division by zero: if no positive predictions or true positives
        precision = 0.0
        recall = 0.0
        f1 = 0.0
    else:
        precision = true_positives / (true_positives + false_positives + 1e-8)
        recall = true_positives / (true_positives + false_negatives + 1e-8)
        if precision + recall == 0:
            f1 = 0.0
        else:
            f1 = 2 * precision * recall / (precision + recall)
    # Step the learning rate scheduler
    scheduler.step()

    # Print epoch summary
    print(f"Epoch {epoch:02d}: "
          f"Train Loss = {train_loss:.4f}, Train Acc = {train_acc*100:.2f}%  |  "
          f"Test Loss = {test_loss:.4f}, Test Acc = {test_acc*100:.2f}%, Test F1 = {f1:.3f}")



Epoch 01: Train Loss = 0.6757, Train Acc = 57.39%  |  Test Loss = 0.6252, Test Acc = 58.81%, Test F1 = 0.348
Epoch 02: Train Loss = 0.6388, Train Acc = 64.24%  |  Test Loss = 0.5743, Test Acc = 72.95%, Test F1 = 0.459
Epoch 03: Train Loss = 0.5822, Train Acc = 72.44%  |  Test Loss = 0.4560, Test Acc = 85.91%, Test F1 = 0.596
Epoch 04: Train Loss = 0.5418, Train Acc = 75.92%  |  Test Loss = 0.4237, Test Acc = 86.33%, Test F1 = 0.598
Epoch 05: Train Loss = 0.5263, Train Acc = 76.86%  |  Test Loss = 0.4062, Test Acc = 87.39%, Test F1 = 0.597
Epoch 06: Train Loss = 0.5099, Train Acc = 78.31%  |  Test Loss = 0.4272, Test Acc = 86.51%, Test F1 = 0.614
Epoch 07: Train Loss = 0.5058, Train Acc = 78.45%  |  Test Loss = 0.4651, Test Acc = 81.64%, Test F1 = 0.563
Epoch 08: Train Loss = 0.4966, Train Acc = 79.48%  |  Test Loss = 0.3982, Test Acc = 89.00%, Test F1 = 0.656
Epoch 09: Train Loss = 0.4983, Train Acc = 78.93%  |  Test Loss = 0.4252, Test Acc = 88.71%, Test F1 = 0.651
Epoch 10: Train Los