# Objective
In this notebook I am going to try and build a tool that can help me identify the flaws of my accent.
To do this, I'll need some labelled data: (voice recording and accent).

On HuggingFace I found this dataset https://huggingface.co/datasets/westbrook/English_Accent_DataSet which seems to be exactly what we needed!

Now let's see if we can classify some clips!

In [11]:
import torch as t
import torch.nn as nn
import torch.nn.functional as F

import pandas as pd
import numpy as np
import random

device = 'cuda' if t.cuda.is_available() else 'cpu'
print(f"using device: {device}")

# Reproducibility -> Also ensures same train / validation / test split every time
t.manual_seed(42)
random.seed(42)
np.random.seed(42)

t.cuda.empty_cache()

using device: cuda


In [12]:
from datasets import load_dataset
from encodec import EncodecModel
from encodec.utils import convert_audio
from torch.utils.data import Dataset
from sklearn.preprocessing import LabelEncoder
from IPython.display import Audio

class EnglishAccentDataset(Dataset):

    # List of accents, DO NOT CHANGE THE ORDER (taken from huggingface)
    accents = ['Dutch', 'German', 'Polish', 'French', 'Hungarian', 'Finnish', 'Romanian', 'Slovak', 'Spanish', 'Italian', 'Estonian', 'Lithuanian', 'Croatian', 'Slovene', 'English', 'Scottish', 'Irish', 'NorthernIrish', 'Indian', 'Vietnamese', 'Canadian', 'American']

    encodec = EncodecModel.encodec_model_24khz()
    encodec.eval()
    encodec.set_target_bandwidth(6.0)
    # Frozen Encodec Model
    for parameter in encodec.parameters():
        parameter.requires_grad_(False)

    def __init__(self, split = None):
        super().__init__()
        if split:
            assert split in ['train', 'validation', 'test']
        self.hf_dataset = load_dataset("westbrook/English_Accent_DataSet", split=split).with_format('torch')

    def get_accent_from_label(label: int):
        return EnglishAccentDataset.accents[label - 1]

    def get_label_from_accent(accent: str):
        return EnglishAccentDataset.accents.index(accent) + 1

    def __len__(self):
        return len(self.hf_dataset)
    
    def __getitem__(self, index):
        with t.no_grad():
            wav, sr = self.hf_dataset[index]['audio']['array'], self.hf_dataset[index]['audio']['sampling_rate'].item()
            target = self.hf_dataset[index]['accent'].item()
            wav = wav.unsqueeze(0).unsqueeze(0)
            wav = convert_audio(wav, sr, EnglishAccentDataset.encodec.sample_rate, EnglishAccentDataset.encodec.channels)
            frames = EnglishAccentDataset.encodec.encode(wav)
            codes = frames[0][0]
        return codes.squeeze(), target

    def decode_sequence(sequence):
        wav = EnglishAccentDataset.encodec.decode([(sequence, None)])
        wav = wav.squeeze().numpy()
        return Audio(wav, rate=EnglishAccentDataset.encodec.sample_rate)

train_dataset = EnglishAccentDataset('train')
valid_dataset = EnglishAccentDataset('validation')
test_dataset  = EnglishAccentDataset('test')

codes, label = test_dataset[0]
print(codes)
print(codes.shape)
print(t.max(codes))

  WeightNorm.apply(module, name, dim)
Repo card metadata block was not found. Setting CardData to empty.


Resolving data files:   0%|          | 0/28 [00:00<?, ?it/s]

Loading dataset shards:   0%|          | 0/18 [00:00<?, ?it/s]

Repo card metadata block was not found. Setting CardData to empty.


Resolving data files:   0%|          | 0/28 [00:00<?, ?it/s]

Repo card metadata block was not found. Setting CardData to empty.


Resolving data files:   0%|          | 0/28 [00:00<?, ?it/s]

tensor([[491, 837, 613,  ..., 408, 408, 408],
        [199, 722,  46,  ..., 518, 518, 913],
        [732, 908, 369,  ...,  36, 937, 937],
        ...,
        [ 64, 568, 455,  ..., 939, 435, 939],
        [356, 874, 725,  ..., 853, 570, 570],
        [334, 969, 145,  ..., 899, 948, 948]])
torch.Size([8, 933])
tensor(1023)


In [13]:
from torch.utils.data import DataLoader

# TODO: add padding and masking to allow batch sizes > 1
num_workers = 5
train_dataloader = DataLoader(train_dataset, batch_size=1, shuffle=True, num_workers=num_workers)
valid_dataloader = DataLoader(valid_dataset, batch_size=1, shuffle=True, num_workers=num_workers)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=True, num_workers=num_workers)

In [14]:
from IPython.display import display

for i in range(3):
    codes, target_label = next(iter(train_dataloader))
    print(target_label)
    print(EnglishAccentDataset.get_accent_from_label(target_label))
    print(codes.shape)
    display(EnglishAccentDataset.decode_sequence(codes))


tensor([15])
English
torch.Size([1, 8, 135])


tensor([1])
Dutch
torch.Size([1, 8, 356])


tensor([15])
English
torch.Size([1, 8, 321])


# The model

For the model, we opt for a transformer architecture.

For this particular case, we only need to use the encoder: basically like the **BERT** architecture.
Then we will get the embeddings of the final time step and feed them to a classifier that is going to predict the classes.

![transformer Architecture](./images/transformer.png "Transformer Architecture")

In [20]:

class AccentRecogniser(nn.Module):
    def __init__(self, input_dim, num_classes, num_heads=16, num_layers=12, ff_dim=512, dropout=0.2, device = 'cpu'):
        super().__init__()

        self.device = device
        self.input_dim = input_dim
        self.embedders = nn.ModuleList([nn.Embedding(1024, input_dim) for _ in range(8)]).to(device)
        
        # Transformer Encoder Layer
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=input_dim, nhead=num_heads, dim_feedforward=ff_dim, dropout=dropout, batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(
            self.encoder_layer, num_layers=num_layers
        ).to(device)
        
        # Classifier head (fully connected layers)
        self.fc = nn.Sequential(
            nn.Linear(input_dim, input_dim // 2),
            nn.ReLU(),
            nn.Linear(input_dim // 2, num_classes)
        ).to(device)

    def forward(self, x):
        # Note: For now the transformer has "infinite" window size because we are passing it the whole sequences.
        x = x.to(self.device)

        B, K, T = x.shape
        y = t.zeros([B, K, T, self.input_dim], device=self.device)
        for i in range(len(self.embedders)):
            y[:, i] = self.embedders[i](x[:, i])
        x = y # [B, K, T, input_dim]

        # Remove Codebook Dimension
        x = t.sum(x, dim=1) # [B, T, input_dim]

        x = self.transformer_encoder(x) # [B, T, input_dim]

        # Selecting the last element means we have the embedding that corresponds to the whole time series.
        x = x[:, -1, :] # [B, input_dim]

        x = self.fc(x) # [B, num_classes]

        return x
    
    def size(self):
        def human_format(num):
            magnitude = 0
            while abs(num) >= 1000:
                magnitude += 1
                num /= 1000.0
            # add more suffixes if you need them
            return "%.2f%s" % (num, ["", "K", "M", "G", "T", "P"][magnitude])

        return human_format(sum(p.numel() for p in self.parameters()))

model = AccentRecogniser(1024, num_classes=len(EnglishAccentDataset.accents), device = device)
print(model.size())

77.21M


In [21]:
def run_step(model, loss_fn, input, target):
    x = model(input)

    target = target.to(device)
    loss = loss_fn(x, target)
    return loss, x

In [25]:
# TODO Add weight for the dataset Imbalance problem

loss_fn = t.nn.CrossEntropyLoss()
learning_rate = 0.000001
optimizer = t.optim.Adam(model.parameters(), learning_rate)
codes, targets = next(iter(valid_dataloader))

loss, predictions = run_step(model, loss_fn, codes, targets)
print(f'Example distribution on example 0: {F.softmax(predictions[0], dim = -1)}')
print(f'Actual target for value 0: {targets[0]}')
optimizer.zero_grad()
loss.backward()
grad_norm = 0.0
for param in model.parameters():
    if param.grad is not None:
        grad_norm += param.grad.norm(2).item() ** 2  # L2 norm for each parameter
grad_norm = grad_norm ** 0.5  # Square root to get the L2 norm

print("Gradient Norm:", grad_norm)
loss

Example distribution on example 0: tensor([0.0401, 0.0321, 0.0498, 0.0554, 0.0282, 0.0348, 0.0501, 0.0556, 0.0600,
        0.0528, 0.0336, 0.0393, 0.0451, 0.0509, 0.0484, 0.0333, 0.0398, 0.0491,
        0.0579, 0.0433, 0.0475, 0.0530], device='cuda:0',
       grad_fn=<SoftmaxBackward0>)
Actual target for value 0: 1
Gradient Norm: 20.713073239002284


tensor(3.4387, device='cuda:0', grad_fn=<NllLossBackward0>)