## Acoustic Aircraft Detection (Binary Classification)

This notebook demonstrates how to train a simple binary classifier for detecting aircraft with the **AeroSonicDB (YPAD-0523)** dataset of low-flying aircraft sounds. We briefly introduce the dataset, then build a pre-processing pipeline to extract spectrograms from the audio, then train a binary CNN classifier and evaluate its performance.

## Import Dependencies

In [1]:
# import required libraries
import os
import math
import random
import pandas as pd
import numpy as np
import IPython.display as ipd
import librosa
import matplotlib.pyplot as plt
from matplotlib.patches import Patch
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, Dense, Flatten, MaxPooling2D, Dropout, BatchNormalization, SpatialDropout2D
from tensorflow.keras import regularizers
from sklearn.metrics import average_precision_score, PrecisionRecallDisplay
from sklearn.utils import class_weight
tf.get_logger().setLevel('ERROR')



## Load dataset and check directory structure

AeroSonicDB (YPAD-0523) contains two distinct audio datasets. 

The first resides in the "audio/audio" directory, and contains 1,895 audio files split between "positive" (aircraft) and "negative" (silence/no aircraft) classes. This dataset is supplemented by the "sample_meta.csv" file, which provides additional information about each of the audio samples. Model training, validation and testing is performed entirely on this dataset (for this particular notebook).

The second dataset can be found in the "env_audio/env_audio" directory and contains six, one-hour long "environmental recordings". These recordings are accompanied by the "environment_class_mappings.csv" file, which maps aircraft audio events to a timestamp in the recording. This dataset of continuous audio with annotations provides an ancilliary method for evaluating model performance on a stream of "real-world" and "real-time" data. 


In [2]:
DATA_DIR = '/kaggle/input/audio-dataset-of-low-flying-aircraft-aerosonicdb'

# take a look at the directory files and structure
print(os.listdir(DATA_DIR))
print(os.listdir(DATA_DIR + '/audio'))
print(os.listdir(DATA_DIR + '/audio/audio'))
print(os.listdir(DATA_DIR + '/env_audio'))
print(os.listdir(DATA_DIR + '/env_audio/env_audio'))

['env_audio', 'audio', 'sample_meta.csv', 'environment_class_mappings.csv', 'environment_mappings_raw.csv']
['audio']
['0', '1']
['env_audio']
['1_AUDIO.wav', '4_AUDIO.wav', '2_AUDIO.wav', '6_AUDIO.wav', '3_AUDIO.wav', '5_AUDIO.wav']


In [None]:
# set a path to the audio/audio directory
AUDIO_DIR = os.path.join(DATA_DIR, 'audio/audio')

# set a path to the env_audio/env_audio directory
ENV_DIR = os.path.join(DATA_DIR, 'env_audio/env_audio')

## Verify the dataset directory and "sample_meta.csv" contain the same number of files

In [None]:
# take a look at the audio directory, 
# how many negative class "0", how many positive "1"?
print(os.listdir(AUDIO_DIR))

for i in ['0', '1']:
    dir_files = len(os.listdir(os.path.join(AUDIO_DIR, i)))
    print(f'Class {i} contains {dir_files} samples')

In [None]:
# load the sample_meta.csv file for a look
df = pd.read_csv(os.path.join(DATA_DIR, 'sample_meta.csv'))
# sanity check on the number of samples in each class
df['class'].value_counts()

In [None]:
# take a look at all of the columns/labels available for each sample
df.columns

## Fetch a random file from each class

In [None]:
## Fetch a random file from each class
random.seed(42)
NEG_FILE = random.sample(os.listdir(os.path.join(AUDIO_DIR, '0')), 1)[0]
POS_FILE = random.sample(os.listdir(os.path.join(AUDIO_DIR, '1')), 1)[0]
print(NEG_FILE)
print(POS_FILE)

## Build helper functions

In [None]:
# define a function to build a filepath from a filename and class combination
def get_audio_path(df, filename):
    # locate the filename and fetch the corresponding class ("fclass" == file class)
    fclass = df.loc[df['filename'] == filename, 'class'].values[0]
    filepath = os.path.join(AUDIO_DIR, str(fclass), filename)
    return filepath, fclass

In [None]:
# check the function above works with our example files
print(get_audio_path(df=df, filename=POS_FILE))
print(get_audio_path(df=df, filename=NEG_FILE))

In [None]:
# function to load a file to play and show it's waveform
def load_show_audio(filename):
    path, fclass = get_audio_path(df=df, filename=filename)
    signal, sr = librosa.load(path)
    print(f'{filename} sample rate: {str(sr)}')
    plt.figure(figsize=(6, 3))
    librosa.display.waveshow(y=signal, sr=sr)
    plt.show()
    return ipd.Audio(path)

In [None]:
# load and play the positive/aircraft example
load_show_audio(filename=POS_FILE)

In [None]:
# load and play the negative/silence example
load_show_audio(filename=NEG_FILE)

## Audio preprocessing and feature extraction

In [None]:
# set some constants for feature extraction, training and inference
SR = 22050 # sample rate of the audio files
DURATION = 5 # length of a segment in seconds
SAMPLES_PER_SEGMENT = SR*DURATION # the number of samples per segment we expect
N_FFT = 2048 # approx frequency resolution of 21.5 Hz
HOP_LENGTH = 1024 
EXP_VECTORS_PER_SEGMENT = math.floor(SAMPLES_PER_SEGMENT/HOP_LENGTH)
N_MELS = 128 # the number of frequency bins for spectrogram
EXP_INPUT_SHAPE = (N_MELS, EXP_VECTORS_PER_SEGMENT) # the expected shape of the spectrogram
print('Expected spectrogram shape:', EXP_INPUT_SHAPE)

In [None]:
# function to load a file and chop it into spectrograms equal to the segment length
def audio_to_spectrogram(filename):
    path, fclass = get_audio_path(df=df, filename=filename)
    signal, sr = librosa.load(path)

    
    if sr != SR:
        raise ValueError('Sample rate mismatch between audio and target')
        
    clip_segments = math.ceil(len(signal) / SAMPLES_PER_SEGMENT)
    
    # empty list to hold the spectrograms for this clip
    specs = []
    
    for segment in range(clip_segments):
        
        start = SAMPLES_PER_SEGMENT * segment
        end = start + SAMPLES_PER_SEGMENT - HOP_LENGTH
        
        spec = librosa.feature.melspectrogram(y=signal[start:end], 
                                              sr=sr, n_fft=N_FFT, 
                                              n_mels=N_MELS, 
                                              hop_length=HOP_LENGTH,
                                              window='hann')
        
        db_spec = librosa.power_to_db(spec, ref=0.0)
        
        if db_spec.shape[1] == EXP_VECTORS_PER_SEGMENT:
            specs.append(db_spec)
        
        # if the clip is shorter than the segment, add zero padding to the right
        elif db_spec.shape[1] < EXP_VECTORS_PER_SEGMENT:
            n_short = EXP_VECTORS_PER_SEGMENT - db_spec.shape[1]
            db_spec = np.pad(db_spec, [(0, 0), (0, n_short)], 'constant')
            specs.append(db_spec)
        
    return (specs, fclass)

In [None]:
# double check the segmentation, spectrogram and padding are working correctly on a single file
specs, fclass = audio_to_spectrogram(POS_FILE)

fig, axes = plt.subplots(1,len(specs), sharey='row', figsize=(11, 3))

count = 0

for spec in specs:
    axes[count] = librosa.display.specshow(spec, ax=axes[count])
    count += 1

plt.show()

In [None]:
# function to apply min-max scaling to squeeze spectrogram values between 0 and 1
def normalise_array(array):
    array = np.asarray(array)
    min_val = array.min()
    max_val = array.max()
    
    norm_array = (array - min_val) / (max_val - min_val)
    
    return norm_array

In [None]:
# wrapper function to take a list of files and extract their features 
# -> array of features (X) and array of corresponding labels (y)
def preprocess(file_list):
    
    data = {'feature': [], 'label': []}
    
    for file in file_list:
        specs, fclass = audio_to_spectrogram(filename=file)
        
        for spec in specs:
            norm_spec = normalise_array(spec)
            data['feature'].append(norm_spec)
            data['label'].append(fclass)
    
    X = np.asarray(data['feature'])
    y = np.asarray(data['label'])
    
    return X, y

## Split the dataset and apply preprocessing

In [None]:
# split dataset into training, validation and testing portions
train = df['filename'].loc[(df['fold'] == '1') | (df['fold'] == '2') | (df['fold'] == '3')| (df['fold'] == '4')].reset_index(drop=True) # takes folds 1, 2, 3 and 4 for training
val = df['filename'].loc[df['fold'] == '5'].reset_index(drop=True) # takes fold 5 for validation
test = df['filename'].loc[(df['fold'] == 'test')].reset_index(drop=True) # held-out test set

print(f'The "TRAIN" set contains {train.shape[0]} samples.')
print(f'The "VALIDATION" set contains {val.shape[0]} samples.')
print(f'The "TEST" set contains {test.shape[0]} samples.')

In [None]:
# preprocess the train set
X_train, y_train = preprocess(train)

# preprocess the validation set
X_val, y_val = preprocess(val)

# preprocess the validation set
X_test, y_test = preprocess(test)

In [None]:
# check the shape of the output equals the expected shape of the spectrogram
X_train[0].shape == EXP_INPUT_SHAPE

## Build CNN arcitecture and train model

In [None]:
# set a random seed for reproducability
tf.keras.utils.set_random_seed(42)


# define the model architecture
model = Sequential()
model.add(Conv2D(16, (3, 3), activation='relu', input_shape=(128, 107,1)))
model.add(MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(tf.keras.layers.BatchNormalization())
model.add(SpatialDropout2D(0.5))

model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(SpatialDropout2D(0.5))

model.add(Conv2D(64, (3,3), activation='relu'))
model.add(MaxPooling2D((3, 3), strides=(2, 2), padding='same'))
model.add(SpatialDropout2D(0.5))

model.add(Flatten())
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.7))
model.add(Dense(1, activation='sigmoid'))

# compile model
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=0.0001), loss='BinaryCrossentropy', metrics=[tf.keras.metrics.AUC(curve='PR', name='PR-AUC')])
#model.summary()

In [None]:
BATCH_SIZE = 32
hist = model.fit(x=X_train, 
                 y=y_train, 
                 epochs=100, 
                 validation_data=(X_val, y_val), 
                 class_weight={0: 3, 1:1},
                 verbose=1,
                 batch_size=BATCH_SIZE)

In [None]:
plt.title('Loss')
plt.plot(hist.history['loss'], 'r')
plt.plot(hist.history['val_loss'], 'b')
plt.show()

In [None]:
plt.title('PR-AUC')
plt.plot(hist.history['PR-AUC'], 'r')
plt.plot(hist.history['val_PR-AUC'], 'b')
plt.show()

## Evaluate model performance against the held-out "test" set

In [None]:
y_prob = model.predict(X_test, batch_size=BATCH_SIZE)
ap_score = average_precision_score(y_true=y_test, y_score=y_prob)
print('Average precision on TEST set:', ap_score)

PrecisionRecallDisplay.from_predictions(y_test, y_prob)

## Save the model

In [None]:
model_path = os.path.join('/kaggle/working/', 'cnn', 'model')

if not os.path.exists(model_path):
    os.makedirs(model_path)

model.save(model_path)

## Evaluate model performance on continuous "Environmental" audio

In [None]:
# define a function to build a filepath from an environment hour number
def get_env_path(env_n):
    filepath = os.path.join(ENV_DIR, f'{env_n}_AUDIO.wav')
    return filepath

In [None]:
# function to retreive the class mappings for a particular env hour
def get_env_mappings(env_n):
    index_n = str(env_n - 1)
    df = pd.read_csv(os.path.join(DATA_DIR, 'environment_class_mappings.csv'))
    
    df = df[index_n]
    
    return df

In [None]:
def preprocess_env(env_n: None | int = None):
    # if env_n == None, it is assumed all environmental audio clips will be processed together
    if env_n == None:
        bottom = 1
        top = 7
    else:
        bottom = env_n
        top = env_n + 1
        
    data = {'feature': [], 'label': []}
    
    for env in range(bottom, top):
        path = get_env_path(env_n=env)
        mappings = get_env_mappings(env_n=env)
        
        signal, sr = librosa.load(path)
        
        if sr != SR:
            raise ValueError('Sample rate mismatch between audio and target')
            
        clip_segments = math.ceil(len(signal) / SAMPLES_PER_SEGMENT)
    
        specs = []
        
        for segment in range(clip_segments):
            
            start = SAMPLES_PER_SEGMENT * segment
            end = start + SAMPLES_PER_SEGMENT - HOP_LENGTH
            
            fclass = mappings.iloc[segment]
            
            if env_n == None:
                
            
                if fclass == 'ignore':

                    continue
                    
            if fclass == 'ignore':
                fclass = 0
            
            spec = librosa.feature.melspectrogram(y=signal[start:end], 
                                              sr=sr, n_fft=N_FFT, 
                                              n_mels=N_MELS, 
                                              hop_length=HOP_LENGTH,
                                              window='hann')
            
            db_spec = librosa.power_to_db(spec, ref=0.0)
            
            db_spec = normalise_array(db_spec)
            
            if db_spec.shape[1] == EXP_VECTORS_PER_SEGMENT:
                
                data['feature'].append(db_spec)
                data['label'].append(fclass)
            
            elif db_spec.shape[1] < EXP_VECTORS_PER_SEGMENT:
                n_short = EXP_VECTORS_PER_SEGMENT - db_spec.shape[1]
                db_spec = np.pad(db_spec, [(0, 0), (0, n_short)], 'constant')
                data['feature'].append(db_spec)
                data['label'].append(int(fclass))
                
                
            
    # check the arrays are of the same length
    if len(data['feature']) == len(data['label']):
        return data
    
    else:
        print('array mismatch - check output')
        return data

In [None]:
# preprocess the environmental audio set
env_set = preprocess_env()
X_env = np.asarray(env_set['feature'])
y_env = np.asarray(env_set['label']).astype(int)

In [None]:
# evaluate model performance on the env set
model.evaluate(x=X_env, y=y_env, batch_size=BATCH_SIZE)

In [None]:
y_prob = model.predict(X_env, batch_size=BATCH_SIZE)
ap_score = average_precision_score(y_true=y_env, y_score=y_prob)
print(ap_score)

PrecisionRecallDisplay.from_predictions(y_env, y_prob)

## Plot environmental predictions against the "ground truth"

In [None]:
# function to fetch the indices to ignore (periods which transition between positive and negative classes)
def fetch_ignore_indices(env_n):
    
    df = pd.read_csv(os.path.join(DATA_DIR, 'environment_class_mappings.csv'))
    
    arr = df[str(env_n-1)]
    
    ignore_list = []
    for i in arr.index:
        if arr.iloc[i] == 'ignore':
            ignore_list.append(i)
        else:
            pass

    lst = []
    fst = []
    for i in range(0, len(ignore_list)):

        if i == 0:
            first = ignore_list[i]
            fst.append(first)

        elif i == len(ignore_list)-1:
            last = ignore_list[-1]
            lst.append(last)

        else:
            if (ignore_list[i]-1 == ignore_list[i-1]) & (ignore_list[i]+1 == ignore_list[i+1]):
                pass

            elif (ignore_list[i]-1 == ignore_list[i-1]) & (ignore_list[i]+1 != ignore_list[i+1]):
                last = ignore_list[i]
                lst.append(last)

            elif (ignore_list[i]-1 != ignore_list[i-1]) & (ignore_list[i]+1 == ignore_list[i+1]):
                first = ignore_list[i]
                fst.append(first)
    
    
    if len(fst) == len(lst):
        
        tupes = []

        for i in range(0, len(fst)):
            tupes.append((fst[i], lst[i]+1))
    
    else:
        print('array length mismatch')
    
    return tupes

In [None]:
# function to predict across an environmental hour, then plot the predictions against the ground truth
def plot_env_predictions(model_name: str, env_n: int):
    
    title = f'ENV_{str(env_n)}_{model_name}'
    
    model_path = f'/kaggle/working/{model_name}/model'
    
    env_set = preprocess_env(env_n = env_n)
    
    X = np.asarray(env_set['feature'])
    y = np.asarray(env_set['label']).astype(int)
    
    model = tf.keras.models.load_model(model_path)
    
    y_prob = model.predict(X, batch_size=32, verbose=0)
    
    df = pd.read_csv('/kaggle/input/audio-dataset-of-low-flying-aircraft-aerosonicdb/environment_mappings_raw.csv')
    idx_n = str(int(env_n)-1)
    
    raw = df[idx_n]
    
    dfn = pd.read_csv(os.path.join(DATA_DIR, 'environment_class_mappings.csv'))
    ig = dfn[idx_n]
    ig = ig.replace('1', '0')
    ig = ig.replace('ignore', '1')
    
    tot_frames = len(y_prob)
    
    start = 0  
    end = start + tot_frames
    
    fig = plt.figure(figsize=(12, 4))
    ax = plt.subplot(111)
    
    # fetch the transition areas
    ig_idx = fetch_ignore_indices(env_n=env_n)
    # mark the transition areas
    for tupe in ig_idx:
        if (tupe[0] >= start) & (tupe[1] <= end):
            ax.axvspan(tupe[0], tupe[1], facecolor='gold', alpha=0.25)
    
    # set tick markers and labels
    x_ticks = np.arange(start, (end+1), step=60)
    x_labels = np.arange(start//12, ((end//12)+1), step=5)
    plt.xticks(x_ticks, x_labels)
        
    gt = ax.plot(range(len(raw[start:end])), raw[start:end], linewidth=1.5, linestyle='dotted', color='seagreen', label='Ground truth')
    thresh = ax.axhline(y=0.5, xmin=-0.05, xmax=1, color='r', linewidth=0.7, alpha=0.5, label='50% threshold')
    pred = ax.plot(range(len(y[start:end])), y_prob[start:end], color='k', linewidth=.25, alpha=0.9)
    ax.fill_between(x=range(len(y[start:end])), y1=y_prob[start:end].flatten(), alpha=0.1, color='k')
    plt.ylim(-0.01,1.1)
    plt.xlabel('Time (minutes)')
    plt.ylabel('Probability')
    patch = Patch(facecolor='gold', alpha=0.25, label='Onset/Outset (ignore)')
    patch2 = Patch(facecolor='k', alpha=0.1, edgecolor=(0.0, 0.0, 0.0, 0.9), label='Model prediction')
    plt.title(f'Model: {model_name}, Env: {env_n}')

    # Shrink current axis's height by 10% on the bottom
    box = ax.get_position()
    ax.set_position([box.x0, box.y0 + box.height * 0.25,
                     box.width, box.height * 0.75])

    # Put a legend below current axis
    ax.legend(handles=[gt[0], thresh, patch2, patch], loc='upper center', bbox_to_anchor=(0.5, -0.25),
              fancybox=True, shadow=False, ncol=4, prop={'size': 8})

    plt.show()

In [None]:
plot_env_predictions('cnn', env_n=1)

In [None]:
plot_env_predictions('cnn', env_n=2)

In [None]:
plot_env_predictions('cnn', env_n=3)

In [None]:
plot_env_predictions('cnn', env_n=4)

In [None]:
plot_env_predictions('cnn', env_n=5)

In [None]:
plot_env_predictions('cnn', env_n=6)