<a href="https://colab.research.google.com/github/dustedduke/CNN-MFCC-sound-detection/blob/master/bosch_master.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# !pip install pytorch-lightning torchaudio optuna torchensemble neptune-client[optuna]
# !git clone https://github.com/google-research/leaf-audio.git && cd leaf-audio && pip install -e .

In [None]:
from google.colab import drive
drive.mount("/content/gdrive")
!cd /content/gdrive/MyDrive

In [None]:
# run = neptune.init(
#     project="dustedduke/Bosch",
#     api_token="",
# )

# params = {"learning_rate": 0.001, "optimizer": "BCEWithLogitsLoss"}
# run["parameters"] = params

In [None]:
dataset_folder = "/content/gdrive/MyDrive/Datasets/UrbanSound8K"

In [None]:
import os, math, random, functools
from time import time
from pathlib import Path
from argparse import ArgumentParser

import numpy as np
import pandas as pd

import torch
from torch import nn
from torch.utils.data import DataLoader, Dataset, Subset
from torch.optim import Adam

from pytorch_lightning import Trainer, seed_everything, loggers
from pytorch_lightning.core.lightning import LightningModule
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint

import torchaudio
from torchaudio import transforms

# import openl3
# import soundfile as sf
from leaf_audio import frontend, initializers

import optuna
import neptune.new as neptune
import neptune.new.integrations.optuna as optuna_utils

## Audio transforms

In [10]:
class AudioUtil():

    @staticmethod
    def get_available_transforms():
      method_list = []
      for attribute in dir(AudioUtil):
          attribute_value = getattr(AudioUtil, attribute)
          if callable(attribute_value):
              if attribute.startswith('__') == False:
                  method_list.append(attribute)
      method_list.remove('open')
      method_list.remove('get_available_transforms')
      return method_list

    # ----------------------------
    # Load an audio file. Return the signal as a tensor and the sample rate
    # ----------------------------
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)
    
    # ----------------------------
    # Convert the given audio to the desired number of channels
    # ----------------------------
    @staticmethod
    def rechannel(aud, new_channel):
        sig, sr = aud

        if (sig.shape[0] == new_channel):
            # Nothing to do
            return aud

        if (new_channel == 1):
            # Convert from stereo to mono by selecting only the first channel
            resig = sig[:1, :]
        else:
            # Convert from mono to stereo by duplicating the first channel
            resig = torch.cat([sig, sig])

        return ((resig, sr))
    
    # ----------------------------
    # Since Resample applies to a single channel, we resample one channel at a time
    # ----------------------------
    @staticmethod
    def resample(aud, newsr):
        sig, sr = aud

        if (sr == newsr):
            # Nothing to do
            return aud

        num_channels = sig.shape[0]
        # Resample first channel
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
        if (num_channels > 1):
            # Resample the second channel and merge both channels
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
            resig = torch.cat([resig, retwo])

        return ((resig, newsr))
    
    # ----------------------------
    # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
    # ----------------------------
    @staticmethod
    def pad_trunc(aud, max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if (sig_len > max_len):
            # Truncate the signal to the given length
            sig = sig[:,:max_len]

        elif (sig_len < max_len):
            # Length of padding to add at the beginning and end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len - pad_begin_len

            # Pad with 0s
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)

        sig = torch.t(sig)
        return sig
    
    # ----------------------------
    # Shifts the signal to the left or right by some percent. Values at the end
    # are 'wrapped around' to the start of the transformed signal.
    # ----------------------------
    @staticmethod
    def time_shift(aud, shift_limit):
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)
    
    # ----------------------------
    # Generate a Spectrogram
    # ----------------------------
    @staticmethod
    def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig,sr = aud
        top_db = 80

        # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        # Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)
    
    # ----------------------------
    # Augment the Spectrogram by masking out some sections of it in both the frequency
    # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
    # overfitting and to help the model generalise better. The masked sections are
    # replaced with the mean value.
    # ----------------------------
    @staticmethod
    def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

        return aug_spec

    def original_augmentation(aud, sr):
        reaud = resample(aud, sr)
        rechan = rechannel(reaud, self.channel)
        dur_aud = pad_trunc(rechan, self.duration)
        shift_aud = time_shift(dur_aud, self.shift_pct)
        sgram = spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
        aug_sgram = spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

        return aug_sgram

    # TODO fix
    def rolling_spectro_augmentation(aud, sr, step):
        n_samples = 2 * int(aud.shape[1]/sr * 0.1)
        complete_sample = torch.empty((0,0))

        _min, _max = float('inf'), -float('inf')
        for idx in range(n_samples):
          rand_index = np.random.randint(0, aud.shape[1] - step)
          sliced_aud = aud[:, rand_index:rand_index+step]
          sample_spectro = spectro_gram(sliced_aud, n_mels=64, n_fft=1024, hop_len=None)
          _min = min(np.amin(sample_spectro), _min)
          _max = min(np.amax(sample_spectro), _max)
          complete_sample = torch.cat((complete_sample, sample_spectro), 1)
          complete_sample = (complete_sample - _min) / (_max - _min)
        
        return complete_sample
    
    def leaf_embedding(aud, n_filters=64, window_len=32, sample_rate=24000, preemp=True):
        compression_fn = functools.partial(frontend.log_compression, log_offset=1e-5)
        complex_conv_init = initializers.GaborInit(sample_rate=sample_rate, min_freq=60., max_freq=7800.)
        learn_pooling=False
        custom_leaf = frontend.Leaf(learn_pooling=learn_pooling,
                                    n_filters=n_filters,
                                    window_len=window_len,
                                    sample_rate=sample_rate,
                                    preemp=preemp,
                                    compression_fn=compression_fn,
                                    complex_conv_init=complex_conv_init)

        sig, sr = aud
        np_tensor = sig.numpy()[0].reshape(1,sig[0].Size())
        aud_tf = tf.convert_to_tensor(np_tensor) 
        basic_leaf = torch.from_numpy(frontend.Leaf()[0].numpy())

        return basic_leaf
    
    def gru_embedding():
        # TODO in progress
        ...



## Dataset

In [None]:
def one_hot(idx, num_items):
    return [(0.0 if n != idx else 1.0) for n in range(num_items)]

class UrbanDataset(Dataset):

    def __init__(self, dataset_folder, fold, transform=None, augment=None):
        super().__init__()
        self.dataset_folder = dataset_folder
        self.path_to_csv = os.path.join(self.dataset_folder, "metadata/UrbanSound8K.csv")
        self.path_to_audio_folder = os.path.join(self.dataset_folder, "audio")
        self.metadata = pd.read_csv(self.path_to_csv)
        self.fold = fold
        self.transform = transform
        self.augment = augment
        
    def train_validation_split(self):
        train_idx = list(self.metadata[self.metadata["fold"] != self.fold].index)
        val_idx = list(self.metadata[self.metadata["fold"] == self.fold].index)

        train_set = Subset(self, train_idx)
        val_set = Subset(self, val_idx)
        val_set.augmenation = None

        return train_set, val_set
        
    def __len__(self):
        return len(self.metadata)
    
    def __getitem__(self, index):
        file_name = self.metadata["slice_file_name"].iloc[index]
        file_path = os.path.join(
            os.path.join(self.path_to_audio_folder, "fold" + str(self.metadata["fold"].iloc[index])), file_name
        )

        aud, sr = torchaudio.load(file_path)
        
        # Both training and validation are transformed
        aud = self.transform(aud)

        # Augment if not from validation fold
        if self.augment and self.metadata["fold"].iloc[index] != self.fold:
            aud = self.augment(aud)

        label = np.array(one_hot(self.metadata["classID"].iloc[index], 10))

        return {
            "file_name": file_name,
            "input_vector": aud,
            "label": label,
        }


## Base Model

In [None]:
class VanillaModel(nn.Module):
    
    def __init__(self, vocab_size, embedding_dim, hidden_dim, n_layers, output_dim, drop_prob):
        super().__init__()
        self.output_size = output_size
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim

        self.vocab_size = vocab_size
        self.embedding_dim = embedding_dim
        self.hidden_dim = hidden_dim
        self.n_layers = n_layers
        self.drop_prob = drop_prob

        self.previous_hidden = None

        self.embedding = nn.Embedding(self.vocab_size, self.embedding_dim)
        self.lstm = nn.LSTM(
            input_size=self.embedding_dim, 
            hidden_size=self.hidden_dim, 
            num_layers=self.n_layers, 
            batch_first=True, 
            dropout=self.drop_prob,
            bidirectional=False)
        self.fc = nn.Linear(self.hidden_dim, self.output_dim)
        
    def forward(self, x, hidden):
        batch_size = x.size(0)
        x = x.long()
        embeds = self.embedding(x)
        x, hidden = self.lstm(embeds, self.previous_hidden)
        self.previous_hidden = detach_tensors(hidden) 
        x = x[:, -1]
        x = self.fc(x)

    def init_hidden():
        # self.previous_hidden = (torch.zeros(1, 10, 2), torch.zeros(1, 10, 2))
        nn.init.xavier_uniform_(previous_hidden[0])
        nn.init.xavier_uniform_(previous_hidden[1])

        return x

## Lightning Wrapper

In [None]:
class VanillaClassifier(LightningModule):
    def __init__(self, hparams, fold):
        super().__init__()

        self.hparams = hparams
        self.fold = fold

        self.dataset_folder = hparams.path_to_UrbanSound8K
        self.nb_classes = hparams.output_dim
        self.best_scores = [0] * 5
        
        self.vocab_size = hparams.vocab_size
        self.output_size = hparams.output_size
        self.embedding_dim = hparams.embedding_dim
        self.hidden_dim = hparams.hidden_dim
        self.n_layers = hparams.n_layers
        self.drop_prob = hparams.drop_prob
        
        model_param = {"classes_num": self.nb_classes, 
                       "vocab_size": self.vocab_size,
                       "output_size": self.output_size,
                       "embedding_dim": self.embedding_dim,
                       "hidden_dim": self.hidden_dim,
                       "n_layers": self.n_layers,
                       "drop_prob": self.drop_prob}

        self.model = VanillaModel(**model_param)
        self.loss = hparams.loss

    def forward(self, x):
        x = self.model(x)
        return x

    def prepare_data(self):
          data_param = {
              "dataset_folder": self.dataset_folder,
              "fold": self.fold,
              "transform": self.hparams.transform,
              "augment": self.hparams.augment
          }
          self.dataset = UrbanDataset(**data_param)
          (self.train_dataset, self.val_dataset) = self.dataset.train_validation_split()

    def train_dataloader(self):
        return DataLoader(
            self.train_dataset, batch_size=self.hparams.batch_size, shuffle=self.hparams['shuffle'], num_workers=8, drop_last=True
        )

    def val_dataloader(self):
        return DataLoader(self.val_dataset, batch_size=self.hparams['batch_size'], num_workers=8, drop_last=True)

    def configure_optimizers(self):
        return torch.optim.Adam(self.model.parameters(), lr=self.hparams['learning_rate'])
        
    def training_step(self, batch, batch_idx):
        data, target_c = (
            batch["input_vector"].float(),
            batch["label"].double()
        )
        target = torch.cat([target_c], 1)
        output = self.forward(data)
        loss = self.loss(output, target_c).mean(0)

        return {"loss": loss, "log": {"1_loss/train_loss": loss}}


    def validation_step(self, batch, batch_idx):
        data, target_c= (
            batch["input_vector"].float(),
            batch["label"].double()
        )

        target = torch.cat([target_c], 1)
        output = self.forward(data)
        loss = torch.cat([self.loss(output, target_c).mean(0), self.loss_c(output, target_c).mean(0)], 0)

        return {
            "val_loss": loss,
            "output": outputs_c,
            "target": target_c,
        }

    def validation_epoch_end(self, outputs):
        val_loss = torch.cat([o["val_loss"] for o in outputs], 0).mean()
        all_outputs = torch.cat([o["output"] for o in outputs], 0).cpu().numpy()
        all_targets = torch.cat([o["target"] for o in outputs], 0).cpu().numpy()

        accuracy_score = accuracy(all_targets, all_outputs)
        f1_micro = compute_micro_F1(all_targets, all_outputs)
        auprc_micro = compute_micro_auprc(all_targets, all_outputs)
        _, auprc_macro = compute_macro_auprc(all_targets, all_outputs, True)
        map_score = mean_average_precision(all_targets, all_outputs)

        if accuracy_score > self.best_scores[0]:
            self.best_scores[0] = accuracy_score
        if f1_micro > self.best_scores[1]:
            self.best_scores[1] = f1_micro
        if auprc_micro > self.best_scores[2]:
            self.best_scores[2] = auprc_micro
        if auprc_macro > self.best_scores[3]:
            self.best_scores[3] = auprc_macro
        if map_score > self.best_scores[4]:
            self.best_scores[4] = map_score

        log_temp = {
            "2_valid/1_accuracy0.5": accuracy_score,
            "2_valid/1_f1_micro0.5": f1_micro,
            "2_valid/1_auprc_micro": auprc_micro,
            "2_valid/1_auprc_macro": auprc_macro,
            "2_valid/1_map": map_score,
        }

        tqdm_dict = {
            "val_loss": val_loss,
            "acc": accuracy_score,
        }

        log = {
            "step": self.current_epoch,
            "1_loss/val_loss": val_loss,
        }

        log.update(log_temp)

        return {"progress_bar": tqdm_dict, "log": log}

    def test_step():
        ...

## Training

In [None]:
def training(hparams, fold):
    seed_everything(hparams.seed)
    MAIN_DIR = os.path.join(config.path_to_summaries, "LSTMUrbanSounds/")
    
    model = VanillaClassifier(hparams, fold)
    
    tb_logger = loggers.TensorBoardLogger(os.path.join(MAIN_DIR, "logs"))
    early_stopping = EarlyStopping("2_valid/1_accuracy0.5", patience=50, mode="max")
    
    trainer = Trainer.from_argparse_args(
        hparams,
        default_root_dir=MAIN_DIR,
        logger=tb_logger,
#         callbacks=[early_stopping],
        gpus=0,
    )
    
    trainer.fit(model)
    return model.best_scores

    with open(os.path.join(MAIN_DIR, "logs/report.txt"), "a") as file:
        file.write(hparams.dataset + "\n")
        file.write(str(model.best_scores) + "\n")

## Training with folds

In [None]:
def train_with_folds(hparams, folds):
    for i in range(1, folds + 1):
        metrics = training(hparams, i)

# Configure and Start

## 1. Choose list of transforms and augmentations

In [11]:
print('AVAILABLE TRANSFORMS AND AUGMENTATIONS:', *AudioUtil.get_available_transforms(), sep='\n- ')

AVAILABLE TRANSFORMS AND AUGMENTATIONS:
- gru_embedding
- leaf_embedding
- original_augmentation
- pad_trunc
- rechannel
- resample
- rolling_spectro_augmentation
- spectro_augment
- spectro_gram
- time_shift


In [None]:
neptune_callback = optuna_utils.NeptuneCallback(run)

CONFIGURATION = {
    'transforms': ['leaf_embedding']
}

In [None]:
def objective(trial):

    folds = trial.suggest_int("folds", 1, 1) #11
    loss = trial.suggest_categorical("loss", [BCEWithLogitsLoss(reduction="none")])
    shuffle = False

    # LSTM parameters 
    embedding_dim = trial.suggest_int("embedding_dim", 2, 4)
    hidden_dim = trial.suggest_int("hidden_dim", 4, 6)
    n_layers = trial.suggest_int("hidden_dim", 1, 2)
    drop_prob = trial.suggest_float("drop_prob", 0.2, 0.5)
    batch_size = trial.suggest_int("batch_size", 32, 256, log=True)
    learning_rate = trial.suggest_float("learning_rate", 1e-5, 1e-2, log=True)

    # LEAF parameters
    sample_rate = trial.suggest_categorical("sample_rate", [24000, 44100, 48000])
    preemp = trial.suggest_categorical("preemp", [True, False])

    # Rechannel parameters
    channels = trial.suggest_int("hidden_dim", 1, 1)

    # Time Shift parameters
    shift_limit = trial.suggest_int("shift_limit", 10, 50, step=10)

    # Spectrogram parameters
    n_mels = trial.suggest_int("n_mels", 4, 4, log=True)
    n_fft = trial.suggest_int("n_fft", 8, 8, log=True)
    hop_len = trial.suggest_int("hop_len", 256, 512, log=True)

    # SpectroFreqTimeAugment parameters
    max_mask_pct = trial.suggest_int("max_mask_pct", 1, 2)
    n_freq_masks = trial.suggest_int("n_freq_masks", 1, 2)
    n_time_masks = trial.suggest_float("n_time_masks", 0.1, 1.2)


    CONFIGURATION['folds'] = folds
    CONFIGURATION['embedding_dim'] = embedding_dim
    CONFIGURATION['hidden_dim'] = hidden_dim
    CONFIGURATION['output_dim'] = 10
    CONFIGURATION['n_layers'] = n_layers
    CONFIGURATION['drop_prob'] = drop_prob
    CONFIGURATION['batch_size'] = batch_size
    CONFIGURATION['learning_rate'] = learning_rate
    CONFIGURATION['loss'] = loss
    CONFIGURATION['shuffle'] = shuffle

    CONFIGURATION['sample_rate'] = sample_rate
    CONFIGURATION['preemp'] = preemp

    CONFIGURATION['shift_limit'] = shift_limit

    CONFIGURATION['n_mels'] = n_mels
    CONFIGURATION['n_fft'] = n_fft
    CONFIGURATION['hop_len'] = hop_len

    CONFIGURATION['max_mask_pct'] = max_mask_pct
    CONFIGURATION['n_freq_masks'] = n_freq_masks
    CONFIGURATION['n_time_masks'] = n_time_masks

    metrics = train_with_folds(CONFIGURATION, folds)

    return metrics


## 2. Run study

In [None]:
study = optuna.create_study()
study.optimize(objective, n_trials=100, callbacks=[neptune_callback])

print("Number of finished trials: ", len(study.trials))

trials = sorted(study.best_trials, key=lambda t: t.values)

for trial in trials:
    print("  Trial#{}".format(trial.number))
    print("    Values: FLOPS={}, accuracy={}".format(trial.values[0], trial.values[1]))
    print("    Params: {}".format(trial.params))