Here I still try to use efficientNet-in21k pretrained model as feature extractor.

But I will modify some codes:

1. Try to move some data processing steps to dataloader instead of lightningModelModule

2. Use some other layers to replace the attention layer used before

For specific experimental steps, please refer to 14-re-organize-dataprocess.ipynb

In [1]:
import pandas as pd
import numpy as np
import glob
import os
import matplotlib.pyplot as plt
import random
from typing import List
from torchaudio.transforms import MelSpectrogram, AmplitudeToDB

import torchaudio

import torch

from torch.utils.data import DataLoader,TensorDataset

import lightning as L

import datasets

from torch.utils.data import Dataset, DataLoader,WeightedRandomSampler

from pathlib import Path
import multiprocessing
import colorednoise as cn
import torch.nn as nn
import librosa
from torch.distributions import Beta
from torch_audiomentations import Compose, PitchShift, Shift, OneOf, AddColoredNoise

import timm
from torchinfo import summary

import torch.nn.functional as F

from torch.optim.lr_scheduler import (
    CosineAnnealingLR,
    CosineAnnealingWarmRestarts,
    ReduceLROnPlateau,
    OneCycleLR,
)
from lightning.pytorch.callbacks  import ModelCheckpoint, EarlyStopping

from lightning.pytorch.loggers import MLFlowLogger

from sklearn.metrics import roc_auc_score

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
import gc

In [3]:
has_mps = torch.backends.mps.is_built()
device = "mps" if has_mps else "cuda" if torch.cuda.is_available() else "cpu"
print(device)

mps


In [4]:
metadata_path='../../data/train_metadata_new_add_rating.csv'

In [5]:
# I need to do a train test split on the data first
# Because this dataset is unbalanced
# Randomly select a sample from each category to add to the validation set, and the rest to the training set

raw_df=pd.read_csv(metadata_path,header=0)

# Find the index of each category
class_indices = raw_df.groupby('primary_label').apply(lambda x: x.index.tolist())

# Initialize training set and validation set
train_indices = []
val_indices = []


# Random select a sample from each category to add to the validation set, and the rest to the training set
for indices in class_indices:
    val_sample = pd.Series(indices).sample(n=1, random_state=42).tolist()
    val_indices.extend(val_sample)
    train_indices.extend(set(indices) - set(val_sample))


# Divide the dataset by index
train_df = raw_df.loc[train_indices]
val_df = raw_df.loc[val_indices]

In [6]:
# Random select 20,000 data from the training set
additional_val_samples = train_df.sample(n=20000, random_state=42)

# Add these samples to the validation set
val_df = pd.concat([val_df, additional_val_samples])

# Remove these samples from the training set
train_df = train_df.drop(additional_val_samples.index)

In [7]:
# Need to interpolate missing values ​​for ratings in metadata csv file

def rating_value_interplote(df:pd.DataFrame):
    '''
    interplote Nan values for rating col in metadata csv 

    parameters:
        df: the df of the metadata csv file

    rating col means the quality of the corresponding audio file
        5 is high quality
        1 is low quality
        0 is without defined quality level
    '''

    if df['rating'].isna().sum()>0: 
        df['rating'].fillna(0, inplace=True)

    # Random assign a value to all places where the value is 0, and select from the specified choices
    mask = df['rating'] == 0  # Create a boolean mask indicating which positions are 0

    choices=np.arange(0.5,5.1,0.5).tolist() # [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0]
    random_values = np.random.choice(choices, size=mask.sum())  # Generate random numbers for these 0 values  
    df.loc[mask, 'rating'] = random_values  # Fill the generated random numbers back into the corresponding positions of the original DataFrame

    return df

In [8]:
# Calculate the weight of each audio file through rating, which is helpful for model training
def audio_weight(df):
    '''
    calculate the weight corresponding to each audio file through the rating value

    Because each audio has different quality level, we use weight to affect the inportance of each audio in models,
    the lower the quality of the audio, the lower the weight
    '''
    # Through rating, we calculate the credibility of each audio and express it through weight. 
    # The purpose of this is to improve the model by increasing the weight of high-quality audio and reducing the weight of low-quality audio.
    df["audio_weight"] = np.clip(df["rating"] / df["rating"].max(), 0.1, 1.0)

    return df

In [9]:
# Because this is an unbalanced dataset, the amount of data in each category is very different
# So I will calculate the weight of each category here
# **(-0.5) The purpose is to reduce the relative influence of high-frequency categories and increase the influence of low-frequency categories, 
# so as to help the model better learn those uncommon categories
# The purpose of calculating this is to build a WeightedRandomSampler, 
# so that each time a batch is extracted using dataloader, it is more friendly to data of different categories.

def sampling_weight(df)->torch.Tensor:
    '''
    calculate the sampling weight of each audio file

    because this is imbalanced dataset
    we hope the category with less data has large probability to be picked.
    '''
    sample_weights = (df['primary_label'].value_counts() / df['primary_label'].value_counts().sum()) ** (-0.5)

    # Map weights to each row of the original data
    sample_weights_map = df['primary_label'].map(sample_weights)

    # Convert pandas Series to NumPy array
    sample_weights_np = sample_weights_map.to_numpy(dtype=np.float32)

    # Convert a NumPy array to a PyTorch tensor using torch.from_numpy
    sample_weights_tensor = torch.from_numpy(sample_weights_np)

    return sample_weights_tensor

In [10]:
def dataloader_sampler_generate(df):
    '''
    prepare dataloader for sampler
    '''
    sample_weights_tensor=sampling_weight(df=df)
    # Here we will build an argument sampler that will be used by the dataloader
    # It should be noted that the order of weights in the constructed sampler needs to be consistent with the order of data passed into the dataloader, otherwise the weights will not match

    # Create a sampler based on the newly obtained weight list
    sampler = WeightedRandomSampler(sample_weights_tensor.type('torch.DoubleTensor'), len(sample_weights_tensor),replacement=True)

    return sampler


In [11]:
train_sampler=dataloader_sampler_generate(df=train_df)
val_sampler=dataloader_sampler_generate(df=val_df)

In [12]:
def read_audio(path: str):
    """
    Read an OGG file using torchaudio and return the waveform tensor and sample rate.

    Parameters:
        path: Path to the .ogg file

    Returns:
        waveform: Tensor representing the waveform
        sample_rate: Sample rate of the audio file
    """
    audio, sample_rate = torchaudio.load(path)
    return audio, sample_rate

In [13]:
class AudioTransform:
    def __init__(self, always_apply=False, p=0.5):
        self.always_apply = always_apply
        self.p = p

    def __call__(self, y: np.ndarray):
        if self.always_apply:
            return self.apply(y)
        else:
            if np.random.rand() < self.p:
                return self.apply(y)
            else:
                return y

    def apply(self, y: np.ndarray):
        raise NotImplementedError


class CustomCompose:
    def __init__(self, transforms: list):
        self.transforms = transforms

    def __call__(self, y: np.ndarray):
        for trns in self.transforms:
            y = trns(y)
        return y


class CustomOneOf:
    def __init__(self, transforms: list, p=1.0):
        self.transforms = transforms
        self.p = p

    def __call__(self, y: np.ndarray):
        if np.random.rand() < self.p:
            n_trns = len(self.transforms)
            trns_idx = np.random.choice(n_trns)
            trns = self.transforms[trns_idx]
            y = trns(y)
        return y


class GaussianNoiseSNR(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, min_snr=5.0, max_snr=40.0, **kwargs):
        super().__init__(always_apply, p)

        self.min_snr = min_snr
        self.max_snr = max_snr

    def apply(self, y: np.ndarray, **params):
        snr = np.random.uniform(self.min_snr, self.max_snr)
        a_signal = np.sqrt(y**2).max()
        a_noise = a_signal / (10 ** (snr / 20))

        white_noise = np.random.randn(len(y))
        a_white = np.sqrt(white_noise**2).max()
        augmented = (y + white_noise * 1 / a_white * a_noise).astype(y.dtype)
        return augmented


class PinkNoiseSNR(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, min_snr=5.0, max_snr=20.0, **kwargs):
        super().__init__(always_apply, p)

        self.min_snr = min_snr
        self.max_snr = max_snr

    def apply(self, y: np.ndarray, **params):
        snr = np.random.uniform(self.min_snr, self.max_snr)
        a_signal = np.sqrt(y**2).max()
        a_noise = a_signal / (10 ** (snr / 20))

        pink_noise = cn.powerlaw_psd_gaussian(1, len(y))
        a_pink = np.sqrt(pink_noise**2).max()
        augmented = (y + pink_noise * 1 / a_pink * a_noise).astype(y.dtype)
        return augmented


class VolumeControl(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, db_limit=10, mode="uniform"):
        super().__init__(always_apply, p)

        assert mode in [
            "uniform",
            "fade",
            "fade",
            "cosine",
            "sine",
        ], "`mode` must be one of 'uniform', 'fade', 'cosine', 'sine'"

        self.db_limit = db_limit
        self.mode = mode

    def apply(self, y: np.ndarray, **params):
        db = np.random.uniform(-self.db_limit, self.db_limit)
        if self.mode == "uniform":
            db_translated = 10 ** (db / 20)
        elif self.mode == "fade":
            lin = np.arange(len(y))[::-1] / (len(y) - 1)
            db_translated = 10 ** (db * lin / 20)
        elif self.mode == "cosine":
            cosine = np.cos(np.arange(len(y)) / len(y) * np.pi * 2)
            db_translated = 10 ** (db * cosine / 20)
        else:
            sine = np.sin(np.arange(len(y)) / len(y) * np.pi * 2)
            db_translated = 10 ** (db * sine / 20)
        augmented = y * db_translated
        return augmented


class NoiseInjection(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, max_noise_level=0.5, sr=32000):
        super().__init__(always_apply, p)

        self.noise_level = (0.0, max_noise_level)
        self.sr = sr

    def apply(self, y: np.ndarray, **params):
        noise_level = np.random.uniform(*self.noise_level)
        noise = np.random.randn(len(y))
        augmented = (y + noise * noise_level).astype(y.dtype)
        return augmented


class GaussianNoise(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, min_snr=5, max_snr=20, sr=32000):
        super().__init__(always_apply, p)

        self.min_snr = min_snr
        self.max_snr = max_snr
        self.sr = sr

    def apply(self, y: np.ndarray, **params):
        snr = np.random.uniform(self.min_snr, self.max_snr)
        a_signal = np.sqrt(y**2).max()
        a_noise = a_signal / (10 ** (snr / 20))

        white_noise = np.random.randn(len(y))
        a_white = np.sqrt(white_noise**2).max()
        augmented = (y + white_noise * 1 / a_white * a_noise).astype(y.dtype)
        return augmented


class PinkNoise(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, min_snr=5, max_snr=20, sr=32000):
        super().__init__(always_apply, p)

        self.min_snr = min_snr
        self.max_snr = max_snr
        self.sr = sr

    def apply(self, y: np.ndarray, **params):
        snr = np.random.uniform(self.min_snr, self.max_snr)
        a_signal = np.sqrt(y**2).max()
        a_noise = a_signal / (10 ** (snr / 20))

        pink_noise = cn.powerlaw_psd_gaussian(1, len(y))
        a_pink = np.sqrt(pink_noise**2).max()
        augmented = (y + pink_noise * 1 / a_pink * a_noise).astype(y.dtype)
        return augmented


class TimeStretch(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, max_rate=1, sr=32000):
        super().__init__(always_apply, p)
        self.max_rate = max_rate
        self.sr = sr

    def apply(self, y: np.ndarray, **params):
        rate = np.random.uniform(0, self.max_rate)
        augmented = librosa.effects.time_stretch(y, rate)
        return augmented


def _db2float(db: float, amplitude=True):
    if amplitude:
        return 10 ** (db / 20)
    else:
        return 10 ** (db / 10)


def volume_down(y: np.ndarray, db: float):
    """
    Low level API for decreasing the volume
    Parameters
    ----------
    y: numpy.ndarray
        stereo / monaural input audio
    db: float
        how much decibel to decrease
    Returns
    -------
    applied: numpy.ndarray
        audio with decreased volume
    """
    applied = y * _db2float(-db)
    return applied


def volume_up(y: np.ndarray, db: float):
    """
    Low level API for increasing the volume
    Parameters
    ----------
    y: numpy.ndarray
        stereo / monaural input audio
    db: float
        how much decibel to increase
    Returns
    -------
    applied: numpy.ndarray
        audio with increased volume
    """
    applied = y * _db2float(db)
    return applied


class RandomVolume(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, limit=10):
        super().__init__(always_apply, p)
        self.limit = limit

    def apply(self, y: np.ndarray, **params):
        db = np.random.uniform(-self.limit, self.limit)
        if db >= 0:
            return volume_up(y, db)
        else:
            return volume_down(y, db)


class CosineVolume(AudioTransform):
    def __init__(self, always_apply=False, p=0.5, limit=10):
        super().__init__(always_apply, p)
        self.limit = limit

    def apply(self, y: np.ndarray, **params):
        db = np.random.uniform(-self.limit, self.limit)
        cosine = np.cos(np.arange(len(y)) / len(y) * np.pi * 2)
        dbs = _db2float(cosine * db)
        return y * dbs


class AddGaussianNoise(AudioTransform):
    """Add gaussian noise to the samples"""

    supports_multichannel = True

    def __init__(
        self, always_apply=False, min_amplitude=0.001, max_amplitude=0.015, p=0.5
    ):
        """
        :param min_amplitude: Minimum noise amplification factor
        :param max_amplitude: Maximum noise amplification factor
        :param p:
        """
        super().__init__(always_apply, p)
        assert min_amplitude > 0.0
        assert max_amplitude > 0.0
        assert max_amplitude >= min_amplitude
        self.min_amplitude = min_amplitude
        self.max_amplitude = max_amplitude

    def apply(self, samples: np.ndarray, sample_rate=32000):
        amplitude = np.random.uniform(self.min_amplitude, self.max_amplitude)
        noise = np.random.randn(*samples.shape).astype(np.float32)
        samples = samples + amplitude * noise
        return samples


class AddGaussianSNR(AudioTransform):
    """
    Add gaussian noise to the input. A random Signal to Noise Ratio (SNR) will be picked
    uniformly in the decibel scale. This aligns with human hearing, which is more
    logarithmic than linear.
    """

    supports_multichannel = True

    def __init__(
        self,
        always_apply=False,
        min_snr_in_db: float = 5.0,
        max_snr_in_db: float = 40.0,
        p: float = 0.5,
    ):
        """
        :param min_snr_in_db: Minimum signal-to-noise ratio in dB. A lower number means more noise.
        :param max_snr_in_db: Maximum signal-to-noise ratio in dB. A greater number means less noise.
        :param p: The probability of applying this transform
        """
        super().__init__(always_apply, p)
        self.min_snr_in_db = min_snr_in_db
        self.max_snr_in_db = max_snr_in_db

    def apply(self, samples: np.ndarray, sample_rate=32000):
        snr = np.random.uniform(self.min_snr_in_db, self.max_snr_in_db)

        clean_rms = np.sqrt(np.mean(np.square(samples)))

        a = float(snr) / 20
        noise_rms = clean_rms / (10**a)

        noise = np.random.normal(0.0, noise_rms, size=samples.shape).astype(np.float32)
        return samples + noise


class Normalize(AudioTransform):
    """
    Apply a constant amount of gain, so that highest signal level present in the sound becomes
    0 dBFS, i.e. the loudest level allowed if all samples must be between -1 and 1. Also known
    as peak normalization.
    """

    supports_multichannel = True

    def __init__(self, always_apply=False, apply_to: str = "all", p: float = 0.5):
        super().__init__(always_apply, p)
        assert apply_to in ("all", "only_too_loud_sounds")
        self.apply_to = apply_to

    def apply(self, samples: np.ndarray, sample_rate=32000):
        max_amplitude = np.amax(np.abs(samples))
        if self.apply_to == "only_too_loud_sounds" and max_amplitude < 1.0:
            return samples

        if max_amplitude > 0:
            return samples / max_amplitude
        else:
            return samples

class NormalizeMelSpec(nn.Module):
    def __init__(self, eps=1e-6):
        super().__init__()
        self.eps = eps

    def forward(self, X):
        mean = X.mean((1, 2), keepdim=True)
        std = X.std((1, 2), keepdim=True)
        Xstd = (X - mean) / (std + self.eps)
        norm_min, norm_max = Xstd.min(-1)[0].min(-1)[0], Xstd.max(-1)[0].max(-1)[0]
        fix_ind = (norm_max - norm_min) > self.eps * torch.ones_like(
            (norm_max - norm_min)
        )
        V = torch.zeros_like(Xstd)
        if fix_ind.sum():
            V_fix = Xstd[fix_ind]
            norm_max_fix = norm_max[fix_ind, None, None]
            norm_min_fix = norm_min[fix_ind, None, None]
            V_fix = torch.max(
                torch.min(V_fix, norm_max_fix),
                norm_min_fix,
            )
            # print(V_fix.shape, norm_min_fix.shape, norm_max_fix.shape)
            V_fix = (V_fix - norm_min_fix) / (norm_max_fix - norm_min_fix)
            V[fix_ind] = V_fix
        return V

In [14]:
# First we need to get all the types
meta_df=pd.read_csv(metadata_path,header=0)
bird_cates=meta_df.primary_label.unique()

#Because the order is very important and needs to be matched one by one in the subsequent training, I will save these types here
# Save as .npy file
np.save("./external_files/13-2-bird-cates.npy", bird_cates)

In [15]:
# load .npy file
loaded_array = np.load("./external_files/13-2-bird-cates.npy",allow_pickle=True)

In [16]:
def class_weight_generate(df:pd.DataFrame)->torch.Tensor:
    '''
    would use focal loss in the following, we need to provide the weight of each category to handle unbalanced data sets
    '''
    sample_weights = (df['primary_label'].value_counts() / df['primary_label'].value_counts().sum()) ** (-0.5)

    # Convert sample_weights to a DataFrame for easier processing
    sample_weights_df = sample_weights.reset_index()
    sample_weights_df.columns = ['label', 'weight']

    # Convert loaded_array to Categorical type and sort sample_weights_df according to this new order
    sample_weights_df['label'] = pd.Categorical(sample_weights_df['label'], categories=loaded_array, ordered=True)


    ## Sort the DataFrame according to the new category order
    sample_weights_df = sample_weights_df.sort_values('label').reset_index(drop=True)

    class_weight=torch.tensor(sample_weights_df['weight'].values,dtype=torch.float16)
    
    return class_weight

In [17]:
loss_train_class_weights=class_weight_generate(df=train_df)
loss_val_class_weights=class_weight_generate(df=val_df)

In [18]:
class BirdclefDataset(Dataset):
    def __init__(
        self,
        df: pd.DataFrame,
        bird_category_dir: str,
        audio_dir: str = "../../data/train_audio",
        train: bool = True,
    ):
        """
        parameters:
            df: the dataframe of metadata (train/val)
            bird_category_dir: the directory of the bird category array file (npy)
            audio_dir: the parent path where all audio files stored
            train: If the Datset for train set or val set
        """
        super().__init__()
        # if the Dataset for training or validation
        self.train = train
        self.raw_df = df

        # inperplote nan or 0 value of rating col
        self.raw_df = rating_value_interplote(df=self.raw_df)
        # Calculate the weight of each audio file by rating
        self.raw_df = audio_weight(self.raw_df)

        self.audio_dir = audio_dir

        self.bird_cate_array = np.load(bird_category_dir, allow_pickle=True)

        self.np_audio_transforms = (
            self.setup_transforms()
        )  # initialize data augmentation func

    def setup_transforms(self):

        return CustomCompose(
            [
                CustomOneOf(
                    [
                        NoiseInjection(p=1, max_noise_level=0.04),
                        GaussianNoise(p=1, min_snr=5, max_snr=20),
                        PinkNoise(p=1, min_snr=5, max_snr=20),
                        AddGaussianNoise(
                            min_amplitude=0.0001, max_amplitude=0.03, p=0.5
                        ),
                        AddGaussianSNR(min_snr_in_db=5, max_snr_in_db=15, p=0.5),
                    ],
                    p=0.3,  
                ),
            ]
        )

    def get_audio_path(self, file_name: str) -> str:
        """
        Get the audio path of the corresponding index through the provided train metadata csv file. 
        Since there is only one index, only one path will be returned.

        Parameters:
            file_name: in format category_type/XC-ID.ogg (asbfly/XC134896.ogg)

        Return:
            the single audio path string
        """

        # concatenate parent path and child path
        return os.path.join(self.audio_dir, file_name)

    def target_clip(
        self, index: int, audio: torch.Tensor, sample_rate: int
    ) -> torch.Tensor:
        """
        calculate the index corresponding audio clip

        information from the train metadata csv

        Parameters:
            audio: the raw audio in tensor [num_channels,length]
            sample_rate: audio sampling rate
        """
        # Get the audio start time corresponding to index
        clip_start_time = self.raw_df["clip_start_time"].iloc[index]
        duration_seconds = self.raw_df["duration"].iloc[index]

        # define clip length
        segment_duration = 5 * sample_rate

        # Total number of samples in the waveform
        total_samples = audio.shape[1]

        if clip_start_time <= duration_seconds:
            clip_start_point = clip_start_time * sample_rate
            # For the last clip, the original audio may not be long enough, so we need to use a mask to fill the sequence
            # The first step is to confirm whether the length is sufficient
            # The length is sufficient, no mask is needed
            if clip_start_point + segment_duration <= total_samples:
                clip = audio[:, clip_start_point : clip_start_point + segment_duration]

            # need masks if the length is not enough
            else:
                padding_length = clip_start_point + segment_duration - total_samples
                silence = torch.zeros(audio.shape[0], padding_length)

                clip = torch.cat((audio[:, clip_start_point:], silence), dim=1)

                del silence, padding_length

        else:
            raise ValueError("The clip start time is out of raw audio length")

        del clip_start_time, segment_duration, total_samples

        return clip

    def random_audio_augmentation(self, audio: torch.Tensor):
        """
        audio (torch.Tensor): A 2D tensor of audio samples with shape (1, N), where N is the number of samples.
        """

        audio_aug = self.np_audio_transforms(audio[0].numpy())

        # tranfer the array to 2D tensor and keep the num channel is 1
        # this step is to keep the input and output shape adn type are the same

        audio_aug_tensor = torch.from_numpy(audio_aug)
        audio_aug_tensor = audio_aug_tensor.unsqueeze(0).to(dtype=torch.float16)

        del audio_aug

        return audio_aug_tensor

    def audio_label_tensor_generator(self, true_label: str) -> torch.Tensor:
        """
        Generate a tensor containing all categories based on the given real audio label

        Parameters:
            true lable: a label string

        Return:
            If have 10 class, and give a true lable
            the return should be tensor([0,1,0,0,0,0,0,0,0,0])
        """
        # Find the index of the target value in the array
        idx = np.where(self.bird_cate_array == true_label)[0][0]

        # Create a tensor of all zeros with length equal to the length of the array
        audio_label_tensor = torch.zeros(len(self.bird_cate_array), dtype=torch.float16)

        # Set the value at the corresponding index position to 1
        audio_label_tensor[idx] = 1

        return audio_label_tensor

    def __len__(self):
        return self.raw_df.shape[0]

    def __getitem__(self, index):
        row = self.raw_df.iloc[index]

        audio_label = row["primary_label"]
        audio_weight = row["audio_weight"]

        # Get the path to a single audio file
        single_audio_dir = self.get_audio_path(row["filename"])

        # Read audio array according to path
        audio, sr = read_audio(single_audio_dir)

        # augmentation
        if self.train:
            audio_augmentation = self.random_audio_augmentation(audio=audio)
            # Get the audio clip corresponding to index
            clip = self.target_clip(index, audio=audio_augmentation, sample_rate=sr)
            del audio_augmentation
        else:
            clip = self.target_clip(index, audio=audio, sample_rate=sr)

        # change audio label to one-hot tensor
        audio_label_tensor = self.audio_label_tensor_generator(true_label=audio_label)

        audio_label_tensor = torch.tensor(audio_label_tensor, dtype=torch.float16)
        clip = torch.tensor(clip, dtype=torch.float16)
        audio_weight = torch.tensor(audio_weight, dtype=torch.float16)

        del audio

        return audio_label_tensor, clip, audio_weight

In [19]:
class Mixup(nn.Module):
    def __init__(self, mix_beta, mixup_prob, mixup_double):
        super(Mixup, self).__init__()
        self.beta_distribution = Beta(mix_beta, mix_beta)
        self.mixup_prob = mixup_prob
        self.mixup_double = mixup_double

    def forward(self, X, Y, weight=None):
        p = torch.rand((1,))[0] # Generate a random number p and compare it with mixup_prob to decide whether to mix.
        if p < self.mixup_prob:
            bs = X.shape[0] # batch size
            n_dims = len(X.shape)
            perm = torch.randperm(bs) # Generate a random permutation for randomly selecting samples from the current batch for mixing.

            p1 = torch.rand((1,))[0] # If the random number p1 (determines whether to perform double mixing) is less than mixup_double, perform a single mix. Otherwise, perform double mixing:
            if p1 < self.mixup_double:
                X = X + X[perm]
                Y = Y + Y[perm]
                Y = torch.clamp(Y, 0, 1) # Use torch.clamp to clamp the values ​​of Y between 0 and 1 (suitable for probabilistic or binary labels).

                if weight is None:
                    return X, Y
                else:
                    weight = 0.5 * weight + 0.5 * weight[perm]
                    return X, Y, weight
            else:
                perm2 = torch.randperm(bs)
                X = X + X[perm] + X[perm2]
                Y = Y + Y[perm] + Y[perm2]
                Y = torch.clamp(Y, 0, 1)

                if weight is None:
                    return X, Y
                else:
                    weight = (
                        1 / 3 * weight + 1 / 3 * weight[perm] + 1 / 3 * weight[perm2]
                    )
                    return X, Y, weight
        else:
            if weight is None:
                return X, Y
            else:
                return X, Y, weight

In [20]:
def mel_transform(sample_rate:float,audio:torch.Tensor,window_size: float=0.04,hop_size:float=0.01,n_mels:int=40)->torch.Tensor:
    """
    transform audio data into mel sepctrogram
    """
    n_fft = int(window_size * sample_rate)  

    hop_length = int(hop_size * sample_rate) 

    mel_transformer = MelSpectrogram(
        sample_rate=sample_rate,
        n_fft=n_fft,
        hop_length=hop_length,
        n_mels=n_mels,
        f_min=0,
        f_max=16000
    )

    melspec=mel_transformer(audio)

    return melspec

In [21]:
def compute_deltas(specgram: torch.Tensor, win_length: int = 5, mode: str = "replicate") -> torch.Tensor:
    """Compute delta coefficients of a tensor, usually a spectrogram.

    Args:
        specgram (Tensor): Tensor of audio of dimension (..., freq, time)
        win_length (int, optional): The window length used for computing delta (Default: 5)
        mode (str, optional): Mode parameter passed to padding (Default: "replicate")

    Returns:
        Tensor: Tensor of deltas of dimension (..., freq, time)
    """
    device = specgram.device  
    dtype = specgram.dtype

    # pack batch
    shape = specgram.size()
    specgram = specgram.reshape(1, -1, shape[-1])

    assert win_length >= 3
    n = (win_length - 1) // 2
    denom = n * (n + 1) * (2 * n + 1) / 3

    specgram = torch.nn.functional.pad(specgram, (n, n), mode=mode)

    # Create the kernel tensor, making sure it is on the same device as the input tensor
    kernel = torch.arange(-n, n + 1, 1, dtype=dtype,device=device).repeat(specgram.shape[1], 1, 1)

    output = (
        torch.nn.functional.conv1d(specgram, kernel, groups=specgram.shape[1]) / denom
    )

    # unpack batch
    output = output.reshape(shape)

    return output



def make_delta(input_tensor: torch.Tensor):
    input_tensor = input_tensor.transpose(3, 2)
    input_tensor = compute_deltas(input_tensor)
    input_tensor = input_tensor.transpose(3, 2)
    return input_tensor


def image_delta(x):
    delta_1 = make_delta(x)
    delta_2 = make_delta(delta_1)
    x = torch.cat([x, delta_1, delta_2], dim=1)
    return x

In [22]:
class Mixup2(nn.Module):
    def __init__(self, mix_beta, mixup2_prob):
        super(Mixup2, self).__init__()
        self.beta_distribution = Beta(mix_beta, mix_beta)
        self.mixup2_prob = mixup2_prob

    def forward(self, X, Y, weight=None):
        p = torch.rand((1,))[0]
        if p < self.mixup2_prob:
            bs = X.shape[0]
            n_dims = len(X.shape)
            perm = torch.randperm(bs)
            coeffs = self.beta_distribution.rsample(torch.Size((bs,)))

            if n_dims == 2:
                X = coeffs.view(-1, 1) * X + (1 - coeffs.view(-1, 1)) * X[perm]
            elif n_dims == 3:
                X = coeffs.view(-1, 1, 1) * X + (1 - coeffs.view(-1, 1, 1)) * X[perm]
            else:
                X = (
                    coeffs.view(-1, 1, 1, 1) * X
                    + (1 - coeffs.view(-1, 1, 1, 1)) * X[perm]
                )
            Y = coeffs.view(-1, 1) * Y + (1 - coeffs.view(-1, 1)) * Y[perm]
            # Y = Y + Y[perm]
            # Y = torch.clamp(Y, 0, 1)

            if weight is None:
                return X, Y
            else:
                weight = coeffs.view(-1) * weight + (1 - coeffs.view(-1)) * weight[perm]
                return X, Y, weight
        else:
            if weight is None:
                return X, Y
            else:
                return X, Y, weight

In [23]:
mixup_layer = Mixup(mix_beta=5, mixup_prob=0.7, mixup_double=0.5)
mixup2_layer = Mixup2(mix_beta=2, mixup2_prob=0.15)

audio_transforms = Compose(
    [
        # AddColoredNoise(p=0.5),
        PitchShift(
            min_transpose_semitones=-4,
            max_transpose_semitones=4,
            sample_rate=32000,
            p=0.4,
        ),
        Shift(min_shift=-0.5, max_shift=0.5, p=0.4),
    ]
)

In [24]:
# load pretrained model
model = timm.create_model('tf_efficientnetv2_s_in21k', pretrained=True,in_chans=3) # You can change the data channel accepted by the pre-trained model by passing in argument in_chans

In [25]:
# Assume model is the loaded complete EfficientNet model
# Use the output of the first set of InvertedResidual
feature_extractor = torch.nn.Sequential(
    *list(model.children())[:-3]  # Remove the last three layers, which needs to be adjusted according to the actual model structure
)

In [26]:
# I want to separate feature extractor from lightningmodule and add it to dataloader as part of data processing

def trainloader_collate(batch):
    """
    When creating data batches, define how each batch should be stacked
    parameters:
        batch: is a list of tuples with (labels, clip, weights)
        feature_extractor: use a pretrained model as a feature extractor
    """
    # Unpack each individual sample in the batch
    labels, clips, weights = zip(*batch)

    # Stack the data into new batches
    labels = torch.stack(labels).float()
    clips = torch.stack(clips).float()

    weights = torch.stack(weights) if weights[0] is not None else None

    clips, labels, weights = mixup_layer(X=clips, Y=labels, weight=weights)

    # Use Compose to combine multiple audio transformation operations. These operations are applied to the input audio data to enhance the generalization and robustness of the model.
    clips = audio_transforms(clips, sample_rate=32000)

    # convert audio to mel spectrogram
    clips = mel_transform(sample_rate=32000, audio=clips)

    clips = torchaudio.transforms.AmplitudeToDB(stype="power", top_db=80)(clips)

    # generalization
    clips = (clips + 80) / 80

    # Random masking part of the spectrogram helps the model learn to be robust to missing information in certain time periods.
    clips = torchaudio.transforms.TimeMasking(
        time_mask_param=20, iid_masks=True, p=0.3
    )(clips)

    # Calculate the first and second order differences of audio or other time series data, usually called delta and delta-delta (also called acceleration) features.
    clips = image_delta(clips)

    # mix audio up
    clips, labels,weights = mixup2_layer(X=clips, Y=labels, weight=weights)

    # feature extractor
    # Use torch.no_grad() to ensure feature extraction does not preserve gradients
    with torch.no_grad():
        clips=feature_extractor(clips)

    return clips, labels, weights

In [27]:
# I want to separate feature extractor from lightningmodule and add it to dataloader as part of data processing.


def valloader_collate(batch):
    """
    When creating data batches, define how each batch should be stacked
    parameters:
        batch: is a list of tuples with (labels, clip, weights)
        feature_extractor: use a pretrained model as a feature extractor
    """
    # Unpack each individual sample in the batch
    labels, clips, weights = zip(*batch)

    # Stack the data into new batches
    labels = torch.stack(labels).float()
    clips = torch.stack(clips).float()

    weights = torch.stack(weights) if weights[0] is not None else None

    # Convert audio data into mel spectrogram
    clips = mel_transform(sample_rate=32000, audio=clips)

    ##Convert the amplitude of Mel Spectrogram to decibel (Decibel, dB)
    clips = torchaudio.transforms.AmplitudeToDB(stype="power", top_db=80)(clips)

    # generalization
    clips = (clips + 80) / 80

    # Calculate the first and second order differences of audio or other time series data, usually called delta and delta-delta (also called acceleration) features.
    clips = image_delta(clips)

    # feature extractor
    # Use torch.no_grad() to ensure feature extraction does not preserve gradients
    with torch.no_grad():
        clips = feature_extractor(clips)

    return clips, labels, weights

In [28]:
# define DatasetModule


class BirdclefDatasetModule(L.LightningDataModule):

    def __init__(
        self,
        train_sampler,
        val_sampler,
        train_df: pd.DataFrame,
        val_df: pd.DataFrame,
        bird_category_dir: str,
        audio_dir: str = "data/audio",
        batch_size: int = 128,
        workers=4,
    ):
        super().__init__()
        self.train_df = train_df
        self.val_df = val_df
        self.bird_category_dir = bird_category_dir
        self.audio_dir = audio_dir
        self.batch_size = batch_size
        self.train_sampler = train_sampler
        self.val_sampler = val_sampler
        self.workers = workers

    def train_dataloader(self):
        BD = BirdclefDataset(
            df=self.train_df,
            bird_category_dir=self.bird_category_dir,
            audio_dir=self.audio_dir,
            train=True,
        )
        loader = DataLoader(
            dataset=BD,
            batch_size=self.batch_size,
            sampler=self.train_sampler,
            pin_memory=True,
            num_workers=self.workers,
            collate_fn=trainloader_collate
        )
        return loader

    def val_dataloader(self):
        BD = BirdclefDataset(
            df=self.val_df,
            bird_category_dir=self.bird_category_dir,
            audio_dir=self.audio_dir,
            train=False,
        )
        loader = DataLoader(
            dataset=BD,
            batch_size=self.batch_size,
            sampler=self.val_sampler,
            pin_memory=True,
            num_workers=self.workers,
            collate_fn=valloader_collate
        )
        return loader

In [29]:
class ChronoNet(nn.Module):
    def __init__(self,class_nums:int=182):
        super().__init__()
        self.gru1 = nn.GRU(
            input_size=1280, hidden_size=128, num_layers=1, batch_first=True
        )
        self.bn1 = nn.BatchNorm1d(num_features=32)
        self.gru2 = nn.GRU(
            input_size=128, hidden_size=128, num_layers=1, batch_first=True
        )
        self.bn2 = nn.BatchNorm1d(num_features=32)
        self.gru3 = nn.GRU(
            input_size=256, hidden_size=128, num_layers=1, batch_first=True
        )
        self.bn3 = nn.BatchNorm1d(num_features=32)
        self.gru4 = nn.GRU(
            input_size=384, hidden_size=128, num_layers=1, batch_first=True
        )
        self.bn4 = nn.BatchNorm1d(num_features=32)
        self.dropout1 = nn.Dropout(0.3)
        self.fc1 = nn.Linear(in_features=128, out_features=class_nums)

    def forward(self, x):
        # Because the input shape required by gru is (batch_size, sequence length, feature_size)
        # But the result of the previous conversion calculation is (batchsize, feature_size, sequence length)
        # I need to change the shape
        x = x.permute(0, 2, 1)
        gru_out1, _ = self.gru1(x)
        x1 = self.bn1(gru_out1)
        gru_out2, _ = self.gru2(x1)
        x2 = self.bn2(gru_out2)
        # According to the chrononet architecture, we need to connect the calculations of the two layers of GRU according to the feature-size dimension
        x3 = torch.cat((x1, x2), dim=2)
        gru_out3, _ = self.gru3(x3)
        x4 = self.bn3(gru_out3)
        x5 = torch.cat((x1, x2, x4), dim=2)
        gru_out4, _ = self.gru4(x5)
        x6 = self.dropout1(gru_out4[:, -1, :]) 
        out = self.fc1(x6)

        return out

In [30]:
class FocalLoss(nn.Module):
    def __init__(self, gamma=2.0, weight=None, sample_weight=None,reduction='mean'):
        super(FocalLoss, self).__init__()
        self.gamma = gamma
        self.weight = weight  
        self.sample_weight=sample_weight
        self.reduction = reduction

    def forward(self, inputs, targets):
        ce_loss = F.cross_entropy(inputs, targets, reduction='none', weight=self.weight)
        p_t = torch.exp(-ce_loss) # Modulating Factor
        loss = (1 - p_t) ** self.gamma * ce_loss

        if self.sample_weight is not None:
            loss *= self.sample_weight

        if self.reduction == 'mean':
            return loss.mean()
        elif self.reduction == 'sum':
            return loss.sum()
        else:
            return loss

In [31]:
def compute_roc_auc(preds, targets):
    preds = torch.sigmoid(preds)  # Assuming binary or multi-label classification
    preds = preds.detach().cpu().numpy()  # Detach and convert to numpy
    targets = targets.detach().cpu().numpy()
    
    # Compute ROC-AUC on a per-class basis and average
    auc_scores = []
    for i in range(targets.shape[1]):  # Loop through classes
        if targets[:, i].sum() > 0:  # Only score classes with positive labels
            auc = roc_auc_score(targets[:, i], preds[:, i])
            auc_scores.append(auc)
    
    if len(auc_scores) > 0:
        return sum(auc_scores) / len(auc_scores)  # Return macro average
    else:
        return 0.0  # Handle cases where no class has positives

In [32]:
class BirdModelModule(L.LightningModule):

    def __init__(
        self,
        model,
        train_class_weight: torch.Tensor,
        val_class_weight: torch.Tensor,
        sample_rate: int = 32000,
        class_num: int = 182,
        lr: float = 0.001
    ):
        """
        Parameters:
            model: the defined model module
            train_class_weight: the argument is used for Focal Loss Function, focal loss needs a sequence of class weights to calculate the loss
            val_class_weight: the argument is also used for Focal loss function, for validation step
        """
        super().__init__()
        self.model = model.to(device)
        self.train_class_weight = train_class_weight.to(device)
        self.val_class_weight = val_class_weight.to(device)
        self.sample_rate = sample_rate
        self.class_num = class_num
        self.lr = lr

    def forward(self, clips):

        return self.model(clips)

    def training_step(self, batch, batch_idx):

        clips = batch[0]
        labels = batch[1]
        weights = batch[2]

        labels = labels.to(device)
        clips = clips.to(device)
        weights = weights.to(device)

        # Use flatten to combine the last two dimensions
        clips = torch.flatten(clips, start_dim=2)

        # predictions
        # target_pred=self(clip.to(device))
        target_pred = self(clips)
        # print("train", weights.shape)
        # initialize loss fn
        loss_fn = FocalLoss(weight=self.train_class_weight, sample_weight=weights)

        loss = loss_fn(inputs=target_pred, targets=labels)

        # Compute ROC-AUC and log it
        roc_auc = compute_roc_auc(preds=target_pred, targets=labels)

        self.log(
            "train_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True
        )
        self.log(
            "train_roc_auc",
            roc_auc,
            on_step=True,
            on_epoch=True,
            prog_bar=True,
            logger=True,
        )

        # clean memory
        del labels, clips, weights, target_pred
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        return loss

    def validation_step(self, batch, batch_idx):
        clips = batch[0]
        labels = batch[1]
        weights = batch[2]

        labels = labels.to(device)
        clips = clips.to(device)
        weights = weights.to(device)

        # Use flatten to combine the last two dimensions
        clips = torch.flatten(clips, start_dim=2)

        # predictions
        target_pred = self(clips).detach()

        # initialize loss fn
        print("val", weights.shape)
        loss_fn = FocalLoss(weight=self.val_class_weight, sample_weight=weights)

        loss = loss_fn(inputs=target_pred, targets=labels)

        # Compute ROC-AUC and log it
        roc_auc = compute_roc_auc(preds=target_pred, targets=labels)

        self.log(
            "val_loss", loss, on_step=True, on_epoch=True, prog_bar=True, logger=True
        )

        self.log(
            "val_roc_auc",
            roc_auc,
            on_step=True,
            on_epoch=True,
            prog_bar=True,
            logger=True,
        )

        # clean memory
        del labels, clips, weights, target_pred
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

        return loss

    def configure_optimizers(self):
        model_optimizer = torch.optim.Adam(
            filter(lambda p: p.requires_grad, self.parameters()),
            lr=self.lr,
            weight_decay=0.001,
        )
        interval = "epoch"

        lr_scheduler = CosineAnnealingWarmRestarts(
            model_optimizer, T_0=10, T_mult=1, eta_min=1e-6, last_epoch=-1
        )

        return {
            "optimizer": model_optimizer,
            "lr_scheduler": {
                "scheduler": lr_scheduler,
                "interval": interval,
                "monitor": "val_loss",
                "frequency": 1,
            },
        }

    def on_train_epoch_end(self):
        pass

    def on_validation_epoch_end(self):
        pass

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        pass

In [33]:
if __name__ == "__main__":

    num_workers = multiprocessing.cpu_count()

    # logger = WandbLogger(project='BirdClef-2024', name='sef_s21_v1')

    checkpoint_callback = ModelCheckpoint(
        monitor="val_loss", 
        dirpath="models/checkpoints",
        filename="sed_s21k_v1-{epoch:02d}-{val_loss:.2f}",
        save_top_k=1,  
        mode="min",  
        auto_insert_metric_name=False,  
    )

    early_stop_callback = EarlyStopping(
        monitor="val_loss", 
        min_delta=0.00,
        patience=3, 
        verbose=True,
        mode="min",  
    )

    # Previously we used a separate dataloader to feed the model
    # Here we encapsulate the dataloader and use this class to read data for training

    bdm = BirdclefDatasetModule(
        train_sampler=train_sampler,
        val_sampler=val_sampler,
        train_df=train_df,
        val_df=val_df,
        bird_category_dir="./external_files/13-2-bird-cates.npy",
        audio_dir="../../data/train_audio",
        batch_size=64,
        workers=0,
    )

    class_num = len(np.load("external_files/13-2-bird-cates.npy", allow_pickle=True))
    # initilize model
    chrononet = ChronoNet(class_nums=class_num)

    BirdModelModule = BirdModelModule(
        model=chrononet,
        train_class_weight=loss_train_class_weights,
        val_class_weight=loss_val_class_weights,
        class_num=6,
    )

    trainer = L.Trainer(
        # enable mixed precision
        precision=16,
        # Set up Trainer, use gradient accumulation, and update parameters after accumulating gradients every 512 batches
        accumulate_grad_batches=512,
        max_epochs=45,
        # accelerator="auto", # set to 'auto' or 'gpu' to use gpu if possible
        # devices='auto', # use all gpus if applicable like value=1 or "auto"
        default_root_dir="models/model_training",
        # logger=CSVLogger(save_dir='/Users/yiding/personal_projects/ML/github_repo/birdcief/code/model-training/log/',name='chrononet')
        # logger=logger,  # use MLflow logger
        callbacks=[checkpoint_callback, early_stop_callback],  
    )

    # train the model
    trainer.fit(
        model=BirdModelModule,
        datamodule=bdm, 
    )

Using 16bit Automatic Mixed Precision (AMP)
GPU available: True (mps), used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name  | Type      | Params
------------------------------------
0 | model | ChronoNet | 1.0 M 
------------------------------------
1.0 M     Trainable params
0         Non-trainable params
1.0 M     Total params
4.039     Total estimated model params size (MB)


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Caskroom/miniforge/base/envs/birdclef/lib/python3.10/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/opt/homebrew/Caskroom/miniforge/base/envs/birdclef/lib/python3.10/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
AttributeError: Can't get attribute 'BirdclefDataset' on <module '__main__' (built-in)>


In [None]:
gc.collect()

0