In [78]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from datasets import load_dataset
import torchaudio.transforms as T
import numpy as np
import pandas as pd
from torch.utils.data import TensorDataset, DataLoader, random_split
import os

# Loading Data

In [3]:
# Load the UrbanSound8K dataset
dataset = load_dataset("danavery/urbansound8K")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [4]:
dataset

DatasetDict({
    train: Dataset({
        features: ['audio', 'slice_file_name', 'fsID', 'start', 'end', 'salience', 'fold', 'classID', 'class'],
        num_rows: 8732
    })
})

In [5]:
first_item = dataset['train'][0]
print(first_item['audio'])


{'path': '100032-3-0-0.wav', 'array': array([-0.00454712, -0.00483704, -0.00460815, ..., -0.00065613,
       -0.00048828,  0.        ]), 'sampling_rate': 44100}


# Preprocessing

In [11]:
def preprocess_audio(batch):
    # Parameters for spectrogram
    n_mels = 64
    n_fft = 1024
    win_length = None
    hop_length = 512
    sample_rate = 44100

    # Transformation pipeline for spectrogram
    transformation = T.MelSpectrogram(
        sample_rate=sample_rate,
        n_fft=n_fft,
        win_length=win_length,
        hop_length=hop_length,
        n_mels=n_mels
    )

    spectrograms = []
    labels = []

    for audio_data, label in zip(batch['audio'], batch['classID']):
        # Extract waveform from 'audio' dictionary
        waveform = torch.from_numpy(audio_data['array']).float()

        # Resampling (if needed)
        if audio_data['sampling_rate'] != sample_rate:
            resampler = T.Resample(orig_freq=audio_data['sampling_rate'], new_freq=sample_rate)
            waveform = resampler(waveform)

        # Ensure waveform is 1D
        if waveform.ndim > 1:
            waveform = waveform.mean(dim=0)

        # Apply transformation
        spectrogram = transformation(waveform).squeeze(0)

        # Normalization
        spectrogram = (spectrogram - spectrogram.mean()) / spectrogram.std()

        spectrogram = spectrogram.unsqueeze(0)  # Add channel dimension
        spectrograms.append(spectrogram)
        labels.append(label)

    max_length = max([spec.shape[-1] for spec in spectrograms])
    # Pad each spectrogram to this maximum size and store in a new list
    padded_spectrograms = [torch.nn.functional.pad(spec, (0, max_length - spec.shape[-1])) for spec in spectrograms]

    # Now we can stack the padded spectrograms
    spectrograms_tensor = torch.stack(padded_spectrograms)
    labels_tensor = torch.tensor(labels, dtype=torch.long)

    return {'spectrogram': spectrograms_tensor, 'label': labels_tensor}


In [12]:
#Here we also take global max length over batches
def data_generator(dataset, batch_size=32):
    for i in range(0, len(dataset), batch_size):
        batch = dataset[i:i + batch_size]
        preprocessed_batch = preprocess_audio(batch)
        lengths = [spec.shape[-1] for spec in preprocessed_batch['spectrogram']]
        yield preprocessed_batch, [int(length) for length in lengths]


In [13]:
dataset

DatasetDict({
    train: Dataset({
        features: ['audio', 'slice_file_name', 'fsID', 'start', 'end', 'salience', 'fold', 'classID', 'class'],
        num_rows: 8732
    })
})

In [14]:
max_length = 0
accumulated_data = []
lengths = []

for preprocessed_chunk, chunk_lengths in data_generator(dataset['train']):
    accumulated_data.append(preprocessed_chunk)
    lengths.extend(chunk_lengths)
    max_length = max(max_length, max(chunk_lengths))

In [15]:
max_length

348

In [16]:
len(accumulated_data)

273

In [17]:
32*273

8736

In [18]:
# Function to pad spectrograms
def pad_spectrogram(spec, max_len):
    return torch.nn.functional.pad(spec, (0, max_len - spec.shape[-1]))


Turning 273 batches of our data into Dataset class

In [19]:
# Padding and concatenating
all_spectrograms = torch.cat([pad_spectrogram(data['spectrogram'], max_length) for data in accumulated_data])
all_labels = torch.cat([data['label'] for data in accumulated_data])

In [20]:
final_dataset = {'spectrogram': all_spectrograms, 'label': all_labels}

# Spliting and Dataloader

In [21]:
#  final_dataset to TensorDataset
dataset = TensorDataset(final_dataset['spectrogram'], final_dataset['label'])


train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

In [22]:
# DataLoaders for Train and Test 
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [23]:
dataiter = iter(train_loader)
spectrograms, labels = next(dataiter)
print("Shape of spectrograms:", spectrograms.shape)

Shape of spectrograms: torch.Size([32, 1, 64, 348])


# Model

In [54]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class AudioClassifier(nn.Module):
    def __init__(self, num_classes):
        super(AudioClassifier, self).__init__()

        # configurations
        conv_filters = [32, 64, 128, 256, 512, 512]
        kernel_size = 3
        pool_size = 2
        dropout_rates = [0.25, 0.3, 0.4, 0.5, 0.5, 0.5]
        dense_units = 128
        lstm_units = 128

        # Convolutional layers
        self.conv_layers = nn.ModuleList()
        self.batch_norms = nn.ModuleList()
        self.dropouts = nn.ModuleList()

        in_channels = 1
        for i, out_channels in enumerate(conv_filters):
            self.conv_layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=kernel_size, padding='same'))
            self.batch_norms.append(nn.BatchNorm2d(out_channels))
            self.dropouts.append(nn.Dropout(dropout_rates[i]))
            in_channels = out_channels

        self.pool = nn.MaxPool2d(kernel_size=pool_size)
        self.gmp = nn.AdaptiveMaxPool2d(1)

        # LSTM layers
        self.lstm1 = nn.LSTM(input_size=conv_filters[-1], hidden_size=lstm_units, batch_first=True, bidirectional=True)
        self.lstm2 = nn.LSTM(input_size=lstm_units * 2, hidden_size=lstm_units, batch_first=True, bidirectional=True)

        # Fully connected layers
        self.dense1 = nn.Linear(lstm_units * 2, dense_units)
        self.dense2 = nn.Linear(dense_units, num_classes)

    def forward(self, x):
        # Forward pass 
        #print("Shape before CONV layers:", x.shape)
        for conv, bn, dropout in zip(self.conv_layers, self.batch_norms, self.dropouts):
            x = self.pool(F.relu(bn(conv(x))))
            x = dropout(x)

        #print("Shape before Max Pooling:", x.shape)
        # Global Max Pooling and LSTM 
        x = self.gmp(x).squeeze(-1).squeeze(-1)  # Shape: [batch_size, channels]
        #print("Shape after Max Pooling:", x.shape)
        x = x.unsqueeze(1)  # Shape: [batch_size,  1, channels] for LSTM
        #print("Shape after unsqueezing 2nd dimension Pooling, i. e before lstm:", x.shape)
        x, _ = self.lstm1(x)
        x, _ = self.lstm2(x)

        # Select the last time step's output for classification
        x = x[:, -1, :]

        # Fully connected layers
        x = F.relu(self.dense1(x))
        x = self.dense2(x)

        return x


# Training arguments

In [25]:
num_classes = len(set(final_dataset['label'].numpy()))

In [26]:
num_classes

10

In [55]:
model = AudioClassifier(num_classes)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)


In [38]:
def train_and_save_checkpoint(model, criterion, optimizer, train_loader, test_loader, epochs=10, checkpoint_dir='checkpoints'):
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)


    model.lstm1.flatten_parameters()
    model.lstm2.flatten_parameters()

    if not os.path.exists(checkpoint_dir):
        os.makedirs(checkpoint_dir)

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for i, data in enumerate(train_loader, 0):
            # Move data to the same device as the model
            inputs, labels = data[0].to(device), data[1].to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader)
        train_accuracy = 100 * correct / total
        print(f'Epoch {epoch+1}, Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.2f}%')

        # Evaluate on test 
        model.eval()
        with torch.no_grad():
            correct = 0
            total = 0
            for data in test_loader:
                inputs, labels = data[0].to(device), data[1].to(device)
                outputs = model(inputs)
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            test_accuracy = 100 * correct / total
            print(f'Test Accuracy: {test_accuracy:.2f}%')

        #  checkpoint every 5th epoch
        if (epoch + 1) % 5 == 0:
            checkpoint_path = os.path.join(checkpoint_dir, f'checkpoint_epoch_{epoch+1}.pth')
            torch.save({
                'epoch': epoch + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': train_loss,
            }, checkpoint_path)
            print(f'Checkpoint saved: {checkpoint_path}')


In [30]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
train_and_save_checkpoint(model, criterion, optimizer, train_loader, test_loader, epochs=65)

Epoch 1, Loss: 0.6239, Accuracy: 79.89%
Test Accuracy: 81.91%
Epoch 2, Loss: 0.6088, Accuracy: 79.93%
Test Accuracy: 77.73%
Epoch 3, Loss: 0.5499, Accuracy: 82.16%
Test Accuracy: 81.97%
Epoch 4, Loss: 0.5499, Accuracy: 82.39%
Test Accuracy: 80.42%
Epoch 5, Loss: 0.5204, Accuracy: 83.32%
Test Accuracy: 80.37%
Checkpoint saved: checkpoints/checkpoint_epoch_5.pth
Epoch 6, Loss: 0.5007, Accuracy: 83.87%
Test Accuracy: 83.74%
Epoch 7, Loss: 0.4851, Accuracy: 84.12%
Test Accuracy: 84.66%
Epoch 8, Loss: 0.4538, Accuracy: 85.08%
Test Accuracy: 84.89%
Epoch 9, Loss: 0.4435, Accuracy: 86.18%
Test Accuracy: 84.37%
Epoch 10, Loss: 0.4134, Accuracy: 86.74%
Test Accuracy: 83.97%
Checkpoint saved: checkpoints/checkpoint_epoch_10.pth
Epoch 11, Loss: 0.4162, Accuracy: 86.44%
Test Accuracy: 85.86%
Epoch 12, Loss: 0.4007, Accuracy: 86.70%
Test Accuracy: 85.46%
Epoch 13, Loss: 0.3878, Accuracy: 87.32%
Test Accuracy: 87.41%
Epoch 14, Loss: 0.3805, Accuracy: 87.69%
Test Accuracy: 82.20%
Epoch 15, Loss: 0.35

In [58]:
model

AudioClassifier(
  (conv_layers): ModuleList(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (1): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (2): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (3): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (4): Conv2d(256, 512, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (5): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=same)
  )
  (batch_norms): ModuleList(
    (0): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (4-5): 2 x BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  )
  (dropouts): Modul

In [71]:
def evaluate(model, data_loader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for data in data_loader:
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

In [72]:
train_accuracy = evaluate(model, train_loader, device)
print(f'Train Accuracy: {train_accuracy:.2f}%')

Train Accuracy: 98.88%


In [70]:
input_tensors[0].shape

torch.Size([32, 1, 64, 348])

# Mapping back

In [75]:
initial_dataset =  load_dataset("danavery/urbansound8K")

In [74]:
from collections import defaultdict

def extract_classes(initial_dataset):
  #  initial_dataest is your DatasetDict object
  data = initial_dataset['train']
  # Creating a dictionary to store unique classID and class pairs
  class_dict = defaultdict(set)

  # Iterate through the dataset
  for example in data:
      class_id = example['classID']
      class_name = example['class']
      class_dict[class_id].add(class_name)

  # If you are sure each classID corresponds to only one class,
  # you can convert the sets to single values
  class_dict = {class_id: next(iter(names)) for class_id, names in class_dict.items()}
  return class_dict

In [76]:
class_dict = extract_classes(initial_dataset)

In [77]:
class_dict

{3: 'dog_bark',
 2: 'children_playing',
 1: 'car_horn',
 0: 'air_conditioner',
 9: 'street_music',
 6: 'gun_shot',
 8: 'siren',
 5: 'engine_idling',
 7: 'jackhammer',
 4: 'drilling'}

# We will create a function to collect 10th batch prediction class names and input tensors

In [98]:
def map_predictions_to_class(model, test_loader, device, class_name_mapping):
    model.eval()
    input_tensors = []
    actual_labels_list = []
    output_labels = []
    output_class_names = []
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            inputs, labels = data[0].to(device), data[1].to(device)
            outputs = model(inputs)
            _, predicted = torch.max(outputs, 1)

            if i == 10:  # Collect data only for the 10th batch
                input_tensors.append(inputs.cpu())
                actual_labels_list.extend(labels.cpu().tolist())
                output_labels.extend(predicted.cpu().tolist())
                output_class_names.extend([class_name_mapping[label.item()] for label in predicted])
                break

    input_tensors = torch.cat(input_tensors)
    output_labels_tensor = torch.tensor(output_labels)


    df = pd.DataFrame({
        'Input Tensors': [tensor.numpy() for tensor in input_tensors],  # Convert tensors to numpy arrays for DataFrame
        'Actual Label': actual_labels_list,
        'Predicted Label': output_labels,
        'Class Name': output_class_names
    })

    return input_tensors, df

In [99]:
input_tensors, predictions_df = map_predictions_to_class(model, test_loader, device, class_dict)

In [100]:
predictions_df.head()

Unnamed: 0,Input Tensors,Actual Label,Predicted Label,Class Name
0,"[[[2.2761073, -0.24937595, 0.17752314, 4.68741...",9,9,street_music
1,"[[[0.26266438, -0.18040122, -0.17845507, -0.18...",9,9,street_music
2,"[[[-0.3720862, 1.5921863, 0.7920696, 0.3264608...",6,6,gun_shot
3,"[[[7.088621, 1.2899628, 1.3319199, 6.9923673, ...",0,0,air_conditioner
4,"[[[-0.14231737, 1.7023071, 0.86349, 1.2145898,...",7,7,jackhammer


# Using Pre trained network for this task