# soundNet: Classifying Vehicle Classes from Audio Files

Import Libraries/Modules

In [1]:
import os

import random
import pandas as pd
import numpy as np
import pickle

import torch
import torchaudio 
from torchaudio.transforms import AmplitudeToDB, MelSpectrogram, Resample
from torch.utils.data import Dataset, DataLoader

Config

In [2]:
# Configuration cell
# ------------------
# This cell contains all the configuration for the notebook.

# Path to the directory containing the sound files.
data_dir = '/media/huimin/PortableSSD/KTH/data/SED-single_vehicle-30_sec/'
annotations_path= '/media/huimin/PortableSSD/KTH/data/SED-single_vehicle-30_sec/weak_labels.csv'

# Information on the dataset
classes = [1, 2, 5, 12, 15]
header = ['filepath_wav_30', 'vehicle_class', 'vehicle_class_combo', 'vehicle_class_base',	'speed']
_SEED = 42

# Parameters for the spectrogram computation.
feats = {
    'n_mels': 128,
    'n_filters': 2048,
    'hop_length': 256,
    'n_window': 2048,
    'sample_rate': 16000,
    'f_min': 0,
    'f_max': 8000,
    'duration': 30
}

# Training and model parameters
model = {
    'batch_size' : 12,
    'lr' : 0.001,
    'epochs' : 100,
    'num_workers' : 4,
    'n_frames' : 1000
}


## Data Manipulation

### Loading audio files

In [3]:
# Load the annotations in a pandas dataframe
labels = pd.read_csv(annotations_path, sep='\t')
labels.head()

Unnamed: 0,filepath_wav_30,vehicle_class,vehicle_class_combo,vehicle_class_base,speed
0,wav/20230109/rs_an0005_dt_20230109_tm_015511_t...,2,16,M,33
1,wav/20230109/rs_an0005_dt_20230109_tm_022315_t...,2,16,M,77
2,wav/20230109/rs_an0005_dt_20230109_tm_022947_t...,2,16,M,28
3,wav/20230109/rs_an0005_dt_20230109_tm_023948_t...,5,18,M,20
4,wav/20230109/rs_an0005_dt_20230109_tm_024134_t...,5,18,M,38


In [4]:
# Compare the number of unique filenames and raw number of rows
# to see if there are any duplicates.
unique = labels[header[0]].nunique()
length = labels.shape[0]
if not np.array_equal(unique, length):
    print("There are duplicates in the dataset.")
else:
    print(f"There are {length} unique filenames in the dataset.")

There are 3316 unique filenames in the dataset.


In [5]:
# Verify that the classes are the correct ones
unique_classes = labels[header[1]].unique()
unique_classes.sort()
if not np.array_equal(classes, unique_classes):
    print('The classes are not the same as the ones in the annotations file.')
    print('Classes in annotations file: {}'.format(unique_classes))
    print('Classes in classes variable: {}'.format(classes))
    raise ValueError('The classes are not the same as the ones in the annotations file.')

In [6]:
# Create 3 splits: train, validation and test
# -------------------------------------------
# We will create 3 splits: train, validation and test. The train split will be used to train the model, the validation
# split will be used to evaluate the model during training and the test split will be used to evaluate the model after
# training. We will use a 70/10/20 split.
from sklearn.model_selection import train_test_split

# Split the dataset into train and test
train, test = train_test_split(labels, test_size=0.2, random_state=_SEED)
# Split the train dataset into train and validation
train, val = train_test_split(train, test_size=0.1, random_state=_SEED)

train.reset_index(drop=True, inplace=True)
val.reset_index(drop=True, inplace=True)
test.reset_index(drop=True, inplace=True)

# # Create a list of the unique audio files
# audio_files = labels[header[0]].unique()
# # Shuffle the list with the defined seed
# np.random.seed(_SEED)
# np.random.shuffle(audio_files)
# # Split the list into 3 parts
# train_files = audio_files[:int(0.6 * len(audio_files))]
# val_files = audio_files[int(0.6 * len(audio_files)):int(0.8 * len(audio_files))]
# test_files = audio_files[int(0.8 * len(audio_files)):]

In [7]:
# Verify the splits
print(f"Number of unique audio files: {length} = {len(train)} + {len(val)} + {len(test)}") if len(labels) == len(train) + len(val) + len(test) else print('The splits are not correct.')
print('Number of train files: {}'.format(len(train)))
print('Number of validation files: {}'.format(len(val)))
print('Number of test files: {}'.format(len(test)))

Number of unique audio files: 3316 = 2386 + 266 + 664
Number of train files: 2386
Number of validation files: 266
Number of test files: 664


In [8]:
# Get the partitions
# ---------------------
# The partitions will be together, identified by 3 keys (train, val, test) and the values will be the couple (filename,
# class).
if not os.path.exists(data_dir + 'partitions.pkl'):
    # Generate the partitions
    # ---------------------
    partitions = {'train': [], 'val': [], 'test': []}

    partitions['train'] = [(filename, class_value) for filename, class_value in zip(train[header[0]], train[header[1]])]
    partitions['val'] = [(filename, class_value) for filename, class_value in zip(val[header[0]], val[header[1]])]
    partitions['test'] = [(filename, class_value) for filename, class_value in zip(test[header[0]], test[header[1]])]
    
    # Save the partitions
    # -------------------
    with open(data_dir + 'partitions.pkl', 'wb') as f:
        pickle.dump(partitions, f)
else:
    with open(data_dir + 'partitions.pkl', 'rb') as f:
        partitions = pickle.load(f)
# Verify the partitions
if len(partitions['train']) + len(partitions['val']) + len(partitions['test']) != len(labels):
    print('The partitions are not correct.')

### Pre-processing of the audio files

In [10]:
def pad_audio(audio, target_len, fs):
    
    if audio.shape[-1] < target_len:
        audio = torch.nn.functional.pad(
            audio, (0, target_len - audio.shape[-1]), mode="constant"
        )

        padded_indx = [target_len / len(audio)]
        onset_s = 0.000
    
    elif len(audio) > target_len:
        
        rand_onset = random.randint(0, len(audio) - target_len)
        audio = audio[rand_onset:rand_onset + target_len]
        onset_s = round(rand_onset / fs, 3)
        padded_indx = [target_len / len(audio)] 

    else:
        onset_s = 0.000
        padded_indx = [1.0]

    offset_s = round(onset_s + (target_len / fs), 3)
    return audio, onset_s, offset_s, padded_indx

In [11]:
def get_log_melspectrogram(audio, sample_rate, window_size, hop_size, n_mels, f_min, f_max):
    """Compute log melspectrogram of an audio signal."""
    # Compute the mel spectrogram
    mel_spectrogram = MelSpectrogram(
        sample_rate=sample_rate,
        n_fft=window_size,
        win_length=window_size,
        hop_length=hop_size,
        f_min=f_min,
        f_max=f_max,
        n_mels=n_mels,
        window_fn=torch.hamming_window,
        wkwargs={"periodic": False},
        power=1,
    )(audio)
    # Convert to dB
    amp_to_db = AmplitudeToDB(stype="amplitude")
    amp_to_db.amin = 1e-5  # amin= 1e-5 as in librosa
    log_melspectrogram = amp_to_db(mel_spectrogram).clamp(min=-50, max=80)
    return log_melspectrogram

In [12]:
def get_log_melspectrogram_set(set, save_path, sample_rate=feats["sample_rate"], window_size=feats["n_window"], hop_size=feats["hop_length"], n_mels=feats["n_mels"], f_min=feats["f_min"], f_max=feats["f_max"], duration=feats["duration"]): 
    """Compute log melspectrogram of a set of audio signals."""
    DURATION = duration
    for i, filename in enumerate(set[header[0]].unique()):
        print(f"\rConstructing mel audio {i+1}/{len(set[header[0]].unique())}", flush=True)
        audio, sr = torchaudio.load(os.path.join(data_dir + filename))
        if sr != sample_rate:
            resampled_audio = Resample(sr, sample_rate)(audio)
        resampled_audio_pad, *_ = pad_audio(resampled_audio, DURATION, sample_rate)
        log_melspectrogram = get_log_melspectrogram(resampled_audio_pad, sample_rate, window_size, hop_size, n_mels, f_min, f_max)
        # Create the save path folder if doesn't exist
        if not os.path.exists(save_path):
            os.makedirs(save_path)
        np.save(f"{save_path}/{filename.split('/')[-1].replace('.wav', '')}.npy", log_melspectrogram.numpy())

In [13]:
# Compute the log melspectrogram for each set
# -------------------------------------------
if not os.path.exists(os.path.join(data_dir + 'npy', 'train')):
    print("Constructing mel audio for train set")
    get_log_melspectrogram_set(train, os.path.join(data_dir + 'npy', 'train'))
if not os.path.exists(os.path.join(data_dir + 'npy', 'val')):    
    print("Constructing mel audio for val set")
    get_log_melspectrogram_set(val, os.path.join(data_dir + 'npy', 'val'))
if not os.path.exists(os.path.join(data_dir + 'npy', 'test')):
    print("Constructing mel audio for test set")
    get_log_melspectrogram_set(test, os.path.join(data_dir + 'npy', 'test'))

Constructing mel audio for train set
Constructing mel audio 1/2386
Constructing mel audio 2/2386
Constructing mel audio 3/2386
Constructing mel audio 4/2386
Constructing mel audio 5/2386
Constructing mel audio 6/2386
Constructing mel audio 7/2386
Constructing mel audio 8/2386
Constructing mel audio 9/2386
Constructing mel audio 10/2386
Constructing mel audio 11/2386
Constructing mel audio 12/2386
Constructing mel audio 13/2386
Constructing mel audio 14/2386
Constructing mel audio 15/2386
Constructing mel audio 16/2386
Constructing mel audio 17/2386
Constructing mel audio 18/2386
Constructing mel audio 19/2386
Constructing mel audio 20/2386
Constructing mel audio 21/2386
Constructing mel audio 22/2386
Constructing mel audio 23/2386
Constructing mel audio 24/2386
Constructing mel audio 25/2386
Constructing mel audio 26/2386
Constructing mel audio 27/2386
Constructing mel audio 28/2386
Constructing mel audio 29/2386
Constructing mel audio 30/2386
Constructing mel audio 31/2386
Constructin

### Dataloaders

In [14]:
# Create the dataset and the dataloaders
class VehicleDataset(Dataset): 
    def __init__(self, data_dir, partition, set='train', n_frames=model['n_frames']):
        self.partition = partition
        self.data_dir = data_dir
        self.set = set
        self.n_frames = n_frames
    
    def __len__(self):
        return len(self.partition)
    
    def __getitem__(self, idx):
        filename, class_value = self.partition[idx]
        log_melspectrogram = np.load(os.path.join(self.data_dir + 'npy', self.set, filename.split('/')[-1].replace('.wav', '') + '.npy'))[:self.n_frames, :]
        return log_melspectrogram, class_value
    
train_dataloader = DataLoader(VehicleDataset(data_dir, partitions['train']), batch_size=model['batch_size'], shuffle=True)
val_dataloader = DataLoader(VehicleDataset(data_dir, partitions['val'], set='val'), batch_size=model['batch_size'], shuffle=True)
test_dataloader = DataLoader(VehicleDataset(data_dir, partitions['test'], set='test'), batch_size=model['batch_size'], shuffle=True)

## Model