In [None]:
import torch
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.ticker
import torchaudio

from torchaudio import transforms
from torch.utils.data import DataLoader, Dataset, random_split

from sklearn.preprocessing import OrdinalEncoder
import sounddevice as sd


In [None]:
categories = ["chirping_birds", "crackling_fire", "handsaw", "chainsaw", "helicopter"]
audio_path = './ESC-50/audio/'
meta_path = './ESC-50/meta/'


data = pd.read_csv(meta_path + 'esc50.csv')

## Remove rows where the category does not belong to categories
data = data[data.category.isin(categories)]

re_encoder = OrdinalEncoder(dtype=np.long)
re_encoder.fit(data[["target"]])
data[["target"]] = re_encoder.transform(data[["target"]])

data

In [None]:
class AudioUtil:
    @staticmethod
    def open(path):
        return torchaudio.load(path)
          
    @staticmethod
    def toFile(aud,path):
        sig, sr = aud
        torchaudio.save(path,sig,sr)

    @staticmethod
    def displayTime(aud):
        sig, sr = aud
        num_channels,num_samples = sig.shape

        if(num_channels != 1):
            raise Exception("Can't display multi-channel sound.")

        t = np.linspace(0,num_samples/sr,num_samples)
        plt.plot(t,sig.numpy().ravel())

        plt.show()

    @staticmethod
    def playSound(aud):
        sig, sr = aud
        sd.play(0.1*sig.numpy().ravel(),sr,blocking=True)

    @staticmethod
    def toMelSpec(aud,n_fft = 512, n_mels = 20, truncate_to=0):
        sig,sr=aud
        num_channels,num_samples = sig.shape
        if(truncate_to>0):
            sig = sig[:,:truncate_to*n_fft]
        
        melspec = transforms.MelSpectrogram(sample_rate = sr,n_fft=n_fft,hop_length=n_fft,n_mels=n_mels,center=False)(sig)
        melspec = transforms.AmplitudeToDB()(melspec)
        return melspec
    
    @staticmethod
    def displayMelspec(melspec, aud):
        sig,sr = aud
        num_channels,num_samples = sig.shape

        tmax = num_samples/sr

        t = np.linspace(0,tmax,5)
        
        plt.imshow(melspec.numpy()[0], interpolation='nearest', origin="lower",aspect="auto")
        plt.colorbar()
        xmin,xmax = plt.xlim()
        plt.xticks(t*xmax/tmax,["{:10.4f}".format(i) for i in t])
        plt.show()

    @staticmethod
    def rechannel(aud):
        sig, sr = aud
        
        num_channels, num_samples  = sig.shape

        if(num_channels == 1):
            return aud
        
        else:
            ## Average the channels
            sig = sig.mean(axis = 1)
            return (sig, sr)

    @staticmethod
    def resample(aud, new_sr):
        sig, sr = aud

        num_channels, num_samples  = sig.shape
        if(num_channels != 1):
            raise Exception("Can't apply resample to multi-channel sound.")
        
        resig = torchaudio.transforms.Resample(sr,new_sr)(sig)
    
        return ((resig,new_sr))
        
    @staticmethod
    def sliceAudio(aud, trunc, index, trunc_type = "ms",bootstrap = False,bootstrapSeed=0):
        sig, sr = aud
        num_channels, num_samples  = sig.shape
        
    
        if(num_channels != 1):
            raise Exception("Can't apply slice to multi-channel sound.")

        if not bootstrap:
            if trunc_type == "ms":
                samples_per_slice = int(trunc/1000 * sr)
                if samples_per_slice*index > num_samples:
                    raise Exception("Can't extract a slice with index {} from this audio file.".format(index)) 
                
            elif trunc_type == "samples":
                samples_per_slice = trunc
                if samples_per_slice*index > num_samples:
                    raise Exception("Can't extract a slice with index {} from this audio file.".format(index))
            else:
                raise Exception('''Unknown truncation type, use "ms" or "samples"''')

            return (sig[:,index*samples_per_slice : (index+1)*samples_per_slice],sr)
        else:
            if trunc_type == "ms":
                samples_per_slice = int(trunc/1000 * sr)
                if samples_per_slice > num_samples:
                    raise Exception("Can't extract slices of such length from this audio file.") 
            elif trunc_type == "samples":
                samples_per_slice = trunc
                if samples_per_slice > num_samples:
                    raise Exception("Can't extract slices of such length from this audio file.")
            else:
                raise Exception('''Unknown truncation type, use "ms" or "samples"''')
        
            randomPermut = np.random.RandomState(bootstrapSeed).randint(0,num_samples-samples_per_slice-1,index+1)[index]
            return ((sig[:,randomPermut : randomPermut+samples_per_slice],sr))
        
    @staticmethod
    def augment_timeShift(aud,shift_limit,seed):
        sig,sr = aud
        _, sig_len = sig.shape
        shift_amt = int(np.random.RandomState(seed).random(size=1) * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)

    @staticmethod
    def augment_spectralMask(spec,max_mask_pct,n_freq_masks=1,n_time_masks=1):
        """
        TODO: Don't use for now, must use a seed to work properly
        """
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct * n_mels

        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec,mask_value)
        
        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec,mask_value)
        
        return aug_spec

    @staticmethod
    def augment_timeNoise(aud,noise_power):
        """
        TODO: Use a seed????
        """
        sig,sr = aud
        num_channels, num_samples  = sig.shape

        sig_aug = sig + np.random.normal(0,scale=noise_power**0.5,size=num_samples).astype('f')

        return (sig_aug,sr)
        
    @staticmethod
    def augment_timeLowPass(aud,cutoff_freq):
        """
        Highly unstable at high cutoff frequencies!!!! (>sr/2) TODO: Fix this
        """
        sig,sr = aud
        sig_aug = torchaudio.functional.lowpass_biquad(sig,sr,cutoff_freq=cutoff_freq)

        return (sig_aug,sr)


In [None]:
class SoundDS(Dataset):
    def __init__(self,df, data_path, bootstrap, bootstrapSeed = np.random.randint(0,20)):
        self.df = df
        self.data_path = str(data_path)
        self.samplesPerFile= 10
        self.windowSize = 512
        self.duration = self.windowSize*self.samplesPerFile
        self.sr = 11025
        self.channel = 1
        self.bootstrap = bootstrap
        self.augmentations = ["timeShift", "noise", "lowPass", "mask"]
        self.bootstrapSeed = bootstrapSeed
    
    def __len__(self):
        return len(self.df)*self.samplesPerFile
    
    
    def __getitem__(self, idx):
        index = idx // self.samplesPerFile
        loaded_file = self.getAudio(idx)

        melSpecGram = AudioUtil.toMelSpec(loaded_file,self.windowSize,20,0)

        # if "mask" in self.augmentations:
            # melSpecGram = AudioUtil.augment_spectralMask(melSpecGram,0.1,1,1)

        class_id = self.df["target"].iloc[index]

        return melSpecGram,class_id

    def getAudio(self,idx):
        index = idx // self.samplesPerFile
        subindex = idx % self.samplesPerFile

        audio_file = self.df["filename"].iloc[index]
        loaded_file = AudioUtil.open(self.data_path + audio_file)
        loaded_file = AudioUtil.rechannel(loaded_file)
        loaded_file = AudioUtil.resample(loaded_file,11025)
        loaded_file = AudioUtil.sliceAudio(loaded_file,self.duration,subindex,"samples",self.bootstrap,self.bootstrapSeed)
        if "timeShift" in self.augmentations:
            loaded_file = AudioUtil.augment_timeShift(loaded_file,1.0,idx*self.bootstrapSeed)
        if "noise" in self.augmentations:
            loaded_file = AudioUtil.augment_timeNoise(loaded_file,0.0000)
        if "lowPass" in self.augmentations:
            loaded_file = AudioUtil.augment_timeLowPass(loaded_file,5000)
        
        return loaded_file
        



In [None]:
ds = SoundDS(data,audio_path,True,4)

# Random split of 80:20 between training and validation
num_items = len(ds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(ds, [num_train, num_val])

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)


In [None]:
import torch.nn.functional as F
from torch.nn import init
import torch.nn as nn

# ----------------------------
# Audio Classification Model
# ----------------------------
class AudioClassifier (nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(1, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device

In [None]:
# ----------------------------
# Training Loop
# ----------------------------
def training(model, train_dl, num_epochs):
  # Loss Function, Optimizer and Scheduler
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # Repeat for each batch in the training set
    for i, data in enumerate(train_dl):
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        # Zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Keep stats for Loss and Accuracy
        running_loss += loss.item()

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

        #if i % 10 == 0:    # print every 10 mini-batches
        #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
    # Print stats at the end of the epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')
  
num_epochs=10   # Just for demo, adjust this higher.
training(myModel, train_dl, num_epochs)

In [None]:
# ----------------------------
# Inference
# ----------------------------
def inference (model, val_dl):
  correct_prediction = 0
  total_prediction = 0

  # Disable gradient updates
  with torch.no_grad():
    for data in val_dl:
      # Get the input features and target labels, and put them on the GPU
      inputs, labels = data[0].to(device), data[1].to(device)

      # Normalize the inputs
      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s

      # Get predictions
      outputs = model(inputs)

      # Get the predicted class with the highest score
      _, prediction = torch.max(outputs,1)
      # Count of predictions that matched the target label
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]
    
  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

# Run inference on trained model with the validation set
inference(myModel, val_dl)