In [1]:
import os
import pandas as pd
import torch
import torchaudio
import soundfile as sf
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
from torch.utils.data import DataLoader, Dataset, random_split
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB, FrequencyMasking, TimeMasking
from torch.optim import AdamW
from collections import Counter
from sklearn.utils.class_weight import compute_class_weight

In [2]:
# Dataset class
class AudioDataset(torch.utils.data.Dataset):
    def __init__(self, csv_file, audio_folder, transforms=None, target_length=16000):
        self.csv_file = csv_file
        self.audio_folder = audio_folder
        self.target_length = target_length
        self.transforms = transforms
        self.data = pd.read_csv(csv_file)
        self.LABEL_MAPPING = {label: idx for idx, label in enumerate(sorted(self.data['label'].unique()))}
#        print(self.data['label'].unique())
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        file_path = os.path.join(self.audio_folder, self.data.iloc[idx, 0])
        label = self.data.iloc[idx, 1]
        label = self.LABEL_MAPPING[label]
        waveform, sample_rate = torchaudio.load(file_path)

        if self.transforms:
            waveform = self.transforms(waveform)
            
        return waveform, label

    def get_class_weights(self, subset_indices):
        """
        Calculate class weights based on the subset indices (e.g., training set).
        """
        labels = self.data.iloc[subset_indices]['label']
        # Calculate class counts for the subset (train set)
        class_counts = Counter(labels)
        total_samples = len(labels)
        num_classes = len(self.LABEL_MAPPING)
        
        # Calculate inverse frequency (class weights)
        class_weights = {label: total_samples / (num_classes * count) for label, count in class_counts.items()}
        
        # Sort class weights by label index to match the LABEL_MAPPING order
        class_weights_tensor = torch.tensor(
            [class_weights.get(label, 1.0) for label in sorted(self.LABEL_MAPPING.keys())],
            dtype=torch.float32
        )
        
        return class_weights_tensor

In [2]:
# Speech Command CNN Architecture
class SpeechCommandCNN(nn.Module):
    def __init__(self, n_channels=1, n_classes=10, n_filters=32):
        super().__init__()

        self.conv1 = nn.Conv2d(n_channels, n_filters, kernel_size=(3, 3), padding=(1, 1), bias=False)
        self.bn1 = nn.BatchNorm2d(n_filters)
        
        self.conv2 = nn.Conv2d(n_filters, n_filters, kernel_size=(3, 3), padding=(1, 1), bias=False)
        self.bn2 = nn.BatchNorm2d(n_filters)
        self.pool1 = nn.MaxPool2d((2, 2))

        self.conv3 = nn.Conv2d(n_filters, 2 * n_filters, kernel_size=(3, 3), padding=(1, 1), bias=False)
        self.bn3 = nn.BatchNorm2d(2 * n_filters)
        
        self.conv4 = nn.Conv2d(2 * n_filters, 2 * n_filters, kernel_size=(3, 3), padding=(1, 1), bias=False)
        self.bn4 = nn.BatchNorm2d(2 * n_filters)
        self.pool2 = nn.MaxPool2d((2, 2))

        self.fc1 = nn.Linear(2 * n_filters, 128)
        self.dropout = nn.Dropout(p=0.3)
        self.fc2 = nn.Linear(128, n_classes)

    def forward(self, x):
       # print(f"Input shape: {x.shape}")
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
       # print(f"After conv1: {x.shape}")
        
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool1(x)
       # print(f"After conv2: {x.shape}")

        x = self.conv3(x)
        x = F.relu(self.bn3(x))
       # print(f"After conv3: {x.shape}")

        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool2(x)
       # print(f"After conv4: {x.shape}")

        # Global average pooling to reduce Height and Width to 1X1
        x = F.avg_pool2d(x, (x.shape[-2], x.shape[-1]))  # Global average pooling
       # print(f"After global avg pool: {x.shape}")

        # Flatten the tensor to pass to the FC layers
        x = x.view(x.size(0), -1)  # Flatten to [batch_size, feature maps]

        # Pass through fully connected layers
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)  # Final FC layer (no activation here, as this outputs logits)

        return x

In [4]:
# Training Function
def train(model, dataloader, criterion, optimizer, device):
    model.train()
    train_loss = 0.0
    correct = 0
    total = 0
    for waveforms, labels in dataloader:
        waveforms, labels = waveforms.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(waveforms)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        # Accumulate loss and compute accuracy
        train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    # Calculate average training loss and accuracy
    avg_train_loss = train_loss / len(dataloader)
    train_accuracy = 100 * correct / total
    return avg_train_loss, train_accuracy


# Validation Function
def validate(model, dataloader, criterion, device):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0
    with torch.no_grad():
        for waveforms, labels in dataloader:
            waveforms, labels = waveforms.to(device), labels.to(device)
            outputs = model(waveforms)         
            loss = criterion(outputs, labels)
            val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    # Calculate average validation loss and accuracy
    avg_val_loss = val_loss / len(dataloader)
    val_accuracy = 100 * correct / total
    return avg_val_loss, val_accuracy

# Early Stopping
class EarlyStopping:
    def __init__(self, patience=40, min_delta=0.001, save_path="best_model.pth"):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_loss = float("inf")  # Initialize with infinity
        self.early_stop = False
        self.save_path = save_path

    def __call__(self, val_loss, model):
        if val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            torch.save(model.state_dict(), self.save_path)  # Save best model weights
            print(f"New best model saved with loss {val_loss:.4f}")
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True

You can train the model using either regular Cross-Entropy Loss or Weighted Cross-Entropy Loss (faster convergence, higher val_accuracy)

In [None]:
#Simple Cross-Entropy Training
if __name__ == "__main__":
    csv_file = "Path_to_your\\labels.csv"
    audio_folder = "Path_to_your\\dataset"

# Training transforms with augmentation
    train_transforms = torch.nn.Sequential(
        MelSpectrogram(sample_rate=16000, n_mels=64, n_fft=2048, hop_length=400),
        AmplitudeToDB(),
        FrequencyMasking(freq_mask_param=30),
        TimeMasking(time_mask_param=30)
    )

# Validation transforms without augmentation
    val_transforms = torch.nn.Sequential(
        MelSpectrogram(sample_rate=16000, n_mels=64, n_fft=2048, hop_length=400),
        AmplitudeToDB()
    )

# Create training and validation datasets with their respective transforms
    train_dataset = AudioDataset(csv_file=csv_file, audio_folder=audio_folder, transforms=train_transforms)
    val_dataset = AudioDataset(csv_file=csv_file, audio_folder=audio_folder, transforms=val_transforms)

    train_size = int(0.8 * len(train_dataset))
    val_size = len(train_dataset) - train_size
    train_dataset, val_dataset = random_split(train_dataset, [train_size, val_size])

    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    num_classes = len(pd.unique(pd.read_csv(csv_file)['label']))
    model = SpeechCommandCNN(n_classes=num_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = AdamW(model.parameters(), lr=0.001, weight_decay=1e-3)
    
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.8)
    
    early_stopping = EarlyStopping(patience=30, min_delta=0.001)

    for epoch in range(200):
        # Train and get training loss and accuracy
        train_loss, train_accuracy = train(model, train_dataloader, criterion, optimizer, device)

        # Validate and get validation loss and accuracy
        val_loss, val_accuracy = validate(model, val_dataloader, criterion, device)

        # Print both losses and accuracies
        print(f"Epoch {epoch + 1} - Training Loss: {train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}%")
        print(f"Epoch {epoch + 1} - Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}%")

        # Check early stopping
        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered. Stopping training.")
            break
        scheduler.step()


In [None]:
# Weighted Cross-Entropy Training 
if __name__ == "__main__":
    csv_file = "labels.csv"
    audio_folder = "dataset"

    # Training transforms with augmentation
    train_transforms = torch.nn.Sequential(
        MelSpectrogram(sample_rate=16000, n_mels=64, n_fft=2048, hop_length=400),
        AmplitudeToDB(),
        FrequencyMasking(freq_mask_param=30),
        TimeMasking(time_mask_param=30)
    )

    # Validation transforms without augmentation
    val_transforms = torch.nn.Sequential(
        MelSpectrogram(sample_rate=16000, n_mels=64, n_fft=2048, hop_length=400),
        AmplitudeToDB()
    )

    # Create original dataset (without transforms for class weight calculation)
    full_dataset = AudioDataset(csv_file=csv_file, audio_folder=audio_folder, transforms=None)

    # Split dataset into training (80%) and validation (20%)
    train_size = int(0.8 * len(full_dataset))
    val_size = len(full_dataset) - train_size
    train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size])

    # Get the indices for the training set
    train_indices = train_dataset.indices

    # Get class weights based on the training subset
    class_weights = full_dataset.get_class_weights(subset_indices=train_indices)

    # Apply transformations for training and validation datasets
    train_dataset = AudioDataset(csv_file=csv_file, audio_folder=audio_folder, transforms=train_transforms)
    val_dataset = AudioDataset(csv_file=csv_file, audio_folder=audio_folder, transforms=val_transforms)

    # Create DataLoaders
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    # Initialize the model
    num_classes = len(pd.unique(pd.read_csv(csv_file)['label']))
    model = SpeechCommandCNN(n_classes=num_classes)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    # CrossEntropyLoss with class weights
    criterion = nn.CrossEntropyLoss(weight=class_weights.to(device))
    val_criterion = nn.CrossEntropyLoss() # For validation use normal crossentropy to avoid biasing the inference results 
    optimizer = AdamW(model.parameters(), lr=0.001, weight_decay=1e-2)
    
    scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.8)
    
    early_stopping = EarlyStopping(patience=30, min_delta=0.001)

    # Training loop
    for epoch in range(100):
        # Train and get training loss and accuracy
        train_loss, train_accuracy = train(model, train_dataloader, criterion, optimizer, device)

        # Validate and get validation loss and accuracy
        val_loss, val_accuracy = validate(model, val_dataloader, val_criterion, device)

        # Print both losses and accuracies
        print(f"Epoch {epoch + 1} - Training Loss: {train_loss:.4f}, Training Accuracy: {train_accuracy:.4f}%")
        print(f"Epoch {epoch + 1} - Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}%")

        # Check early stopping
        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early stopping triggered. Stopping training.")
            break
        scheduler.step()

The cell below allows you to convert a pytorch model into an onnx model

In [None]:
device = torch.device("cpu")
best_model = SpeechCommandCNN(n_classes=8)
best_model.load_state_dict(torch.load("best_model.pth", map_location=device))
best_model.to(device)
best_model.eval()
example_inputs = (torch.randn(1, 1, 64, 41).to(device),) # it has to be representative of your input tensor
# Export to ONNX
onnx_filename = "model.onnx"
torch.onnx.export(best_model, example_inputs, onnx_filename, dynamo=True)