In [None]:
!pip install jiwer openai-whisper torchcodec datasets
from transformers import WhisperForConditionalGeneration, WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor
from transformers import AutoFeatureExtractor, WhisperModel
from transformers import LogitsProcessorList, EpsilonLogitsWarper

from transformers import LlamaTokenizer
from datasets import load_dataset
import torch, torchaudio
from torch import nn
from torch.nn.utils.rnn import pad_sequence
import numpy as np
from jiwer import wer as calculate_wer
import pickle
from datasets import Dataset, Audio, Value
import os, random
from typing import Optional
from whisper.normalizers import EnglishTextNormalizer
import math
from sentencepiece import SentencePieceProcessor, SentencePieceTrainer
from pathlib import Path
import whisper
import copy, heapq
import pandas as pd
from collections import Counter
import torch.nn.functional as F

In [3]:
from google.colab import drive
from google.colab import userdata
userdata.get('HF_TOKEN')
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
feature_extractor = WhisperFeatureExtractor.from_pretrained('openai/whisper-small', language='en', task='transcribe')
tokenizer = WhisperTokenizer.from_pretrained('openai/whisper-small', language='en', task='transcribe')
processor = WhisperProcessor.from_pretrained('openai/whisper-small', language='en', task='transcribe')
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
normalizer = EnglishTextNormalizer()


preprocessor_config.json: 0.00B [00:00, ?B/s]

tokenizer_config.json: 0.00B [00:00, ?B/s]

vocab.json: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

merges.txt: 0.00B [00:00, ?B/s]

normalizer.json: 0.00B [00:00, ?B/s]

added_tokens.json: 0.00B [00:00, ?B/s]

special_tokens_map.json: 0.00B [00:00, ?B/s]

In [5]:
df_test_set = pd.read_csv('/content/drive/MyDrive/data/final_test.csv')
df_train_set = pd.read_csv('/content/drive/MyDrive/data/final_train.csv')

accents = df_test_set['accents'].unique().tolist() + df_train_set['accents'].unique().tolist()
accents = list(set(accents))
accents = [accents[0]]

In [6]:
def data_preparation(csv_path, file_list_path, feature_extractor, tokenizer=None, base_audio_dir="", max_label_length=None):
    df_text = pd.read_csv(csv_path)

    with open(file_list_path, "r") as f:
        file_paths = [line.strip() for line in f if line.strip()]

    dataset = []
    i = 0
    for file_path in file_paths:
        row = df_text[df_text["path"] == file_path]
        if row.empty:
            print(f"[WARN] No transcript found for: {file_path}")
            continue
        text = str(row["sentence"].values[0]).lower().strip()

        full_audio_path = os.path.join(base_audio_dir, file_path)

        audio, sr = torchaudio.load(full_audio_path)
        if sr != 16000:
            audio = torchaudio.functional.resample(audio, sr, 16000)

        feats = feature_extractor(
            audio.squeeze(0).numpy(),
            sampling_rate=16000,
            return_tensors="pt"
        )["input_features"]

        item = {"input_features": feats, "text": text}

        if tokenizer is not None:
            labels = tokenizer(
                text,
                return_tensors="pt",
                padding="longest",
                truncation=True,
                max_length=max_label_length
            )["input_ids"][0]
            item["labels"] = labels
        i += 1
        print("Missing:", i / len(file_paths) * 100, "%")
        dataset.append(item)

    return dataset

In [7]:
def utt_score_with_weight_noise(model,base_state_dict,input_features,pseudo_text,tokenizer,device,n_mc_samples: int = 5,noise_scale: float = 0.1,forced_decoder_ids=None):

    avg_wer = 0.0
    generated_texts = []

    for _ in range(n_mc_samples):
        new_state_dict = copy.deepcopy(base_state_dict)
        for k, v in new_state_dict.items():
            if not torch.is_floating_point(v):
                continue

            std = torch.std(v)
            if std == 0:
                continue

            noise = torch.randn_like(v)
            new_state_dict[k] = v + noise * std * noise_scale

        model.load_state_dict(new_state_dict, strict=False)
        model.to(device)
        model.eval()

        gen_kwargs = {
            "inputs": input_features,
            "max_new_tokens": 150,
        }
        if forced_decoder_ids is not None:
            gen_kwargs["forced_decoder_ids"] = forced_decoder_ids

        with torch.no_grad():
            generated_ids = model.generate(**gen_kwargs)

        generated_text = tokenizer.batch_decode(
            generated_ids, skip_special_tokens=True
        )[0].lower().strip()

        generated_texts.append(generated_text)
        avg_wer += calculate_wer([pseudo_text], [generated_text]) / n_mc_samples

    diversity = len(set(generated_texts))

    # product = avg_wer * diversity
    if diversity > 0:
        utt_score = avg_wer * diversity
    else:
        utt_score = avg_wer

    return utt_score, avg_wer, diversity

In [8]:
def generate_pseudo_labels(df_unlabeled,model_frozen,feature_extractor,tokenizer,device,base_dirs,drop_ratio: float = 0.2,n_mc_samples: int = 5,noise_scale: float = 0.1,forced_decoder_ids=None):
    base_state_dict = copy.deepcopy(model_frozen.state_dict())

    records = []

    for path in df_unlabeled["path"].tolist():
        full_path = None
        for d in base_dirs:
            candidate = os.path.join(d, path)
            if os.path.exists(candidate):
                full_path = candidate
                break
        if full_path is None:
            if os.path.exists(path):
                full_path = path
            else:
                print(f"[WARN] Audio file not found for path: {path}")
                continue

        audio, sr = torchaudio.load(full_path)
        if sr != 16000:
            audio = torchaudio.functional.resample(audio, sr, 16000)
        audio_np = audio.squeeze(0).numpy()

        inputs = feature_extractor(audio_np,sampling_rate=16000,return_tensors="pt")
        input_features = inputs["input_features"].to(device)

        model_frozen.load_state_dict(base_state_dict, strict=True)
        model_frozen.to(device)
        model_frozen.eval()

        gen_kwargs = {"inputs": input_features,"max_new_tokens": 150,}
        if forced_decoder_ids is not None:
            gen_kwargs["forced_decoder_ids"] = forced_decoder_ids

        with torch.no_grad():
            base_ids = model_frozen.generate(**gen_kwargs)

        pseudo_text = tokenizer.batch_decode(base_ids, skip_special_tokens=True)[0].lower().strip()

        utt_score, avg_wer, diversity = utt_score_with_weight_noise(
            model=model_frozen,
            base_state_dict=base_state_dict,
            input_features=input_features,
            pseudo_text=pseudo_text,
            tokenizer=tokenizer,
            device=device,
            n_mc_samples=n_mc_samples,
            noise_scale=noise_scale,
            forced_decoder_ids=forced_decoder_ids,
        )

        records.append({
            "path": path,
            "sentence": pseudo_text,
            "utt_score": utt_score,     # ~ avg_wer * diversity
            "avg_wer": avg_wer,
            "diversity": diversity,
        })

    df_scores = pd.DataFrame(records)

    if len(df_scores) == 0:
        print("[INFO] No pseudo-labels generated.")
        return df_scores

    TOP_PERCENT = 1.0 - drop_ratio
    n_keep = max(1, int(len(df_scores) * TOP_PERCENT))

    df_kept = df_scores.nsmallest(n_keep, "utt_score").reset_index(drop=True)

    print(
        f"[INFO] Kept {len(df_kept)} / {len(df_scores)} "
        f"utterances (dropped bottom {int((1.0 - TOP_PERCENT) * 100)}%)"
    )

    df_kept["source"] = "pseudo"
    return df_kept[["path", "sentence", "source", "utt_score", "avg_wer", "diversity"]]

In [None]:
model_frozen = WhisperForConditionalGeneration.from_pretrained('openai/whisper-small')
model_frozen.to(device)

df = pd.read_csv('/content/drive/MyDrive/data/final_train.csv')

for accent in accents:
    df_train = pd.read_csv(f'/content/drive/MyDrive/data/{accent}/train.csv')
    df_unlabeled = df_train

    base_dirs = [f'/content/drive/MyDrive/data/train_data']

    pseudo_labels_df = generate_pseudo_labels(df_unlabeled,model_frozen,feature_extractor,tokenizer,device,base_dirs)

    pseudo_labels_df.to_csv(f'/content/drive/MyDrive/data/{accent}/utt_training.csv',index=False)

In [None]:
model_config_for_len = WhisperForConditionalGeneration.from_pretrained('openai/whisper-small')
max_label_length = model_config_for_len.config.max_target_positions
train_audio_list = '/content/drive/MyDrive/data/train_files.txt'
del model_config_for_len

for accent in accents:
    print(accent)
    df_pseudo_training = pd.read_csv(f'/content/drive/MyDrive/data/{accent}/utt_training.csv')
    train_dataset = data_preparation(f'/content/drive/MyDrive/data/{accent}/utt_training.csv', train_audio_list, feature_extractor, tokenizer, base_audio_dir='/content/drive/MyDrive/data/train_data', max_label_length=max_label_length)
    torch.save(train_dataset, f'/content/drive/MyDrive/data/{accent}/utt_training.pt')