In [1]:
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import Dataset
import torchaudio
import numpy as np
from tqdm import tqdm

In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device}")

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

Using cuda


In [3]:
epochs = 40
lr = 0.001
batch_size = 4

In [4]:
# Make all tensor in a batch the same length by padding with zeros

def pad_sequence(batch):
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)


def collate_fn(batch):
    tensors, targets = [], []

    for waveform, label in batch:
        tensors += [waveform]
        targets += torch.tensor([label])

    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    return tensors, targets

In [5]:
class AudioDataset(Dataset):
    def __init__(self, dir_path):
        self.dir_path = dir_path
        self.classes = os.listdir(self.dir_path)
        self.data_paths = []
        self.labels = []

        for root, dirs, files in os.walk(self.dir_path):
            for file in files:
                label = os.path.basename(root)
                data_path = os.path.join(root, file)
                self.data_paths.append(data_path)
                self.labels.append(self.classes.index(label))

        print("classes: ", self.classes)
        print(f"{len(self.labels)} datas loaded from {len(set(self.labels))} classes")

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, index):
        data_path = self.data_paths[index]
        label = self.labels[index]
        signal, sample_rate = torchaudio.load(data_path)
        signal_mono = torch.mean(signal, dim=0, keepdim=True)

        new_sample_rate = 8000
        transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
        signal_mono_transformed = transform(signal_mono)

        return signal_mono_transformed, label

In [6]:
dataset = AudioDataset("dataset")

train_size = int(len(dataset)*0.8)
test_size = len(dataset) - train_size

train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
train_data_loader = torch.utils.data.DataLoader(train_dataset, 
                                                batch_size=batch_size, 
                                                shuffle=True, 
                                                collate_fn=collate_fn,
                                                num_workers=num_workers, 
                                                pin_memory=pin_memory)

test_data_loader = torch.utils.data.DataLoader(test_dataset, 
                                                batch_size=batch_size, 
                                                shuffle=False,
                                                collate_fn=collate_fn,
                                                num_workers=num_workers, 
                                                pin_memory=pin_memory)

classes:  ['alireza', 'benyamin', 'maryam', 'mohammadali', 'morteza', 'nahid', 'parisa', 'sajjad', 'zahra', 'zeynab']
558 datas loaded from 10 classes


In [7]:
class M5(nn.Module):
    def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = torch.flatten(x, start_dim=1)
        x = self.fc1(x)
        x = F.softmax(x, dim=1)
        return x

    def accuracy(self, preds, labels):
        maxs, indices = torch.max(preds, 1)
        acc = torch.sum(indices == labels) / len(preds)
        return acc.cpu()

In [8]:
model = M5(n_output=10).to(device)
print(model)

# count_parameters
n = sum(p.numel() for p in model.parameters() if p.requires_grad)
print("Number of parameters: %s" % n)

M5(
  (conv1): Conv1d(1, 32, kernel_size=(80,), stride=(16,))
  (bn1): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool1): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv2): Conv1d(32, 32, kernel_size=(3,), stride=(1,))
  (bn2): BatchNorm1d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool2): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv3): Conv1d(32, 64, kernel_size=(3,), stride=(1,))
  (bn3): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool3): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (conv4): Conv1d(64, 64, kernel_size=(3,), stride=(1,))
  (bn4): BatchNorm1d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (pool4): MaxPool1d(kernel_size=4, stride=4, padding=0, dilation=1, ceil_mode=False)
  (fc1): Linear(in_features=64, out_features=10, bias=True)
)
Numbe

In [9]:
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_function = nn.CrossEntropyLoss()

In [None]:
# train
model.train()

for epoch in range(epochs):
    train_loss = 0.0
    train_acc = 0.0
    for audios, labels in tqdm(train_data_loader):
        audios, labels = audios.to(device), labels.to(device)
        labels_one_hot = torch.nn.functional.one_hot(labels, num_classes=10).type(torch.FloatTensor).to(device)

        preds = model(audios)
        loss = loss_function(preds, labels_one_hot)
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        train_loss += loss
        train_acc += model.accuracy(preds, labels)
    
    total_loss = train_loss / len(train_data_loader)
    total_acc = train_acc / len(train_data_loader)

    print(f"Epoch: {epoch}, Loss: {total_loss}, Acc: {total_acc}")

In [11]:
model.eval()

test_loss = 0.0
test_acc = 0.0
for audios, labels in tqdm(test_data_loader):
    audios, labels = audios.to(device), labels.to(device)
    labels_one_hot = torch.nn.functional.one_hot(labels, num_classes=10).type(torch.FloatTensor).to(device)

    preds = model(audios)
    loss = loss_function(preds, labels_one_hot)

    test_loss += loss
    test_acc += model.accuracy(preds, labels)

total_loss = test_loss / len(test_data_loader)
total_acc = test_acc / len(test_data_loader)

print(f"Loss: {total_loss}, Acc: {total_acc}")

100%|██████████| 28/28 [00:00<00:00, 128.96it/s]

Loss: 1.681485891342163, Acc: 0.7946428656578064





In [12]:
torch.save(model.state_dict(), "weights.pth")

In [12]:
# Inference

signal, sample_rate = torchaudio.load("input/test.ogg")

# preprocess
signal = torch.mean(signal, dim=0, keepdim=True)
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
signal = transform(signal)
signal = signal[:, 32000:40000]
signal = signal.unsqueeze(0).to(device)

# process
preds = model(signal)

# postprocess
preds = preds.cpu().detach().numpy()
output = np.argmax(preds)
print(output)

3
