# Config

In [3]:
import os

In [4]:
##### file system
INPUT_DIR = "/kaggle/input/audio-mnist/data"
WORKING_DIR = "/kaggle/working"

##### dataset
SAMPLE_RATE = 48_000
MAX_INPUT_SIZE = 65_536

##### training
TRAIN_PERCENTAGE = 0.8
BATCH_SIZE = 64
DATA_PERCENTAGE = 0.5

In [5]:
if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)

# Explore Data

In [6]:
from tqdm import tqdm
from scipy.io import wavfile
import shutil
import matplotlib.pyplot as plt
import pandas as pd

## Prepare Data for PyTorch

I will be using PyTorch, to load a custom dataset it needs some fixing.

- I will create a csv file with information regarding all files

In [7]:
def create_csv_info():
    # initialize the dataset
    dataset = pd.DataFrame(columns=["file", "label", "speaker"])

    for speaker in tqdm(os.listdir(INPUT_DIR)):
        # check if "speaker" is a folder
        # "speaker" should be a folder with all the recordings from the given
        #   speaker, the name of the speaker is the name of the folder
        if not os.path.isdir(os.path.join(INPUT_DIR, speaker)):
            continue

        for file in os.listdir(os.path.join(INPUT_DIR, speaker)):
            label = file.split("_")[0]
            file_path = os.path.join(speaker, file)

            dataset = dataset.append({
                "file": file_path,
                "label": label,
                "speaker": speaker
            }, ignore_index=True)

    # write dataset as csv
    dataset.to_csv(os.path.join(WORKING_DIR, "dataset.csv"), index=False)

In [8]:
create_csv_info()

100%|██████████| 61/61 [01:17<00:00,  1.27s/it]


# Load Dataset

In [9]:
from torch.utils.data import Dataset, DataLoader
import torch
import numpy as np


In [10]:
class AudioSample():
    # this defines one audio sample
    def __init__(self, sample_rate, samples):
        self.sample_rate = sample_rate
        self.samples = samples

    def display(self):
        plt.plot(self.samples)
        plt.show()


class AudioDescription():
    # apart from the audio also has the label and the speaker
    def __init__(self, audio_sample, label, speaker):
        self.audio_sample = audio_sample
        self.label = label
        self.speaker = speaker

    def display(self):
        self.audio_sample.display()
        print("Label: ", self.label)
        print("Speaker: ", self.speaker)


class AudioMNIST(Dataset):

    def __init__(self, csv_file, root_dir, transform=None):
        csv_file_full_path = os.path.join(root_dir, csv_file)

        self.dataset = pd.read_csv(csv_file_full_path)
        self.root_dir = root_dir
        self.transform = transform

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):

        if torch.is_tensor(idx):
            idx = idx.tolist()

        file_path = os.path.join(self.root_dir, self.dataset.iloc[idx, 0])

        sample_rate, samples = wavfile.read(file_path)

        audio_description = AudioDescription(
            AudioSample(sample_rate, samples),
            self.dataset.iloc[idx, 1],
            self.dataset.iloc[idx, 2]
        )

        if self.transform:
            audio_description = self.transform(audio_description)

        return audio_description


## Transforms

The data in dataset does not have all the same size. So transformations are
required. Two transformations are possible. Padding and Cropping.

In [11]:
class Padding():
    def __init__(self, output_size=MAX_INPUT_SIZE, mode="edge"):
        # output size is an integer
        # for mode check np.pad https://numpy.org/doc/stable/reference/generated/numpy.pad.html
        self.output_size = output_size
        self.mode = mode

    def __call__(self, audio_description):
        # sample is an AudioDescription
        if len(audio_description.audio_sample.samples) > self.output_size:
            # throw error; cannot pad sample to a smaller size
            raise ValueError("Cannot pad sample to a smaller size")

        new_samples = np.pad(audio_description.audio_sample.samples, ((self.output_size - len(audio_description.audio_sample.samples) + 1) //
                                                                      2, (self.output_size - len(audio_description.audio_sample.samples)) // 2), mode=self.mode)

        return AudioDescription(
            AudioSample(audio_description.audio_sample.sample_rate, new_samples),
            audio_description.label,
            audio_description.speaker
        )


class RandomCrop():
    def __init__(self, output_size=int(MAX_INPUT_SIZE/2)):
        # output size is an integer
        self.output_size = output_size

    def __call__(self, audio_description):
        # audio_sample is an AudioDescription
        if len(audio_description.audio_sample.samples) <= self.output_size:
            # pad the sample
            audio_description = Padding(self.output_size)(audio_description)
        else:
            # get random start index
            start_index = np.random.randint(
                0, len(audio_description.audio_sample.samples) - self.output_size)

            audio_description = AudioDescription(AudioSample(
                audio_description.audio_sample.sample_rate, audio_description.audio_sample.samples[start_index:start_index + self.output_size]),
                audio_description.label,
                audio_description.speaker
            )

        return audio_description


## Putting it all together

In [12]:
dataset = AudioMNIST(WORKING_DIR + "/dataset.csv",
                     INPUT_DIR, transform=RandomCrop())

# split dataset into train and test
train_size = int(TRAIN_PERCENTAGE * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(
    dataset, [train_size, test_size])


def my_collate(batch):
    # batch is a list of AudioDescription
    samples = []
    labels = []
    speakers = []

    for audio_description in batch:
        samples.append(audio_description.audio_sample.samples)
        labels.append(audio_description.label)
        speakers.append(audio_description.speaker)

    # convert the list of audio samples to a tensor with shape [N, W]
    X = torch.tensor(samples)

    # add a new dimension to the tensor to get the desired shape [N, 1, W]
    X = X.unsqueeze(1)

    # convert the lists of labels and speakers to tensors
    y = torch.tensor(labels)
    z = torch.tensor(speakers)

    # return the modified tensors
    return X, y, z


dataloader_train = DataLoader(
    train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, collate_fn=my_collate)
dataloader_test = DataLoader(
    test_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=4, collate_fn=my_collate)


  cpuset_checked))


In [13]:
# show one sample with the loader
for sample, label, speaker in dataloader_train:
    print(f"Shape of X [N, C, W]: {sample.shape}")
    print(f"Shape of y: {label.shape} {label.dtype}")
    break
    



Shape of X [N, C, W]: torch.Size([64, 1, 32768])
Shape of y: torch.Size([64]) torch.int64


# Model

In [14]:
from torch import nn

In [15]:
# get CPU or GPU device for training
device = "cuda" if torch.cuda.is_available() else "mps" if torch.backends.mps.is_available() else "cpu"
print(f"Using {device} device")

Using cuda device


In [None]:
class AutoEncoder(nn.Module):
    def __init__(self):
        super().__init__()

        self.encoder = self._build_encoder()
        self.decoder = self._build_decoder()

    def forward(self, x):
        x = x.float()
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded

    def _build_encoder(self):
        # encoder with
        return nn.Sequential(
            # 32768 * 1
            nn.Conv1d(1, 32, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2, stride=2),

            nn.Conv1d(32, 64, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2, stride=2),

            nn.Conv1d(64, 128, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2, stride=2),

            nn.Conv1d(128, 256, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2, stride=2),

            nn.Conv1d(256, 512, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),
            nn.MaxPool1d(kernel_size=2, stride=2),
        )

    def _build_decoder(self):
        return nn.Sequential(
            nn.Upsample(scale_factor=2),
            nn.ConvTranspose1d(512, 256, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),

            nn.Upsample(scale_factor=2),
            nn.ConvTranspose1d(256, 128, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),

            nn.Upsample(scale_factor=2),
            nn.ConvTranspose1d(128, 64, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),

            nn.Upsample(scale_factor=2),
            nn.ConvTranspose1d(64, 32, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),

            nn.Upsample(scale_factor=2),
            nn.ConvTranspose1d(32, 1, kernel_size=9, stride=1, padding=4),
            nn.Tanh(),
        )


In [42]:
model = ConvClassifier().to(device)
print(model)

ConvClassifier(
  (conv1): Conv1d(1, 32, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool1): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 64, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool2): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(64, 128, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool3): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv1d(128, 256, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool4): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (conv5): Conv1d(256, 512, kernel_size=(9,), stride=(1,), padding=(1,))
  (pool5): MaxPool1d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  (gap): AdaptiveAvgPool1d(output_size=1)
  (linear1): Linear(in_features=512, out_features=256, bias=True)
  (linear2): Linear(in_features=256, out_features=128, bias=True)
  (linear3): Linear(i

# Training

In [43]:
loss_fn = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)


In [44]:
def train(data_loader, model, loss_fn, optimizer):
    model.train()

    for batch, (sample, label, _) in enumerate(data_loader):
        # normalize the audio wav sample
        sample = sample / 32768

        sample, label = sample.to(device), label.to(device)

        sample = sample.float()

        # compute prediction error
        pred = model(sample)
        loss = loss_fn(pred, sample)

        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if batch % 100 == 0:
            loss, current = loss.item(), batch * len(sample)
            print(
                f"loss: {loss:>7f}  [{current:>5d}/{len(data_loader.dataset):>5d}]")

            # show an example wave
            sample = sample.cpu().detach().numpy() * 32768
            plt.plot(sample[0][0])
            pred = pred.cpu().detach().numpy() * 32768
            plt.plot(pred[0][0])
            plt.show()

        del sample, label


def test(data_loader, model, loss_fn):
    model.eval()

    test_loss, average_mse = 0, 0
    showed = False

    with torch.no_grad():
        for sample, label, _ in data_loader:
            # normalize the audio wav sample
            sample = sample / 32768

            sample, label = sample.to(device), label.to(device)

            sample.float()

            pred = model(sample)
            test_loss += loss_fn(pred, sample).item()

            average_mse += torch.mean((pred - sample) ** 2)

            # show a random wave
            if not showed:
                showed = True
                random_index = random.randint(0, len(sample) - 1)
                sample = sample.cpu().detach().numpy() * 32768
                plt.plot(sample[random_index][0])

                pred = pred.cpu().detach().numpy() * 32768
                plt.plot(pred[random_index][0])
                plt.show()

    test_loss /= len(data_loader)
    average_mse /= len(data_loader)
    
    print(f"Test Error: \n Avg loss: {test_loss:>8f} \n Avg MSE: {average_mse:>8f} \n")


In [46]:
epochs = 20

for epoch in range(epochs):
    print(f"Epoch {epoch + 1}\n-------------------------------")
    
    train(dataloader_train, model, loss_fn, optimizer)
    test(dataloader_test, model, loss_fn)
    print()

print("Done!")

Epoch 1
-------------------------------




loss: 0.863717  [    0/24000]
loss: 1.023399  [ 6400/24000]
loss: 0.852181  [12800/24000]
loss: 0.833847  [19200/24000]




Test Error: 
 Accuracy: 68.3%, Avg loss: 0.920539 


Epoch 2
-------------------------------




loss: 0.869520  [    0/24000]
loss: 0.779230  [ 6400/24000]
loss: 0.755922  [12800/24000]
loss: 0.737351  [19200/24000]




Test Error: 
 Accuracy: 73.9%, Avg loss: 0.795193 


Epoch 3
-------------------------------




loss: 0.784265  [    0/24000]
loss: 0.877691  [ 6400/24000]
loss: 0.766195  [12800/24000]
loss: 1.108378  [19200/24000]




Test Error: 
 Accuracy: 73.8%, Avg loss: 0.746747 


Epoch 4
-------------------------------




loss: 0.681839  [    0/24000]
loss: 0.791667  [ 6400/24000]
loss: 0.673064  [12800/24000]
loss: 0.738979  [19200/24000]




Test Error: 
 Accuracy: 72.8%, Avg loss: 0.778386 


Epoch 5
-------------------------------




loss: 0.965189  [    0/24000]
loss: 0.760685  [ 6400/24000]
loss: 0.792296  [12800/24000]
loss: 0.556915  [19200/24000]




Test Error: 
 Accuracy: 69.0%, Avg loss: 0.835600 


Epoch 6
-------------------------------




loss: 0.852231  [    0/24000]
loss: 0.720118  [ 6400/24000]
loss: 1.009113  [12800/24000]
loss: 0.482032  [19200/24000]




Test Error: 
 Accuracy: 80.2%, Avg loss: 0.609328 


Epoch 7
-------------------------------




loss: 0.594993  [    0/24000]
loss: 0.603509  [ 6400/24000]
loss: 0.591290  [12800/24000]
loss: 1.093860  [19200/24000]




Test Error: 
 Accuracy: 79.4%, Avg loss: 0.615969 


Epoch 8
-------------------------------




loss: 0.553347  [    0/24000]
loss: 0.529821  [ 6400/24000]
loss: 0.561202  [12800/24000]
loss: 0.684717  [19200/24000]




Test Error: 
 Accuracy: 58.9%, Avg loss: 1.213683 


Epoch 9
-------------------------------




loss: 1.038671  [    0/24000]
loss: 0.534981  [ 6400/24000]
loss: 0.517799  [12800/24000]
loss: 0.548923  [19200/24000]




Test Error: 
 Accuracy: 83.1%, Avg loss: 0.527176 


Epoch 10
-------------------------------




loss: 0.659183  [    0/24000]
loss: 0.418399  [ 6400/24000]
loss: 0.446363  [12800/24000]
loss: 0.642023  [19200/24000]




Test Error: 
 Accuracy: 78.9%, Avg loss: 0.594058 


Epoch 11
-------------------------------




loss: 0.477265  [    0/24000]
loss: 0.435348  [ 6400/24000]
loss: 0.706144  [12800/24000]
loss: 0.476204  [19200/24000]




Test Error: 
 Accuracy: 84.4%, Avg loss: 0.487203 


Epoch 12
-------------------------------




loss: 0.640156  [    0/24000]
loss: 0.428016  [ 6400/24000]
loss: 0.516451  [12800/24000]
loss: 0.466518  [19200/24000]




Test Error: 
 Accuracy: 84.9%, Avg loss: 0.459604 


Epoch 13
-------------------------------




loss: 0.394085  [    0/24000]
loss: 0.377913  [ 6400/24000]
loss: 0.421350  [12800/24000]
loss: 0.651295  [19200/24000]




Test Error: 
 Accuracy: 83.8%, Avg loss: 0.499929 


Epoch 14
-------------------------------




loss: 0.474717  [    0/24000]
loss: 0.461639  [ 6400/24000]
loss: 0.338898  [12800/24000]
loss: 0.285339  [19200/24000]




Test Error: 
 Accuracy: 87.2%, Avg loss: 0.410141 


Epoch 15
-------------------------------




loss: 0.403765  [    0/24000]
loss: 0.345150  [ 6400/24000]
loss: 0.547921  [12800/24000]
loss: 0.649073  [19200/24000]




Test Error: 
 Accuracy: 86.4%, Avg loss: 0.436782 


Epoch 16
-------------------------------




loss: 0.295358  [    0/24000]
loss: 0.507749  [ 6400/24000]
loss: 0.310837  [12800/24000]
loss: 0.354660  [19200/24000]




Test Error: 
 Accuracy: 88.8%, Avg loss: 0.361507 


Epoch 17
-------------------------------




loss: 0.312427  [    0/24000]
loss: 0.419327  [ 6400/24000]
loss: 0.283072  [12800/24000]
loss: 0.403224  [19200/24000]




Test Error: 
 Accuracy: 89.8%, Avg loss: 0.346117 


Epoch 18
-------------------------------




loss: 0.327489  [    0/24000]
loss: 0.235393  [ 6400/24000]
loss: 0.252992  [12800/24000]
loss: 0.713882  [19200/24000]




Test Error: 
 Accuracy: 81.8%, Avg loss: 0.504370 


Epoch 19
-------------------------------




loss: 0.410751  [    0/24000]
loss: 0.379303  [ 6400/24000]
loss: 0.273859  [12800/24000]
loss: 0.230706  [19200/24000]




Test Error: 
 Accuracy: 86.8%, Avg loss: 0.383889 


Epoch 20
-------------------------------




loss: 0.521337  [    0/24000]
loss: 0.292680  [ 6400/24000]
loss: 0.350408  [12800/24000]
loss: 0.655213  [19200/24000]




Test Error: 
 Accuracy: 90.3%, Avg loss: 0.322910 


Done!
