In [None]:
import numpy as np
import pandas as pd
import librosa
import matplotlib.pyplot as plt
import seaborn as sns
import random

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import KFold, StratifiedKFold
import math
from collections import OrderedDict

from PIL import Image
import albumentations
from pydub import AudioSegment

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

import pretrainedmodels

from joblib import Parallel, delayed

import warnings
warnings.filterwarnings('ignore')

from tqdm.autonotebook import tqdm


In [None]:
train = pd.read_csv("/fred/oz138/test/data/train.csv")
test = pd.read_csv("/fred/oz138/test/data/test.csv")
submission = pd.read_csv("/fred/oz138/test/data/sample_submission.csv")

In [None]:
# arguments
class args:
    
    ROOT_PATH = "/fred/oz138/test/data/train_audio"
    
    num_classes = 264
    max_duration= 5 # seconds
    
    sample_rate = 32000
    
    img_height = 128
    img_width = 313
    
    batch_size = 16
    num_workers = 4
    epochs = 2
    
    lr = 0.0009
    wd = 1e-5
    momentum = 0.9
    eps = 1e-8
    betas = (0.9, 0.999)
    
    melspectrogram_parameters = {
        "n_mels": 128,
        "fmin": 20,
        "fmax": 16000
    }
    

In [None]:
# load audio files
def load_audio(path):
    try:
        sound = AudioSegment.from_wav(path)
        sound = sound.set_frame_rate(args.sample_rate)
        sound_array = np.array(sound.get_array_of_samples(), dtype=np.float32)
    except:
        sound_array = np.zeros(args.sample_rate * args.max_duration, dtype=np.float32)
        
    return sound_array, args.sample_rate

In [None]:
# audio albumentations
from albumentations.core.transforms_interface import DualTransform, BasicTransform

class AudioTransform(BasicTransform):
    """Transform for Audio task"""

    @property
    def targets(self):
        return {"data": self.apply}
    
    def update_params(self, params, **kwargs):
        if hasattr(self, "interpolation"):
            params["interpolation"] = self.interpolation
        if hasattr(self, "fill_value"):
            params["fill_value"] = self.fill_value
        return params

class NoiseInjection(AudioTransform):
    """It simply add some random value into data by using numpy"""
    def __init__(self, noise_levels=(0, 0.5), always_apply=False, p=0.5):
        super(NoiseInjection, self).__init__(always_apply, p)

        self.noise_levels = noise_levels
    
    def apply(self, data, **params):
        sound, sr = data
        noise_level = np.random.uniform(*self.noise_levels)
        noise = np.random.randn(len(sound))
        augmented_sound = sound + noise_level * noise
        # Cast back to same data type
        augmented_sound = augmented_sound.astype(type(sound[0]))

        return augmented_sound, sr

class ShiftingTime(AudioTransform):
    """Shifting time axis"""
    def __init__(self, always_apply=False, p=0.5):
        super(ShiftingTime, self).__init__(always_apply, p)
    
    def apply(self, data, **params):
        sound, sr = data

        shift_max = np.random.randint(1,len(sound))
        shift = np.random.randint(int(sr * shift_max))
        direction = np.random.randint(0,2)
        if direction == 1:
            shift = -shift

        augmented_sound = np.roll(sound, shift)
        # Set to silence for heading/ tailing
        if shift > 0:
            augmented_sound[:shift] = 0
        else:
            augmented_sound[shift:] = 0

        return augmented_sound, sr

class PitchShift(AudioTransform):
    
    def __init__(self, always_apply=False, p=0.5):
        super(PitchShift, self).__init__(always_apply, p)
    
    def apply(self, data, **params):
        sound, sr = data

        n_steps = np.random.randint(-10, 10)
        augmented_sound = librosa.effects.pitch_shift(sound, sr, n_steps)

        return augmented_sound, sr

class TimeStretch(AudioTransform):
    
    def __init__(self, always_apply=False, p=0.5):
        super(TimeStretch, self).__init__(always_apply, p)
    
    def apply(self, data, **params):
        sound, sr = data

        rate = np.random.uniform(0, 2)
        augmented_sound = librosa.effects.time_stretch(sound, rate)

        return augmented_sound, sr

class RandomAudio(AudioTransform):
    
    def __init__(self,  seconds=5, always_apply=False, p=0.5):
        super(RandomAudio, self).__init__(always_apply, p)

        self.seconds = seconds
    
    def apply(self, data, **params):
        sound, sr = data

        shift = np.random.randint(len(sound))
        trim_sound = np.roll(sound, shift)

        min_samples = int(sr * self.seconds)

        if len(trim_sound) < min_samples:
            padding = min_samples - len(trim_sound)
            offset = padding // 2
            trim_sound = np.pad(trim_sound, (offset, padding - offset), "constant")
        else:
            trim_sound = trim_sound[:min_samples]

        return trim_sound, sr

class MelSpectrogram(AudioTransform):

    def __init__(self, parameters, always_apply=False, p=0.5):
        super(MelSpectrogram, self).__init__(always_apply, p)

        self.parameters = parameters
    
    def apply(self, data, **params):
        sound, sr = data

        melspec = librosa.feature.melspectrogram(sound, sr=sr, **self.parameters)
        melspec = librosa.power_to_db(melspec)
        melspec = melspec.astype(np.float32)

        return melspec, sr

class SpecAugment(AudioTransform):
    
    def __init__(self, num_mask=2, freq_masking=0.15, time_masking=0.20, always_apply=False, p=0.5):
        super(SpecAugment, self).__init__(always_apply, p)

        self.num_mask = num_mask
        self.freq_masking = freq_masking
        self.time_masking = time_masking
    
    def apply(self, data, **params):
        melspec, sr = data

        spec_aug = self.spec_augment(melspec, 
                                     self.num_mask,
                                     self.freq_masking,
                                     self.time_masking,
                                     melspec.min())
        


        return spec_aug, sr
    
    # Source: https://www.kaggle.com/davids1992/specaugment-quick-implementation
    def spec_augment(self, 
                    spec: np.ndarray,
                    num_mask=2,
                    freq_masking=0.15,
                    time_masking=0.20,
                    value=0):
        spec = spec.copy()
        num_mask = random.randint(1, num_mask)
        for i in range(num_mask):
            all_freqs_num, all_frames_num  = spec.shape
            freq_percentage = random.uniform(0.0, freq_masking)

            num_freqs_to_mask = int(freq_percentage * all_freqs_num)
            f0 = np.random.uniform(low=0.0, high=all_freqs_num - num_freqs_to_mask)
            f0 = int(f0)
            spec[f0:f0 + num_freqs_to_mask, :] = value

            time_percentage = random.uniform(0.0, time_masking)

            num_frames_to_mask = int(time_percentage * all_frames_num)
            t0 = np.random.uniform(low=0.0, high=all_frames_num - num_frames_to_mask)
            t0 = int(t0)
            spec[:, t0:t0 + num_frames_to_mask] = value

        return spec

class SpectToImage(AudioTransform):

    def __init__(self, always_apply=False, p=0.5):
        super(SpectToImage, self).__init__(always_apply, p)
    
    def apply(self, data, **params):
        image, sr = data
        delta = librosa.feature.delta(image)
        accelerate = librosa.feature.delta(image, order=2)
        image = np.stack([image, delta, accelerate], axis=0)
        image = image.astype(np.float32) / 100.0

        return image

In [None]:
### Example

train_audio_augmentation = albumentations.Compose([
     RandomAudio(seconds=args.max_duration, always_apply=True),
     NoiseInjection(p=0.33),
     MelSpectrogram(parameters=args.melspectrogram_parameters,always_apply=True),
     SpecAugment(p=0.33),
     SpectToImage(always_apply=True)
])

valid_audio_augmentation = albumentations.Compose([
     RandomAudio(seconds=args.max_duration, always_apply=True),
     MelSpectrogram(parameters=args.melspectrogram_parameters,always_apply=True),
     SpectToImage(always_apply=True)
])


path = f"{args.ROOT_PATH}/aldfly/XC135454.mp3"
data = load_audio(path)
image = train_audio_augmentation(data=data)['data']

plt.imshow(image.transpose(1,2,0))
plt.show()
