# Soundscape detection

Author: Edoardo De Matteis

This notebook has been used to train the model used for the binary classification task of soundscape detection i.e. a system that detects (bird) sounds in an ambient recording. 

## Dependencies

In [None]:
!pip install torchaudio
!pip install torchinfo
!pip install pytorch_lightning
!pip install wandb -qqq

In [None]:
import os
import json
from pathlib import Path
from typing import Dict, Optional, Tuple, Union, Callable, List, Any
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

import torch
import torchvision
from torch import nn
from torch.utils.data import Dataset, DataLoader
import torchmetrics
import torchaudio
import torchaudio.transforms as T
import pytorch_lightning as pl
from pytorch_lightning import Callback, seed_everything
from pytorch_lightning.loggers import WandbLogger
from torchinfo import summary

import wandb

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

In [None]:
%cd /content/gdrive/MyDrive/Colab\ Notebooks/Birdcalls

In [None]:
%load_ext autoreload
%autoreload 2

Let's define some basic functions that will help us:

In [None]:
def cnn_size(
    input: Tuple[int, int],
    kernel: Union[int, Tuple[int, int]],
    padding: Union[int, Tuple[int, int]] = 0,
    stride: Union[int, Tuple[int, int]] = 1,
) -> Tuple[int, int]:
    """
    Return the size of the output of a convolutional layer.
    :param input: Size of the input image.
    :param kernel: Kernel size, it is assumed to be a square.
    :param padding: Padding size.
    :param stride: Stride.
    :return: The output size.
    """
    if isinstance(kernel, int):
        kernel = (kernel, kernel)

    if isinstance(padding, int):
        padding = (padding, padding)

    if isinstance(stride, int):
        stride = (stride, stride)

    out_w = (input[0] - kernel[0] + 2 * padding[0]) / stride[0] + 1
    out_h = (input[1] - kernel[1] + 2 * padding[1]) / stride[1] + 1
    return int(out_w), int(out_h)

def pool_size(
    input: Union[int, Tuple[int, int]],
    pooling: Union[int, Tuple[int, int]],
) -> Tuple[int, int]:
    """
    Return the size of the output of a convolutional layer.
    :param input: Size of the input image.
    :param pooling: Pooling size.
    :return: The output size.
    """
    if isinstance(pooling, int):
        pooling = (pooling, pooling)

    out_w = input[0] / pooling[0]
    out_h = input[1] / pooling[1]
    return int(out_w), int(out_h)

## Dataset

We define the dataset object, for our purposes we only care about the offline dataset.
Therefore we only define loading methods.

In [None]:
class SoundscapeDataset(Dataset):
    def __init__(
        self, csv_path: Optional[str], online: bool, debug: int, load: bool, **kwargs
    ):
        """
        :param csv_path: Path of the training CSV file.
        :param online: If true tensors are computed on-the-fly by the dataloader, otherwise they are all precomputed.
        :param debug: Defines the size of the reduced dataset (it is shuffled beforehand) we want to use, any number
        below or equal to 0 means that we keep the whole dataset.
        :param load: If true we do not compute anything and will load values from a file.
        :param kwargs:
        """
        super(SoundscapeDataset, self).__init__()

        self.online = online
        self.len: int

        self.spectrograms: torch.Tensor
        self.targets: torch.Tensor

    @staticmethod
    def load(
        spectrograms_path: Union[str, Path], targets_path: Union[str, Path], **kwargs
    ):
        """
        Load a dataset whose spectorgrams and targets are loaded from .pt files.
        :param spectrograms_path: Path of the spectrograms tensor file.
        :param targets_path: Path of the targets tensor file.
        :param kwargs:
        :return: A SoundscapeDataset object with populated tensors.
        """
        ds = SoundscapeDataset(csv_path=None, online=False, debug=-1, load=True)

        ds.spectrograms = torch.load(spectrograms_path)
        ds.targets = torch.load(targets_path)
        ds.len = len(ds.targets)

        return ds

    def __len__(self):
        """
        :return: Length of the dataset.
        """
        return self.len

    def __getitem__(self, item):
        """
        :param item: Index of the item to retrieve.
        :return: The item-th entry.
        """
        if self.online:
            return {
                "row_id": self.row_id[item],
                "site": self.site[item],
                "audio_id": self.audio_id[item],
                "seconds": self.seconds[item],
                "birds": self.birds[item],
            }
        else:
            return {
                "spectrograms": self.spectrograms[item],
                "targets": self.targets[item],
            }

In [None]:
class SoundscapesDataModule(pl.LightningModule):
    def __init__(
        self,
        num_workers: Dict,
        batch_size: Dict,
        shuffle: Dict,
        **kwargs,
    ):
        super().__init__()
        self.num_workers = num_workers
        self.batch_size = batch_size
        self.shuffle = shuffle

        # These attributes will be populated after self.setup() call.
        self.train_ds: Optional[Dataset] = None
        self.val_ds: Optional[Dataset] = None
        self.test_ds: Optional[Dataset] = None

    def setup(self, stage: Optional[str] = None) -> None:
        if stage is None or stage == "fit":
            # Train
            self.train_ds = SoundscapeDataset.load(
                spectrograms_path=TRAIN_SPECTROGRAMS,
                targets_path=TRAIN_TARGETS
            )

            # Val
            self.val_ds = SoundscapeDataset.load(
                spectrograms_path=VAL_SPECTROGRAMS,
                targets_path=VAL_TARGETS
            )

        if stage is None or stage == "test":
            # Test
            self.test_ds = SoundscapeDataset.load(
                spectrograms_path=TEST_SPECTROGRAMS,
                targets_path=TEST_TARGETS
            )

    def train_dataloader(
        self,
    ) -> Union[DataLoader, List[DataLoader], Dict[str, DataLoader]]:
        batch_size = self.batch_size["train"]
        shuffle = self.shuffle["train"]

        dl = DataLoader(
            dataset=self.train_ds,
            batch_size=batch_size,
            shuffle=shuffle,
        )

        return dl

    def val_dataloader(self) -> Union[DataLoader, List[DataLoader]]:
        batch_size = self.batch_size["val"]
        shuffle = self.shuffle["val"]

        dl = DataLoader(
            dataset=self.val_ds,
            batch_size=batch_size,
            shuffle=shuffle,
        )

        return dl
    
    def test_dataloader(self) -> Union[DataLoader, List[DataLoader]]:
        batch_size = self.batch_size["test"]
        shuffle = self.shuffle["test"]

        dl = DataLoader(
            dataset=self.test_ds,
            batch_size=batch_size,
            shuffle=shuffle,
        )

        return dl

## Model

For all the model definition refer to the project directories `Birdcalls.src.pl_module` and `Birdcalls.our.models`. 

In [None]:
class Detection(nn.Module):
    # Shape of the input image (c,h,w)
    shape = torch.Size((1, 128, 313))

    def __init__(self, **kwargs):
        super().__init__()

        self.cnn = nn.Conv2d(
            in_channels=1,
            out_channels=3,
            kernel_size=1
        )
        self.resnet = torchvision.models.resnet50(pretrained=True)
        self.fc = nn.Linear(in_features=1000, out_features=1)
    
    def forward(self, xb):
        out = self.cnn(xb)
        out = self.resnet(out)
        logits = self.fc(out)

        return logits

In [None]:
class SoundscapeDetection(pl.LightningModule):
    def __init__(self, **kwargs):
        super(SoundscapeDetection, self).__init__()
        self.save_hyperparameters()

        self.model = Detection()

        self.loss = nn.BCEWithLogitsLoss()

        accuracy = torchmetrics.Accuracy()
        self.train_accuracy = accuracy.clone()
        self.val_accuracy = accuracy.clone()
        self.test_accuracy = accuracy.clone()

        precision = torchmetrics.Precision()
        self.train_precision = precision.clone()
        self.val_precision = precision.clone()
        self.test_precision = precision.clone()

        recall = torchmetrics.Recall()
        self.train_recall = recall.clone()
        self.val_recall = recall.clone()
        self.test_recall = recall.clone()

        self.conf_mat = torchmetrics.ConfusionMatrix(num_classes=2)

    def forward(self, xb):
        logits = self.model(xb)
        # The loss function does implement the sigmoid by itself.
        preds = torch.sigmoid(logits).squeeze(1).ge(0.5).to(torch.long)
        return logits, preds

    def step(self, x: torch.Tensor, y: torch.Tensor):
        logits, preds = self(x)
        loss = self.loss(logits, y)
        return {"logits": logits, "preds": preds, "loss": loss}

    def training_step(self, batch: Any, batch_idx: int) -> torch.Tensor:
        targets = batch["targets"]
        specs = batch["spectrograms"]

        out_step = self.step(x=specs, y=targets)

        x = out_step["preds"]
        y = targets.squeeze(1).to(torch.long)

        self.train_accuracy(x, y)
        self.train_precision(x, y)
        self.train_recall(x, y)

        self.log_dict(
            {
                "train_loss": out_step["loss"],
                "train_acc": self.train_accuracy.compute(),
                "train_prec": self.train_precision.compute(),
                "train_rec": self.train_recall.compute(),
            }
        )
        return out_step["loss"]

    def validation_step(self, batch: Any, batch_idx: int):
        targets = batch["targets"]
        specs = batch["spectrograms"]
        out_step = self.step(x=specs, y=targets)

        x = out_step["preds"]
        y = targets.squeeze(1).to(torch.long)

        self.val_accuracy(x, y)
        self.val_precision(x, y)
        self.val_recall(x, y)

        self.log_dict(
            {
                "val_loss": out_step["loss"],
                "val_acc": self.val_accuracy.compute(),
                "val_prec": self.val_precision.compute(),
                "val_rec": self.val_recall.compute(),
            }
        )
        return out_step["loss"]

    def test_step(self, batch: Any, batch_idx: int):
        targets = batch["targets"]
        specs = batch["spectrograms"]
        out_step = self.step(x=specs, y=targets)

        x = out_step["preds"]
        y = targets.squeeze(1).to(torch.long)

        self.test_accuracy(x, y)
        self.test_precision(x, y)
        self.test_recall(x, y)

        self.log_dict(
            {
                "test_acc": self.test_accuracy.compute(),
                "test_prec": self.test_precision.compute(),
                "test_rec": self.test_recall.compute(),
            }
        )

        self.logger.experiment.log(
            {
                "conf_mat": wandb.plot.confusion_matrix(
                    probs=None,
                    preds=x.cpu(),
                    y_true=y.cpu(),
                    class_names=["nocall", "call"],
                )
            }
        )

    def configure_optimizers(self):
        opt = self.hparams.optim["optimizer"]["fn"](
            params=self.parameters(),
            lr=self.hparams.optim["optimizer"]["lr"],
            betas=self.hparams.optim["optimizer"]["betas"],
            eps=self.hparams.optim["optimizer"]["eps"],
            weight_decay=self.hparams.optim["optimizer"]["weight_decay"],
        )


        if not self.hparams.optim["use_lr_scheduler"]:
            return {"optimizer": opt}
        else:
            scheduler = self.hparams.optim["lr_scheduler"]["fn"](
                optimizer=opt,
                T_0=self.hparams.optim["lr_scheduler"]["T_0"],
                T_mult=self.hparams.optim["lr_scheduler"]["T_mult"],
                eta_min=self.hparams.optim["lr_scheduler"]["eta_min"],
                last_epoch=self.hparams.optim["lr_scheduler"]["last_epoch"],
                verbose=self.hparams.optim["lr_scheduler"]["verbose"],
            )
            return {"optimizer": opt, "lr_scheduler": scheduler}

## Training

Environmental and setup variables.

In [None]:
TRAIN_SPECTROGRAMS= Path("/content/gdrive/My Drive/Colab Notebooks/Birdcalls/out/precomputed/train/soundscapes/spectrograms.pt")
TRAIN_TARGETS= Path("/content/gdrive/My Drive/Colab Notebooks/Birdcalls/out/precomputed/train/soundscapes/targets.pt")

VAL_SPECTROGRAMS= Path("/content/gdrive/My Drive/Colab Notebooks/Birdcalls/out/precomputed/val/soundscapes/spectrograms.pt")
VAL_TARGETS= Path("/content/gdrive/My Drive/Colab Notebooks/Birdcalls/out/precomputed/val/soundscapes/targets.pt")

TEST_SPECTROGRAMS= Path("/content/gdrive/My Drive/Colab Notebooks/Birdcalls/out/precomputed/test/soundscapes/spectrograms.pt")
TEST_TARGETS= Path("/content/gdrive/My Drive/Colab Notebooks/Birdcalls/out/precomputed/test/soundscapes/targets.pt")

In [None]:
num_workers = {'train': 12, 'val': 12, 'test': 12}
batch_size = {'train': 128, 'val': 128, 'test': 128}
shuffle = {'train': True, 'val': False, 'test': False}

# Optimizer
optimizer = {'fn': torch.optim.Adam,
             'lr': 1e-5,
             'betas': [ 0.9, 0.999 ],
             'eps': 1e-08,
             'weight_decay': 0
             }

use_lr_scheduler = False

lr_scheduler = {'fn': torch.optim.lr_scheduler.CosineAnnealingWarmRestarts,
                'T_0': 10,
                'T_mult': 2,
                'eta_min': 0,
                'last_epoch': -1,
                'verbose': True}

optim = {'optimizer': optimizer, 
         'use_lr_scheduler': use_lr_scheduler,
         'lr_scheduler': lr_scheduler}

# Trainer
train = {
    "deterministic": False,
    "random_seed": 42,
    "val_check_interval": 1.0,
    "progress_bar_refresh_rate": 20,
    "fast_dev_run": False, # True for debug purposes.
    "gpus": -1 if torch.cuda.is_available() else 0, 
    "precision": 32,
    "max_steps": 100,
    "max_epochs": 50,
    "accumulate_grad_batches": 1,
    "num_sanity_val_steps": 2,
    "gradient_clip_val": 10.0
}

In [None]:
if train["deterministic"]:
    seed_everything(train["random_seed"])

W&B login.

In [None]:
wandb.login()

Let's setup the trainer and we can run it.

In [None]:
datamodule = SoundscapesDataModule(num_workers=num_workers,
                        batch_size=batch_size,
                        shuffle=shuffle)

model = SoundscapeDetection(optim=optim)

wandb_logger = WandbLogger(
    project="Soundscapes detection",
    config={
        "batch_size": batch_size['train'],
        "learning_rate": optimizer['lr'],
        "optimizer": optimizer['fn'],
        "betas": optimizer["betas"],
        "eps": optimizer["eps"],
        "weight_decay": optimizer["weight_decay"],
        "T_0": lr_scheduler["T_0"],
        "T_mult": lr_scheduler["T_mult"],
        "eta_min": lr_scheduler["eta_min"],
        "last_epoch": lr_scheduler["last_epoch"],
        "dataset": "Bird CLEF 2021",
        "summary": summary(model),
        }
)

trainer = pl.Trainer(
        logger=wandb_logger,
        deterministic=train["deterministic"],
        gpus=train["gpus"],
        max_epochs=train["max_epochs"],
    )

In [None]:
print(summary(model))
print(model)

Fit

In [None]:
trainer.fit(model=model, datamodule=datamodule)

Validation

In [None]:
trainer.validate(model=model, datamodule=datamodule)

Test

In [None]:
trainer.test(model=model, datamodule=datamodule)

Quit W&B

In [None]:
wandb.finish()