The following is a workbook of my capstone project at Digital Futures. The aim was to create an algorithm that will be able to identify birds based on their chirping. A dataset from xeno-canto.org via kaggle was used. For reasons of simplicity and versatility, the algorithm only uses audio data as input when making predictions. Further improvement is desired. You can read about these and a more detailed walk-through of creating the algorithm at https://aarroonn.medium.com/.

In [None]:
# LIBRARIES:

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import scipy
from scipy.signal import butter, lfilter
from itertools import compress

import librosa

from sklearn.naive_bayes import GaussianNB
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.ensemble import VotingClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn import metrics
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.decomposition import PCA


%matplotlib inline

In [None]:
# FUNCTIONS:

# BANDPASS FILTER FUNCTION:

def bandpass_00(audio, sample_rate, lowcut = 800, highcut = 7000, order = 5):
    
    '''Creates a bandpass filter for audio signals.
    
    parameters:
    - audio: np.array of signal values
    - lowcut: numeric value of lowcut frequency
    - highcut: numeric value of highcut frequency
    - sample_rate: integer sample rate of audio
    - order: numeric order of the filter
    
    output:
    np.array of filtered signal values'''
    
    nyq = sample_rate * 0.5
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(N = order, Wn = [low, high], btype = 'bandpass')
    output = lfilter(b, a, audio)
    
    return output


# NOISE FILTER FUNCTION:

def denoiser_00(audio, sample_rate, aggregate = np.median, metric = 'cosine', window = 1, margin_bg = 10,
                margin_front = 10, power = 2):
    
    '''Function to separate front elements and background noise using spectrogram transformation, filtering
    nearest-neighbours and soft-mask operation.
    The function is based on one of the 'Advanced examples' from the librosa package documentation:
    https://librosa.org/doc/latest/auto_examples/plot_vocal_separation.html#sphx-glr-auto-examples-plot-vocal-separation-py
    
    parameters:
    - audio: np.array of signal values
    - sample_rate: integer sample rate of audio
    - aggregate: numpy aggregate (per-frequency) function treating the nearest neighbours of each spectrogram input column
    - metric: str distance metric used for nearest-neighbour calculation. see sklearn.neighbors.DistanceMetric for a list of options
    - window: int>=1 defining in seconds which points to consider neighbour
    - margin_bg: numeric>=0 determines the signal strength of the front element in the soft-mask operaion
    - margin_front: numeric>=0 determines the signal strength of the background element in the soft-mask operaion
    - power: numeric>0 or np.inf the power element of the soft-mask computation
    
    output:
    np.array-s of the filtered front audio and the background audio'''
    
    # getting spectrogram component and phase of Fourier-transforms of the original audio
    ft_mag, ft_phase = librosa.magphase(librosa.stft(audio))
    
    # returns a combined array of element-wise minimums of the original audio and our neighbours cosine simiarity
    # filtered array, so that out filter values will never exceed the original ones
    ft_filter = np.minimum(ft_mag,
                           librosa.decompose.nn_filter(ft_mag,
                                                       aggregate = aggregate,
                                                       metric = metric,
                                                       width = int(librosa.time_to_frames(window, sr = sample_rate))))
    
    # getting mask operations for background and front elements. the results will be our local 'magnitudes' to
    # separate out elements already present in the original audio. 'ft_filter' is our background elements. we retain
    # the front audio by subtracting it from the original audio. note the symmetry between the two masks. that is
    # because the two audios serve as 'reference' to each other's masks
    
    # to save processing time, i comment out the background elements here
    #mask_bg = librosa.util.softmask(ft_filter,
    #                                margin_bg * (ft_mag - ft_filter),
    #                                power = power)
    mask_front = librosa.util.softmask(ft_mag - ft_filter,
                                       margin_front * ft_filter,
                                       power = power)
    
    # retaining the front and background audio signals
    
    # to save processing time, i comment out the background elements here
    front = mask_front * ft_mag
    #bg = mask_bg * ft_mag
    audio_front = librosa.istft(front * ft_phase)
    #audio_bg = librosa.istft(bg * ft_phase)
    
    return audio_front#, audio_bg


# TIME SAMPLING FUNCTION:

def look_pass(list01):
    
    '''Custom function taking in a boolean list. It was used to find points where our signal just passed a
    threshold value.'''
    
    list00 = []
    
    for i in range(len(list01) - 1):
        if (list01[i], list01[i + 1]) == (False, True):
            list00.append(i + 1)
            
    return list00


def tsampler_01(audio, sample_rate, sample_length = 0.5, look_ahead = 0.05, threshold = -15.):
    
    '''Function to create a sample of given length, from around the first relevant signal strength of audio.
    
    parameters:
    - audio: np.array of signal values
    - sample_rate: int sample rate of audio
    - sample_length: numeric>0 length of desired sample output(s) in seconds
    - look_ahead: numeric>0 length of how many seconds before reaching the threshold should be included in the final sample
    - threshold: numeric value of threshold point in dB. only signals exceeding will be sampled
    
    output:
    - a list of np.arrays of audio signals'''
    
    # for a more stable evaluation of local amplitude of the signal its converted to rms
    rms = librosa.feature.rms(y = audio)[0]
    
    # calculating time arrays for the original and rms signals
    t01 = librosa.frames_to_time(np.arange(len(audio)), hop_length = 1, sr = sample_rate)
    t02 = librosa.frames_to_time(np.arange(len(rms)), sr = sample_rate)
    
    # creating a boolean mask, to let through values above our threshold using our function from before
    rms_bool = np.where(librosa.amplitude_to_db(rms, ref = np.max) >= threshold, True, False)
    t02 = [t02[x] for x in look_pass(rms_bool)]
    
    # using our sample length, we cut the original audio to sample size. we also return the cutoff points (t_val)
    t_bool = [np.where((t01 > x - look_ahead), True, False) for x in t02]
    t_val = [round(x - look_ahead, 2) for x in t02]
    samples = [np.array(list(compress(audio, x)))[: int(sample_rate * sample_length) + 1] for x in t_bool]
    
    return samples, t_val


# ALL TOGETHER:

def audio_treatment(audio, sample_rate = 16000, track = False, track_v = 0, gen = None):
    
    '''Function to combine the audio manipulating sub-functions.
    
    parameters:
    - audio: np.array of signal values
    - sample_rate: int sample rate of audio
    - track: boolean value of whether the progress should be tracked
    - track_v: int value of the last element to be processed
    - gen: a generator object providing value steps
    
    output:
    np.array of processed audio signal'''
    
    a_t = bandpass_00(audio = audio, sample_rate = sample_rate)
    a_t = denoiser_00(audio = a_t, sample_rate = sample_rate)
    a_t, t_v = tsampler_01(audio = a_t, sample_rate = sample_rate)
    f_t = [np.abs(librosa.stft(x)) for x in a_t]
    if track == True:
        print(f'{next(gen)} / {track_v} finished')
        
    return f_t, t_v


# PREDICTION FUNCTION:

def chirping01(audio):
    
    '''Fucntion identifying birds based on an audio record of their chirping.
    
    parameters:
    - audio: np.array of signal values
    
    output:
    - a print of specific times and associated bird types'''
    
    aud, time = audio_treatment(audio)
    # if the last samples' shape is not suitable for the model, it is discarded
    
    while aud[-1].shape != aud[0].shape:
        aud = aud[: -1]
        time = time[: -1]
        
    # we scale up the strength of the audio signal and reshape it for our model
    aud_db = np.array([librosa.amplitude_to_db(x, ref = np.max) for x in aud])
    aud_db = np.array([x.reshape(x.shape[0] * x.shape[1], ) for x in aud_db])
    
    # we perform a PCA on the signal and perform the prediction
    aud_pca = pca01.transform(aud_db)
    pred = svc02.predict_proba(aud_pca)
    
    # we create. asorted list of tuples (bird name, probability)
    list_prob = []
     
    for i in pred:
        list_p = []
        for j in range(len(i)):
            list_p.append((dict_ebird[dict_bird[j]], round(i[j] * 100, 2)))
        list_prob.append(list_p)

    list_prob.sort(key = lambda x: x[1], reverse = True)
    
    # if probabilities exceed 40%, we only print those birds. otherwise, we print all the associated types and probabilities
    for k in range(len(time)):
        print(f'At {time[k]}sec, we think it\'s:')
        if any(x[1] >= 40 for x in list_prob[k]):
            for m in list_prob[k]:
                if m[1] >= 40:
                    print(f'{m[0]} with {m[1]}% chance')
        else:
            print('Unfortunately, we cannot really say for sure but these are our guesses:')
            for n in list_prob[k]:
                print(f'{n[0]} with {n[1]}% chance')
                


## Checking the data:

In [None]:
# importing dataset:

df_00 = pd.read_csv('train.csv')
df_00.head()

In [None]:
df_00.columns

In [None]:
df_00.dtypes

In [None]:
# creating a dictionary of ebird codes to species names, so later we can match these back:

dict_ebird = {}
for i in df_00.ebird_code.unique():
    dict_ebird[i] = df_00[df_00.ebird_code == i].species.unique()[0]

## Cleaning and formatting:

In [None]:
# since the aim is to work only with audio data, only the necessary columns are kept:

df_01 = df_00[['rating', 'ebird_code', 'duration', 'filename', 'sampling_rate', 'type']].copy()
df_01.head()

In [None]:
# we haven't null values

df_01.isnull().sum()

In [None]:
df_01.shape

In [None]:
df_01.dtypes

In [None]:
# the type of record have more than 1,200 unique values. we want to categorise these into much fewer groups:

df_01.type.unique()

In [None]:
# getting rid of capitalisation:

df_01.type = [x.lower() for x in df_01.type]

In [None]:
# sorting out the many different inputs of chirp types into 3 major groups – song, call and miscellaneous:

df_01.type = np.where(['song' in x for x in df_01.type], 'song', df_01.type)
df_01.type = np.where(['call' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where(['duet' in x for x in df_01.type], 'song', df_01.type)
df_01.type = np.where(['alarm' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where(['zweeoo' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where(['chip' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where(['coo' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where(['whistle' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where(['wisthle' in x for x in df_01.type], 'call', df_01.type)
df_01.type = np.where((df_01.type != 'song') & (df_01.type != 'call'), 'misc', df_01.type)

In [None]:
df_01.type.unique()

In [None]:
# setting sample rates to integer format:

df_01.sampling_rate = df_01.sampling_rate.str[0: 5]
df_01.sampling_rate = df_01.sampling_rate.astype('int64')

In [None]:
df_01.dtypes

## Feature engineering:

### Setting up a managable subset

Reading audio in and processing it in python is a lengthy process. To save time, I only used a smaller set of samples to train the model and create the algorithm.

In [None]:
# limiting our data to good quality recordings, under a minute with at least 16kHz sampling rate. this is because we
# want to be able to detect 8kHz frequencies, so by the Nyquist sampling theorem, we need double the sample rate:

df_02 = df_01[(df_01.rating >= 3) & (df_01.duration < 61) & (df_01.sampling_rate >= 16000) &
              (df_01.type == 'song')].copy()

In [None]:
# list the most populated datasets, summing around 300 samples:

df_02.groupby('ebird_code').ebird_code.count().sort_values(ascending = False)[: 5].sum()

In [None]:
# creating a list of bird types for our restricted sample:

list_ebird = df_02.groupby('ebird_code').ebird_code.count().sort_values(ascending = False)[: 5].index

In [None]:
# retaining the desired birds:

df_02 = df_02[([x in list_ebird for x in df_02.ebird_code])].copy()
df_02.reset_index(drop = True, inplace = True)

In [None]:
# thinking ahead, we create two dictionaries that can be used to provide numeric values for our model and translate it
# back:

bird_dict = {}
for i in range(len(df_02.ebird_code.unique())):
    bird_dict[df_02.ebird_code.unique()[i]] = i
    
dict_bird = {}
for i in range(len(df_02.ebird_code.unique())):
    dict_bird[i] = df_02.ebird_code.unique()[i]

### Reading in audio data:

In [None]:
# creating a file path from where we can read the audio in:

df_02['path'] = ['train_audio/' + df_02.ebird_code[x] + '/' + df_02.filename[x] for x in range(len(df_02))]

In [None]:
# reading in audio files. librosa creates an array of amplituda values. this takes some time:

df_02['samp'] = df_02.path.apply(lambda x: librosa.load(x, mono = True, sr = 16000, res_type = 'fft')[0])

In [None]:
# to track progress, an infinite number generator object was set up:

def inf_seq():
    num = 1
    while True:
        yield num
        num += 1
        
gen = inf_seq()

df_02['audio_tr'] = df_02.samp.apply(lambda x: audio_treatment(x, track = True, track_v = df_02.shape[0],
                                                               gen = gen)[0])

### Retaining and formatting meain features:

In [None]:
# creating a multi-dimensional list of our features to use for the model:


ft_list = []
for i in range(df_02.shape[0]):
    ft_list += ([[df_02.ebird_code[i], x] for x in df_02.audio_tr[i]])

In [None]:
# we create a dataframe of the features for further manipulation:

df_03 = pd.DataFrame(np.array(ft_list), columns = ['ebird_code', 'res_ft'])
df_03

In [None]:
# where the length of our res_ft does not match the maximum, it is because the threshold was reached at the end of a
# recording, so the sample is too short. we get rid of these for equal feature lengths

df_03['len_tr'] = [len(x.ravel()) for x in df_03.res_ft]

df_03 = df_03[df_03.len_tr == df_03.len_tr.max()].copy()
df_03.shape

### Creating X and y values:

In [None]:
# using our dictionary from earlier, we create y-values for our model:

df_03['value'] = df_03.ebird_code.map(bird_dict)
y_01 = df_03.value

y_01.shape

In [None]:
# finally, we scale all features up to the same dB level and create a 2-D array for our X-features:

res_db = np.array([librosa.amplitude_to_db(x, ref = np.max) for x in df_03.res_ft])
X_01 = np.array([x.reshape(x.shape[0] * x.shape[1], ) for x in res_db])

X_01.shape

In [None]:
# creating test and training sets:

X_trn01, X_tst01, y_trn01, y_tst01 = train_test_split(X_01, y_01, test_size = 0.2, random_state = 42)

## Modeling:

### Naive Bayesian model:

In [None]:
# to create a baseline, a simple and quick Bayesian model is trained on the data:

gnb01 = GaussianNB()
gnb01.fit(X_trn01, y_trn01)

In [None]:
y_gnb01 = gnb01.predict(X_tst01)
accuracy_score(y_tst01, y_gnb01)
# baseline it is

### PCA:

In [None]:
# as the plan is to use a support-vector machine and not spend a lifetime gridsearching, a PCA is carried out:

pca = PCA().fit(X_01)

In [None]:
plt.figure(figsize = (8, 5))

plt.plot(np.cumsum(pca.explained_variance_ratio_))
plt.xlabel('number of components')
plt.ylabel('variance')

plt.show()

In [None]:
# looking at the graph, 500 components seems like a suitable choice:

pca01 = PCA(500)
X_pca01 = pca01.fit_transform(X_01)

In [None]:
X_trn02, X_tst02, y_trn02, y_tst02 = train_test_split(X_pca01, y_01, test_size = 0.2, random_state = 42)

### SVM:

In [None]:
# we set up a support-vector machine and a grid for gridsearch:

svc01 = SVC(kernel = 'rbf', class_weight = 'balanced', probability = True)
grid01 = {'C' : [1, 5, 10, 25],
          'gamma' : [1e-07, 5e-07, 1e-06, 5e-06]}
svc_grid01 = GridSearchCV(svc01, grid01)

In [None]:
svc_grid01.fit(X_trn02, y_trn02)

In [None]:
print(svc_grid01.best_params_)
#'C': 10, 'gamma': 5e-07

In [None]:
svc02 = svc_grid01.best_estimator_
y_svc02 = svc02.predict(X_tst02)

In [None]:
accuracy_score(y_tst02, y_svc02)

In [None]:
# creating a confusion matrix, to see our results:

mat = confusion_matrix(y_tst02, y_svc02)

plt.figure(figsize = (7, 7))

sns.heatmap(mat, square = True, annot = True,
            xticklabels = pd.Series(y_tst02.unique()).map(dict_bird).map(dict_ebird),
            yticklabels = pd.Series(y_tst02.unique()).map(dict_bird).map(dict_ebird))
plt.xlabel('predicted')
plt.ylabel('actual')

plt.show()

### Random forest ensemble:

In [None]:
rfc01 = RandomForestClassifier(n_estimators = 100, random_state = 42)
rfc01.fit(X_trn01, y_trn01)

In [None]:
y_rfc01 = rfc01.predict(X_tst01)
accuracy_score(y_tst01, y_rfc01)

In [None]:
# testing against the PCA data too:

rfc02 = RandomForestClassifier(n_estimators = 100, random_state = 42)
rfc02.fit(X_trn02, y_trn02)

In [None]:
y_rfc02 = rfc02.predict(X_tst02)
accuracy_score(y_tst02, y_rfc02)

### K-nearest neighbours:

In [None]:
# based on a few trials, 2 neighbours seemed to be the best choice:

knn01 = KNeighborsClassifier(n_neighbors = 2)
knn01.fit(X_trn02, y_trn02)

In [None]:
y_knn01 = knn01.predict(X_tst02)
accuracy_score(y_tst02, y_knn01)

### Voting ensemble:

In [None]:
# to see if results can be improved, a soft-voting ensemble of the last three models was tried:

vcf01 = VotingClassifier(estimators = [('svc', svc02), ('rf', rfc02), ('knn', knn01)],
                         voting = 'soft', weights = [2, 1, 2])
vcf01.fit(X_trn02, y_trn02)

In [None]:
y_vcf01 = vcf01.predict(X_tst02)
accuracy_score(y_tst02, y_vcf01)

### Model evaluations:

In [None]:
# Bayesian:

print(metrics.classification_report(y_gnb01, y_tst01))

In [None]:
# SVM:

print(metrics.classification_report(y_svc02, y_tst02))

In [None]:
# Random Forest:

print(metrics.classification_report(y_rfc02, y_tst02))

In [None]:
# KNN:

print(metrics.classification_report(y_knn01, y_tst02))

In [None]:
# Voting ensemble:

print(metrics.classification_report(y_vcf01, y_tst02))

## Using the function on audio files:

Since the support-vector machine was the best candidate, it was used for making the predictions.

In [None]:
chirping01(list(df_02.samp[0]))

In [None]:
# checking if our model predicted well:

dict_ebird[df_02.ebird_code[0]]

In [None]:
# checking the function against a different input:

chirping01(df_02.samp[304])

In [None]:
dict_ebird[df_02.ebird_code[304]]