In [13]:
# ----------------------------
# Prepare training data from Metadata file
# ----------------------------
import pandas as pd
from pathlib import Path

download_path = Path.cwd()/'data'/'UrbanSound8K'
data_path = download_path
# Read metadata file
metadata_file = download_path/'metadata'/'UrbanSound8K.csv'
df = pd.read_csv(metadata_file)
df.head()

# Construct file path by concatenating fold and file name
df['relative_path'] = '/fold' + df['fold'].astype(str) + '/' + df['slice_file_name'].astype(str)

# Take relevant columns
df = df[['relative_path', 'classID']]
df.head()

Unnamed: 0,relative_path,classID
0,/fold5/100032-3-0-0.wav,3
1,/fold5/100263-2-0-117.wav,2
2,/fold5/100263-2-0-121.wav,2
3,/fold5/100263-2-0-126.wav,2
4,/fold5/100263-2-0-137.wav,2


In [21]:
import math, random
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio

class AudioUtil():
  # ----------------------------
  # Load an audio file. Return the signal as a tensor and the sample rate
  # ----------------------------
  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    return (sig, sr)
  # ----------------------------
  # Convert the given audio to the desired number of channels
  # ----------------------------
  @staticmethod
  def rechannel(aud, new_channel):
    sig, sr = aud

    if (sig.shape[0] == new_channel):
      # Nothing to do
      return aud

    if (new_channel == 1):
      # Convert from stereo to mono by selecting only the first channel
      resig = sig[:1, :]
    else:
      # Convert from mono to stereo by duplicating the first channel
      resig = torch.cat([sig, sig])

    return ((resig, sr))
  # ----------------------------
  # Since Resample applies to a single channel, we resample one channel at a time
  # ----------------------------
  @staticmethod
  def resample(aud, newsr):
    sig, sr = aud

    if (sr == newsr):
      # Nothing to do
      return aud

    num_channels = sig.shape[0]
    # Resample first channel
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      # Resample the second channel and merge both channels
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))
  # ----------------------------
  # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
  # ----------------------------
  @staticmethod
  def pad_trunc(aud, max_ms):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms

    if (sig_len > max_len):
      # Truncate the signal to the given length
      sig = sig[:,:max_len]

    elif (sig_len < max_len):
      # Length of padding to add at the beginning and end of the signal
      pad_begin_len = random.randint(0, max_len - sig_len)
      pad_end_len = max_len - sig_len - pad_begin_len

      # Pad with 0s
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))

      sig = torch.cat((pad_begin, sig, pad_end), 1)
      
    return (sig, sr)
  # ----------------------------
  # Shifts the signal to the left or right by some percent. Values at the end
  # are 'wrapped around' to the start of the transformed signal.
  # ----------------------------
  @staticmethod
  def time_shift(aud, shift_limit):
    sig,sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)
  # ----------------------------
  # Generate a Spectrogram
  # ----------------------------
  @staticmethod
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80

    # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

    # Convert to decibels
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)
  # ----------------------------
  # Augment the Spectrogram by masking out some sections of it in both the frequency
  # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
  # overfitting and to help the model generalise better. The masked sections are
  # replaced with the mean value.
  # ----------------------------
  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [39]:
from torch.utils.data import DataLoader, Dataset, random_split
import torchaudio

# ----------------------------
# Sound Dataset
# ----------------------------
class SoundDS(Dataset):
  def __init__(self, df, data_path):
    self.df = df
    self.data_path = str(data_path)
    self.duration = 4000
    self.sr = 44100
    self.channel = 2
    self.shift_pct = 0.4
            
  # ----------------------------
  # Number of items in dataset
  # ----------------------------
  def __len__(self):
    return len(self.df)    
    
  # ----------------------------
  # Get i'th item in dataset
  # ----------------------------
  def __getitem__(self, idx):
    # Absolute file path of the audio file - concatenate the audio directory with
    # the relative path
    audio_file = self.data_path + self.df.loc[idx, 'relative_path']
    # Get the Class ID
    class_id = self.df.loc[idx, 'classID']

    aud = AudioUtil.open(audio_file)
    # Some sounds have a higher sample rate, or fewer channels compared to the
    # majority. So make all sounds have the same number of channels and same 
    # sample rate. Unless the sample rate is the same, the pad_trunc will still
    # result in arrays of different lengths, even though the sound duration is
    # the same.
    reaud = AudioUtil.resample(aud, self.sr)
    rechan = AudioUtil.rechannel(reaud, self.channel)

    dur_aud = AudioUtil.pad_trunc(rechan, self.duration)
    shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
    sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
    aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)

    return aug_sgram, class_id, audio_file

In [40]:
from torch.utils.data import random_split



myds = SoundDS(df, data_path)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 0.8)
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)
print(val_ds[100])

(tensor([[[ 20.6065,  26.1066,  22.9612,  ...,  26.3701,  24.8197,  26.4258],
         [ 29.3451,  30.5413,  28.1066,  ...,  30.3388,  29.0485,  29.7255],
         [ 27.6176,  28.3429,  23.2725,  ...,  26.8402,  24.0312,  25.9529],
         ...,
         [-21.5477, -27.7489, -31.5458,  ..., -32.9533, -31.6734, -32.4550],
         [-22.7927, -32.1873, -32.6538,  ..., -32.4252, -33.6491, -33.4696],
         [-22.5035, -32.5212, -34.6304,  ..., -34.4008, -34.5448, -35.9068]],

        [[ 20.4203,  25.9071,  22.7603,  ...,  26.5693,  25.0198,  26.6253],
         [ 29.1478,  30.3413,  27.9066,  ...,  30.5385,  29.2484,  29.9255],
         [ 27.4164,  28.1426,  23.0733,  ...,  27.0405,  24.2310,  26.1529],
         ...,
         [-21.8158, -28.0847, -31.8552,  ..., -32.8424, -31.5462, -32.1693],
         [-22.9226, -32.6720, -32.8876,  ..., -32.0565, -33.3486, -33.3040],
         [-22.5544, -32.6005, -34.7764,  ..., -34.1636, -34.5668, -35.7054]]]), 7, '/l/audio_ML_demo/data/UrbanSound8K/fol

In [41]:
import torch.nn.functional as F
import torch.nn as nn
from torch.nn import init

# ----------------------------
# Audio Classification Model
# ----------------------------
class AudioClassifier (nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = nn.ReLU()
        self.bn1 = nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = nn.ReLU()
        self.bn2 = nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = nn.ReLU()
        self.bn3 = nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = nn.ReLU()
        self.bn4 = nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]

        # Linear Classifier
        self.ap = nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = nn.Linear(in_features=64, out_features=10)

        # Wrap the Convolutional Blocks
        self.conv = nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device

device(type='cpu')

In [134]:
# ----------------------------
# Training Loop
# ----------------------------
def training(model, train_dl, num_epochs):
  # Loss Function, Optimizer and Scheduler
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # Repeat for each batch in the training set
    for i, data in enumerate(train_dl):
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        # Zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Keep stats for Loss and Accuracy
        running_loss += loss.item()

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

        #if i % 10 == 0:    # print every 10 mini-batches
        #    print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
    # Print stats at the end of the epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')
  
num_epochs=2   # Just for demo, adjust this higher.
training(myModel, train_dl, num_epochs)

Epoch: 0, Loss: 1.13, Accuracy: 0.62
Epoch: 1, Loss: 1.03, Accuracy: 0.65
Finished Training


In [138]:
# ----------------------------
# Inference
# ----------------------------
def inference (model, val_dl):
  correct_prediction = 0
  total_prediction = 0

  # Disable gradient updates
  with torch.no_grad():
    for data in val_dl:
      # Get the input features and target labels, and put them on the GPU
      inputs, labels = data[0].to(device), data[1].to(device)

      # Normalize the inputs
      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s

      # Get predictions
      outputs = model(inputs)

      # Get the predicted class with the highest score
      _, prediction = torch.max(outputs,1)
      print(prediction)
      # Count of predictions that matched the target label
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]
    
  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

# Run inference on trained model with the validation set
inference(myModel, val_dl)

tensor([0, 5, 8, 1, 4, 9, 3, 1, 0, 6, 5, 4, 7, 9, 5, 2])
tensor([3, 4, 4, 5, 8, 8, 4, 2, 0, 5, 2, 7, 8, 0, 3, 0])
tensor([4, 4, 8, 4, 1, 0, 7, 0, 6, 7, 5, 5, 8, 3, 2, 0])
tensor([0, 4, 0, 5, 2, 5, 4, 2, 3, 9, 8, 7, 5, 9, 1, 9])
tensor([9, 7, 3, 9, 2, 0, 5, 2, 8, 0, 2, 5, 5, 1, 7, 7])
tensor([9, 4, 0, 5, 5, 0, 8, 7, 0, 2, 3, 3, 5, 8, 3, 7])
tensor([5, 2, 9, 8, 7, 9, 5, 7, 5, 2, 0, 8, 7, 3, 9, 4])
tensor([8, 3, 9, 4, 5, 3, 4, 0, 7, 3, 0, 1, 8, 7, 0, 8])
tensor([1, 6, 7, 3, 4, 5, 5, 9, 5, 3, 5, 4, 4, 2, 9, 6])
tensor([3, 9, 9, 4, 3, 5, 7, 5, 1, 3, 7, 2, 2, 9, 9, 4])
tensor([7, 6, 8, 0, 9, 4, 2, 0, 8, 1, 0, 7, 9, 4, 0, 8])
tensor([2, 2, 7, 7, 4, 7, 9, 5, 0, 2, 3, 8, 3, 8, 4, 9])
tensor([8, 0, 4, 5, 2, 9, 3, 2, 4, 5, 6, 0, 7, 6, 6, 9])
tensor([3, 4, 5, 4, 7, 2, 9, 5, 6, 9, 2, 6, 5, 1, 4, 4])
tensor([8, 5, 9, 1, 3, 7, 0, 4, 0, 9, 3, 7, 6, 5, 4, 3])
tensor([8, 9, 4, 3, 9, 5, 9, 4, 3, 5, 8, 7, 8, 0, 9, 2])
tensor([9, 5, 2, 7, 8, 2, 0, 5, 8, 4, 0, 3, 6, 5, 2, 8])
tensor([9, 8, 7, 9, 4, 9, 5, 3,

In [139]:
from random import randint
from IPython.display import Audio
# This and the next cell: self made interactive part, pick a random audio file from the validation set 
# and let the user listen to them as well, see what the model predicts.
classes = ["Air conditioner",
           "Car horn",
           "Children playing",
           "Dog bark",
           "Drilling",
           "Engine idling",
           "Gun shot",
           "Jackhammer",
           "Siren",
           "Street music"]
    

In [144]:
# maybe use next(iter(data_loader)) to get one batch and test with that? If shuffle=True for daltaloader then could just use that as the solution as well
# Output is always the same here for some reason, investigate
with torch.no_grad():
    data_point = val_ds[randint(0, len(val_ds)-1)]
    # Get the input features and target labels
    input_data = data_point[0].unsqueeze(0)
    input_, label = input_data.to(device), torch.tensor([data_point[1]]).to(device)

    # Normalize the inputs
    input_m, input_s = input_.mean(), input_.std()
    input_ = (input_ - input_m) / input_s
    
    print(input_)
    # Get predictions
    output = myModel(input_)
    print(output)

    # Get the predicted class with the highest score
    _, prediction = torch.max(outputs,1)
    print("Model predicted: {}".format(classes[prediction[0]]))
    print("Correct answer was: {}".format(classes[label]))

Audio(data_point[2])

tensor([[[[ 4.6343,  4.6446,  4.6441,  ...,  4.6406,  4.6352,  4.6298],
          [ 4.0434,  4.0537,  4.0532,  ...,  4.1467,  4.0445,  4.0391],
          [ 1.4760,  1.9957,  1.6554,  ...,  3.6507,  2.2369,  2.2693],
          ...,
          [-0.4146,  1.0379,  1.2336,  ...,  2.3663,  1.1632,  1.5050],
          [-0.4684,  0.8253,  0.8182,  ...,  2.3896,  1.0317,  1.0668],
          [-0.4566,  0.8346,  0.9857,  ...,  2.3286,  0.8815,  1.0179]],

         [[ 4.5963,  4.5958,  4.5072,  ...,  4.6603,  4.6391,  4.6420],
          [ 4.0073,  4.0617,  3.9307,  ...,  4.0728,  4.0485,  4.0513],
          [ 2.7808,  3.3718,  3.3280,  ...,  2.4919,  2.4181,  1.6939],
          ...,
          [ 2.7367,  2.2650,  1.4735,  ...,  2.1376,  0.1395,  0.8978],
          [ 2.4752,  2.2105,  1.5657,  ...,  2.1952,  0.1454,  0.7848],
          [ 2.1067,  1.9682,  1.2641,  ...,  2.0617,  0.0584,  0.5415]]]])
tensor([[ 0.2934, -1.0371,  0.5700,  0.1201,  0.1943,  0.1961, -1.2032, -0.0850,
          0.1104,  0