<h1 style="text-align:center">Birdsong Classification song</h1>

In this notebook I'll train a classifier which can predict the species of a bird based on new audio.
For this notebook I've limited to the specifies which start with an 'a' and 'b' to reduce the load and training time.
Eventually the dataset could be extended to use the other datasets as well, but that won't be done in this notebook.
the dataset used for this can be found on: [https://www.kaggle.com/ttahara/birdsong-resampled-train-audio-00](https://www.kaggle.com/ttahara/birdsong-resampled-train-audio-00)

It is common knowledge that raw audio files cannot be used as input for a ML model. They have to be transformed first into something the computer understands. Therefore, an audio can be transformed into a spectogram. This visualizes the strength of a soundwave over the time.

There are multiple libraries to convert audio files to spectograms such as scipy, torch, pylab and librosa to name a few. At my job as data engineer/analysist I've already created ML audio to identify music genres of audio tracks. For that we used scipy. To spice things up I've choosen to use PyTorch to experience something new.

the different steps for this notebook are described below:
<ol>
    <li>sound files</li>
    <li>convert into spectrograms</li>
    <li>input into CNN plus Linear Classifier model</li>
    <li>produce predictions and evaluate model</li>
</ol>
<img src="https://i.imgur.com/JZ6JZWg.png" alt="Audio Classification Application" />

For a future experiment the dataset can be cleaned up by removing the background noise from the dataset. If I have time left I might do it in here as well, but as of now it's outside of my scope.

# step 1: metadata
The first step is to read the metadata and load in into a pd.DataFrame.
In our case there already is a metadata file available. If this wasn't the case, we would've had to check for every possible path and get our information from there.

In the metadata file there is a lot of information which is not needed for training the model.
To clean it up we will load the file into a DataFrame and then turn it in such a way that we only have 2 columns left: bird species and relative path to the audio file.

In [1]:
import pandas as pd
from pathlib import Path

In [2]:
download_path = Path.cwd()/'birdsongs'
metadata_file = download_path/'train_mod.csv'
df = pd.read_csv(metadata_file)
df = df[df.ebird_code.astype(str).str.startswith('a') | df.ebird_code.astype(str).str.startswith('b')]
print(f"Mean duration: {df.duration.mean()}")
df.head()

Mean duration: 59.16556145004421


Unnamed: 0,rating,playback_used,ebird_code,channels,date,pitch,duration,filename,speed,species,...,author,primary_label,longitude,length,time,recordist,license,resampled_sampling_rate,resampled_filename,resampled_channels
0,3.5,no,aldfly,1 (mono),2013-05-25,Not specified,25,XC134874.mp3,Not specified,Alder Flycatcher,...,Jonathon Jongsma,Empidonax alnorum_Alder Flycatcher,-92.962,Not specified,8:00,Jonathon Jongsma,Creative Commons Attribution-ShareAlike 3.0,32000,XC134874.wav,1 (mono)
1,4.0,no,aldfly,2 (stereo),2013-05-27,both,36,XC135454.mp3,both,Alder Flycatcher,...,Mike Nelson,Empidonax alnorum_Alder Flycatcher,-82.1106,0-3(s),08:30,Mike Nelson,Creative Commons Attribution-NonCommercial-Sha...,32000,XC135454.wav,1 (mono)
2,4.0,no,aldfly,2 (stereo),2013-05-27,both,39,XC135455.mp3,both,Alder Flycatcher,...,Mike Nelson,Empidonax alnorum_Alder Flycatcher,-82.1106,0-3(s),08:30,Mike Nelson,Creative Commons Attribution-NonCommercial-Sha...,32000,XC135455.wav,1 (mono)
3,3.5,no,aldfly,2 (stereo),2013-05-27,both,33,XC135456.mp3,both,Alder Flycatcher,...,Mike Nelson,Empidonax alnorum_Alder Flycatcher,-82.1106,0-3(s),08:30,Mike Nelson,Creative Commons Attribution-NonCommercial-Sha...,32000,XC135456.wav,1 (mono)
4,4.0,no,aldfly,2 (stereo),2013-05-27,both,36,XC135457.mp3,level,Alder Flycatcher,...,Mike Nelson,Empidonax alnorum_Alder Flycatcher,-82.1106,0-3(s),08:30,Mike Nelson,Creative Commons Attribution-NonCommercial-Sha...,32000,XC135457.wav,1 (mono)


In [3]:
df['relative_path'] = '/' + df['ebird_code'].astype(str) + "/" + df['resampled_filename'].astype(str)

# using f"/{df['ebird_code']}/{df['resampled_filename']}" results in an error :(

df = df[['ebird_code', 'relative_path']]
df = df[df.ebird_code.astype(str).str.startswith('a') | df.ebird_code.astype(str).str.startswith('b')]

bird_ids = pd.DataFrame(df.ebird_code.unique()).set_index(0)
df['classID'] = df['ebird_code'].apply(lambda x: bird_ids.index.get_loc(x))

df.head()

Unnamed: 0,ebird_code,relative_path,classID
0,aldfly,/aldfly/XC134874.wav,0
1,aldfly,/aldfly/XC135454.wav,0
2,aldfly,/aldfly/XC135455.wav,0
3,aldfly,/aldfly/XC135456.wav,0
4,aldfly,/aldfly/XC135457.wav,0


# step 2: convert audio to a spectogram

For a dataset to be useable, the datatypes of the different columns must be the same. For audio, this is a bit different. As mentioned at the start, I've done this before using scipy. I'll use that knowledge to recreate the class, but using PyTorch.

From personal experience, the most common difference I encounter are:
<ul>
    <li>mono vs stero (number of channels)</li>
    <li>sample rates</li>
    <li>audio length</li>
</ul>

From the original metadata file mentioned in the previous step, I noticed that all these 3 problems also occur in this dataset.

* Both mono and stereo appears in the datset.
* The sampling rate ranges from 8000Hz to 11025Hz. 
* the audio length ranges from 1s to 2283s (38 min).


In here we also create a Mel Spectogram. For me the 'Mel' version is new, but my CTO (who has a PHD in ML with Music) told me to look into it as this might contribute to both the model and learn me something new. My understanding of the 'Mel' is as follows:

Mel is a different scale compared to the regular spectogram which plots frequency on the time. Mel spectogram uses the Mel scale instead of frequency. There is a whole mathematical reason behind this, one which is a bit too complicated for me, but the take away is that it's better than just frequency.
for more information I suggest to read [https://medium.com/analytics-vidhya/understanding-the-mel-spectrogram-fca2afa2ce53](https://medium.com/analytics-vidhya/understanding-the-mel-spectrogram-fca2afa2ce53)

Secondly, we will make a PyTorch Dataset Object which will use all the steps load and modify the audio, which then can be used for training our model. The duration will be changed to 59 since this is the mean() of the duration from our dataset.

@TODO: I'm currently still exploring these numbers

In [4]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
from torch.utils.data import DataLoader, Dataset, random_split

In [5]:
class AudioUtil():
    """
    This class contains all the methods needed for the pre-processing transform steps
    """
    @staticmethod
    def open(audio_file):
        """
        Load the audio and return the signal
        :param audio_file:
        :type audio_file:
        :return: returns a tuple of the signal and sample rate
        """
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)
    
    @staticmethod
    def rechannel(aud, new_channel):
        """
        Audio can be mono or stereo, this has to be uniform.
        Therefore, we convert any mono (1 audio channel) to stereo (2 audio channels) or vice verse depending on the new_channel input.
        :param aud: tuple of signal and sample rate
        :type aud: tuple
        :param new_channel: mono = 1 or stereo = 2
        :type new_channel: int
        :return: returns a tuple of a tuple with the signal and sample rate
        """
        sig, sr = aud
        if (sig.shape[0] == new_channel): # if its already the desired channels, do nothing
            return aud
        
        if (new_channel == 1):
            resig = sig[:1, :] # convert stereo to mono by selecting the 1st channel
        else:
            resig = torch.cat([sig, sig]) # convert mono to stero by duplicating the 1st channel
        
        return ((resig, sr))
    
    @staticmethod
    def resample(aud, newsr):
        """
        Audio can have multiple sample rates, this has to be uniform.
        Therefore, we convert the audio to the newsr from the input using torchaudio.transforms
        :param aud: tuple of signal and sample rate
        :type aud: tuple
        :param newsr: the desired sample rate
        :type newsr: int
        :return: returns a tuple of a tuple with the signal and sample rate
        """
        sig, sr = aud
        
        if (sr == newsr): # if its already the desired sample rate, do nothing
            return aud
        
        num_channels = sig.shape[0]
        
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:]) # resample the 1st channel
        if (num_channels > 1): # if audio is stero
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:]) # resample the 2nd channel
            resig = torch.cat([resig, tetwo]) # merge both channels
        
        return ((resig, newsr))
    
    @staticmethod
    def pad_trunc(aud, max_ms):
        """
        Truncate the signal to a fixed length 'max_ms'
        :param aud: tuple of signal and sample rate
        :type aud: tuple
        :param max_ms: max duration in milliseconds
        :type max_ms: int
        :return: returns a tuple of the signal and sample rate
        """
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms
        
        if (sig_len > max_len):
            sig = sig[:,:max_len] # truncate to given length
        
        elif (sig_len < max_len):
            pad_begin_len = random.randint(0, max_len - sig_len) # get the length to be added at the start of the signal
            pad_end_len = max_len - sig_len - pad_begin_len # get the length to be added at the start of the signal
            
            # pad the signal with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))
            
            sig = torch.cat((pad_begin, sig, pad_end), 1)
        
        return (sig, sr)
    
    @staticmethod
    def time_shift(aud, shift_limit):
        """
        Shifts the signal by some percentage to the left or right.
        Audio which is "out of bounds" is wrapped around to the start of the signal.
        :param aud: tuple of signal and sample rate
        :type aud: tuple
        :param shift_limit: shift_limit
        :type shift_limit: int
        :return: returns a tuple of the signal and sample rate
        """
        sig, sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
    
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        """
        Convert raw audio signal to a Mel Spectogram
        :param aud: tuple of signal and sample rate
        :type aud: tuple
        :type n_mels: int
        :type n_fft: int
        :type hop_len: int
        :return: MelSpectogram with Decibel scale
        """
        sig, sr = aud
        top_db = 80
        
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig) # convert to Mel Spectrogram
        
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec) # convert to Decibels
        return (spec)

    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        """
        Randomly augments the spectrogram to mask the frequency and time, similar as you would with the random skewing of images.
        :param spec:
        :type spec: PyTorch Spectogram
        :type max_mas_pct: float
        :type n_freq_masks: int
        :type n_time_masks: int
        :return:
        """
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec
        
        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_time_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)
            
        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)
        
        return aug_spec

In [6]:
class SoundDS(Dataset):
    """
    Custom Pytorch Dataset Class. This uses all the pre-processing from the AudioUtil object.
    """
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 59000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4
    
    def __len__(self):
        """
        Native Python method to use the len() function.
        :return: number of items in dataset
        """
        return len(self.df)
    
    def __getitem__(self, idx):
        """
        Native Python method to get an item by index
        :param idx: index
        :type idx: int
        :return: item i from dataset as spectrogram and classID
        """
        audio_file = self.data_path + self.df.loc[idx, 'relative_path']
        class_id = self.df.loc[idx, 'classID']
        
        aud = AudioUtil.open(audio_file) # read audio file
        reaud = AudioUtil.resample(aud, self.sr) # change sample rate
        rechan = AudioUtil.rechannel(reaud, self.channel) # change mono to/from stereo
        
        dur_aud = AudioUtil.pad_trunc(rechan, self.duration) # adjust audio duration
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct) # shift audio on the time
        sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None) # make spectrogram
        aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2) # mask frequency and time
        
        return aug_sgram, class_id

# step 3: prepare batches of data

After all the steps needed to create valid input for the model, we can use the custom Dataset to load the features and labels from our Pandas Dataframe from step 1. This is then randomly split in training and test set with an 80:20 split.

In [7]:
myds = SoundDS(df, download_path)

num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, test_ds = random_split(myds, [num_train, num_val])

train_dl = torch.utils.data.DataLoader(train_ds, batch_size=108, shuffle=True) # total of classIDs * 2
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=108, shuffle=True) # total of classIDs * 2

# step 4: create model

The steps down below are quite similar to common standard image classification models. Since the dataset no longer consists of audio signals, but spectrogram images, we can use CNN (Convolutional Neural Networks) to process them. The different layers at the start are to "break down" the spectrogram into 256 inputs. Finally the nn.Linear uses the 256 inputs to predict for every 54 possible classes. (this is basically the Neural Networks class @Teun Salters, but this time we dont have to code it ourselves). The Sequential is basically a queue which will run all the different steps in a row.

The training part will use the dataset multiple times with each time being called an 'Epoch'. With CrossEntropyLoss function to calculate the loss (predicted output vs expected output) and an optimizer to get the best parameters. The learning rate at the optimizer defines the "step size" it will take. Per epoch the code will predict for each "sound" and then use the loss function to decide how the weights have to be adjusted before starting another prediction.

In [8]:
import torch.nn.functional as F
from torch.nn import init
import torch.nn as nn

In [9]:
class AudioClassifier (nn.Module):
    def __init__(self):
        super().__init__()
        conv_layers = []
        
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]
        
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]
        
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]
        
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]
        
        self.conv5 = nn.Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu5 = nn.ReLU()
        self.bn5 = nn.BatchNorm2d(128)
        init.kaiming_normal_(self.conv5.weight, a=0.1)
        self.conv5.bias.data.zero_()
        conv_layers += [self.conv5, self.relu5, self.bn5]
        
        self.conv6 = nn.Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu6 = nn.ReLU()
        self.bn6 = nn.BatchNorm2d(256)
        init.kaiming_normal_(self.conv6.weight, a=0.1)
        self.conv6.bias.data.zero_()
        conv_layers += [self.conv6, self.relu6, self.bn6]
        
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=256, out_features=54)
        
        self.conv = nn.Sequential(*conv_layers)
    
    def forward(self, x):
        x = self.conv(x)
        
        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        
        x = self.lin(x)
        
        return x

In [10]:
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # this tries to use the GPU if possible. RIP Linux users with nvidia gpu
myModel = myModel.to(device)
next(myModel.parameters()).device # print if it's on cuda

device(type='cpu')

In [None]:
def training(model, train_dl, num_epochs):
    criterion = nn.CrossEntropyLoss() # loss function, which calculates the distance between current output and expected output
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # find the best parameters with a learning rate (step size) of 0.001
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer,
                                                    max_lr=0.001,
                                                    steps_per_epoch=int(len(train_dl)),
                                                    epochs=num_epochs,
                                                    anneal_strategy='linear') # tie them nicely together
    
    acc_list = [] # to create a fancy plot later on
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_prediction = 0
        
        for i, data in enumerate(train_dl):
            inputs, labels = data[0].to(device), data[1].to(device) # input features and target labels
            
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s # normalize input features
            
            optimizer.zero_grad() # reset optimizer
            
            outputs = model(inputs) # predict output
            loss = criterion(outputs, labels) # calculate loss
            loss.backward() # count changes in weights
            optimizer.step() # optimize
            scheduler.step() # run 1 schedule step
            
            running_loss += loss.item() # stats for Loss and Accuracy
            
            _, prediction = torch.max(outputs, 1) # get the class with the highest prediction
            
            correct_prediction += (prediction == labels).sum().item() 
            total_prediction+= prediction.shape[0]
        
        # print stats
        num_batches = len(train_dl)
        avg_loss = running_loss / num_batches
        acc = correct_prediction / total_prediction
        acc_list.append(acc)
        print(f'Epoch: {epoch + 1}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')
    
    print('Finished Training')

num_epochs = 50
training(myModel, train_dl, num_epochs)

# step 5: validating the model

The last step is to use the test data we split in step 4 to validate the model.

In [None]:
def interference (model, test_dl):
    correct_pred = 0
    total_pred = 0
    
    with torch.no_grad():
        for data in test_dl:
            inputs, labels = data[0].to(device), data[1].to(device) # input features and target labels
            
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m) / inputs_s # normalize input features
            
            outputs = model(inputs) # predict output
            
            _, prediction = torch.max(outputs, 1) # get the class with the highest prediction
            correct_pred += (prediction == labels).sum.item()
            total_pred += prediction.shape[0]
            
    acc = correct_pred / total_pred
    print(f'Accuracy: {acc:.2f}, Total items: {total_pred}')

interference(myModel, test_dl)