In [90]:
import pandas as pd
from pathlib import Path

In [91]:
download_path = Path.cwd()/'UrbanSound8K'

In [92]:
metadata_file = download_path/'metadata'/'UrbanSound8K.csv'
df = pd.read_csv(metadata_file)
df.head()

Unnamed: 0,slice_file_name,fsID,start,end,salience,fold,classID,class
0,100032-3-0-0.wav,100032,0.0,0.317551,1,5,3,dog_bark
1,100263-2-0-117.wav,100263,58.5,62.5,1,5,2,children_playing
2,100263-2-0-121.wav,100263,60.5,64.5,1,5,2,children_playing
3,100263-2-0-126.wav,100263,63.0,67.0,1,5,2,children_playing
4,100263-2-0-137.wav,100263,68.5,72.5,1,5,2,children_playing


In [93]:
df['relative_path'] = '/fold' + df['fold'].astype(str) + '/' + df['slice_file_name'].astype(str)

In [94]:
df = df[['relative_path', 'classID']]
df.head()

Unnamed: 0,relative_path,classID
0,/fold5/100032-3-0-0.wav,3
1,/fold5/100263-2-0-117.wav,2
2,/fold5/100263-2-0-121.wav,2
3,/fold5/100263-2-0-126.wav,2
4,/fold5/100263-2-0-137.wav,2


In [95]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

In [96]:
class AudioUtil():
    #Load an audio file, Return the signal as a tensor and the sample rate
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)
    
    #Convert the given audio to the desired number of channels
    @staticmethod
    def rechannel(aud, new_channel):
        sig, sr = aud

        if(sig.shape[0] == new_channel):
            return aud
        if (new_channel == 1):
            #Stereo to mono by only selecting first channel
            resig = sig[:1, :]
        else:
            #mono to stereo by dupe first channel
            resig = torch.cat([sig,sig])
        
        return ((resig, sr))
    
    #Since Resample applies to a single channel, we resample one channel at a time
    @staticmethod
    def resample(aud, newsr):
        sig,sr = aud
        if(sr==newsr):
            return aud
        num_channels = sig.shape[0]
        #resample first channel
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
        if(num_channels>1):
            #Resample the second channel and merge both
            retwo = torchaudio.transforms.Resample(sr,newsr)(sig[1:,:])
            resig = torch.cat([resig, retwo])
        return ((resig, newsr))

    #Pad (or turncate) the signal to a fixed length 'max_ms' in millisec
    @staticmethod
    def pad_trunc(aud,max_ms):
        sig, sr = aud
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms

        if(sig_len > max_len):
            #Truncate the signal to the given length
            sig = sig[:,:max_len]
        elif(sig_len < max_len):
            #Length of padding to add at the beginning and the end of the signal
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len = max_len - sig_len -pad_begin_len

            #Pad with 0s
            pad_begin = torch.zeros((num_rows,pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))

            sig = torch.cat((pad_begin, sig, pad_end), 1)
        
        return (sig,sr)

    #Shifts the signal to the left or right by some percent. Values at the end
    #are 'warpped around' to the start of the transformed signal.
    @staticmethod
    def time_shift(aud, shift_limit):
        sig, sr = aud
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return(sig.roll(shift_amt), sr)

    #Generate a Spectogram
    @staticmethod
    def specto_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
        sig,sr = aud
        top_db = 80

        #spec has shape [channel, n_mels, time], where channel is mono, stereo etc
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

        #Convert to decibels
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return (spec)

    #Augment the Spectorgram by masking out some sections of it, in both the frequency
    #dimension (ie. horizontal bars) and the time dimension (verticel bars) to prevent
    #overfitting and to help the model generalise better. The masked sections are
    #replaced with the mean value.
    @staticmethod
    def spectrogram_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
        _, n_mels, n_steps = spec.shape
        mask_value = spec.mean()
        aug_spec = spec

        freq_mask_param = max_mask_pct*n_mels
        for _ in range(n_freq_masks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)
        
        time_mask_param = max_mask_pct * n_mels
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec,mask_value)
        
        return aug_spec

In [97]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

In [98]:
#Sound Dataset
class SoundDS(Dataset):
    def __init__(self,df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4
    
    #Number of items in dataset
    def __len__(self):
        return len(self.df)
    
    #Get iæth item in dataset
    def __getitem__(self, idx):
        #Absolute file path of the audio file - concatenate the audio directory with
        #the relative path
        audio_file = self.data_path + self.df.loc[idx, 'relative_path']
        #Get the Class ID
        class_id = self.df.loc[idx, 'classID']

        aud = AudioUtil.open(audio_file)
        #Some sounds have a higher sample rate, or fewer channels compared to the
        #majority. So make all sound have the same number of channels and same
        #sample rate. Unless the sample rate is the same, the pad_trunc will still
        #result in arrays of different lengths, even though the sound duration is
        #the same.
        reaud = AudioUtil.resample(aud, self.sr)
        rechan = AudioUtil.rechannel(reaud, self.channel)

        dur_aud = AudioUtil.pad_trunc(rechan,self.duration)
        shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
        sgram = AudioUtil.specto_gram(shift_aud, n_mels=64,n_fft=1024,hop_len=None)
        aug_sgram = AudioUtil.spectrogram_augment(sgram, max_mask_pct=0.1, n_freq_masks=2,n_time_masks=2)

        return aug_sgram, class_id

In [106]:
from torch.utils.data import random_split

myds = SoundDS(df, df[['relative_path']])

#Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

#Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)


NameError: name 'data_path' is not defined

In [100]:
import torch.nn.functional as F
import torch.nn as nn
from torch.nn import init

In [101]:
#Audio Classifier Model
class AudioClassifier (nn.Module):
    #Build the model architecture
    def __init__(self):
        super().__init__()
        conv_layers = []

        #First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(2,8, kernel_size = (5,5), stride=(2,2),padding=(2,2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight,a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        #Second Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv2 = nn.Conv2d(8,16, kernel_size=(3,3), stride=(2,2),padding=(1,1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight,a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        #Third Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv3 = nn.Conv2d(16,32, kernel_size = (3,3), stride=(2,2),padding=(1,1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight,a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        #Final Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv4 = nn.Conv2d(32,64, kernel_size=(3,3), stride=(2,2),padding=(1,1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight,a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        #Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        #Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)

    #Forward pass computations
    def forward(self, x):
        #Run the convolutional blocks
        x = self.conv(x)

        #Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        #Linear layer
        x = self.lin(x)

        #Final output
        return x


In [102]:
#Create the model and put it on the GPU if avaliable
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
#Check that it is on cuda
next(myModel.parameters()).device

device(type='cpu')

In [103]:
#Training loop
def training(model, train_dl, num_epochs):
    #Loss, optim, scheduler
    criterion = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                    steps_per_epoch=int(len(train_dl)),
                                                    epochs=num_epochs,
                                                    anneal_strategy='linear')
    
    #repeat
    for epoch in range(num_epochs):
        running_loss = 0.0
        correct_prediction = 0
        total_preditiction = 0

        #repeat
        for i,data in enumerate(train_dl):
            #Get the input features and target labels, and put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            #Normalize
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m)/inputs_s

            #Zero grad
            optimizer.zero_grad()

            #forward + backward + optimize
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            scheduler.step()

            #Keep stats for Loss and Accuracy
            running_loss += loss.item()

            #Get the predicted class with highest score
            _, prediction = torch.max(outputs, 1)
            #Count of predictions that matched the label
            correct_prediction += (prediction == labels).sum().item()
            total_preditiction += prediction.shape[0]

        #print stats at the end of the epoch
        num_batches = len(train_dl)
        avg_loss = running_loss/num_batches
        acc = correct_prediction/total_preditiction
        print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')
    
    print('Finished Training')

In [104]:
num_epochs = 2
training(myModel, train_dl, num_epochs)

RuntimeError: Error opening 'c:\\Users\\Morten\\Desktop\\School\\Semester5\\IDATT2502_AI\\_Eksamen\\UrbanSound8K/fold1/159738-8-0-13.wav': System error.

In [None]:
#inference
def inference(model, val_dl):
    correct_prediction = 0
    total_prediction = 0

    #Disable gradient updates
    with torch.no_grad():
        for data in val_dl:
            #Get the input features and target labels, put them on the GPU
            inputs, labels = data[0].to(device), data[1].to(device)

            #Nomralize the input
            inputs_m, inputs_s = inputs.mean(), inputs.std()
            inputs = (inputs - inputs_m)/inputs_s

            #Get predictions
            outputs = model(input)

            #Get the predicted class with the highest score
            _, prediction = torch.max(outputs, 1)
            #Count of predictions that matched the target label
            correct_prediction += (prediction == labels).sum().item()
            total_prediction += prediction.shape[0]
        
    acc = correct_prediction/total_prediction
    print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

#Run inference on trained model with the validation set
inference(myModel, val_dl)
    