In [None]:
# Imports
import os
import re
import librosa
import torch
import numpy as np
from datasets import Dataset
from transformers import WhisperProcessor, WhisperForConditionalGeneration, Seq2SeqTrainingArguments, Seq2SeqTrainer
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.decomposition import PCA
from sklearn.metrics import f1_score
from jiwer import wer, cer

In [None]:
# Mount to our Google Drive, granting access and log-in required
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Loading the whisper-small version, best for our hardware
processor = WhisperProcessor.from_pretrained("openai/whisper-small")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
# Path to dataset on our drive
ROOT_PATHS = {
    "swedia_kristianstad": "/content/drive/MyDrive/ml-computers/Swedia_Kristianstad",
    "swedia_sormland": "/content/drive/MyDrive/ml-computers/Swedia_Sormland"
}
MAX_SEGMENTS_PER_DIALECT = 20 # set segments appropriate for hardware limitations ex. 20
MAX_LABEL_LENGTH = 440
TOKEN_SLICE_LENGTH = 440
LABEL_MAP = {"swedia_kristianstad": 1, "swedia_sormland": 0}


In [None]:
# Help functions for ex. finding audio + textgrid pairs
def find_wav_textgrid_pairs(root_path, max_segments):
    pairs = []
    count = 0
    for subdir, _, files in os.walk(root_path):
        if "spontaneous" in subdir:
            wav_files = [f for f in files if f.endswith(".wav") and not f.startswith("._")]
            textgrid_files = [f for f in files if f.endswith(".TextGrid") and not f.startswith("._")]
            for wav_file in wav_files:
                if count >= max_segments:
                    break
                base = wav_file.replace(".wav", "")
                if base + ".TextGrid" in textgrid_files:
                    pairs.append((os.path.join(subdir, wav_file), os.path.join(subdir, base + ".TextGrid")))
                    count += 1
    return pairs

def read_textgrid(file_path, processor):
    intervals = []
    encodings = ["utf-8", "iso-8859-1", "windows-1252"]
    for enc in encodings:
        try:
            with open(file_path, 'r', encoding=enc) as f:
                lines = f.readlines()
                for line in lines:
                    if 'text = "' in line:
                        match = re.findall(r'text = "(.*)"', line)
                        if match:
                            text = match[0].strip()
                            if text:
                                intervals.append(text)
            break
        except UnicodeDecodeError:
            continue

    full_text = " ".join(intervals)
    tokenized = processor.tokenizer(full_text).input_ids
    if len(tokenized) <= MAX_LABEL_LENGTH:
        return full_text
    else:
        short_tokens = tokenized[:TOKEN_SLICE_LENGTH]
        return processor.tokenizer.decode(short_tokens, skip_special_tokens=True)

def extract_data(root_paths, max_segments, processor):
    data = []
    for dialect, path in root_paths.items():
        pairs = find_wav_textgrid_pairs(path, max_segments)
        print(f"Found {len(pairs)} pairs for {dialect}")
        for wav, tg in pairs:
            sentence = read_textgrid(tg, processor)
            data.append({
                "path": wav,
                "sentence": sentence,
                "dialect": dialect,
                "dialect_label": LABEL_MAP[dialect]
            })
    return data


In [None]:
# Preprocessing for the ASR-training
def preprocess_for_training(batch):
    audio, _ = librosa.load(batch["path"], sr=16000)
    input_features = processor(audio, sampling_rate=16000).input_features[0]
    labels = processor.tokenizer(batch["sentence"]).input_ids
    batch["input_features"] = input_features
    batch["labels"] = labels
    return batch

In [None]:
# Preprocessing for our classification task
def preprocess_for_classification(batch):
    audio, _ = librosa.load(batch["path"], sr=16000)
    inputs = processor(audio, sampling_rate=16000)
    batch["input_features"] = inputs.input_features[0]
    batch["labels"] = processor.tokenizer(batch["sentence"]).input_ids
    batch["dialect_label"] = batch["dialect_label"]
    return batch


In [None]:
# Extract and split data train/test 80/20
data = extract_data(ROOT_PATHS, MAX_SEGMENTS_PER_DIALECT, processor)
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42)

In [None]:
# Preprocessing datasets for ASR training and for classification
train_dataset_for_training = Dataset.from_list(train_data).map(preprocess_for_training)
test_dataset_for_training = Dataset.from_list(test_data).map(preprocess_for_training)

train_dataset = Dataset.from_list(train_data).map(preprocess_for_classification, remove_columns=["path", "sentence", "dialect"])
test_dataset = Dataset.from_list(test_data).map(preprocess_for_classification, remove_columns=["path", "sentence", "dialect"])

from torch.nn.utils.rnn import pad_sequence
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer


In [None]:
# Toggle training on/off for off set = FALSE
TRAIN_MODEL = True

In [None]:
# Custom collator for padding labels with -100
def custom_data_collator(features):
    input_features = torch.stack([torch.tensor(f["input_features"]) for f in features])
    labels = [torch.tensor(f["labels"]) for f in features]
    padded_labels = pad_sequence(labels, batch_first=True, padding_value=-100)
    return {"input_features": input_features, "labels": padded_labels}

In [None]:
# Runs training if "TRAIN_MODEL = TRUE"
if TRAIN_MODEL:
    training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper_finetuned_skanska",
    logging_dir="./logs",
    logging_strategy="steps",        # Log steps
    logging_steps=1,                 # Log every step
    per_device_train_batch_size=2,
    per_device_eval_batch_size=2,
    learning_rate=1e-5,
    num_train_epochs=3,
    eval_strategy="epoch",
    save_strategy="epoch",
    predict_with_generate=True,
    fp16=torch.cuda.is_available(),
    remove_unused_columns=False,
    report_to="none"
)


    trainer = Seq2SeqTrainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset_for_training,
        eval_dataset=test_dataset_for_training,
        tokenizer=processor,
        data_collator=custom_data_collator
    )

    trainer.train()

In [None]:
    # Save the modell and processor
    model.save_pretrained("./whisper_finetuned_skanska")
    processor.save_pretrained("./whisper_finetuned_skanska")

In [None]:
# Classification
X_train = [np.array(x).flatten() for x in train_dataset["input_features"]]
X_test = [np.array(x).flatten() for x in test_dataset["input_features"]]
y_train = train_dataset["dialect_label"]
y_test = test_dataset["dialect_label"]

pca = PCA(n_components=2)
X_train_pca = pca.fit_transform(X_train)
X_test_pca = pca.transform(X_test)

clf = LogisticRegression(max_iter=1000)
clf.fit(X_train_pca, y_train)

y_pred = clf.predict(X_test_pca)
acc = np.mean([p == r for p, r in zip(y_pred, y_test)])
f1 = f1_score(y_test, y_pred, average="macro")

print(f"\n Dialect Classification with PCA + LogisticRegression")
print(f" Accuracy: {acc:.2%}")
print(f" F1 Score: {f1:.2%}")

In [None]:
# Function for evaluation
def compute_metrics(dataset, model, processor, clf, pca):
    predictions = []
    references = []
    dialect_preds = []
    dialect_refs = []

    for example in dataset:
        input_tensor = torch.tensor(example["input_features"]).unsqueeze(0).to(model.device)
        predicted_ids = model.generate(input_tensor)
        transcription = processor.tokenizer.decode(predicted_ids[0], skip_special_tokens=True)
        true_transcription = processor.tokenizer.decode(example["labels"], skip_special_tokens=True)

        predictions.append(transcription.lower())
        references.append(true_transcription.lower())

        input_flat = np.array(example["input_features"]).flatten().reshape(1, -1)
        input_pca = pca.transform(input_flat)
        dialect_pred = clf.predict(input_pca)[0]

        dialect_preds.append(dialect_pred)
        dialect_refs.append(example["dialect_label"])

    for i in range(min(2, len(predictions))):
        print(f"\n Reference   : {references[i]}\n Prediction : {predictions[i]}")

    wer_score = wer(references, predictions)
    cer_score = cer(references, predictions)
    acc = np.mean([p == r for p, r in zip(dialect_preds, dialect_refs)])
    f1 = f1_score(dialect_refs, dialect_preds, average="macro")

    print(f"\n WER: {wer_score:.2%}")
    print(f" CER: {cer_score:.2%}")
    print(f" Dialect Accuracy: {acc:.2%}")
    print(f" Dialect F1 Score: {f1:.2%}")


In [None]:
# Run evaluation function
compute_metrics(test_dataset, model, processor, clf, pca)
