In [None]:
!pip install khmer-nltk

In [None]:
!pip install transformers==4.44.2

In [None]:
!pip install evaluate

In [None]:
!pip install jiwer

In [None]:
from datasets import load_dataset, Dataset, Audio
raw_dataset = load_dataset("rinabuoy/khm-asr-open")

In [None]:
!pip install librosa

In [None]:
print(raw_dataset["train"].features)

In [None]:
train_dataset = raw_dataset["train"]
test_dataset = raw_dataset["test"]

In [None]:
from transformers import WhisperFeatureExtractor
feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-small")

In [None]:
from transformers import WhisperTokenizer
tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="khmer", task="transcribe")

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="khmer", task="transcribe")

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")

In [None]:
model.generation_config.language = "khmer"
model.generation_config.task = "transcribe"
model.generation_config.forced_decoder_ids = None
model.config.use_cache = False

In [None]:
print(len(test_dataset))
print(len(train_dataset))

In [None]:
import re
from khmernltk import word_tokenize

def clean_text(dataset):
    text = dataset["sentence"]

    if re.search(r"[A-Za-z]", text):
        return {"sentence": None}
    tokenized_text = word_tokenize(
        text, 
        return_tokens=False, 
        separator=" "
    )
    return {"sentence": tokenized_text if text else None}

In [None]:
test_dataset = test_dataset.map(clean_text)
train_dataset = train_dataset.map(clean_text)

In [None]:
train_dataset = train_dataset.cast_column('audio', Audio(decode=False))
test_dataset = test_dataset.cast_column('audio', Audio(decode=False))

In [None]:
train_dataset = train_dataset.filter(lambda x: x["sentence"] is not None)

In [None]:
test_dataset = test_dataset.filter(lambda x: x["sentence"] is not None)

In [None]:
print(test_dataset[500]["sentence"])
print(train_dataset[1]['sentence'])

In [None]:
train_dataset = train_dataset.cast_column("audio", Audio(sampling_rate=16000))
test_dataset = test_dataset.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
train_dataset.features

In [None]:
train_dataset = train_dataset.cast_column("audio", Audio(sampling_rate=16000, decode=False))
test_dataset = test_dataset.cast_column("audio", Audio(sampling_rate=16000, decode=False))

In [None]:
import io
import soundfile as sf
import numpy as np

def prepare_dataset(batch):
    # Decode audio from bytes
    audio_bytes = batch["audio"]["bytes"]
    audio_array, sampling_rate = sf.read(io.BytesIO(audio_bytes), dtype="float32")
    
    # Resample if needed (soundfile loads at original sample rate)
    if sampling_rate != 16000:
        import librosa
        audio_array = librosa.resample(audio_array, orig_sr=sampling_rate, target_sr=16000)
        sampling_rate = 16000
    
    # Process the audio
    batch["input_features"] = feature_extractor(
        audio_array, 
        sampling_rate=sampling_rate
    ).input_features[0]
    
    batch["labels"] = tokenizer(batch['sentence']).input_ids
    
    return batch

train_dataset = train_dataset.map(prepare_dataset, remove_columns=["audio", "sentence"])
test_dataset = test_dataset.map(prepare_dataset, remove_columns=["audio", "sentence"])

In [None]:
import evaluate
metric = evaluate.load("wer")
# cer_metric = load_metric("cer")
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # decode predictions and labels
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    # print the first 2 for inspection
    print("Example predictions vs references:")
    for i in range(min(2, len(pred_str))):
        print(f"Prediction: {pred_str[i]}")
        print(f"Reference : {label_str[i]}")
        print("---")

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

In [None]:
from transformers import Seq2SeqTrainingArguments


training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-small-kh-v2",  # change to a repo name of your choice
    per_device_train_batch_size=16,
    gradient_accumulation_steps=2,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-4,
    warmup_steps=80,
    num_train_epochs=25,
    gradient_checkpointing=True,
    bf16=True,
    eval_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=500,
    eval_steps=500,
    logging_steps=50,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
)

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
processor.save_pretrained(training_args.output_dir)
tokenizer.save_pretrained(training_args.output_dir)

In [None]:
trainer.train()

In [None]:
from huggingface_hub import login
from huggingface_hub import HfApi
login(token="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")
repo_name = "dynann/whisper-small-khmer-v2"
trainer.push_to_hub(repo_name)


In [None]:

import soundfile as sf
from transformers import Wav2Vec2ForCTC, Wav2Vec2Processor

# Load the trained model
# model_path = "/kaggle/working/wav2vec2-xlsr-khmer-300m/checkpoint-4400"
    

# model = Wav2Vec2ForCTC.from_pretrained(model_path)
processor = Wav2Vec2Processor.from_pretrained(model)

model.eval()
def transcribe_audio(audio_path):
    """Transcribe a single audio file"""
    # Load audio
    audio, sr = sf.read(audio_path)
    if sr != 16000:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=16000)
    
    # Process audio
    inputs = processor(audio, sampling_rate=16000, return_tensors="pt", padding=True)
    
    with torch.no_grad():
        inputs = {k: v for k, v in inputs.items()}
        logits = model(**inputs).logits
    
    # Get predicted ids
    predicted_ids = torch.argmax(logits, dim=-1)
    
    # Decode the ids to text
    transcription = processor.decode(predicted_ids[0])
    print("predicted_ids => ",predicted_ids[0])
    return transcription

# Test on a single file
if __name__ == '__main__':
    # Test on a sample from your dataset

    test_audio_path = "/kaggle/input/asr-large-km/data/wavs/00000.wav"
    print("🎤 Testing audio transcription...")
    print(f"   Audio file: {test_audio_path}")
    transcription = transcribe_audio(test_audio_path)
    print(f"\n📝 Predicted transcription => ", transcription)

In [None]:
!pip freeze requirement.txt

In [None]:
import torch, gc
gc.collect()
torch.cuda.empty_cache()
