In [None]:
! pip install -U pip
! pip install -U torch==1.5.0
! pip install -U torchaudio==0.5.0
! pip install -U torchvision==0.6.0
! pip install -U matplotlib==3.2.1
! pip install -U trains>=0.15.0
! pip install -U pandas==1.0.4
! pip install -U numpy==1.18.4
! pip install -U tensorboard==2.2.1

In [None]:
import PIL
import io

import pandas as pd
import numpy as np
from pathlib2 import Path
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset
from torch.utils.tensorboard import SummaryWriter

import torchaudio
from torchvision.transforms import ToTensor

from trains import Task

%matplotlib inline

In [None]:
task = Task.init(project_name='Audio Example', task_name='audio classifier')
configuration_dict = {'number_of_epochs': 10, 'batch_size': 4, 'dropout': 0.25, 'base_lr': 0.001}
configuration_dict = task.connect(configuration_dict)  # enabling configuration override by trains
print(configuration_dict)  # printing actual configuration (after override in remote mode)

In [None]:
# Download UrbanSound8K dataset (https://urbansounddataset.weebly.com/urbansound8k.html)
path_to_UrbanSound8K = './data/UrbanSound8K'

In [None]:
class UrbanSoundDataset(Dataset):
#rapper for the UrbanSound8K dataset
    def __init__(self, csv_path, file_path, folderList):
        self.file_path = file_path
        self.file_names = []
        self.labels = []
        self.folders = []
        
        #loop through the csv entries and only add entries from folders in the folder list
        csvData = pd.read_csv(csv_path)
        for i in range(0,len(csvData)):
            if csvData.iloc[i, 5] in folderList:
                self.file_names.append(csvData.iloc[i, 0])
                self.labels.append(csvData.iloc[i, 6])
                self.folders.append(csvData.iloc[i, 5])
        
    def __getitem__(self, index):
        #format the file path and load the file
        path = self.file_path / ("fold" + str(self.folders[index])) / self.file_names[index]
        sound, sample_rate = torchaudio.load(path, out = None, normalization = True)

        # UrbanSound8K uses two channels, this will convert them to one
        soundData = torch.mean(sound, dim=0, keepdim=True)
        
        #Make sure all files are the same size
        if soundData.numel() < 160000:
            fixedsize_data = torch.nn.functional.pad(soundData, (0, 160000 - soundData.numel()))
        else:
            fixedsize_data = soundData[0,:160000].reshape(1,160000)
        
        #downsample the audio
        downsample_data = fixedsize_data[::5]
        
        melspectogram_transform = torchaudio.transforms.MelSpectrogram(sample_rate=sample_rate)
        melspectogram = melspectogram_transform(downsample_data)
        melspectogram_db = torchaudio.transforms.AmplitudeToDB()(melspectogram)

        return fixedsize_data, sample_rate, melspectogram_db, self.labels[index]
    
    def __len__(self):
        return len(self.file_names)


csv_path = Path(path_to_UrbanSound8K) / 'metadata' / 'UrbanSound8K.csv'
file_path = Path(path_to_UrbanSound8K) / 'audio'

train_set = UrbanSoundDataset(csv_path, file_path, range(1,10))
test_set = UrbanSoundDataset(csv_path, file_path, [10])
print("Train set size: " + str(len(train_set)))
print("Test set size: " + str(len(test_set)))

train_loader = torch.utils.data.DataLoader(train_set, batch_size = configuration_dict.get('batch_size', 4), 
                                           shuffle = True, pin_memory=True, num_workers=1)
test_loader = torch.utils.data.DataLoader(test_set, batch_size = configuration_dict.get('batch_size', 4), 
                                          shuffle = False, pin_memory=True, num_workers=1)

classes = ('air_conditioner', 'car_horn', 'children_playing', 'dog_bark', 'drilling', 'engine_idling', 
           'gun_shot', 'jackhammer', 'siren', 'street_music')

In [None]:
class Net(nn.Module):
    def __init__(self, num_classes, dropout_value):
        super(Net,self).__init__()
        self.num_classes = num_classes
        self.dropout_value = dropout_value
        
        self.C1 = nn.Conv2d(1,16,3)
        self.C2 = nn.Conv2d(16,32,3)
        self.C3 = nn.Conv2d(32,64,3)
        self.C4 = nn.Conv2d(64,128,3)
        self.maxpool1 = nn.MaxPool2d(2,2)        
        self.fc1 = nn.Linear(128*29*197,128)
        self.fc2 = nn.Linear(128,self.num_classes)
        self.dropout = nn.Dropout(self.dropout_value)
    
    def forward(self,x):
        # add sequence of convolutional and max pooling layers
        x = F.relu(self.C1(x))
        x = self.maxpool1(F.relu(self.C2(x)))
        x = F.relu(self.C3(x))
        x = self.maxpool1(F.relu(self.C4(x)))
        # flatten image input
        x = x.view(-1,128*29*197)
        x =  F.relu(self.fc1(self.dropout(x)))
        x = self.fc2(self.dropout(x))
        return x
    
    
model = Net(len(classes), configuration_dict.get('dropout', 0.25))

In [None]:
optimizer = optim.SGD(model.parameters(), lr = configuration_dict.get('base_lr', 0.001), momentum = 0.9)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size = 3, gamma = 0.1)
criterion = nn.CrossEntropyLoss()

In [None]:
device = torch.cuda.current_device() if torch.cuda.is_available() else torch.device('cpu')
print('Device to use: {}'.format(device))
model.to(device)

In [None]:
tensorboard_writer = SummaryWriter('./tensorboard_logs')

In [None]:
def plot_signal(signal, title, cmap=None):
    fig = plt.figure()
    if signal.ndim == 1:
        plt.plot(signal)
    else:
        plt.imshow(signal, cmap=cmap)    
    plt.title(title)
    
    plot_buf = io.BytesIO()
    plt.savefig(plot_buf, format='jpeg')
    plot_buf.seek(0)
    plt.close(fig)
    return ToTensor()(PIL.Image.open(plot_buf))

In [None]:
def train(model, epoch):
    model.train()
    for batch_idx, (sounds, sample_rate, inputs, labels) in enumerate(train_loader):
        inputs = inputs.to(device)
        labels = labels.to(device)

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        _, predicted = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        iteration = epoch * len(train_loader) + batch_idx
        if batch_idx % log_interval == 0: #print training stats
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'
                  .format(epoch, batch_idx * len(inputs), len(train_loader.dataset), 
                          100. * batch_idx / len(train_loader), loss))
            tensorboard_writer.add_scalar('training loss/loss', loss, iteration)
            tensorboard_writer.add_scalar('learning rate/lr', optimizer.param_groups[0]['lr'], iteration)
                
        
        if batch_idx % debug_interval == 0:    # report debug image every "debug_interval" mini-batches
            for n, (inp, pred, label) in enumerate(zip(inputs, predicted, labels)):
                series = 'label_{}_pred_{}'.format(classes[label.cpu()], classes[pred.cpu()])
                tensorboard_writer.add_image('Train MelSpectrogram samples/{}_{}_{}'.format(batch_idx, n, series), 
                                             plot_signal(inp.cpu().numpy().squeeze(), series, 'hot'), iteration)

In [None]:
def test(model, epoch):
    model.eval()
    class_correct = list(0. for i in range(10))
    class_total = list(0. for i in range(10))
    with torch.no_grad():
        for idx, (sounds, sample_rate, inputs, labels) in enumerate(test_loader):
            inputs = inputs.to(device)
            labels = labels.to(device)

            outputs = model(inputs)

            _, predicted = torch.max(outputs, 1)
            c = (predicted == labels)
            for i in range(len(inputs)):
                label = labels[i].item()
                class_correct[label] += c[i].item()
                class_total[label] += 1
        
            iteration = (epoch + 1) * len(train_loader)
            if idx % debug_interval == 0:    # report debug image every "debug_interval" mini-batches
                for n, (sound, inp, pred, label) in enumerate(zip(sounds, inputs, predicted, labels)):
                    series = 'label_{}_pred_{}'.format(classes[label.cpu()], classes[pred.cpu()])
                    tensorboard_writer.add_audio('Test audio samples/{}_{}_{}'.format(idx, n, series), 
                                                 sound, iteration, int(sample_rate[n]))
                    tensorboard_writer.add_image('Test MelSpectrogram samples/{}_{}_{}'.format(idx, n, series), 
                                                 plot_signal(inp.cpu().numpy().squeeze(), series, 'hot'), iteration)

    total_accuracy = 100 * sum(class_correct)/sum(class_total)
    print('[Iteration {}] Accuracy on the {} test images: {}%\n'.format(epoch, sum(class_total), total_accuracy))
    tensorboard_writer.add_scalar('accuracy/total', total_accuracy, iteration)

In [None]:
log_interval = 100
debug_interval = 200
for epoch in range(configuration_dict.get('number_of_epochs', 10)):
    train(model, epoch)
    test(model, epoch)
    scheduler.step()