<a href="https://colab.research.google.com/github/AliHassan-019/accent-detection/blob/main/notebook3328d452ea.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# IMPORTANT: RUN THIS CELL IN ORDER TO IMPORT YOUR KAGGLE DATA SOURCES,
# THEN FEEL FREE TO DELETE THIS CELL.
# NOTE: THIS NOTEBOOK ENVIRONMENT DIFFERS FROM KAGGLE'S PYTHON
# ENVIRONMENT SO THERE MAY BE MISSING LIBRARIES USED BY YOUR
# NOTEBOOK.
import kagglehub
mozillaorg_common_voice_path = kagglehub.dataset_download('mozillaorg/common-voice')
edolele_speech_commends_path = kagglehub.dataset_download('edolele/speech-commends')

print('Data source import complete.')


# Importing libraries, loading and transforming data

In [None]:
!pip install -q evaluate transformers==4.28.1
!pip install -U -q datasets
!pip install -q torchaudio==0.12.0+cu113 -f https://download.pytorch.org/whl/cu113/torch_stable.html
!add-apt-repository -y ppa:savoury1/ffmpeg4
!apt-get -qq install -y ffmpeg
!pip install -q mlflow

# Torch audio tutorial

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys

import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm


In [None]:
from torchaudio.datasets import SPEECHCOMMANDS
import os


class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__(root = "/kaggle/input/", url = '.', download=False, folder_in_archive='speech-commends/')

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:

                return [os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]


# Create training and testing split of the data. We do not use validation in this tutorial.
train_set = SubsetSC("training")
test_set = SubsetSC("testing")

waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]


In [None]:
train_set._path

In [None]:
print("Shape of waveform: {}".format(waveform.size()))
print("Sample rate of waveform: {}".format(sample_rate))

plt.plot(waveform.t().numpy());

In [None]:
labels = ['no',
 'two',
 'backward',
 'four',
 'five',
 'nine',
 'right',
 'follow',
 'visual',
 'off',
 'yes',
 'six',
 'dog',
 'learn',
 'left',
 'bird',
 'forward',
 'wow',
 'zero',
 'eight',
 'bed',
 'go',
 'house',
 'tree',
 'seven',
 'on',
 'three',
 'one',
 'down',
 'stop',
 'up',
 'happy',
 'marvin',
 'cat',
 'sheila']

In [None]:
waveform_first, *_ = train_set[0]
ipd.Audio(waveform_first.numpy(), rate=sample_rate)



In [None]:
waveform_second, *_ = train_set[-1]
ipd.Audio(waveform_second.numpy(), rate=sample_rate)


In [None]:
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)
transformed = transform(waveform_second)

ipd.Audio(transformed.numpy(), rate=new_sample_rate)


In [None]:
def label_to_index(word):
    # Return the position of the word in labels
    return torch.tensor(labels.index(word))


def index_to_label(index):
    # Return the word corresponding to the index in labels
    # This is the inverse of label_to_index
    return labels[index]


word_start = "yes"
index = label_to_index(word_start)
word_recovered = index_to_label(index)

print(word_start, "-->", index, "-->", word_recovered)


为了将由录音和话语组成的数据点列表转换为模型的两个批处理张量，我们实现了PyTorch DataLoader使用的collate函数，该函数允许我们分批遍历数据集。有关使用校对函数的更多信息，请参阅文档。



在整理函数中，我们还应用了重采样和文本编码。



In [None]:
def pad_sequence(batch):
    # Make all tensor in a batch the same length by padding with zeros
    batch = [item.t() for item in batch]
    batch = torch.nn.utils.rnn.pad_sequence(batch, batch_first=True, padding_value=0.)
    return batch.permute(0, 2, 1)


def collate_fn(batch):

    # A data tuple has the form:
    # waveform, sample_rate, label, speaker_id, utterance_number

    tensors, targets = [], []

    # Gather in lists, and encode labels as indices
    for waveform, _, label, *_ in batch:
        tensors += [waveform]
        targets += [label_to_index(label)]

    # Group the list of tensors into a batched tensor
    tensors = pad_sequence(tensors)
    targets = torch.stack(targets)

    return tensors, targets


batch_size = 256

device = 'cuda'

if device == "cuda":
    num_workers = 1
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False

train_loader = torch.utils.data.DataLoader(
    train_set,
    batch_size=batch_size,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)
test_loader = torch.utils.data.DataLoader(
    test_set,
    batch_size=batch_size,
    shuffle=False,
    drop_last=False,
    collate_fn=collate_fn,
    num_workers=num_workers,
    pin_memory=pin_memory,
)


为了将由录音和话语组成的数据点列表转换为模型的两个批处理张量，我们实现了PyTorch DataLoader使用的collate函数，该函数允许我们分批遍历数据集。在本教程中，我们将使用卷积神经网络来处理原始音频数据。通常对音频数据进行更高级的变换，但cnn可以用来准确地处理原始数据。具体的体系结构是在本文描述的M5网络体系结构的基础上建模的。处理原始音频数据的模型的一个重要方面是其第一层滤波器的接受域。我们模型的第一个滤波器的长度是80，所以当处理以8kHz采样的音频时，接收场大约是10ms(而在4kHz时，大约是20ms)。这个大小类似于语音处理应用程序，通常使用20ms到40ms的接受域。

In [None]:
class M5(nn.Module):
    def __init__(self, n_input=1, n_output=35, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=80, stride=stride)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(4)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(4)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(4)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(4)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=2)


model = M5(n_input=transformed.shape[0], n_output=len(labels))
model.to(device)
print(model)


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


n = count_parameters(model)
print("Number of parameters: %s" % n)


In [None]:
optimizer = optim.Adam(model.parameters(), lr=0.01, weight_decay=0.0001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=20, gamma=0.1)  # reduce the learning after 20 epochs by a factor of 10

In [None]:
def train(model, epoch, log_interval):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        # negative log-likelihood for a tensor of size (batch x 1 x n_output)
        loss = F.nll_loss(output.squeeze(), target)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # print training stats
        if batch_idx % log_interval == 0:
            print(f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} ({100. * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}")

        # update progress bar
        pbar.update(pbar_update)
        # record loss
        losses.append(loss.item())

In [None]:
def number_of_correct(pred, target):
    # count number of correct predictions
    return pred.squeeze().eq(target).sum().item()


def get_likely_index(tensor):
    # find most likely label index for each element in the batch
    return tensor.argmax(dim=-1)


def test(model, epoch):
    model.eval()
    correct = 0
    for data, target in test_loader:

        data = data.to(device)
        target = target.to(device)

        # apply transform and model on whole batch directly on device
        data = transform(data)
        output = model(data)

        pred = get_likely_index(output)
        correct += number_of_correct(pred, target)

        # update progress bar
        pbar.update(pbar_update)

    print(f"\nTest Epoch: {epoch}\tAccuracy: {correct}/{len(test_loader.dataset)} ({100. * correct / len(test_loader.dataset):.0f}%)\n")

In [None]:
log_interval = 20
n_epoch = 2

pbar_update = 1 / (len(train_loader) + len(test_loader))
losses = []

# The transform needs to live on the same device as the model and the data.
transform = transform.to(device)
with tqdm(total=n_epoch) as pbar:
    for epoch in range(1, n_epoch + 1):
        train(model, epoch, log_interval)
        test(model, epoch)
        scheduler.step()

# Let's plot the training loss versus the number of iteration.
plt.plot(losses);
plt.title("training loss");


In [None]:
def predict(tensor):
    # Use the model to predict the label of the waveform
    tensor = tensor.to(device)
    tensor = transform(tensor)
    tensor = model(tensor.unsqueeze(0))
    tensor = get_likely_index(tensor)
    tensor = index_to_label(tensor.squeeze())
    return tensor


waveform, sample_rate, utterance, *_ = train_set[-1]
ipd.Audio(waveform.numpy(), rate=sample_rate)

print(f"Expected: {utterance}. Predicted: {predict(waveform)}.")

In [None]:
for i, (waveform, sample_rate, utterance, *_) in enumerate(test_set):
    output = predict(waveform)
    if output != utterance:
        ipd.Audio(waveform.numpy(), rate=sample_rate)
        print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")
        break
else:
    print("All examples in this dataset were correctly classified!")
    print("In this case, let's just look at the last data point")
    ipd.Audio(waveform.numpy(), rate=sample_rate)
    print(f"Data point #{i}. Expected: {utterance}. Predicted: {output}.")

# Accent recognition

In [None]:
#imports
import pandas as pd
import gc
import re
import numpy as np

import warnings
warnings.filterwarnings("ignore")

from tqdm import tqdm
tqdm.pandas()
from imblearn.over_sampling import RandomOverSampler
from imblearn.under_sampling import RandomUnderSampler
import torch
import torchaudio
import datasets
import transformers
print(transformers.__version__)

In [None]:
# use the highest quality data sample for training and validation (leave the test set apart)
dd = pd.read_csv("/kaggle/input/common-voice/cv-valid-train.csv").drop_duplicates()
dd = dd[~dd['accent'].isnull()]
print(dd.shape)
dd.sample(5)

In [None]:
dd['accent'].value_counts()

In [None]:
from collections import Counter
labels = [lang for lang, _ in Counter(dd['accent']).most_common(5)]
labels

In [None]:
RATE_HZ = 16000 # resampling rate in Hz
MAX_LENGTH = 40000 # maximum audio interval length to consider (= RATE_HZ * SECONDS)
label2id, id2label = dict(), dict()
for i, label in enumerate(labels):
    label2id[label] = i
    id2label[i] = label

print(id2label, '\n\n', label2id)

# Load and preprocess data

In [None]:
dd = dd[dd['accent'].isin(labels)]
dd['label'] = dd['accent'].apply(lambda x: label2id[x])
dd = dd[['filename', 'label']]
print(dd.shape)
dd.sample(5)

In [None]:
# random undersampling of all but minority class
rus = RandomUnderSampler(random_state=83, sampling_strategy='not minority')
y = dd[['label']]
dd = dd.drop(['label'], axis=1)
dd, y_resampled = rus.fit_resample(dd, y)
del y
dd['label'] = y_resampled
del y_resampled
gc.collect()

In [None]:
dd['label'].value_counts()

In [None]:
audio,rate = torchaudio.load("/kaggle/input/common-voice/cv-valid-train/"+dd['filename'].iloc[0])

In [None]:
transform = torchaudio.transforms.Resample(rate,RATE_HZ)

In [None]:
audio = transform(audio).squeeze(0).numpy()

In [None]:
def get_transform_audio(file):
    audio,rate = torchaudio.load("/kaggle/input/common-voice/cv-valid-train/"+str(file))
    transform = torchaudio.transforms.Resample(rate,RATE_HZ)
    audio = transform(audio).squeeze(0).numpy()
    audio = audio[:MAX_LENGTH] # truncate to first part of audio to save RAM
    return audio
dd['audio'] = dd['filename'].progress_apply(get_transform_audio)
gc.collect()

In [None]:
%%time
dd = dd.drop(['filename'], axis=1)
gc.collect()

In [None]:
dd.sample(5)

In [None]:
from datasets import Dataset
dd = Dataset.from_pandas(dd)
gc.collect()

In [None]:
from collections import Counter
Counter(dd['label']).items()

In [None]:
dd = dd.train_test_split(test_size=0.25)
gc.collect()
dd

# Load facebook/wav2vec2-base-960h model

In [None]:
from transformers import AutoFeatureExtractor, AutoModelForAudioClassification

model_str = "facebook/wav2vec2-base-960h"
feature_extractor=AutoFeatureExtractor.from_pretrained(model_str)
model=AutoModelForAudioClassification.from_pretrained(model_str,num_labels=len(labels))
model.config.id2label = id2label
# number of trainable parameters
print(model.num_parameters(only_trainable=True)/1e6)

In [None]:
def preprocess_function(batch):
    inputs = feature_extractor(batch['audio'], sampling_rate=RATE_HZ, max_length=MAX_LENGTH, truncation=True)
    inputs['input_values'] = inputs['input_values'][0]
    return inputs

dd['train'] = dd['train'].map(preprocess_function, remove_columns="audio", batched=False)
gc.collect()
dd['test'] = dd['test'].map(preprocess_function, remove_columns="audio", batched=False)
gc.collect()

In [None]:
import evaluate

accuracy = evaluate.load("accuracy")

from sklearn.metrics import roc_auc_score
def compute_metrics(eval_pred):
    # Compute the ROC AUC score
    predictions = eval_pred.predictions
    predictions = np.exp(predictions)/np.exp(predictions).sum(axis=1, keepdims=True)
    label_ids = eval_pred.label_ids
    roc_auc = roc_auc_score(label_ids, predictions, average='macro', multi_class='ovr')

    # Calculate accuracy using the loaded accuracy metric
    acc_score = accuracy.compute(predictions=predictions.argmax(axis=1), references=label_ids)['accuracy']

    return {
        "roc_auc": roc_auc,
        "accuracy": acc_score
    }

# Training and validation

In [None]:
from transformers import TrainingArguments, Trainer
batch_size=8
warmup_steps=50
weight_decay=0.02
num_train_epochs=10
model_name = "english_accents_classification"
training_args = TrainingArguments(
    output_dir=model_name,
    logging_dir='./logs',
    num_train_epochs=num_train_epochs,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    learning_rate=1e-5, # 3e-5
    logging_strategy='steps',
    logging_first_step=True,
    load_best_model_at_end=True,
    logging_steps=1,
    evaluation_strategy='epoch',
    warmup_steps=warmup_steps,
    weight_decay=weight_decay,
    eval_steps=1,
    gradient_accumulation_steps=1,
    gradient_checkpointing=True,
    save_strategy='epoch',
    save_total_limit=1, # save fewer checkpoints to limit used space
    report_to="mlflow",  # log to mlflow
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dd["train"],
    eval_dataset=dd["test"],
    tokenizer=feature_extractor,
    compute_metrics=compute_metrics,
)

In [None]:
trainer.evaluate()

In [None]:
trainer.train()

In [None]:
trainer.evaluate()

In [None]:
trainer.save_model()

In [None]:
from transformers import pipeline

pipe=pipeline('audio-classification',model=model_name,device=0)

In [None]:
# us example
audio,rate=torchaudio.load('/kaggle/input/common-voice/cv-valid-test/cv-valid-test/sample-000003.mp3')
transform=torchaudio.transforms.Resample(rate,RATE_HZ)
audio=transform(audio).numpy().reshape(-1)
# make a classification pipeline
pipe(audio)

In [None]:
from IPython.display import Audio
Audio(audio,rate=RATE_HZ)

In [None]:
# england example
audio,rate=torchaudio.load('/kaggle/input/common-voice/cv-valid-test/cv-valid-test/sample-000008.mp3')
transform=torchaudio.transforms.Resample(rate,RATE_HZ)
audio=transform(audio).numpy().reshape(-1)
# make a classification pipeline
pipe(audio)

In [None]:
from IPython.display import Audio
Audio(audio,rate=RATE_HZ)

In [None]:
# indian example
audio,rate=torchaudio.load('/kaggle/input/common-voice/cv-valid-test/cv-valid-test/sample-000033.mp3')
transform=torchaudio.transforms.Resample(rate,RATE_HZ)
audio=transform(audio).numpy().reshape(-1)
# make a classification pipeline
pipe(audio)

In [None]:
from IPython.display import Audio
Audio(audio,rate=RATE_HZ)

In [None]:
# australia example
audio,rate=torchaudio.load('/kaggle/input/common-voice/cv-valid-test/cv-valid-test/sample-000065.mp3')
transform=torchaudio.transforms.Resample(rate,RATE_HZ)
audio=transform(audio).numpy().reshape(-1)
# make a classification pipeline
pipe(audio)

In [None]:
from IPython.display import Audio
Audio(audio,rate=RATE_HZ)

In [None]:
# canada example
audio,rate=torchaudio.load('/kaggle/input/common-voice/cv-valid-test/cv-valid-test/sample-000037.mp3')
transform=torchaudio.transforms.Resample(rate,RATE_HZ)
audio=transform(audio).numpy().reshape(-1)
# make a classification pipeline
pipe(audio)

In [None]:
from IPython.display import Audio
Audio(audio,rate=RATE_HZ)