# Introduction

This notebook contains the code for the pre-processing steps that were followed for the task of Honk Detection.

# Import Necessary Libraries

In [None]:
import json
import os
from collections import Counter

import h5py
import librosa
import numpy as np
from skimage.transform import resize
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

# Set Default Parameters

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

In [None]:
TRACK_DURATION = 2270  # measured in seconds; chosen because smallest track duration is 37:52 minutes which is 2272 seconds
SEGMENT_DURATION = 1  # duration of audio for which spectrogram image must be made; measured in seconds
N_FFT = 256  # corresponds to window size for FFT; used to adjust frequency resolution
WIN_LENGTH = 192  # corresponds to number of samples in 24ms time interval at 8kHz
IMAGE_SHAPE = (224,224)  # default size of images that are required
N_RANDOM_SAMPLES = 1360 # number of samples to be selected randomly to balance the dataset

LABEL_DATA_PATH = 'drive/MyDrive/<path-of-label-data>'  # path of the folder from where labels will be read
AUDIO_DATA_PATH = 'drive/MyDrive/<path-of-audio-data>'  # path of the folder where the audio files are present
DATASET_FILE_PATH = 'drive/MyDrive/<path-of-dataset>'  # path of the file that must be created

# Important Functions Used

The following function takes a spectrogram array and processes it to convert to an 8-bit image.

First we resize the image to the required size, followed by MinMax Scaling to get the values in the range [0,255]. Then the image is flipped over and inverted so that the black areas represent more energy regions. Finally, the greyscale image is converted to RGB by adding the same pixel intensity to all the three channels.

In [None]:
# convert raw spectrogram data to 8-bit image
def convert_spectrogram_to_image(spectrogram_data, image_shape):

    # resize spectrogram data to obtain array of shape specified in parameter
    resized_image = resize(spectrogram_data, image_shape)
    
    # scale spectrogram values to fit within 8-bit range to visualise as image
    scaler = MinMaxScaler(feature_range=(0,255))
    scaled_image = scaler.fit_transform(resized_image).astype(np.uint8)
    
    greyscale_image = np.flip(scaled_image, axis=0)  # put low frequencies at the bottom in image
    greyscale_image = 255 - greyscale_image  # invert to make black ==> more energy

    # convert greyscale image to RGB by adding same value to all three channels
    rgb_image = np.asarray(np.dstack((greyscale_image, greyscale_image, greyscale_image)), dtype=np.uint8)

    return rgb_image

The next function takes as input the audio signal and various parameters used for Short-Time Fourier Transform and returns a list of images, each obtained by converting the corresponding segment of the spectrogram to greyscale image.

In [None]:
def convert_audio_to_spectrogram_images(signal, sample_rate, segment_duration=SEGMENT_DURATION, n_fft=N_FFT, win_length=WIN_LENGTH, image_shape=IMAGE_SHAPE):
    # expected number of images we are expecting to extract from the audio file
    images_per_track = int(librosa.get_duration(signal, sr=sample_rate) / segment_duration)
    images = list()

    # iterate for each count of image
    for i in range(images_per_track):
        start = i * segment_duration * sample_rate
        end = (i + 1) * segment_duration * sample_rate

        # calculate the spectrogram and convert from amplitude to log-scaled decibel values
        short_time_fourier_transform = librosa.stft(signal[start:end], n_fft=n_fft, win_length=win_length)
        spectrogram = librosa.amplitude_to_db(abs(short_time_fourier_transform))

        # convert raw spectrogram data to 8-bit image
        final_image = convert_spectrogram_to_image(spectrogram, image_shape)
        
        # add processed image to list
        images.append(final_image)
    
    # return list containing all extracted images
    return images

Perform complete preprocessing pipeline by doing the following steps:
1. Get names of all audio files from the input data path
2. Create .h5 dataset file
3. Read each audio file individually and generate images of spectrograms of each 1 second audio window
4. Try and obtain existing labels, or label files in a general way
5. Append the image and label data to the dataset file


In [None]:
def preprocess(audio_path, labelled_data_path, output_dataset_path):

    # get all audio filenames from audio_path folder
    files = [f for f in os.listdir(audio_path) if os.path.isfile(os.path.join(audio_path, f))]

    # create new h5 file to store dataset consisting of all the image data and the corresponding labels
    dataset_file = h5py.File(output_dataset_path, 'a')

    # iterate among all audio files found
    for index, audio_file in enumerate(files):
        
        # get absolute path of the audio file
        path_to_audio_file = os.path.join(audio_path, audio_file)

        # load segment of file for training data
        audio_signal, sample_rate = librosa.load(path_to_audio_file,
                                                 mono=True,
                                                 sr=None,
                                                 duration=TRACK_DURATION)

        # get the corresponding spectrogram images of the audio file
        spectrogram_images = convert_audio_to_spectrogram_images(audio_signal, sample_rate=sample_rate)
        
        # check labelled_data_path to see if label information is present for current file or not
        possible_file_name = audio_file.split('.')[0] + '.json'
        possible_file_path = os.path.join(labelled_data_path, possible_file_name)
        
        # list containing label values for current audio file
        labels = list()

        # if label information exists, collect labels values to list
        if os.path.isfile(possible_file_path):
            with open(possible_file_path, 'r') as fp:
                label_data = json.load(fp)
                labels.extend(list(label_data.values())[:TRACK_DURATION])
        
        # else give default label value '2' to all samples
        else:
            labels.extend([2 for _ in range(len(spectrogram_images))])
        
        # convert lists to numpy arrays for faster processing
        spectrogram_images = np.array(spectrogram_images, dtype=np.uint8)
        labels = np.array(labels, dtype=np.uint8)

        # extract Unknown labels for random undersampling
        labels_uk = labels[ labels == 2 ]

        # random indices to choose 25% of Unknown samples
        """
        This is required because there are a lot of Unknown samples ~ 45,000 which are being stored uselessly now since they will be 
        randomly Undersampled during Balancing. To reduce the load later, we undersample these during storage itself.
        """
        random_indices = np.random.randint(0, labels_uk.shape[0], int(0.25 * labels_uk.shape[0]))

        # randomly undersample the Unknown class
        images_uk = spectrogram_images[ labels == 2 ][random_indices]
        labels_uk = labels_uk[ random_indices ]
        
        # extract the Known samples and labels
        spectrogram_images = spectrogram_images[ labels != 2 ]
        labels = labels[ labels != 2 ]

        # add the selected Unknown samples to the Known samples
        spectrogram_images = np.append(spectrogram_images, images_uk, axis=0)
        labels = np.append(labels, labels_uk)

        # dataset must be created only for the first time, remaining iterations only extend it further by adding more rows
        if 'X' not in dataset_file.keys():
            dataset_file.create_dataset('X', data=spectrogram_images, chunks=spectrogram_images.shape, maxshape=(None, None, None, None), dtype=np.uint8)
            dataset_file.create_dataset('y', data=labels, chunks=(labels.shape[0], ), maxshape=(None, ), dtype=np.uint8)
        
        else:
            # increase rows in existing array by the number of rows to be added in current iteration by resizing original array
            dataset_file['X'].resize(dataset_file['X'].shape[0] + spectrogram_images.shape[0], axis=0)

            # then add the new rows in the space that is newly generated
            dataset_file['X'][-spectrogram_images.shape[0]:] = spectrogram_images

            # append labels obtained from either JSON file or automatically generated to the overall dataset file
            dataset_file['y'].resize(dataset_file['y'].shape[0] + labels.shape[0], axis=0)
            dataset_file['y'][-labels.shape[0]:] = labels

        # show completion message
        print(f'{audio_file} has been processed successfully!')

    # close dataset file and exit
    dataset_file.close()
    print("Completed!!")

# Balancing the Dataset

This section contains the code that is used to balance the classes in the dataset.

We first separate the dataset obtained after all the pre-processing steps are completed into separate arrays depending upon their labels. We then randomly select a fixed number of samples from all the classes and combine them together to create the final dataset. We combining, we add 25% of the known honks to the dataset as UNKNOWN samples to maintain the distribution in the dataset. Finally, we take the train-test split so that the performance of the model can be evaluated on unseen data.

All the obtained arrays are stored to the dataset file, and the operation is completed.

In [None]:
# function to randomly select samples from the dataset and to introduce known honks to balance the
# huge class difference present in the dataset
def balance_dataset(dataset_file_path):
    
    # open dataset file in reading mode and read datasets
    dataset_file = h5py.File(dataset_file_path, 'r')
    
    # count the number of different labels in the dataset
    # so that upper limit of sample count is known during selection of random samples
    counter = Counter(dataset_file['y'])
    num_no_honks = counter[0]
    num_honks = counter[1]
    num_uk = counter[2]

    # generate random indices to select non-honk samples
    random_indices_no_honk = np.random.randint(0, num_no_honks, int(0.5 * N_RANDOM_SAMPLES))

    # generate random indices to select honk samples
    random_indices_honk = np.random.randint(0, num_honks, int(0.5 * N_RANDOM_SAMPLES))
    
    # generate random indices to select unknown samples
    random_indices_uk = np.random.randint(0, num_uk, N_RANDOM_SAMPLES)

    # read the dataset
    X = dataset_file['X'][()]
    y = dataset_file['y'][()]

    X_no_honk = X[ y == 0 ][ random_indices_no_honk ]   # choose random non-honk samples
    X_honk = X[ y == 1 ][ random_indices_honk ] # choose random honk samples
    X_uk = X[ y == 2 ][ random_indices_uk ] # choose random unknown samples

    y_no_honk = np.zeros(X_no_honk.shape[0], dtype=y.dtype) # labels for non-honk samples
    y_honk = np.ones(X_honk.shape[0], dtype=y.dtype)    # labels for honk samples
    y_uk = np.full(X_uk.shape[0], 2, dtype=y.dtype) # labels for unknown samples

    # delete original dataset file
    os.remove(dataset_file_path)

    # read the dataset file in write mode, overwriting over previous data
    dataset_file = h5py.File(dataset_file_path, 'w')

    # generate the final image dataset
    X_final = np.append(X_honk, X_no_honk, axis=0)
    y_final = np.append(y_honk, y_no_honk)

    # make train and test split
    X_train, X_test, y_train, y_test = train_test_split(X_final, y_final, test_size=0.33, random_state=42, stratify=y_final)

    # choose some instances of honks in training dataset to add to Unknown class
    num_honks_train = X_train[ y_train == 1 ].shape[0]  # number of honks in training dataset
    random_indices_honk_add = np.random.randint(0, num_honks_train, int(0.25 * num_honks_train))

    # extract chosen samples and labels
    X_honk_add = X_train[ y_train == 1 ][random_indices_honk_add]
    y_honk_add = np.full(X_honk_add.shape[0], 2, dtype=y.dtype)

    # add some known honks to Unknown class
    X_uk = np.append(X_uk, X_honk_add, axis=0)
    y_uk = np.append(y_uk, y_honk_add)

    # generate final image dataset
    X_train = np.append(X_train, X_uk, axis=0)
    y_train = np.append(y_train, y_uk)
    
    # create new datasets containing fresh training data
    dataset_file.create_dataset(name='X_train', data=X_train, dtype=np.uint8)
    dataset_file.create_dataset(name='y_train', data=y_train, dtype=np.uint8)

    # create new datasets containing testing data
    dataset_file.create_dataset(name='X_test', data=X_test, dtype=np.uint8)
    dataset_file.create_dataset(name='y_test', data=y_test, dtype=np.uint8)
    
    # close dataset file
    dataset_file.close()

    # show confirmation
    print("Dataset was balanced successfully!")

# Executing the Code (main)

In [None]:
# create training dataset
    preprocess(AUDIO_DATA_PATH, LABEL_DATA_PATH, DATASET_FILE_PATH)

    # balance the dataset by randomly undersampling majority class
    # and inserting known instances of minority class in the dataset
    balance_dataset(DATASET_FILE_PATH)

# Utility Functions

These are some utility functions that can be used to check if the pre-processing steps worked as expected, or not.

In the first function, we read the created dataset and print the distribution of the different classes in it.

In [None]:
file = h5py.File(DATASET_FILE_PATH, 'r')
counter1 = Counter(file['y_train'])
counter2 = Counter(file['y_test'])
print(f'Counter1: {counter1} ||| Counter2: {counter2}')
file.close()

In the final function, we read the dataset and print the shapes of all the datasets. The number of samples in X_train should be equal to the number of labels in y_train. Similarly, for X_test and y_test.

In [None]:
file = h5py.File(DATASET_FILE_PATH, 'r')
X_train = file['X_train']
y_train = file['y_train']
X_test = file['X_test']
y_test = file['y_test']

print(f'X_train: {X_train.shape}')
print(f'y_train: {y_train.shape}')
print(f'X_test: {X_test.shape}')
print(f'y_test: {y_test.shape}')

file.close()