## Import Dependencies

In [None]:
import torch
import torchaudio
import torchvision
import torchvision.transforms as transforms
import torch.nn as nn

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import sklearn.metrics

import os
from pathlib import Path

## Configuration 

In [None]:
### DO NOT CHANGE UNLESS RE-GENERATING DATASET IMAGES ###
class Config_Mel():
    def __init__(self) -> None:
        
        # Device
        self.device = 'cpu'
        
        # Dataset Path
        self.birdclef2023 = 'birdclef-2023'

        # Out path
        self.outpath_images = self.birdclef2023 + '_MelSpectrograms'

        self.melSpecTransform = torchaudio.transforms.AmplitudeToDB()

        # Audio Features
        self.sample_rate = 32000
        self.n_fft=2048
        self.f_min=40
        self.f_max=15000
        self.hop_length=512
        self.n_mels=128
        self.mel_args = {'sample_rate': self.sample_rate,
                         'n_fft': self.n_fft,
                         'f_min': self.f_min,
                         'f_max': self.f_max,
                         'hop_length': self.hop_length,
                         'n_mels': self.n_mels}
### DO NOT CHANGE UNLESS RE-GENERATING DATASET IMAGES ###

class Config():
    def __init__(self) -> None:

        self.run_environment = 'local' # 'kaggle' 'local' 'colab'

        # Device
        if (self.run_environment == 'kaggle') or (self.run_environment == 'colab'):
            self.device = 'cpu'
        else:
            self.device = 'cpu'
        
        # Dataset Path
        if self.run_environment == 'kaggle':
            self.soundscape_paths = "/kaggle/input/birdclef-2023/test_soundscapes"
        elif self.run_environment == 'local':
            self.soundscape_paths = '/home/colin/elec5305/ele5305_research_project/birdclef-2023/test_soundscapes'
        elif self.run_environment == 'colab':
            raise NotImplementedError
        
        # Metadata Path
        if self.run_environment == 'kaggle':
            self.metadata_path = '/kaggle/input/birdclef-2023/train_metadata.csv'
        elif self.run_environment == 'local':
            self.metadata_path = '/home/colin/elec5305/ele5305_research_project/birdclef-2023/train_metadata.csv'
        elif self.run_environment == 'colab':
            raise NotImplementedError

        # Out path
        if self.run_environment == 'kaggle':
            self.outpath = '/kaggle/working/'
        elif self.run_environment == 'local':
            self.outpath = '/home/colin/elec5305/ele5305_research_project/src_mel/results'
        elif self.run_environment == 'colab':
            self.outpath = ''

        # Dataloader options
        self.num_workers = 2
        self.test_batch_size = 64

        # Model name
        self.model_name = 'tf_efficientnet_b0_ns'

        # Weights path
        if self.run_environment == 'kaggle':
            self.pretrained_weights = '/kaggle/input/weigths/model_weights.pth'
        elif self.run_environment == 'local':
            self.pretrained_weights = '/home/colin/elec5305/ele5305_research_project/src_audio/weights/model_weights.pth'

        # Image Transforms
        self.test_transforms = torchvision.transforms.Compose([
                    # transforms.ToTensor(),
                    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                    ])

        # Audio Features
        self.sample_rate = 32000
        self.period = 5

        # Mel Spectrogram Parameters
        self.n_fft=2048
        self.f_min=40
        self.f_max=15000
        self.hop_length=512
        self.n_mels=128
        self.mel_args = {'n_fft': self.n_fft,
                         'f_min': self.f_min,
                         'f_max': self.f_max,
                         'hop_length': self.hop_length,
                         'n_mels': self.n_mels}
        
CONFIG = Config()
CONFIG_MEL = Config_Mel()

## Test Set Data Class

In [None]:
df_test = pd.DataFrame(
     [(path.stem, *path.stem.split("_"), path) for path in Path(CONFIG.soundscape_paths).glob("*.ogg")],
    columns = ["filename", "name" ,"id", "path"]
)
print(df_test.shape)
df_test.head()

In [None]:
class BirdCLEF2023_SoundScapes(torch.utils.data.Dataset):
    def __init__(self, test_df, sample_rate, period, transform):
        self.test_df = test_df
        self.sample_rate = sample_rate
        self.period = period

        self.transform = transform

        return

    def __len__(self):
        return len(list(self.test_df['id']))
    
    def __getitem__(self,idx):

        dict_idx = dict(self.test_df.iloc[idx])

        ogg_file = dict_idx['path']
        waveform, sample_rate = torchaudio.load(ogg_file)
        waveform = waveform.ravel()

        resampler = torchaudio.transforms.Resample(orig_freq=self.sample_rate, new_freq=self.sample_rate)
        waveform = resampler(waveform)

        recording_length = len(waveform) / self.sample_rate
        n_periods = int(recording_length // self.period)


        waveform_list = []
        for i in range(n_periods):
            curr_waveform = waveform[i * self.sample_rate * self.period : (i+1) * self.sample_rate * self.period]

            # Append the current tensor to the list
            waveform_list.append(curr_waveform)

        # Stack the tensors in the list along the 0th dimension
        waveforms = torch.stack(waveform_list, dim=0)
        dict_idx['waveforms'] = waveforms

        return dict_idx

In [None]:
# Create Test dataset instance
test_dataset = BirdCLEF2023_SoundScapes(test_df=df_test, sample_rate=CONFIG.sample_rate, period = CONFIG.period, transform=CONFIG.test_transforms)

In [None]:
# Test Cell
print(test_dataset[0]['waveforms'].shape)
len(test_dataset)

## Load model

In [None]:
import timm

In [None]:
class MelSpectrogramLayer(nn.Module):
    def __init__(self, sample_rate, n_fft, hop_length, n_mels, transform):
        super(MelSpectrogramLayer, self).__init__()
        self.mel_transform = torchaudio.transforms.MelSpectrogram(
            sample_rate=sample_rate,
            n_fft=n_fft,
            hop_length=hop_length,
            n_mels=n_mels
        )
        self.transform = transform

    def forward(self, waveform):
        mel_spectrogram = self.mel_transform(waveform)

        batched = True
        if mel_spectrogram.dim() == 2:
            batched = False
        elif mel_spectrogram.dim() == 3:
            batched = True

        if batched == True:
            if self.training and torch.rand(1) >= CONFIG.masking_prob and CONFIG.masking == True:
                mel_spectrogram = torchaudio.transforms.FrequencyMasking(
                    freq_mask_param=mel_spectrogram.shape[1] // 5
                )(mel_spectrogram)
                mel_spectrogram = torchaudio.transforms.TimeMasking(
                    time_mask_param=mel_spectrogram.shape[2] // 5
                )(mel_spectrogram)
        else:
            if self.training and torch.rand(1) >= CONFIG.masking_prob and CONFIG.masking == True:
                mel_spectrogram = torchaudio.transforms.FrequencyMasking(
                    freq_mask_param=mel_spectrogram.shape[0] // 5
                )(mel_spectrogram)
                mel_spectrogram = torchaudio.transforms.TimeMasking(
                    time_mask_param=mel_spectrogram.shape[1] // 5
                )(mel_spectrogram)


        if batched == True:
            mel_spectrogram = mel_spectrogram.unsqueeze(1)
            mel_spectrogram = mel_spectrogram.expand(-1, 3, -1, -1)
        else:
            mel_spectrogram = mel_spectrogram.unsqueeze(0)
            mel_spectrogram = mel_spectrogram.expand(3, -1, -1)

        mel_spectrogram = self.transform(mel_spectrogram)

        mel_spectrogram = torch.nan_to_num(mel_spectrogram)
        
        return mel_spectrogram
    

# https://www.kaggle.com/code/leonshangguan/faster-eb0-sed-model-inference

def init_layer(layer):
    nn.init.xavier_uniform_(layer.weight)

    if hasattr(layer, "bias"):
        if layer.bias is not None:
            layer.bias.data.fill_(0.)


def init_bn(bn):
    bn.bias.data.fill_(0.)
    bn.weight.data.fill_(1.0)


def init_weights(model):
    classname = model.__class__.__name__
    if classname.find("Conv2d") != -1:
        nn.init.xavier_uniform_(model.weight, gain=np.sqrt(2))
        model.bias.data.fill_(0)
    elif classname.find("BatchNorm") != -1:
        model.weight.data.normal_(1.0, 0.02)
        model.bias.data.fill_(0)
    elif classname.find("GRU") != -1:
        for weight in model.parameters():
            if len(weight.size()) > 1:
                nn.init.orghogonal_(weight.data)
    elif classname.find("Linear") != -1:
        model.weight.data.normal_(0, 0.01)
        model.bias.data.zero_()


def interpolate(x: torch.Tensor, ratio: int):
    """Interpolate data in time domain. This is used to compensate the
    resolution reduction in downsampling of a CNN.
    Args:
      x: (batch_size, time_steps, classes_num)
      ratio: int, ratio to interpolate
    Returns:
      upsampled: (batch_size, time_steps * ratio, classes_num)
    """
    (batch_size, time_steps, classes_num) = x.shape
    upsampled = x[:, :, None, :].repeat(1, 1, ratio, 1)
    upsampled = upsampled.reshape(batch_size, time_steps * ratio, classes_num)
    return upsampled


def pad_framewise_output(framewise_output: torch.Tensor, frames_num: int):
    """Pad framewise_output to the same length as input frames. The pad value
    is the same as the value of the last frame.
    Args:
      framewise_output: (batch_size, frames_num, classes_num)
      frames_num: int, number of frames to pad
    Outputs:
      output: (batch_size, frames_num, classes_num)
    """
    output = torch.nn.functional.interpolate(
        framewise_output.unsqueeze(1),
        size=(frames_num, framewise_output.size(2)),
        align_corners=True,
        mode="bilinear").squeeze(1)

    return output

class AttBlockV2(nn.Module):
    def __init__(self,
                 in_features: int,
                 out_features: int,
                 activation="linear"):
        super().__init__()

        self.activation = activation
        self.att = nn.Conv1d(
            in_channels=in_features,
            out_channels=out_features,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=True)
        self.cla = nn.Conv1d(
            in_channels=in_features,
            out_channels=out_features,
            kernel_size=1,
            stride=1,
            padding=0,
            bias=True)

        self.init_weights()

    def init_weights(self):
        init_layer(self.att)
        init_layer(self.cla)

    def forward(self, x):
        # x: (n_samples, n_in, n_time)
        norm_att = torch.softmax(torch.tanh(self.att(x)), dim=-1)
        cla = self.nonlinear_transform(self.cla(x))
        x = torch.sum(norm_att * cla, dim=2)
        return x, norm_att, cla

    def nonlinear_transform(self, x):
        if self.activation == 'linear':
            return x
        elif self.activation == 'sigmoid':
            return torch.sigmoid(x)

class Mel_Classifier(torch.nn.Module):
    def __init__(
        self, 
        model_name: str,
        mel_generator: MelSpectrogramLayer,
        # config=None,
        pretrained=True, 
        num_classes=264, 
        in_channels=3
    ):
        super().__init__()
        
        # self.config = config

        self.mel_generator = mel_generator

        # self.bn0 = nn.BatchNorm2d(self.config.n_mels)

        base_model = timm.create_model(
            model_name, 
            pretrained=pretrained, 
            num_classes=0,
            global_pool="",
            in_chans=in_channels,
        )
        
        layers = list(base_model.children())[:-2]
        self.backbone = nn.Sequential(*layers)

        in_features = base_model.num_features

        self.fc1 = nn.Linear(in_features, in_features, bias=True)
        self.att_block = AttBlockV2(
            in_features, num_classes, activation="linear")

        self.init_weight()

    def init_weight(self):
        # init_bn(self.bn0)
        init_layer(self.fc1)
        
    def forward(self, input_data):
        input_data = self.mel_generator(input_data)

        # if self.config.in_channels == 3:
        x = input_data
        # else:
        #     x = input_data[:, [0], :, :] # (batch_size, 1, time_steps, mel_bins)

        # frames_num = x.shape[2]

        # x = x.transpose(1, 3)
        # x = self.bn0(x)
        # x = x.transpose(1, 3)


        # x = x.transpose(2, 3)

        x = self.backbone(x)
        
        # Aggregate in frequency axis
        x = torch.mean(x, dim=2)

        x1 = torch.nn.functional.max_pool1d(x, kernel_size=3, stride=1, padding=1)
        x2 = torch.nn.functional.avg_pool1d(x, kernel_size=3, stride=1, padding=1)
        x = x1 + x2

        x = x.transpose(1, 2)
        x = torch.nn.functional.relu_(self.fc1(x))
        x = x.transpose(1, 2)

        (clipwise_output, norm_att, segmentwise_output) = self.att_block(x)

        output_dict = {
            "clipwise_output": clipwise_output,
        }

        # return output_dict
        return clipwise_output

In [None]:
# Create model instance
melspec_layer = MelSpectrogramLayer(sample_rate=CONFIG.sample_rate,
                                    n_fft=CONFIG.n_fft,
                                    hop_length=CONFIG.hop_length,
                                    n_mels=CONFIG.n_mels,
                                    transform=CONFIG.test_transforms)
network = Mel_Classifier(model_name=CONFIG.model_name,
                         mel_generator=melspec_layer,
                         pretrained=False)

# Load weights
network.load_state_dict(torch.load(CONFIG.pretrained_weights, map_location=CONFIG.device))

# Set to eval
network.eval()

## Define Evaluation Function

In [None]:

def padded_cmap(solution, submission, padding_factor=5):
    solution = solution#.drop(['row_id'], axis=1, errors='ignore')
    submission = submission#.drop(['row_id'], axis=1, errors='ignore')
    new_rows = []
    for i in range(padding_factor):
        new_rows.append([1 for i in range(len(solution.columns))])
    new_rows = pd.DataFrame(new_rows)
    new_rows.columns = solution.columns
    padded_solution = pd.concat([solution, new_rows]).reset_index(drop=True).copy()
    padded_submission = pd.concat([submission, new_rows]).reset_index(drop=True).copy()
    score = sklearn.metrics.average_precision_score(
        padded_solution.values,
        padded_submission.values,
        average='macro',
    )
    return score

In [None]:
# Get species list
df_all = pd.read_csv(CONFIG.metadata_path)
species_list = list(set(df_all['primary_label']))

## Make Predicitons

In [None]:
toProbs = torch.nn.Softmax(dim=1)

outputs = [0] * len(test_dataset)

for i in range(len(test_dataset)):
    pred = toProbs(network(test_dataset[i]['waveforms'])).detach().cpu().numpy()

    outputs[i] = pred

## Make Submission

In [None]:
filenames = df_test.filename.values.tolist()
sub_df = pd.DataFrame(columns=['row_id']+species_list)

for i, file in enumerate(filenames):
    pred = outputs[i]
    num_rows = pred.shape[0]

    row_ids = [f'{file}_{(i+1)*5}' for i in range(num_rows)]
    df = pd.DataFrame(columns=['row_id']+species_list)
    
    df['row_id'] = row_ids


    df[species_list] = pred

    sub_df = pd.concat([sub_df,df]).reset_index(drop=True)


In [None]:
csv_path = os.path.join(CONFIG.outpath, 'submission.csv')
sub_df.to_csv(csv_path,index=False)