# 0 Imports

In [1]:
import math, random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import sklearn
from sklearn.preprocessing import OneHotEncoder, OrdinalEncoder

from vscode_audio import Audio

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset, random_split
from torch.nn import init
import torchaudio
from torchaudio import transforms

plt.rcParams['figure.facecolor'] = 'white'

In [2]:
data_train = pd.read_csv("data/train_post_competition.csv")
data_test = pd.read_csv("data/test_post_competition.csv")

In [3]:
data_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9473 entries, 0 to 9472
Data columns (total 5 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   fname              9473 non-null   object
 1   label              9473 non-null   object
 2   manually_verified  9473 non-null   int64 
 3   freesound_id       9473 non-null   int64 
 4   license            9473 non-null   object
dtypes: int64(2), object(3)
memory usage: 370.2+ KB


In [4]:
data_train.head()

Unnamed: 0,fname,label,manually_verified,freesound_id,license
0,00044347.wav,Hi-hat,0,28739,Attribution
1,001ca53d.wav,Saxophone,1,358827,Attribution
2,002d256b.wav,Trumpet,0,10897,Creative Commons 0
3,0033e230.wav,Glockenspiel,1,325017,Attribution
4,00353774.wav,Cello,1,195688,Attribution


In [5]:
data_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9400 entries, 0 to 9399
Data columns (total 5 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   fname         9400 non-null   object
 1   label         9400 non-null   object
 2   usage         9400 non-null   object
 3   freesound_id  9400 non-null   int64 
 4   license       9400 non-null   object
dtypes: int64(1), object(4)
memory usage: 367.3+ KB


In [6]:
data_test.head()

Unnamed: 0,fname,label,usage,freesound_id,license
0,00063640.wav,,Ignored,88926,Attribution
1,0013a1db.wav,,Ignored,373335,Creative Commons 0
2,002bb878.wav,,Ignored,189611,Attribution
3,002d392d.wav,,Ignored,35939,Attribution
4,00326aa9.wav,Oboe,Private,355125,Attribution


In [7]:
data_train.label.unique().size

41

In [8]:
data_test.label.unique().size

42

In [9]:
data_train.drop(columns=["manually_verified", "freesound_id", "license"], inplace=True)
data_test.drop(columns=["usage", "freesound_id", "license"], inplace=True)

In [10]:
data_train.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9473 entries, 0 to 9472
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   fname   9473 non-null   object
 1   label   9473 non-null   object
dtypes: object(2)
memory usage: 148.1+ KB


In [11]:
data_test.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9400 entries, 0 to 9399
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype 
---  ------  --------------  ----- 
 0   fname   9400 non-null   object
 1   label   9400 non-null   object
dtypes: object(2)
memory usage: 147.0+ KB


In [15]:
# enc = OrdinalEncoder(dtype=np.int64, handle_unknown="use_encoded_value", unknown_value=np.nan)
enc = OrdinalEncoder(dtype=np.int64)
enc.fit(data_test[["label"]])
data_train["label_enc"] = enc.transform(data_train[["label"]])
data_test["label_enc"] = enc.transform(data_test[["label"]])

In [16]:
LABELS_COUNT = data_test["label_enc"].unique().size
LABELS_COUNT

42

In [17]:
class AudioUtils():
    @staticmethod
    def open(audio_file):
        sig, sr = torchaudio.load(audio_file)
        return (sig, sr)

    @staticmethod
    def rechannel(audio, new_channel):
        sig, sr = audio
        if sig.shape[0] == new_channel:
            return audio
        if new_channel == 1:
            resig = sig[:1, :]
        else:
            resig = torch.cat([sig, sig])
        return resig, sr

    @staticmethod
    def resample(audio, newsr):
        sig, sr = audio
        if sr == newsr:
            return audio
        num_channels = sig.shape[0]
        resig = torchaudio.transforms.Resample(sr, newsr)(sig[:1, :])
        if num_channels > 1:
            retwo = torchaudio.transforms.Resample(sr, newsr)(sig[1:, :])
            resig = torch.cat([resig, retwo])
        return resig, newsr

    @staticmethod
    def pad_trunc(audio, max_ms):
        sig, sr = audio
        num_rows, sig_len = sig.shape
        max_len = sr//1000 * max_ms
        if sig_len > max_len:
            sig = sig[:, :max_len]
        elif sig_len < max_len:
            pad_begin_len = random.randint(0, max_len - sig_len)
            pad_end_len  =max_len - sig_len - pad_begin_len
            pad_begin = torch.zeros((num_rows, pad_begin_len))
            pad_end = torch.zeros((num_rows, pad_end_len))
            sig = torch.cat((pad_begin, sig, pad_end), 1)
        return sig, sr

    @staticmethod
    def time_shift(audio, shift_limit):
        sig, sr = audio
        _, sig_len = sig.shape
        shift_amt = int(random.random() * shift_limit * sig_len)
        return (sig.roll(shift_amt), sr)

    @staticmethod
    def spectrogram(audio, n_mels=64, n_fft=1024, hop_len=None):
        sig, sr = audio
        top_db = 80
        spec = transforms.MelSpectrogram(sr, n_fft=n_fft, hop_length=hop_len, n_mels=n_mels)(sig)
        spec = transforms.AmplitudeToDB(top_db=top_db)(spec)
        return spec

    def spectrogram_augment(spectrogram, max_mask_pct=0.1, n_freq_maks=1, n_time_masks=1):
        _, n_mels, n_steps = spectrogram.shape
        mask_value = spectrogram.mean()
        aug_spec = spectrogram
        freq_mask_param = max_mask_pct * n_mels
        for _ in range(n_freq_maks):
            aug_spec = transforms.FrequencyMasking(freq_mask_param)(aug_spec, mask_value)
        time_mask_param = max_mask_pct * n_steps
        for _ in range(n_time_masks):
            aug_spec = transforms.TimeMasking(time_mask_param)(aug_spec, mask_value)
        return aug_spec

In [18]:
class SoundDataset(Dataset):
    def __init__(self, df, data_path):
        self.df = df
        self.data_path = str(data_path)
        self.duration = 4000
        self.sr = 44100
        self.channel = 2
        self.shift_pct = 0.4
        self.ordinal_enc = OrdinalEncoder()
        self.df["label"] = self.ordinal_enc.fit_transform(df[["label"]])
        # self.y = torch.tensor(self.df["label"])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, index):
        audio_file = self.data_path + self.df.loc[index, "fname"]
        label = torch.tensor(self.df.loc[index, "label"], dtype=torch.long)

        audio = AudioUtils.open(audio_file)
        re_aud = AudioUtils.resample(audio, self.sr)
        re_chan = AudioUtils.rechannel(re_aud, self.channel)
        dur_aud = AudioUtils.pad_trunc(re_chan, self.duration)
        shift_aud = AudioUtils.time_shift(dur_aud, self.shift_pct)
        sgram = AudioUtils.spectrogram(shift_aud)
        aug_sgram = AudioUtils.spectrogram_augment(sgram)
        return aug_sgram, label

In [19]:
BATCH_SIZE = 16
TRAIN_PATH = "data/audio_train/"
TEST_PATH = "data/audio_test/"

In [20]:
train_ds = SoundDataset(data_train, TRAIN_PATH)
test_ds = SoundDataset(data_test, TEST_PATH)

train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True)
test_dl = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=True)

In [21]:
train_ds[0]

  label = torch.tensor(self.df.loc[index, "label"], dtype=torch.long)


(tensor([[[-43.1036, -36.8363, -44.8570,  ..., -37.0609, -44.8570, -44.8570],
          [-44.8570, -29.9207, -32.4493,  ..., -36.8845, -40.1252, -35.0314],
          [-31.1404, -33.0701, -32.2485,  ..., -42.7650, -36.8694, -35.6922],
          ...,
          [-33.2584, -44.8570, -44.8570,  ..., -44.8570, -44.8570, -44.8165],
          [-33.4945, -44.5604, -44.8570,  ..., -44.8570, -44.8570, -44.8570],
          [-33.5546, -44.7317, -44.8570,  ..., -44.8570, -44.6854, -44.8570]],
 
         [[-43.1036, -36.8363, -44.8570,  ..., -37.0609, -44.8570, -44.8570],
          [-44.8570, -29.9207, -32.4493,  ..., -36.8845, -40.1252, -35.0314],
          [-31.1404, -33.0701, -32.2485,  ..., -42.7650, -36.8694, -35.6922],
          ...,
          [-33.2584, -44.8570, -44.8570,  ..., -44.8570, -44.8570, -44.8165],
          [-33.4945, -44.5604, -44.8570,  ..., -44.8570, -44.8570, -44.8570],
          [-33.5546, -44.7317, -44.8570,  ..., -44.8570, -44.6854, -44.8570]]]),
 tensor(23))

In [22]:
class AudioClassifier(torch.nn.Module):
    def __init__(self):
        super().__init__()
        conv_layers = []

        # CONV BLOCK 1
        self.conv_1 = torch.nn.Conv2d(2, 8, kernel_size=(5, 5), stride=(2, 2), padding=(2, 2))
        self.relu_1 = torch.nn.ReLU()
        self.batchnorm_1 = torch.nn.BatchNorm2d(8)
        init.kaiming_normal_(self.conv_1.weight, a=0.1)
        self.conv_1.bias.data.zero_()
        conv_layers += [self.conv_1, self.relu_1, self.batchnorm_1]

        # CONV BLOCK 2
        self.conv_2 = torch.nn.Conv2d(8, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu_2 = torch.nn.ReLU()
        self.batchnorm_2 = torch.nn.BatchNorm2d(16)
        init.kaiming_normal_(self.conv_2.weight, a=0.1)
        self.conv_2.bias.data.zero_()
        conv_layers += [self.conv_2, self.relu_2, self.batchnorm_2]

        # CONV BLOCK 3
        self.conv_3 = torch.nn.Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu_3 = torch.nn.ReLU()
        self.batchnorm_3 = torch.nn.BatchNorm2d(32)
        init.kaiming_normal_(self.conv_3.weight, a=0.1)
        self.conv_3.bias.data.zero_()
        conv_layers += [self.conv_3, self.relu_3, self.batchnorm_3]

        # CONV BLOCK 4
        self.conv_4 = torch.nn.Conv2d(32, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
        self.relu_4 = torch.nn.ReLU()
        self.batchnorm_4 = torch.nn.BatchNorm2d(64)
        init.kaiming_normal_(self.conv_4.weight, a=0.1)
        self.conv_4.bias.data.zero_()
        conv_layers += [self.conv_4, self.relu_4, self.batchnorm_4]

        self.ap = torch.nn.AdaptiveAvgPool2d(output_size=1)
        self.lin = torch.nn.Linear(in_features=64, out_features=LABELS_COUNT)

        self.conv = torch.nn.Sequential(*conv_layers)

    def forward(self, x):
        x = self.conv(x)
        x = self.ap(x)
        x = x.view(x.shape[0], -1)
        x = self.lin(x)
        return x

In [23]:
model = AudioClassifier()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
next(model.parameters()).device

device(type='cuda', index=0)

In [24]:
def training(model, train_dl, num_epochs):
  criterion = nn.CrossEntropyLoss()
  optimizer = torch.optim.Adam(model.parameters(),lr=0.001)
  scheduler = torch.optim.lr_scheduler.OneCycleLR(optimizer, max_lr=0.001,
                                                steps_per_epoch=int(len(train_dl)),
                                                epochs=num_epochs,
                                                anneal_strategy='linear')

  for epoch in range(num_epochs):
    running_loss = 0.0
    correct_prediction = 0
    total_prediction = 0

    for i, data in enumerate(train_dl):
        inputs, labels = data[0].to(device), data[1].to(device)

        inputs_m, inputs_s = inputs.mean(), inputs.std()
        inputs = (inputs - inputs_m) / inputs_s

        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        scheduler.step()

        running_loss += loss.item()

        _, prediction = torch.max(outputs,1)
        correct_prediction += (prediction == labels).sum().item()
        total_prediction += prediction.shape[0]

    num_batches = len(train_dl)
    avg_loss = running_loss / num_batches
    acc = correct_prediction/total_prediction
    print(f'Epoch: {epoch}, Loss: {avg_loss:.2f}, Accuracy: {acc:.2f}')

  print('Finished Training')

In [25]:
NUM_EPOCHS = 1

In [26]:
training(model, train_dl, NUM_EPOCHS)

  label = torch.tensor(self.df.loc[index, "label"], dtype=torch.long)


In [None]:
def inference (model, val_dl):
  correct_prediction = 0
  total_prediction = 0

  with torch.no_grad():
    for data in val_dl:
      inputs, labels = data[0].to(device), data[1].to(device)

      inputs_m, inputs_s = inputs.mean(), inputs.std()
      inputs = (inputs - inputs_m) / inputs_s

      outputs = model(inputs)

      _, prediction = torch.max(outputs,1)
      correct_prediction += (prediction == labels).sum().item()
      total_prediction += prediction.shape[0]
    
  acc = correct_prediction/total_prediction
  print(f'Accuracy: {acc:.2f}, Total items: {total_prediction}')

In [None]:
inference(model, test_dl)