<a href="https://colab.research.google.com/github/bearbearyu1223/llm-fine-tuning-playground/blob/main/fine_tune_whisper_common_voice_zh_tw.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Fine-Tune Whisper

## Introduction


## Prepare the Dev Enviroment

First of all, secure a decent compute instance with GPU for training. The script below can be used to veriy if GPU is assigned and also what is its specifications.

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
!pip install -q peft transformers datasets accelerate evaluate jiwer bitsandbytes wandb

In [None]:
import os
import wandb
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
os.environ["WANDB_SILENT"] = "true"
wandb.init(project="whisper-large-peft-lora-zh_TW-v0.0")

Since we will push the model checkpoints to  Hugging Face Hub, we need link this notebook to a Hugging Face account, which can be achieved by entering the Hugging Face aunthentication token via the scripts below.

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
!huggingface-cli login

## Prepare the dataset for fine-tuning

We use the low-resource language zh-TW from [mozilla-foundation/common_voice_16_0](https://huggingface.co/datasets/mozilla-foundation/common_voice_16_0/viewer/yue)  for the fine-tuning task.

In [None]:
from datasets import load_dataset, DatasetDict
common_voice = DatasetDict()
common_voice["train"] = load_dataset("mozilla-foundation/common_voice_16_0", "zh-TW", split="train", trust_remote_code=True)
common_voice["eval"] = load_dataset("mozilla-foundation/common_voice_16_0", "zh-TW", split="validation", trust_remote_code=True)
common_voice["test"] = load_dataset("mozilla-foundation/common_voice_16_0", "zh-TW", split="test", trust_remote_code=True)

print(common_voice)

In [None]:
# remove the columns that are not needed for training: we will only use the 'audio' and 'sentence' columns for training
common_voice = common_voice.remove_columns(
    ["accent", "age", "client_id", "down_votes", "gender", "locale", "path", "segment", "up_votes", "variant"]
)
print(common_voice)

In [None]:
model_name_or_path = "openai/whisper-large-v2"
task = "transcribe"
language = "Chinese"

In [None]:
from transformers import AutoFeatureExtractor, AutoTokenizer, AutoProcessor
# get the feature extractor from the model checkpoints
feature_extractor = AutoFeatureExtractor.from_pretrained(pretrained_model_name_or_path=model_name_or_path)
# get the tokentizer from the model checkpoints
tokenizer = AutoTokenizer.from_pretrained(pretrained_model_name_or_path=model_name_or_path, language=language, task=task)
# get the processor from the model checkpoints
processor = AutoProcessor.from_pretrained(pretrained_model_name_or_path=model_name_or_path, language=language, task=task)

In [None]:
from datasets import Audio

# signals to datasets to resample audio samples using 16000Hz on the fly the first time that they are loaded
common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))
common_voice["train"][0]

In [None]:
def prepare_dataset(batch):
  # load and resample audio data and resample in 16kHz
  audio = batch["audio"]

  # compute log-Mel input features from input audio array
  batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

  # encode target text to label ids
  batch["labels"] = tokenizer(batch["sentence"]).input_ids
  return batch

In [None]:
common_voice = common_voice.map(prepare_dataset, remove_columns=common_voice.column_names["train"], num_proc=2)

## Training and Evaluation

### Define a Data Collator

In [None]:
import torch
from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        # pad the input to max length
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        label_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = label_batch["input_ids"].masked_fill(label_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step, cut bos token as it is appended later
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch


In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

### Load a Pre-Trained Checkpoint

In [None]:
from transformers import AutoModelForSpeechSeq2Seq
# load the pre-trained model in 8 bit quantization
model = AutoModelForSpeechSeq2Seq.from_pretrained(model_name_or_path, load_in_8bit=True, device_map="auto")

In [None]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [None]:
from peft import prepare_model_for_kbit_training

model = prepare_model_for_kbit_training(model)

In [None]:
print(model)

### Define the Training Configuration

In [None]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model

config = LoraConfig(r=32, lora_alpha=64, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none")

In [None]:
model = get_peft_model(model, config)
model.print_trainable_parameters()

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="bearbearyu1223/whisper-large-peft-lora-zh_TW-v0.0",  # change to a repo name of your choice
    per_device_train_batch_size=8,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-3,
    warmup_steps=50,
    num_train_epochs=3,
    evaluation_strategy="epoch",
    save_strategy="epoch",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    logging_steps=25,
    report_to=["wandb"],
    metric_for_best_model="wer",
    load_best_model_at_end=True,
    greater_is_better=False,
    push_to_hub=True,
    remove_unused_columns=False
)

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=common_voice["train"],
    eval_dataset=common_voice["eval"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

model.config.use_cache = False

In [None]:
trainer.train()

In [None]:
trainer.push_to_hub()

### Evaluate the WER on Testset

In [None]:
import evaluate

metric = evaluate.load("wer")

In [None]:
from torch.utils.data import DataLoader
from tqdm import tqdm
import numpy as np
import gc

eval_dataloader = DataLoader(common_voice["test"], batch_size=8, collate_fn=data_collator)

model.eval()
for step, batch in enumerate(tqdm(eval_dataloader)):
    with torch.cuda.amp.autocast():
        with torch.no_grad():
            generated_tokens = (
                model.generate(
                    input_features=batch["input_features"].to("cuda"),
                    decoder_input_ids=batch["labels"][:, :4].to("cuda"),
                    max_new_tokens=225,
                )
                .cpu()
                .numpy()
            )
            labels = batch["labels"].cpu().numpy()
            labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
            decoded_preds = tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
            decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)
            metric.add_batch(
                predictions=decoded_preds,
                references=decoded_labels,
            )
    del generated_tokens, labels, batch
    gc.collect()
wer = 100 * metric.compute()
print(f"WER: {wer:.2f}")