In [1]:
import numpy as np
import librosa
import os
import time
import h5py

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import matplotlib.pyplot as plt
from IPython.display import Audio

In [2]:
dir_path = 'all_data/AED_data'  # directory path

# load all event types
all_event = np.load(dir_path+'/event_types.npy')
print(all_event)

['beach' 'bus' 'cafe/restaurant' 'car' 'city_center' 'forest_path'
 'grocery_store' 'home' 'library' 'metro_station' 'office' 'park'
 'residential_area' 'train' 'tram']


In [3]:
train_label = np.load(dir_path+'/train_labels.npy')
print(train_label[0])  # (label, audio_file) pair

['beach' 'b019_170_180.wav']


In [4]:
# walk through the directory, find the files with .wav extension
# Initialize an empty list to store the filenames of .wav files found in the directory
wav_files = []

# Walk through the directory and collect filenames of files with .wav extension
for (dirpath, dirnames, filenames) in os.walk(dir_path):
    for file in filenames:
        if '.wav' in file:  # Check if the file has a .wav extension
            wav_files.append(file)  # Append the filename to the list

# Calculate the total number of .wav files found
num_data = len(wav_files)

# Load validation and test labels from saved numpy files
val_label = np.load(dir_path+'/validation_labels.npy')
test_label = np.load(dir_path+'/test_labels.npy')

# Initialize empty lists to store audio data and target labels for training, validation, and testing
train_audio = []
val_audio = []
test_audio = []
train_target = []
val_target = []
test_target = []

# Load and preprocess audio data and target labels for training
for i in range(len(train_label)):
    # Load audio file and convert it to 10 seconds duration (16000 samples per second)
    y, _ = librosa.load(dir_path+'/audio/'+train_label[i][1], sr=16000)
    train_audio.append(y[:16000*10])
    # Map the target label to its corresponding index in the list of all events
    train_target.append(np.argmax((train_label[i][0] == np.array(all_event)).astype(np.float32)))

# Load and preprocess audio data and target labels for validation
for i in range(len(val_label)):
    y, _ = librosa.load(dir_path+'/audio/'+val_label[i][1], sr=16000)
    val_audio.append(y[:16000*10]) 
    val_target.append(np.argmax((val_label[i][0] == np.array(all_event)).astype(np.float32)))

# Load and preprocess audio data and target labels for testing
for i in range(len(test_label)):
    y, _ = librosa.load(dir_path+'/audio/'+test_label[i][1], sr=16000)
    test_audio.append(y[:16000*10])
    test_target.append(np.argmax((test_label[i][0] == np.array(all_event)).astype(np.float32)))


In [5]:
class AlexNet(nn.Module):
    def __init__(self, num_classes=15):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 16, kernel_size=11, stride=4, padding=2),  # number of input channel is 1
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(16, 32, kernel_size=5, padding=2), 
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((3, 3))  # perform adaptive mean pooling on any size of the input to match the provided size
        self.classifier = nn.Sequential(
            nn.Dropout(),
            nn.Linear(64 * 3 * 3, 256),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(256, 256),
            nn.ReLU(inplace=True),
            nn.Linear(256, num_classes),
        )

    def forward(self, x):
        # Forward pass through the network
        x = self.features(x)
        x = self.avgpool(x)  # the dimension after adaptive average pooling is (batch, 64, 3, 3)
        x = torch.flatten(x, 1)  # average
        x = self.classifier(x)
        return x
    
# test it with a sample input
model = AlexNet()
sample_input = torch.randn(2, 1, 257, 626)  # (batch_size, num_channel, freq_dim, time_step)
# Forward pass through the model
sample_output = model(sample_input)
print(sample_output.shape)  # (batch_size, num_classes)

torch.Size([2, 15])


In [6]:
from torch.utils.data import Dataset, DataLoader

batch_size = 8

# Custom dataset class for data processing
class dataset_pipeline(Dataset):
    def __init__(self, data, label):
        super(dataset_pipeline, self).__init__()
        self.data = data
        self.label = label
        self._len = len(self.data)  # Number of utterances

    def __getitem__(self, index):
        # Calculate STFT here
        spec = librosa.stft(self.data[index].astype(np.float32), n_fft=512, hop_length=256)
        label = self.label[index]
        spec = torch.from_numpy(np.abs(spec))  # Use only the magnitude spectrogram
        label = torch.from_numpy(np.array(label)).long()
        return spec, label

    def __len__(self):
        return self._len

# Define data loaders
train_loader = DataLoader(dataset_pipeline(train_audio, train_target), 
                          batch_size=batch_size, 
                          shuffle=True,
                         )

validation_loader = DataLoader(dataset_pipeline(val_audio, val_target), 
                               batch_size=batch_size, 
                               shuffle=False,
                              )

# Define cross-entropy loss function
def CE(output, target):
    loss = nn.CrossEntropyLoss()
    return loss(output, target)

# Training function
def train(model, epoch):
    start_time = time.time()
    model = model.train()  # Set the model to training mode
    train_loss = 0.

    for batch_idx, data in enumerate(train_loader):
        spec, label = data
        optimizer.zero_grad()
        output = model(spec.unsqueeze(1))
        loss = CE(output, label)
        loss.backward()
        optimizer.step()
        train_loss += loss.data.item()

        # Print training progress
        if (batch_idx+1) % log_step == 0:
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches | ms/batch {:5.2f} | CE {:5.4f} |'.format(
                epoch, batch_idx+1, len(train_loader),
                elapsed * 1000 / (batch_idx+1), 
                train_loss / (batch_idx+1)
            ))

    train_loss /= (batch_idx+1)
    print('-' * 99)
    print('    | end of training epoch {:3d} | time: {:5.2f}s | CE {:5.4f} |'.format(
        epoch, (time.time() - start_time), train_loss))
    return train_loss

# Validation function
def validate(model, epoch):
    start_time = time.time()
    model = model.eval()  # Set the model to evaluation mode
    correct = 0
    total = 0

    for batch_idx, data in enumerate(validation_loader):
        spec, label = data
        with torch.no_grad():  # Perform forward pass only during validation
            output = model(spec.unsqueeze(1))
            _, output_label = torch.max(output, 1)
            output_label = output_label.data.numpy()
            label = label.data.numpy()
            correct += np.sum(output_label == label)
            total += len(label)

    accuracy = correct / total
    print('    | end of validation epoch {:3d} | time: {:5.2f}s | Accuracy {:5.4f} |'.format(
        epoch, (time.time() - start_time), accuracy))
    print('-' * 99)
    return accuracy

total_epoch = 100  # Train for 100 epochs
model_save = 'best_AlexNet.pt'  # Path to save the best validation model
optimizer = optim.Adam(model.parameters(), lr=1e-3)

# Main training loop
training_loss = []
validation_loss = []

for epoch in range(1, total_epoch + 1):
    training_loss.append(train(model, epoch))
    validation_loss.append(validate(model, epoch))

    if training_loss[-1] == np.min(training_loss):
        print('      Best training model found.')
    if validation_loss[-1] == np.max(validation_loss):
        # Save current best model on validation set
        with open(model_save, 'wb') as f:
            torch.save(model.state_dict(), f)
            print('      Best validation model found and saved.')

    print('-' * 99)

| epoch   1 |    12/   51 batches | ms/batch 76.05 | CE 2.7156 |
| epoch   1 |    24/   51 batches | ms/batch 73.59 | CE 2.7059 |
| epoch   1 |    36/   51 batches | ms/batch 73.77 | CE 2.6873 |
| epoch   1 |    48/   51 batches | ms/batch 72.37 | CE 2.6629 |
---------------------------------------------------------------------------------------------------
    | end of training epoch   1 | time:  3.66s | CE 2.6518 |
    | end of validation epoch   1 | time:  0.11s | Accuracy 0.1333 |
---------------------------------------------------------------------------------------------------
      Best training model found.
      Best validation model found and saved.
---------------------------------------------------------------------------------------------------
| epoch   2 |    12/   51 batches | ms/batch 70.14 | CE 2.5094 |
| epoch   2 |    24/   51 batches | ms/batch 63.65 | CE 2.5193 |
| epoch   2 |    36/   51 batches | ms/batch 64.39 | CE 2.5166 |
| epoch   2 |    48/   51 batches | m

In [7]:
# Load the best model's state dictionary
model.load_state_dict(torch.load('best_AlexNet.pt'))

# Set the model to evaluation mode
model.eval()

# Initialize variables for correctness and total test samples
correct = []
total = len(test_audio)

# Iterate over test data
for i in range(len(test_audio)):
    # Calculate STFT for the current audio sample
    this_spec = librosa.stft(test_audio[i].astype(np.float32), n_fft=512, hop_length=256)
    this_label = test_target[i]
    spec = torch.from_numpy(np.abs(this_spec))  # Use only the magnitude spectrogram
    this_label = torch.from_numpy(np.array(this_label)).long()

    # Forward pass through the model
    output = model(spec.unsqueeze(0).unsqueeze(1))

    # Calculate accuracy
    _, output_label = torch.max(output, 1)
    output_label = output_label.data.numpy()
    this_label = this_label.data.numpy()
    correct.append(np.sum(output_label == this_label))

# Calculate overall accuracy
overall_accuracy = np.sum(correct) / total * 100
print('Overall accuracy: {:.2f}%'.format(overall_accuracy))

# Calculate accuracy for each class
for i in range(len(test_audio) // 2):
    class_accuracy = np.sum(correct[i*2:i*2+2]) / 2 * 100
    print('Accuracy for {:s}: {:.2f}%'.format(all_event[i], class_accuracy))

Overall accuracy: 73.33%
Accuracy for beach: 50.00%
Accuracy for bus: 50.00%
Accuracy for cafe/restaurant: 100.00%
Accuracy for car: 100.00%
Accuracy for city_center: 100.00%
Accuracy for forest_path: 50.00%
Accuracy for grocery_store: 100.00%
Accuracy for home: 100.00%
Accuracy for library: 50.00%
Accuracy for metro_station: 100.00%
Accuracy for office: 100.00%
Accuracy for park: 100.00%
Accuracy for residential_area: 0.00%
Accuracy for train: 0.00%
Accuracy for tram: 100.00%
