# Install Necessary Libraries [Check README > System Requirements]

### Run this command only if you don't want to use "Virtual Environment" and want to use your own "Local System"

In [None]:
# %pip install pandas torch torchaudio librosa transformers==4.41.2 ipywidgets datasets --quiet

In [None]:
import sys
print(sys.executable)

In [None]:
import transformers
print(transformers.__version__)

In [None]:
import torch
print(torch.version.cuda)
print(torch.cuda.is_available())

# Importing Libraries and Modules

In [None]:
import os
import torch
import torchaudio
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
from transformers import (
    Wav2Vec2Processor, 
    Wav2Vec2Model, 
    Trainer, 
    TrainingArguments
)

import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

# Data Preprocessing

In [None]:
# Get the current working directory (where the Jupyter notebook is located)
project_folder = os.path.abspath(os.getcwd())
print("Project Folder : ", project_folder)

# Construct the relative path to the validated.tsv file
tsv_file_path = os.path.join(project_folder, "Data", "validated.tsv")

print("TSV Data Path : ", tsv_file_path)

# Load TSV files
validated_df = pd.read_csv(tsv_file_path, sep='\t')

# Process audio files
def load_audio(file_path):
    waveform, sample_rate = torchaudio.load(file_path)
    return waveform, sample_rate

# Example usage with a relative path
audio_file_path = os.path.join(project_folder, "Data", "clips", "common_voice_en_34925860.mp3")
waveform, sample_rate = load_audio(audio_file_path)


# Balance the Dataset

In [None]:
from sklearn.utils import resample

selected_accents = ["Canadian English", "England English"]
filtered_df = validated_df[validated_df['accents'].isin(selected_accents)]

balanced_data = pd.DataFrame()

for accent in selected_accents:
    accent_data = filtered_df[filtered_df['accents'] == accent]
    sampled_data = resample(accent_data, n_samples=300, random_state=42)
    balanced_data = pd.concat([balanced_data, sampled_data])

# Print the class distribution in the balanced dataset
print("Balanced Class Distribution:\n", balanced_data['accents'].value_counts())


# Data Exploration

In [None]:
# Understand the distribution of classes

from IPython.display import Audio, display

class_column = 'accents'
class_distribution = balanced_data[class_column].value_counts()
print("Class Distribution:\n", class_distribution)

# List of specific classes to focus on
selected_classes = ["Canadian English", "England English"]

# Listen to some audio samples for specific classes
for label in selected_classes:
    class_df = balanced_data[balanced_data[class_column] == label]

    if not class_df.empty:
        sample = class_df.iloc[0]
        audio_path = os.path.join(project_folder, "Data", "clips", sample['path'])

        # Load actual audio
        waveform, sr = torchaudio.load(audio_path)

        print(f"\nPlaying audio for class: {label}")
        display(Audio(waveform.numpy(), rate=sr))
        
    else:
        print(f"\nNo samples found for class: {label}")


# Split the Data

In [None]:
# Remove rows with NaN values
balanced_data = balanced_data.dropna(subset=[class_column])

from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(
    balanced_data, 
    test_size=0.2, 
    stratify=balanced_data[class_column],  # Preserve class distribution
    random_state=42
)

In [None]:
# Rename split datasets for clarity

train_df = train_df.copy()

valid_df = test_df.copy()

# Model Selection code

In [None]:
# Activate DEVELOPER Mode in System Settings to RUN this MODEL
# RUN this code to support symlinks by huggingface

%pip install huggingface_hub[hf_xet] --quiet

In [None]:
from transformers import AutoFeatureExtractor

extractor = AutoFeatureExtractor.from_pretrained("facebook/wav2vec2-large-960h")

print("Dataset Sampling Rate (in Hz) : ", extractor.sampling_rate)  # Usually 16000

In [None]:
# Load Wav2Vec2 Processor

sampling_rate = 16000

processor = Wav2Vec2Processor.from_pretrained(
    "facebook/wav2vec2-large-960h", 
    feature_extractor_kwargs={"sampling_rate": sampling_rate}
    )

# Encode Accent Labels

selected_accents = ['Canadian English', 'England English']
label2id = {label: idx for idx, label in enumerate(selected_accents)}
id2label = {v: k for k, v in label2id.items()}

# --------------------------------------------------------------------------------------------------------------------------------

print("Class Distribution:", len(selected_accents))
print("Unique Accents:", train_df['accents'].nunique())
print("Selected Classes:", selected_classes)

validated_df.head()

In [None]:
# Encode in DataFrame

train_df["label"] = train_df["accents"].map(label2id)
valid_df["label"] = valid_df["accents"].map(label2id)

# Checking Data Shapes and Imbalance

In [None]:
balanced_data.head()

In [None]:
# Check total missing values per column

print(balanced_data.isna().sum())

In [None]:
# Drop the unnecessary 'segment' column

balanced_data = balanced_data.drop(columns=['segment'])

In [None]:
# Drop rows that have missing values in gender or age only if needed

balanced_data = balanced_data.dropna(subset=['gender', 'age']).reset_index(drop=True)

In [None]:
# Display rows that contain any NaN values

missing_rows = balanced_data[balanced_data.isna().any(axis=1)]
print(missing_rows)

In [None]:
validated_df.shape

In [None]:
balanced_data.shape

In [None]:
# Check for missing values
balanced_data.isna().any().sum()

In [None]:
# Check if we are dealing with an imbalanced dataset
balanced_data['locale'].value_counts()

In [None]:
#  Encode Labels + Load Processor

from transformers import Wav2Vec2Processor

selected_accents = ['Canadian English', 'England English']
label2id = {label: idx for idx, label in enumerate(selected_accents)}
id2label = {v: k for k, v in label2id.items()}

balanced_data["label"] = balanced_data["accents"].map(label2id)

sampling_rate = 16000
processor = Wav2Vec2Processor.from_pretrained(
    "facebook/wav2vec2-large-960h", 
    feature_extractor_kwargs={"sampling_rate": sampling_rate}
)

# Train - Test Split

In [None]:
# Train - Test Split

from sklearn.model_selection import train_test_split

train_df, test_df = train_test_split(
    balanced_data,
    test_size=0.2,
    stratify=balanced_data["label"],
    random_state=42
)

train_df = train_df.reset_index(drop=True)
test_df = test_df.reset_index(drop=True)


# Check Dataset Audio Length

In [None]:
import os
import torchaudio
import pandas as pd
from tqdm import tqdm  # For progress bar

# Step 1: Set project folder path
project_folder = os.path.abspath(os.getcwd())

# Step 2: Function to get full audio path
def get_full_audio_path(relative_path):
    return os.path.join(project_folder, "Data", "clips", relative_path)

# Step 3: Filter only selected accent classes
selected_classes = ["Canadian English", "England English"]
balanced_data = validated_df[validated_df['accents'].isin(selected_classes)].reset_index(drop=True)


# Step 4: Function to get duration in seconds
def get_audio_duration(file_path):
    try:
        waveform, sample_rate = torchaudio.load(file_path)
        duration = waveform.shape[1] / sample_rate
        return round(duration, 2)  # round to 2 decimal places
    except Exception as e:
        print(f"Error loading {file_path}: {e}")
        return None

# Step 5: Apply to all rows
durations = []
for idx, row in tqdm(balanced_data.iterrows(), total=len(balanced_data)):
    path = get_full_audio_path(row['path'])
    duration = get_audio_duration(path)
    durations.append(duration)

# Step 6: Add durations to DataFrame
balanced_data['duration_sec'] = durations

# Step 7: Basic stats and check
print("\n📊 Audio Duration Summary:")
print(balanced_data['duration_sec'].describe())

print("\n🎧 Audio files shorter than 5 seconds:")
print(balanced_data[balanced_data['duration_sec'] < 5][['path', 'duration_sec']])

print("\n🎧 Audio files longer than 5 seconds:")
print(balanced_data[balanced_data['duration_sec'] > 5][['path', 'duration_sec']])


# Custom Dataset Class

In [None]:
# Custom Dataset Class

import torch
import torchaudio
from torch.utils.data import Dataset
import os

class AccentDataset(Dataset):
    def __init__(self, dataframe, processor, label2id, audio_base_path):
        self.df = dataframe
        self.processor = processor
        self.label2id = label2id
        self.audio_base_path = audio_base_path

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        row = self.df.iloc[idx]
        audio_path = os.path.join(self.audio_base_path, row['path'])

        waveform, sr = torchaudio.load(audio_path)
        if sr != 16000:
            resampler = torchaudio.transforms.Resample(sr, 16000)
            waveform = resampler(waveform)

        input_values = self.processor(
            waveform.squeeze().numpy(),
            sampling_rate=16000,
            return_tensors="pt",
            padding="max_length",
            truncation=True,
            max_length=16000 * 4      # e.g., max 4 seconds audio at 16kHz
        ).input_values.squeeze(0)

        return {
            "input_values": input_values,
            "label": row["label"]
        }

# Define Classification Model

In [None]:
# Define Classification Model

import torch.nn as nn
from transformers import Wav2Vec2Model

# Define Classification Model
class Wav2Vec2Classifier(nn.Module):
    def __init__(self, num_labels):
        super(Wav2Vec2Classifier, self).__init__()
        self.wav2vec2 = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-large-960h")
        self.dropout = nn.Dropout(0.2)
        self.classifier = nn.Linear(self.wav2vec2.config.hidden_size, num_labels)

    def forward(self, input_values, labels=None):
        outputs = self.wav2vec2(input_values)
        hidden_states = outputs.last_hidden_state
        pooled_output = hidden_states.mean(dim=1)  # Mean pooling
        logits = self.classifier(self.dropout(pooled_output))

        loss = None
        if labels is not None:
            loss_fn = nn.CrossEntropyLoss()
            loss = loss_fn(logits, labels)

        return {"loss": loss, "logits": logits}

# Prepare Data Loaders

In [None]:
# Prepare Data Loaders

from datasets import Dataset as HFDataset

def convert_to_dict(dataset):
    input_values = []
    labels = []

    for i in range(len(dataset)):
        item = dataset[i]
        input_values.append(item['input_values'].numpy())
        labels.append(item['label'])
    return {
        "input_values": input_values,
        "label": labels
    }

audio_base_path = os.path.join(os.getcwd(), "Data", "clips")

train_dataset = AccentDataset(train_df, processor, label2id, audio_base_path)
test_dataset = AccentDataset(test_df, processor, label2id, audio_base_path)

train_dict = convert_to_dict(train_dataset)
test_dict = convert_to_dict(test_dataset)

hf_train = HFDataset.from_dict(train_dict)
hf_test = HFDataset.from_dict(test_dict)

# Run this Command in "Terminal - CMD" inside "VS Code" to use GPU :

### pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121

In [None]:
import torch

print("CUDA Available:", torch.cuda.is_available())
print("CUDA Version : ", torch.version.cuda)
print("Device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU")

# Initialize Trainer

In [None]:
# Define train_dataset & eval_dataset

from torch.utils.data import random_split

# 80% train, 20% eval split
train_size = int(0.8 * len(train_dataset))
eval_size = len(train_dataset) - train_size

train_dataset, eval_dataset = random_split(train_dataset, [train_size, eval_size])

# Define Training Arguments 

In [None]:
# Define Training Arguments 

from transformers import TrainingArguments

training_args = TrainingArguments(
    output_dir="./Model",
    evaluation_strategy="epoch",
    save_strategy="epoch",
    learning_rate=2e-5,
    per_device_train_batch_size=4,
    per_device_eval_batch_size=4,
    num_train_epochs=3,
    weight_decay=0.01,
    logging_dir="./logs",
    logging_steps=10,
    save_total_limit=1,
    load_best_model_at_end=True,
    metric_for_best_model="accuracy",
    report_to="none",
)


# Define Compute Metrics

In [None]:
# Define Compute Metrics

from sklearn.metrics import accuracy_score, precision_recall_fscore_support

def compute_metrics(pred):
    predictions = pred.predictions.argmax(-1)
    labels = pred.label_ids
    precision, recall, f1, _ = precision_recall_fscore_support(labels, predictions, average="weighted")
    acc = accuracy_score(labels, predictions)
    
    return {
        "accuracy": acc,
        "f1": f1,
        "precision": precision,
        "recall": recall,
    }

In [None]:
# Instantiate model

model = Wav2Vec2Classifier(num_labels=2)

# Now call the Trainer
from transformers import Trainer

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,  # Optional: only needed if using processor
)

# Train & Evaluate the Model

In [None]:
# Train & Evaluate the Model

trainer.train()

# Save the Model

In [None]:
# Save the Model

trainer.save_model("Model")
processor.save_pretrained("Model")

In [None]:
test_results = trainer.evaluate(test_dataset)

print(test_results)

# Export Predictions

In [None]:
import os
import pandas as pd

# Create the outputs directory if it doesn't exist
os.makedirs("Output", exist_ok=True)

preds = trainer.predict(eval_dataset)
predicted_labels = preds.predictions.argmax(axis=1)
true_labels = preds.label_ids

df = pd.DataFrame({
    "True Label": true_labels,
    "Predicted Label": predicted_labels
})
df.to_csv("Output/eval_predictions.csv", index=False)

print(predicted_labels)

# Accuracy, Precision, Recall & F1 Score

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Run prediction
preds_output = trainer.predict(eval_dataset)
y_pred = preds_output.predictions.argmax(axis=1)
y_true = preds_output.label_ids

# Calculate metrics
accuracy = accuracy_score(y_true, y_pred)
precision = precision_score(y_true, y_pred)
recall = recall_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred)

# Print
print(f"Accuracy:  {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall:    {recall: .4f}")
print(f"F1 Score:  {f1:.4f}")

In [None]:
import os

# Create "Graphs" folder if it doesn't exist
os.makedirs("Graphs", exist_ok=True)

In [None]:
import time
import pandas as pd
import matplotlib.pyplot as plt

# Define metrics
metrics = {
    "Accuracy": round(accuracy * 100, 2),
    "Precision": round(precision * 100, 2),
    "Recall": round(recall * 100, 2),
    "F1 Score": round(f1 * 100, 2),
}

# Create a DataFrame
df = pd.DataFrame(metrics, index=["Score (%)"]).T
print(df)

df.to_csv("Output/model_metrics.csv")       # For CSV

# Plot clean bar graph
plt.figure(figsize=(6, 4))
bars = plt.bar(df.index, df["Score (%)"], color='skyblue')

# Add value labels on top of bars
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2.0, height + 1, f'{height}%', ha='center', va='bottom', fontsize=10)

plt.ylim(0, 110)
plt.ylabel("Score (%)")
plt.title("Model Evaluation Metrics")
plt.grid(axis='y', linestyle='--', alpha=0.5)
plt.tight_layout()

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Final_Metrics_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

# Plotting GRAPHS

In [None]:
# Bar Chart

import matplotlib.pyplot as plt
import seaborn as sns

# Set style
sns.set(style="whitegrid")

# Data
metrics = {
    "Accuracy": 97.83,
    "Precision": 100.00,
    "Recall": 95.35,
    "F1 Score": 97.62
}

# Bar Plot
plt.figure(figsize=(8, 5))
sns.barplot(x=list(metrics.keys()), y=list(metrics.values()), palette="Blues_d")

plt.title("Model Performance Metrics", fontsize=16)
plt.ylabel("Percentage (%)", fontsize=12)
plt.ylim(0, 110)

for i, v in enumerate(metrics.values()):
    plt.text(i, v + 1, f"{v:.2f}%", ha='center', fontweight='bold')
plt.tight_layout()

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Bar_Graph_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
# Pie Chart Graph

plt.figure(figsize=(6, 6))
plt.pie(metrics.values(), labels=metrics.keys(), autopct='%1.1f%%', colors=sns.color_palette("pastel"))
plt.title("Model Metric Distribution")

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Pie_Chart_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
# Radar (Spider) Chart

import matplotlib.pyplot as plt
import numpy as np

# Labels and values
labels = ['Accuracy', 'Precision', 'Recall', 'F1 Score']
values = [97.83, 100.00, 95.35, 97.62]
values += values[:1]  # Repeat first value to close the circle

# Angle for each axis
angles = np.linspace(0, 2 * np.pi, len(labels), endpoint=False).tolist()
angles += angles[:1]

# Radar plot
fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
ax.plot(angles, values, color='blue', linewidth=2)
ax.fill(angles, values, color='skyblue', alpha=0.4)

ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels)
ax.set_yticks([20, 40, 60, 80, 100])
ax.set_title("Model Performance Radar Chart", y=1.1)

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Spider_Chart_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
# Horizontal Bar Plot

import seaborn as sns
import matplotlib.pyplot as plt

metrics = {
    "Accuracy": 97.83,
    "Precision": 100.00,
    "Recall": 95.35,
    "F1 Score": 97.62
}

plt.figure(figsize=(7, 4))
sns.barplot(y=list(metrics.keys()), x=list(metrics.values()), palette='magma')
plt.xlabel("Percentage (%)")
plt.title("Model Evaluation Metrics")
for i, (k, v) in enumerate(metrics.items()):
    plt.text(v + 0.5, i, f"{v:.2f}%", va='center')
plt.xlim(0, 110)
plt.tight_layout()

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Horizontal_Bar_chart_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
# Line Plot (Comparison Curve)

plt.figure(figsize=(7, 4))
plt.plot(list(metrics.keys()), list(metrics.values()), marker='o', linestyle='-', color='green')
plt.title("Model Metrics Comparison")
plt.ylabel("Percentage (%)")
plt.ylim(90, 105)
for i, (k, v) in enumerate(metrics.items()):
    plt.text(i, v + 0.5, f"{v:.2f}%", ha='center')
plt.grid(True)
plt.tight_layout()

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Line_chart_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, roc_curve, auc, precision_recall_curve

# 1. Confusion Matrix
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(6, 5))
sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=["Canadian", "England"], yticklabels=["Canadian", "England"])
plt.xlabel("Predicted Label")
plt.ylabel("True Label")
plt.title("Confusion Matrix")

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Confusion_Matrix_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
# 2. ROC Curve

y_probs = preds_output.predictions[:, 1]  # Probability for positive class (class 1)
fpr, tpr, _ = roc_curve(y_true, y_probs)
roc_auc = auc(fpr, tpr)

plt.figure()
plt.plot(fpr, tpr, color="darkorange", lw=2, label=f"ROC curve (AUC = {roc_auc:.2f})")
plt.plot([0, 1], [0, 1], color="navy", lw=2, linestyle="--")
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title("Receiver Operating Characteristic")
plt.legend(loc="lower right")

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/ROC_curve_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
# 3. Precision-Recall Curve

precision_vals, recall_vals, _ = precision_recall_curve(y_true, y_probs)
plt.figure()
plt.plot(recall_vals, precision_vals, color="purple", lw=2)
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title("Precision-Recall Curve")
plt.grid(True)

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Precision_Recall_curve_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

In [None]:
import matplotlib.pyplot as plt

log_history = trainer.state.log_history
loss_values = [entry['loss'] for entry in log_history if 'loss' in entry]
plt.plot(loss_values)
plt.title("Training Loss")
plt.xlabel("Steps")
plt.ylabel("Loss")
plt.grid()

# Save the plot
timestamp = time.strftime("%d%m%Y-%H%M%S")
plt.savefig(f"Graphs/Training_Loss_curve_{timestamp}.png", dpi=300, bbox_inches='tight')

plt.show()

# ----------------------------------------------------------------------------------------