In [None]:
import torch
from datasets import load_dataset, Audio, DatasetDict
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from transformers import Seq2SeqTrainingArguments, Seq2SeqTrainer
from transformers.models.whisper.english_normalizer import BasicTextNormalizer

from dataclasses import dataclass
from typing import Any, Dict, List, Union

import evaluate
import numpy as np

from functools import partial

In [None]:
from huggingface_hub import notebook_login

notebook_login(new_session=False)

## Load data

In [None]:
%%time
minds = DatasetDict()
minds["train"] = load_dataset("PolyAI/minds14", "en-US", split="train[:450]")
minds["test"] = load_dataset("PolyAI/minds14", "en-US", split="train[450:]")
minds = minds.select_columns(["audio", "transcription"])
minds

## Preprocessing

In [None]:
# Load Whisper processor
processor = WhisperProcessor.from_pretrained("openai/whisper-tiny", language="english", task="transcribe")

In [None]:
# Preprocess data
sampling_rate = processor.feature_extractor.sampling_rate
minds = minds.cast_column("audio", Audio(sampling_rate=sampling_rate))

def prepare_dataset(example):
    audio = example["audio"]

    example = processor(
        audio=audio["array"],
        sampling_rate=audio["sampling_rate"],
        text=example["transcription"],
    )

    # compute input length of audio sample in seconds
    example["input_length"] = len(audio["array"]) / audio["sampling_rate"]

    return example

minds = minds.map(prepare_dataset, remove_columns=minds.column_names["train"], num_proc=1)

In [None]:
# Max length filtering
max_input_length = 30.0
def is_audio_in_length_range(length):
    return length < max_input_length

minds["train"] = minds["train"].filter(is_audio_in_length_range, input_columns=["input_length"])
minds["train"]

## Fine-tuning

In [None]:
# Define data collator
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: list[dict[str, Union[list[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"][0]} for feature in features] # (N, 80, 3000)
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch
    
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
# Evaluation metric
metric = evaluate.load("wer")
normalizer = BasicTextNormalizer()

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

    # compute orthographic wer
    wer_ortho = metric.compute(predictions=pred_str, references=label_str)

    # compute normalised WER
    pred_str_norm = [normalizer(pred) for pred in pred_str]
    label_str_norm = [normalizer(label) for label in label_str]
    # filtering step to only evaluate the samples that correspond to non-zero references:
    pred_str_norm = [
        pred_str_norm[i] for i in range(len(pred_str_norm)) if len(label_str_norm[i]) > 0
    ]
    label_str_norm = [
        label_str_norm[i]
        for i in range(len(label_str_norm))
        if len(label_str_norm[i]) > 0
    ]

    wer = metric.compute(predictions=pred_str_norm, references=label_str_norm)

    return {"wer_ortho": wer_ortho, "wer": wer}

In [None]:
# Load model
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-tiny")

In [None]:
# disable cache during training since it's incompatible with gradient checkpointing
model.config.use_cache = False

# set language and task for generation and re-enable cache
model.generate = partial(model.generate, language="english", task="transcribe", use_cache=True)

In [None]:
# Training configs
training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-tiny-dv",
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,
    learning_rate=1e-5,
    lr_scheduler_type="constant_with_warmup",
    warmup_steps=50,
    max_steps=500,
    gradient_checkpointing=True,
    fp16=True,
    fp16_full_eval=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=16,
    predict_with_generate=True,
    generation_max_length=225,
    save_steps=100,
    eval_steps=100,
    logging_steps=25,
#     report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=False,
)


In [None]:
# Train model
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=minds["train"],
    eval_dataset=minds["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor,
)

trainer.train()

In [None]:
# Push to hub
kwargs = {
     "dataset_tags": "PolyAI/minds14",
    "finetuned_from": "openai/whisper-tiny",
    "tasks": "automatic-speech-recognition",
}
trainer.push_to_hub(**kwargs)