In [50]:
import numpy as np
import os
import  torch
import torchaudio
from torch.utils.data import DataLoader, Dataset, random_split
from torchaudio import datasets
import matplotlib.pyplot as plt
from torch import nn
import torch.optim as optim
from torchvision.transforms import ToTensor
import tensorflow as tf
from sklearn.preprocessing import LabelEncoder
import librosa
from torchaudio.datasets import SPEECHCOMMANDS
from tqdm import tqdm
import torch.nn.functional as F

In [51]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [52]:
torch.cuda.empty_cache()
import gc
gc.collect()
torch.cuda.reset_max_memory_allocated()
torch.cuda.reset_max_memory_cached()




In [53]:
class Block(nn.Module):
    expansion = 1

    def __init__(self, in_channels, out_channels, i_downsample=None, stride=1):
        super(Block, self).__init__()

        self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size=3, padding=1, stride=stride, bias=False)
        self.batch_norm1 = nn.BatchNorm1d(out_channels)
        self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size=3, padding=1, stride=1, bias=False)
        self.batch_norm2 = nn.BatchNorm1d(out_channels)

        self.i_downsample = i_downsample
        self.stride = stride
        self.relu = nn.ReLU()

    def forward(self, x):
        residual = x
        out = self.conv1(x)
        out = self.batch_norm1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.batch_norm2(out)
        if self.i_downsample:
            residual = self.i_downsample(x)
        out += residual
        out = self.relu(out)
        return out

class ResNet_audio(nn.Module):
    def __init__(self, in_channels=1, num_classes=35): 
        super(ResNet_audio, self).__init__()
        self.in_channels = in_channels
        self.num_classes = num_classes
        self.conv1 = nn.Conv1d(self.in_channels, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.batch_norm1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU()

        self.blocks = nn.Sequential(*[Block(64, 64) for _ in range(18)])
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(64, self.num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.batch_norm1(x)
        x = self.relu(x)
        x = self.blocks(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = x.view(x.size(0), -1)  
        x = self.fc(x)

        return x


In [54]:
class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]


# Create training and testing split of the data
train_set = SubsetSC("training")
test_set = SubsetSC("testing")

waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]

In [55]:
with open("labels.txt", "r") as f:
  labels = [line.strip() for line in f]

In [56]:
#Pre-processing on audio files
new_sr = 8000
transform = torchaudio.transforms.Resample(orig_freq = sample_rate, new_freq= new_sr)
tranformed = transform(waveform)


In [57]:
#encoding each word using its index in the list of labels.
def label_to_index(word):
    # Return the position of the word in labels
    return torch.tensor(labels.index(word))


def index_to_label(index):
    # Return the word corresponding to the index in labels
    # This is the inverse of label_to_index
    return labels[index]

def pad_sequence(batch):
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)

def custom_collate_fn(batch):
    tensors , targets = [], []
    for waveform, _, label, _, *_ in batch:
        tensors+=[waveform]
        targets += [label_to_index(label)]
        
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)
    return tensors, targets

In [58]:
batch_size = 64

#Creatting data loaders 
train_loader = DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=custom_collate_fn,
)
test_loader = DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=custom_collate_fn,
)

In [59]:
model =  ResNet_audio().to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)
log_interval = 20
n_epoch = 2
pbar_update = 1 / (len(train_loader) + len(test_loader))

def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

pbar = tqdm(total=len(train_loader), desc=f"Train Epoch: {n_epoch}")
losses = []
n = count_parameters(model)
print("Number of parameters: %s" % n)

Train Epoch: 2:   0%|          | 0/1326 [00:00<?, ?it/s]

Number of parameters: 449827


In [60]:
def train(model, epoch, log_interval):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # update progress bar
        pbar.update(pbar_update)
        # record loss
        losses.append(loss.item())

def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)


def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        # update progress bar
        pbar.update(pbar_update)

    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n")

In [61]:

transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        test(model, epoch)
    

Train Epoch: 2:   0%|          | 0/1326 [00:00<?, ?it/s]




























































































































































































 31%|███       | 0.6228304405874554/2 [23:54<52:52, 2303.71s/it]  


KeyboardInterrupt: 