## Ornithologist - CNN Bird Sounds Classification

### Import required libraries

In [None]:
import os
import shutil
import fnmatch
import opendatasets as od

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

import librosa as li
import soundfile as sf
import scipy.signal as signal

import tensorflow as tf
import tensorflow_io as tfio

### Download dataset
Currently using the [British Birdsong Dataset](https://www.kaggle.com/datasets/rtatman/british-birdsong-dataset?resource=download).

To download sound files from Kaggle, get the kaggle.json API token from [here](https://www.kaggle.com/settings) an place it in the notebooks folder.

In [None]:
dataset_url = ("https://www.kaggle.com/datasets/rtatman/british-birdsong-dataset?resource=download")
path_to_download = "../datasets"

od.download(dataset_url, data_dir=path_to_download)

In [None]:
csv_path = path_to_download + '/british-birdsong-dataset/birdsong_metadata.csv'

metadata = pd.read_csv(csv_path)[['file_id', 'english_cname']]
metadata.rename(columns={'english_cname':'label'}, inplace=True)

### Audio processing function definitions

- butter_highpass() generates the coefficients for a butterworth high pass filter
- apply_butter_highpass() takes the filter coefficients and apply the filter to a signal
- remove_silence() takes a signal and removes the sections that fall below a certain threshold
- split_audio() takes a long audio and splits it into segments of equal length, zero-padding the last one

In [None]:
def butter_highpass(cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = signal.butter(order, normal_cutoff, btype='high', analog=False)
    return b, a

def apply_butter_highpass(data, cutoff, fs, order=5):
    b, a = butter_highpass(cutoff, fs, order=order)
    y = signal.filtfilt(b, a, data)
    return y

In [None]:
def remove_silence(signal, thresh=20, hop=2048, plot=False):
  splits = li.effects.split(y=signal, top_db=thresh, frame_length=(hop * 4), hop_length=hop)
  
  if plot:
    peak = np.max(signal)
    plt.subplots(figsize=(12,4))
    plt.plot(signal)
    plt.vlines(splits, ymin=-peak, ymax=peak, color='red')
    plt.show()
  
  stripped_audio = []
  
  for s in splits:
    split = signal[s[0]:s[1]]
    stripped_audio.extend(split)
  
  return np.asarray(stripped_audio)

In [None]:
def split_audio(signal, target_length, samplerate=22050, plot=False): # Target length must be in seconds
  duration = li.get_duration(y=signal, sr=samplerate)
  n_segments = np.ceil(duration / target_length)
   
  audio_segments = []
  
  for n in range(int(n_segments)):
    s = signal[samplerate * n * target_length : samplerate * (n + 1) * target_length]
    
    if len(s) < target_length * samplerate:
      s = np.pad(s, (0, target_length * samplerate - len(s)), 'constant')
    
    audio_segments.append(s)
    
    if plot:
      plt.plot(s, alpha=1/n_segments)
    
  if plot:
    plt.show()

  return audio_segments

### Pre-processing files
In this section, the files listed in the metadata csv are loaded, pre-processed and saved to disk as multiple labeled samples.

In [None]:
# User defined constants
AUDIO_LENGTH = 3 # Seconds
TARGET_SAMPLE_RATE = 22050

sound_files_path = path_to_download + '/british-birdsong-dataset/songs/songs/'

# Create empty folder processes files folder if not exists
save_path = sound_files_path + 'processed_samples/'
if os.path.exists(save_path):
  shutil.rmtree(save_path)
  os.mkdir(save_path)
else: 
  os.mkdir(save_path)

for index, file_id, label in metadata.itertuples():
  # Read audio file from disk
  filename = sound_files_path + 'xc' + str(file_id) + '.flac'
  y, sr = li.load(filename, sr=TARGET_SAMPLE_RATE, mono=True)
    
  # Apply high-pass filter
  y = apply_butter_highpass(data=y, cutoff=800, fs=sr, order=6)
  
  # Delete silent sections
  y = remove_silence(y, plot=False)
  
  # Split into segments of desired length
  audio_segments = split_audio(y, target_length=AUDIO_LENGTH, samplerate=sr)
  
  # Apply data augmentation (optional)
    
  # Save samples to new folder on disk, with incremental filenames
  for segment in audio_segments:
    count = len(fnmatch.filter(os.listdir(save_path), label + '*.wav'))
    sf.write(f'{save_path}{label}_{count + 1}.wav', segment, sr, format='wav', subtype='PCM_16') # Changed from flac to wav while testing
    
n_samples = len(os.listdir(save_path))
print(f'Done. Saved {n_samples} audio files of {AUDIO_LENGTH} seconds.')

Create categories dictionary for label encoding.

In [None]:
def encode_labels(labels_list):
    unique_labels_dict = {}

    for i, label in enumerate(set(labels_list)):
        unique_labels_dict.update({label: i})

    return unique_labels_dict

In [None]:
LABELS_DICT = encode_labels(metadata['label'])

### Input pipeline

#### Pipeline function definitions

In [None]:
# Get label from tensor containing file path, without encoding
def get_label(path_tensor):
  filename = tf.strings.split(path_tensor, os.path.sep)[-1]
  label = tf.strings.split(filename, '.')[0]
  label = tf.strings.split(label, '_')[0]
  return label

In [None]:
# Load and process an audio from file path tensor.
# Returns a tensor containing mel-spectrogram and label.
def process_audio(filename_tensor, plot=False):
  label = get_label(filename_tensor)
  sr = TARGET_SAMPLE_RATE
  
  # Load and reshape audio
  raw_file = tf.io.read_file(filename_tensor)
  audio, _ = tf.audio.decode_wav(raw_file)
  audio = tf.squeeze(audio, axis=[-1])
  
  # Apply short fade-in and fade-out
  audio = tfio.audio.fade(audio, fade_in=200, fade_out=200, mode="linear")
  
  # Optional wave plot
  if plot:
    plt.figure()
    plt.plot(audio.numpy())
    
  # Generate mel-spectrogram
  spectrogram = tfio.audio.spectrogram(audio, nfft=512, window=512, stride=256)
  mel_spectrogram = tfio.audio.melscale(spectrogram, rate=sr, mels=128, fmin=0, fmax=int(sr/2))
  
  # Optional spectrogram plot
  if plot:
    plt.figure()
    plt.imshow(tf.math.log(spectrogram).numpy())
    
  return mel_spectrogram, label

### TF Dataset

In [None]:
audio_ds = tf.data.Dataset.list_files(save_path + '*.wav', shuffle=True)

# Train/test splitting
n_samples = len(audio_ds)
train_split = int(n_samples * 0.8)
train_ds = audio_ds.take(train_split)
test_ds = audio_ds.skip(train_split)

print(f'Train dataset: {len(train_ds)} files.\nTest dataset: {len(test_ds)} files.')

In [None]:
# Apply processing function and batching to datasets
train_ds = train_ds.map(process_audio)#.batch(32, drop_remainder=True)
test_ds = test_ds.map(process_audio)#.batch(32, drop_remainder=True)

### Build CNN model

Defining a simple CNN with 2x Conv+Maxpool + 1 Dense layers.

In [None]:
unique_class_count = len(metadata['label'].unique())

model = tf.keras.models.Sequential([
    tf.keras.layers.Conv1D(filters=64, kernel_size=3, padding='same',
                           activation='relu', input_shape=(259, 128)),
    tf.keras.layers.MaxPooling1D(pool_size=2),
    tf.keras.layers.Dropout(0.2),

    tf.keras.layers.Conv1D(filters=128, kernel_size=3, padding='same',
                           activation='relu'),
    tf.keras.layers.MaxPooling1D(pool_size=2),
    tf.keras.layers.Dropout(0.3),

    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(units=512, activation='relu'),
    tf.keras.layers.Dropout(0.5),

    tf.keras.layers.Dense(units=unique_class_count, activation='softmax')
])

model.build()
model.summary()

model.compile(loss='sparse_categorical_crossentropy',
              optimizer='adam', metrics=['accuracy'])

Train the model with data from tf.data.dataset

In [None]:
EPOCHS = 30
BATCH_SIZE = 10

history = model.fit(x=train_ds, batch_size=10, epochs=EPOCHS)

### Data exploration and visualizations

In [None]:
# Plot histogram of original audio file durations
plt.subplots(figsize=(12, 4))
plt.title('Distribution of Audio File Lenght')
plt.xlabel('Time in seconds')
plt.ylabel('Number of files')
plt.xticks(range(0, 300, 10))
plt.hist(metadata['duration'], bins=100)
plt.show()

print(f'• Original file count: {len(metadata)}')
# print(f'• Total file count after splitting: {len(audio_data)}')

In [None]:
# Plot total duration for each label
duration_sum = metadata.groupby('label').sum()

fig, ax = plt.subplots(figsize=(24, 8))
ax.set_title('Total raw duration for each label')
ax.bar(x=duration_sum.index, height=duration_sum['duration'])
ax.set_ylabel('Duration (in seconds)')
ax.set_xlabel('Labels')
plt.xticks(rotation=90)
plt.show()