In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, accuracy_score
import numpy as np
import os
from tqdm import tqdm
from sklearn.preprocessing import LabelEncoder
import matplotlib.pyplot as plt
import seaborn as sns

In [2]:
# Custom Dataset to load .npy files
class AccentDataset(Dataset):
    def __init__(self, data_dir, label_encoder):
        self.data_dir = data_dir
        self.files = []
        self.labels = []

        # Collect all files and their respective labels
        for accent_folder in os.listdir(data_dir):
            folder_path = os.path.join(data_dir, accent_folder)
            if os.path.isdir(folder_path):
                for file in os.listdir(folder_path):
                    if file.endswith('.npy'):
                        self.files.append(os.path.join(folder_path, file))
                        self.labels.append(accent_folder)

        # Encode the labels
        self.label_encoder = label_encoder
        self.labels = self.label_encoder.transform(self.labels)

    def __len__(self):
        return len(self.files)

    def __getitem__(self, idx):
        file_path = self.files[idx]
        features = np.load(file_path)  # Load the .npy feature embedding
        features = torch.tensor(features).float()  # Convert to tensor
        label = torch.tensor(self.labels[idx]).long()  # Get label
        return features, label

In [3]:
# Conformer Model Architecture
class ConformerModel(nn.Module):
    def __init__(self, input_dim, num_classes):
        super(ConformerModel, self).__init__()
        self.conformer = nn.Sequential(
            nn.Conv1d(input_dim, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )
        self.fc = nn.Linear(128 * (230 // 4), num_classes)  # Sequence length after 2 pooling layers

    def forward(self, x):
        x = x.transpose(1, 2)  # [batch, input_dim, sequence_len]
        x = self.conformer(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc(x)
        return x

In [4]:
def compute_class_weights(labels):
    class_weights = compute_class_weight('balanced', classes=np.unique(labels), y=labels)
    return torch.tensor(class_weights, dtype=torch.float)


In [5]:
# Load datasets and split into train/test
def load_data_and_split(data_dir, label_encoder, batch_size=16, test_split=0.2):
    dataset = AccentDataset(data_dir, label_encoder)
    train_size = int((1 - test_split) * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    return train_loader, test_loader, dataset.labels

In [6]:
# Early stopping class
class EarlyStopping:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_score = None
        self.early_stop = False

    def __call__(self, accuracy):
        if self.best_score is None:
            self.best_score = accuracy
        elif accuracy < self.best_score + self.min_delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = accuracy
            self.counter = 0

In [7]:
# Training loop
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for features, labels in tqdm(train_loader):
        features, labels = features.to(device), labels.to(device)

        optimizer.zero_grad()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(train_loader)


In [8]:
# Evaluation loop with confusion matrix
def evaluate_model(model, test_loader, criterion, device):
    model.eval()
    all_preds = []
    all_labels = []
    total_loss = 0
    with torch.no_grad():
        for features, labels in test_loader:
            features, labels = features.to(device), labels.to(device)

            outputs = model(features)
            loss = criterion(outputs, labels)
            total_loss += loss.item()
            
            _, predicted = torch.max(outputs.data, 1)
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    
    # Confusion matrix
    conf_matrix = confusion_matrix(all_labels, all_preds)
    return accuracy, total_loss / len(test_loader), conf_matrix

In [9]:
# Plot confusion matrix
def plot_confusion_matrix(cm, labels):
    plt.figure(figsize=(10, 7))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=labels, yticklabels=labels)
    plt.ylabel('True Labels')
    plt.xlabel('Predicted Labels')
    plt.title('Confusion Matrix')
    plt.show()

In [None]:
if __name__ == "__main__":
    data_dir = 'main'  # Path to the main folder with accents
    num_classes = 12  # Number of accent classes

    # Label encoding for accents
    accents = sorted(os.listdir(data_dir))
    label_encoder = LabelEncoder()
    label_encoder.fit(accents)

    # Load train/test data
    train_loader, test_loader, labels = load_data_and_split(data_dir, label_encoder)

    # Compute class weights
    class_weights = compute_class_weights(labels)
    class_weights = class_weights.to(torch.device("cuda" if torch.cuda.is_available() else "cpu"))

    # Define model, loss function, optimizer
    model = ConformerModel(input_dim=64, num_classes=num_classes)  # Updated input_dim = 64
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)

    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Early stopping object
    early_stopping = EarlyStopping(patience=5, min_delta=0.01)

    # Training the model
    num_epochs = 20
    best_accuracy = 0
    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        train_loss = train_model(model, train_loader, criterion, optimizer, device)
        print(f"Training Loss: {train_loss:.4f}")
        
        accuracy, val_loss, conf_matrix = evaluate_model(model, test_loader, criterion, device)
        print(f"Validation Accuracy: {accuracy * 100:.2f}%, Validation Loss: {val_loss:.4f}")

        # Plot confusion matrix
        plot_confusion_matrix(conf_matrix, labels=accents)

        # Check for early stopping
        early_stopping(accuracy)
        if early_stopping.early_stop:
            print("Early stopping triggered.")
            break

        # Save the model if it has the best accuracy
        if accuracy > best_accuracy:
            torch.save(model.state_dict(), "best_conformer_model.pth")
            best_accuracy = accuracy

    print(f"Best Validation Accuracy: {best_accuracy * 100:.2f}%")