## **Importing requirements**

In [2]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys

import IPython.display as ipd

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)


cpu


## **Dataset class**

In [3]:
import os
from torch.utils.data import Dataset

class AudioDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.classes = sorted(os.listdir(root_dir))

        self.audio_files = []
        self.labels = []

        # Iterate over each class directory
        for i, cls in enumerate(self.classes):
            class_dir = os.path.join(root_dir, cls)
            class_label = os.path.basename(class_dir)
            file_names = os.listdir(class_dir)

            # Load each audio file and assign label
            for file_name in file_names:
                file_path = os.path.join(class_dir, file_name)
                self.audio_files.append(file_path)
                self.labels.append(class_label)

    def __len__(self):
        return len(self.audio_files)

    def __getitem__(self, idx):
        file_path = self.audio_files[idx]
        label = self.labels[idx]

        waveform, sample_rate = torchaudio.load(file_path)

        # You can apply preprocessing to the waveform here if needed
        # For example: waveform = preprocess_audio(waveform)

        return waveform, sample_rate, label


## **Using google drive as a file explorer for google colab**

In [4]:
# Detect whether notebook runs in google colab
if "google.colab" in sys.modules:
    from google.colab import drive
    drive.mount('/content/drive')
    root_drive_path = '/content/drive/My Drive/12words'
else:
    root_drive_path = '12words'



In [5]:
# from google.colab import drive
# drive.mount('/content/drive')
# root_drive_path = '/content/drive/My Drive/12words'

# Define the root directory of your dataset
root_dir = root_drive_path

# Create an instance of AudioDataset
dataset = AudioDataset(root_dir)
display(dataset.classes)


['1', '11', '13', '15', '17', '19', '3', '4', '5', '7', '8', '9']

## **Spliting dataset to create Train_set and Test_set**

In [6]:
from torch.utils.data import Dataset, DataLoader, random_split

test_size = int(0.05 * len(dataset))  # 5% of the dataset for validation
train_size = len(dataset) - test_size

# Split the dataset into training and validation sets
train_set, test_set = random_split(dataset, [train_size, test_size])

print(f'train set size: {len(train_set)}')
print(f'test set size: {len(test_set)}')


train set size: 456
test set size: 24


## **Checking one data from train_set**

In [8]:
# train_set[0]
waveform_first, sample_rate, _ = train_set[0]
display(ipd.Audio(waveform_first.numpy(), rate=sample_rate))
display(waveform_first)
display(waveform_first.shape)

tensor([[0.0000e+00, 0.0000e+00, 0.0000e+00,  ..., 3.0951e-08, 1.0477e-08,
         4.5316e-09]])

torch.Size([1, 63360])

## **Transforms**

In [8]:
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
transformed = transform(waveform_first)

display(ipd.Audio(transformed.numpy(), rate=new_sample_rate))

## **Label to index and vice versa**

In [9]:
labels = sorted(list(set(datapoint[2] for datapoint in train_set)))
display(labels)

['1', '11', '13', '15', '17', '19', '3', '4', '5', '7', '8', '9']

In [10]:
def label_to_index(word):
    # Return the position of the word in labels
    return torch.tensor(labels.index(word))


def index_to_label(index):
    # Return the word corresponding to the index in labels
    # This is the inverse of label_to_index
    return labels[index]


## **Using Train_set and Test_set to create Train_loader and Test_loader**

In [11]:


def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros

    # item.t() = transpose of the item.
    # before:[length, features] --> after:[features, length]
    batch = [item.t() for item in batch]

    # output shape: [batch_size, max_length_dim, feature_dim]
    # batch_first --> batch dimension should be the first dimension.
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)

    # changing second and third dim with premute(0,2,1):
    # output shape: [batch_size, feature_dim, max_length_dim]
    # assuming that all waveform having same length of T(max) after padding:
    # output shape: [[0,1,...,31], [0 (1 channel)], [0,1,...,T(max)]]
    return batch.permute(0, 2, 1)    


# input: list of (audio, label)
# input: a batch comming from train/test_set
# =============================================================
# output: one tuple: (batched audios, batched labels)
# output: a batch for train/test_loader (ready to train/test)
def collate_fn(batch):
    # A data tuple has the form:
    # waveform, sample_rate, label

    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, _, label in batch:
        tensors += [waveform]
        targets += [label_to_index(label)]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    # tensors.shape --> [batch_size, feature_dim (channels), max_length_dim]
    # tensors are ready for learning. (1 'tensors' = 1 ready-to-train batch)
    return tensors, targets


batch_size = 32

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)
test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)

## **The CNN model**

In [12]:
class M5(nn.Module):
    def __init__(self, n_input=1, n_output=12, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)


# n_input = number of channels = 1
model = M5(n_input=transformed.shape[0], n_output=len(labels))
model.to(device)
print(model)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

n = count_parameters(model)
print("Number of parameters: %s" % n)

M5(
  (conv1): Conv1d(1, 32, kernel_size=(80,), stride=(16,))
  (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(1,))
  (bn2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(32, 64, kernel_size=(3,), stride=(1,))
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
  (bn4): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool4): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=64, out_features=12, bias=True)
)
Numbe

In [13]:
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  # reduce the learning after 20 epochs by a factor of 10

## **Train function**

In [14]:
def train(model, epoch, log_interval):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch.batch: {epoch}.{batch_idx} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # record loss
        losses.append(loss.item())

## **Test function**

In [15]:
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)


def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)


    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n")

## **Training the model and showing the test results**

In [16]:
log_interval = 5
n_epoch = 100

losses = []

# The transform needs to live on the same device as the model and the data.
transform = transform.to(device)

for epoch in range(1, n_epoch + 1):
    train(model, epoch, log_interval)
    test(model, epoch)
    scheduler.step()   



Test Epoch: 1	Accuracy: 0/24 (0%)


Test Epoch: 2	Accuracy: 3/24 (12%)


Test Epoch: 3	Accuracy: 6/24 (25%)


Test Epoch: 4	Accuracy: 8/24 (33%)


Test Epoch: 5	Accuracy: 6/24 (25%)


Test Epoch: 6	Accuracy: 4/24 (17%)


Test Epoch: 7	Accuracy: 9/24 (38%)


Test Epoch: 8	Accuracy: 11/24 (46%)


Test Epoch: 9	Accuracy: 9/24 (38%)


Test Epoch: 10	Accuracy: 7/24 (29%)


Test Epoch: 11	Accuracy: 13/24 (54%)


Test Epoch: 12	Accuracy: 8/24 (33%)


Test Epoch: 13	Accuracy: 11/24 (46%)


Test Epoch: 14	Accuracy: 14/24 (58%)


Test Epoch: 15	Accuracy: 9/24 (38%)


Test Epoch: 16	Accuracy: 14/24 (58%)


Test Epoch: 17	Accuracy: 10/24 (42%)


Test Epoch: 18	Accuracy: 10/24 (42%)


Test Epoch: 19	Accuracy: 16/24 (67%)


Test Epoch: 20	Accuracy: 13/24 (54%)


Test Epoch: 21	Accuracy: 19/24 (79%)


Test Epoch: 22	Accuracy: 20/24 (83%)


Test Epoch: 23	Accuracy: 20/24 (83%)


Test Epoch: 24	Accuracy: 20/24 (83%)


Test Epoch: 25	Accuracy: 20/24 (83%)


Test Epoch: 26	Accuracy: 21/24 (88%)


Test E

## **Saving the model**

In [17]:
MODEL_STATE_PATH = 'model_state.pt'
torch.save(model.state_dict(), MODEL_STATE_PATH)

MODEL_PATH = 'model.pt'
torch.save(model, MODEL_PATH)

## **Prediction using trained model**

In [18]:
def predict(tensor):
    # Use the model to predict the label of the waveform
    tensor = tensor.to(device)
    tensor = transform(tensor)
    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor


waveform, sample_rate, label = train_set[-1]
ipd.Audio(waveform.numpy(), rate=sample_rate)

print(f"Expected: {label}. Predicted: {predict(waveform)}.")

Expected: 9. Predicted: 9.


## **Finding an example that isn't classified correctly**

In [19]:
for i, (waveform, sample_rate, utterance, *_) in enumerate(test_set):
    output = predict(waveform)
    if output != utterance:
        display(ipd.Audio(waveform.numpy(), rate=sample_rate))
        print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")
        break
else:
    print("All examples in this dataset were correctly classified!")
    print("In this case, let's just look at the last data point")
    display(ipd.Audio(waveform.numpy(), rate=sample_rate))
    print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")

Data point #0. Expected: 3. Predicted: 13.
