In [2]:
import numpy as np
import pandas as pd
from pathlib import Path
import torch
import torchaudio
from torchaudio import transforms
from IPython.display import Audio
import random
from torch.utils.data import DataLoader, Dataset, random_split

In [179]:
class AudioUtil():
  # ----------------------------
  # Load an audio file. Return the signal as a tensor and the sample rate
  # ----------------------------
  
  @staticmethod
  def open(audio_file):
    sig, sr = torchaudio.load(audio_file)
    return (sig, sr)
  
  @staticmethod
  def rechannel(aud, new_channel):
    sig, sr = aud

    if (sig.shape[0] == new_channel):
      # Nothing to do
      return aud

    if (new_channel == 1):
      # Convert from stereo to mono by selecting only the first channel
      resig = sig[:1, :]
    else:
      # Convert from mono to stereo by duplicating the first channel
      resig = torch.cat([sig, sig])

    return ((resig, sr))

  
  # ----------------------------
  # Since Resample applies to a single channel, we resample one channel at a time
  # ----------------------------
  @staticmethod
  def resample(aud, newsr):
    sig, sr = aud

    if (sr == newsr):
      # Nothing to do
      return aud

    num_channels = sig.shape[0]
    # Resample first channel
    resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1,:])
    if (num_channels > 1):
      # Resample the second channel and merge both channels
      retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:,:])
      resig = torch.cat([resig, retwo])

    return ((resig, newsr))
  # ----------------------------
  # Pad (or truncate) the signal to a fixed length 'max_ms' in milliseconds
  # ----------------------------
  @staticmethod
  def pad_trunc(aud, max_ms=5000):
    sig, sr = aud
    num_rows, sig_len = sig.shape
    max_len = sr//1000 * max_ms

    if (sig_len > max_len):
      # Truncate the signal to the given length
      sig = sig[:,:max_len]

    elif (sig_len < max_len):
      # Length of padding to add at the beginning and end of the signal
      pad_begin_len = random.randint(0, max_len - sig_len) 
      pad_end_len = max_len - sig_len - pad_begin_len

      # Pad with 0s
      pad_begin = torch.zeros((num_rows, pad_begin_len))
      pad_end = torch.zeros((num_rows, pad_end_len))

      sig = torch.cat((pad_begin, sig, pad_end), 1)
      
    return (sig, sr)

  
  
  # ----------------------------
  # Shifts the signal to the left or right by some percent. Values at the end
  # are 'wrapped around' to the start of the transformed signal.
  # ----------------------------
  @staticmethod
  def time_shift(aud, shift_limit):
    random.seed(0)
    sig,sr = aud
    _, sig_len = sig.shape
    shift_amt = int(random.random() * shift_limit * sig_len)
    return (sig.roll(shift_amt), sr)

  # ----------------------------
  # Generate a Spectrogram
  # ----------------------------
  @staticmethod
  def spectro_gram(aud, n_mels=64, n_fft=1024, hop_len=None):
    sig,sr = aud
    top_db = 80

    # spec has shape [channel, n_mels, time], where channel is mono, stereo etc
    spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)

    # Convert to decibels
    spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
    return (spec)
  
  # ----------------------------
  # Augment the Spectrogram by masking out some sections of it in both the frequency
  # dimension (ie. horizontal bars) and the time dimension (vertical bars) to prevent
  # overfitting and to help the model generalise better. The masked sections are
  # replaced with the mean value.
  # ----------------------------
  @staticmethod
  def spectro_augment(spec, max_mask_pct=0.1, n_freq_masks=1, n_time_masks=1):
    _, n_mels, n_steps = spec.shape
    mask_value = spec.mean()
    aug_spec = spec

    freq_mask_param = max_mask_pct * n_mels
    for _ in range(n_freq_masks):
      aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)

    time_mask_param = max_mask_pct * n_steps
    for _ in range(n_time_masks):
      aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)

    return aug_spec

In [217]:
# ----------------------------
# Sound Dataset
# ----------------------------
class SoundDS(Dataset):
  def __init__(self, df, data_path, train):
    self.df = df
    self.data_path = str(data_path)
    self.duration = 5000
    self.sr = 16000
    self.channel = 1
    self.shift_pct = 0.4
    self.train = train
            
  # ----------------------------
  # Number of items in dataset
  # ----------------------------
  def __len__(self):
    return len(self.df)    
    
  # ----------------------------
  # Get i'th item in dataset
  # ----------------------------
  def __getitem__(self, idx):
    # Absolute file path of the audio file - concatenate the audio directory with
    # the relative path
    audio_file = self.data_path + self.df.loc[idx, 'track']
    # Get the Class ID
    algorithm = self.df.loc[idx, 'algorithm']

    aud = AudioUtil.open(audio_file)
    # Some sounds have a higher sample rate, or fewer channels compared to the
    # majority. So make all sounds have the same number of channels and same 
    # sample rate. Unless the sample rate is the same, the pad_trunc will still
    # result in arrays of different lengths, even though the sound duration is
    # the same.

    reaud = AudioUtil.resample(aud, self.sr)
    rechan = AudioUtil.rechannel(reaud, self.channel)
    dur_aud = AudioUtil.pad_trunc(rechan, self.duration)

    if self.train:
      shift_aud = AudioUtil.time_shift(dur_aud, self.shift_pct)
      sgram = AudioUtil.spectro_gram(shift_aud, n_mels=64, n_fft=1024, hop_len=None)
      aug_sgram = AudioUtil.spectro_augment(sgram, max_mask_pct=0.1, n_freq_masks=2, n_time_masks=2)
    else:
      sgram = AudioUtil.spectro_gram(dur_aud, n_mels=64, n_fft=1024, hop_len=None)
      aug_sgram = sgram

    return aug_sgram, algorithm, idx

In [193]:
from torch.utils.data import random_split

data_path = "E:/SP_Cup/spcup_2022_training/"
test_path = "E:/SP_Cup/spcup_2022_eval_part1/spcup_2022_eval_part1/"
label_file = pd.read_csv("E:/SP_Cup/spcup_2022_training/labels.csv")
test_file = pd.read_csv("E:/SP_Cup/spcup_2022_eval_part1/spcup_2022_eval_part1/labels_eval_part1.csv")

del test_file["Unnamed: 0"]
test_file["algorithm"] = 0

myds = SoundDS(label_file, data_path, True)
test_ds = SoundDS(test_file, test_path, False)

# Random split of 80:20 between training and validation
num_items = len(myds)
num_train = round(num_items * 1) # 0.8
num_val = num_items - num_train
train_ds, val_ds = random_split(myds, [num_train, num_val])

# Create training and validation data loaders
train_dl = torch.utils.data.DataLoader(train_ds, batch_size=16, shuffle=True)
val_dl = torch.utils.data.DataLoader(val_ds, batch_size=16, shuffle=False)
test_dl = torch.utils.data.DataLoader(test_ds, batch_size=16, shuffle=False)
test_dl_full = torch.utils.data.DataLoader(test_ds, batch_size=test_file["algorithm"].shape[0], shuffle=False)

In [183]:
import torch.nn.functional as F
from torch.nn import init

# ----------------------------
# Audio Classification Model
# ----------------------------
class AudioClassifier (torch.nn.Module):
    # ----------------------------
    # Build the model architecture
    # ----------------------------
    def __init__(self):
        super().__init__()
        conv_layers = []

        # First Convolution Block with Relu and Batch Norm. Use Kaiming Initialization
        self.conv1 = torch.nn.Conv2d(1, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu1 = torch.nn.ReLU()
        self.bn1 = torch.nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv1.weight, a=0.1)
        self.conv1.bias.data.zero_()
        conv_layers += [self.conv1, self.relu1, self.bn1]

        # Second Convolution Block
        self.conv2 = torch.nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu2 = torch.nn.ReLU()
        self.bn2 = torch.nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv2.weight, a=0.1)
        self.conv2.bias.data.zero_()
        conv_layers += [self.conv2, self.relu2, self.bn2]

        # Second Convolution Block
        self.conv3 = torch.nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu3 = torch.nn.ReLU()
        self.bn3 = torch.nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv3.weight, a=0.1)
        self.conv3.bias.data.zero_()
        conv_layers += [self.conv3, self.relu3, self.bn3]

        # Second Convolution Block
        self.conv4 = torch.nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu4 = torch.nn.ReLU()
        self.bn4 = torch.nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv4.weight, a=0.1)
        self.conv4.bias.data.zero_()
        conv_layers += [self.conv4, self.relu4, self.bn4]


        # Linear Classifier
        self.ap = torch.nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = torch.nn.Linear(in_features=64, out_features=6)

        # Wrap the Convolutional Blocks
        self.conv = torch.nn.Sequential(*conv_layers)
 
    # ----------------------------
    # Forward pass computations
    # ----------------------------
    def forward(self, x):
        # Run the convolutional blocks
        x = self.conv(x)

        # Adaptive pool and flatten for input to linear layer
        x = self.ap(x)
        x = x.view(x.shape[0], -1)

        # Linear layer
        x = self.lin(x)

        # Final output
        return x

# Create the model and put it on the GPU if available
myModel = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
myModel = myModel.to(device)
# Check that it is on Cuda
next(myModel.parameters()).device

device(type='cpu')

In [195]:
# ----------------------------
# Training Loop
# ----------------------------
def training(model, train_dl, num_epochs):
  accuracy = dict()
  loss_database = dict()
  
  save_path = "E:\SP_Cup\Audio Classifier\models\model_v3_101"

  # Loss Function, Optimizer and Scheduler
  criterion = torch.nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  # Repeat for each epoch
  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    # Repeat for each batch in the training set
    for i, data in enumerate(train_dl):
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        # Zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        # Keep stats for Loss and Accuracy
        running_loss += loss.item()

        # Get the predicted class with the highest score
        _, prediction = torch.max(outputs,1)
        # Count of predictions that matched the target label
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

        if i % 10 == 0:    # print every 10 mini-batches
            print('[%d, %5d] loss: %.3f' % (epoch + 1, i + 1, running_loss / 10))
    
    # Print stats at the end of the epoch
    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

    #DATA saving
    accuracy[epoch + 1] = acc
    loss_database[epoch + 1] = avg_loss

    torch.save(model.state_dict(), save_path + "/epoch_" + str(epoch) + ".pth")

  print('Finished Training')
  
num_epochs=101  # Just for demo, adjust this higher.
training(myModel, train_dl, num_epochs)

#torch.save(myModel.state_dict(), 'models_v10/epoch_11.pth')

#dict -> array , matplotlib.pyplot epoch vs parameter 

[1,     1] loss: 0.004
[1,    11] loss: 0.144
[1,    21] loss: 0.280
[1,    31] loss: 0.457
[1,    41] loss: 0.635
[1,    51] loss: 0.744
[1,    61] loss: 0.863
[1,    71] loss: 1.021
[1,    81] loss: 1.116
[1,    91] loss: 1.275
[1,   101] loss: 1.425
[1,   111] loss: 1.611
[1,   121] loss: 1.729
[1,   131] loss: 1.860
[1,   141] loss: 2.018
[1,   151] loss: 2.191
[1,   161] loss: 2.312
[1,   171] loss: 2.473
[1,   181] loss: 2.625
[1,   191] loss: 2.760
[1,   201] loss: 2.940
[1,   211] loss: 3.068
[1,   221] loss: 3.209
[1,   231] loss: 3.393
[1,   241] loss: 3.561
[1,   251] loss: 3.726
[1,   261] loss: 3.885
[1,   271] loss: 3.996
[1,   281] loss: 4.107
[1,   291] loss: 4.237
[1,   301] loss: 4.419
[1,   311] loss: 4.529
[1,   321] loss: 4.643
[1,   331] loss: 4.740
[1,   341] loss: 4.925
[1,   351] loss: 5.154
[1,   361] loss: 5.293
[1,   371] loss: 5.477
Epoch: 0, Loss: 0.15, Accuracy: 0.94
[2,     1] loss: 0.046
[2,    11] loss: 0.197
[2,    21] loss: 0.339
[2,    31] loss: 0.4

In [196]:
torch.save(myModel.state_dict(), "E:\SP_Cup\Audio Classifier\models\model_v3_101\epoch_101.pth")

In [218]:
criterion = torch.nn.CrossEntropyLoss()

model_test = AudioClassifier()
model_test.load_state_dict(torch.load("C:\Users\dumin\OneDrive\Desktop\output models\part_1\model\epoch_101.pth"))

test_predictions = np.zeros((test_file["track"].shape[0], 1)).astype(np.int32)

with torch.no_grad():
    for i, data in enumerate(test_dl):
        # Get the input features and target labels, and put them on the GPU
        inputs, labels = data[0].to(device), data[1].to(device)

        # Normalize the inputs
        inputs_m, inputs_s = inputs.mean(), inputs.std()   

        # print(inputs.shape)
        #print(inputs_m)

        inputs = (inputs - inputs_m) / inputs_s

        # forward + backward + optimize
        outputs = myModel(inputs)

        _, prediction = torch.max(outputs,1)

        #print(prediction.numpy())

        test_file.loc[i*16:(i+1)*16-1, "algorithm"] = np.array(prediction)

        #data.append(prediction.numpy()[0])
        
        #print(i)

In [204]:
test_file.to_csv('../results/answer_v3_100.csv', index=False, header=False)