# Notebook to load in a audio dataset with transcriptions and finetune a whisper model.

In [None]:
from google.colab import drive, runtime
drive.mount('/content/drive')

In [None]:
!pip install --upgrade pip
!pip install --upgrade datasets[audio] transformers accelerate evaluate jiwer

In [None]:
from huggingface_hub import notebook_login

notebook_login()

## Load feature extractor, tokenizer, processor and pretrained model

In [None]:
from transformers import WhisperFeatureExtractor, WhisperTokenizer, WhisperProcessor

MODEL = "openai/whisper-small"

feature_extractor = WhisperFeatureExtractor.from_pretrained(MODEL)
tokenizer = WhisperTokenizer.from_pretrained(MODEL, language="English", task="transcribe")
processor = WhisperProcessor.from_pretrained(MODEL, language="English", task="transcribe")

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(MODEL, device_map="auto")
model.generation_config.language = 'english'
model.generation_config.task = "transcribe"
# https://stackoverflow.com/questions/69609401/suppress-huggingface-logging-warning-setting-pad-token-id-to-eos-token-id
model.generation_config.pad_token_id = tokenizer.pad_token_id

model.config.forced_decoder_ids = None

## Option 1: Loading in data from hugging face (skip if data is saved to disk)

In [None]:
from datasets import load_dataset, DatasetDict

DATASET_NAME = "divi212/india-supreme-court-audio"

dataset = DatasetDict()

dataset['train'] = load_dataset(DATASET_NAME, split='train')
dataset['test'] = load_dataset(DATASET_NAME, split='test')

In [None]:
print(dataset['train'][0])

In [None]:
dataset.save_to_disk('/content/drive/MyDrive/india-supreme-court-asr/data/')

In [None]:
print(dataset['train'][0])

In [None]:
from datasets import Audio

dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))

In [None]:
print(dataset['train'][0])

In [None]:
# remove any None transcripts
dataset = dataset.filter(lambda x: x["transcript"] is not None)

In [None]:
print(dataset)

In [None]:
def prepare_dataset(batch):
  audio = batch["audio"]

  # log mel features
  batch["input_length"] = len(audio)
  batch["input_features"] = feature_extractor(audio["array"], sampling_rate=16000).input_features[0]

  # encode transcript
  batch["labels"] = tokenizer(text=batch["transcript"]).input_ids
  batch["labels_length"] = len(tokenizer(batch["transcript"], add_special_tokens=False).input_ids)
  return batch

In [None]:
MAX_DURATION_IN_SECONDS = 30.0
MAX_INPUT_LENGTH = MAX_DURATION_IN_SECONDS * 16000

# this is the max tokens in the decoder block
MAX_LABELS_LENGTH = model.generation_config.max_length

def filter_inputs(input_length):
    """Filter inputs with zero input length or longer than 30s"""
    return 0 < input_length < MAX_INPUT_LENGTH

def filter_labels(labels_length):
    """Filter labels that are too long"""
    return labels_length < MAX_LABELS_LENGTH

In [None]:
vectorized_dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names["train"])
vectorized_dataset = vectorized_dataset.filter(filter_inputs, input_columns=["input_length"]).remove_columns(["input_length"])
vectorized_dataset = vectorized_dataset.filter(filter_labels, input_columns=["labels_length"]).remove_columns(["labels_length"])

In [None]:
vectorized_dataset['train']

In [None]:
vectorized_dataset['test']

In [None]:
vectorized_dataset.save_to_disk('/content/drive/MyDrive/india-supreme-court-asr/vectorized_data/')

In [None]:
dataset = vectorized_dataset
dataset['train']

In [None]:
dataset['test']

## Option 2: load in dataset from disk

In [None]:
from datasets import DatasetDict

dataset = DatasetDict()
dataset = dataset.load_from_disk('/content/drive/MyDrive/india-supreme-court-asr/vectorized_data/')

In [None]:
dataset['train']

In [None]:
dataset['test']

## Create a data collator to pad the input data to be on consistent lengths

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

### Define evaluation metrics

In [None]:
!pip install torcheval
import evaluate
from torcheval.metrics.functional import word_information_lost

wer_metric = evaluate.load("wer")
cer_metric = evaluate.load("cer")

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = wer_metric.compute(predictions=pred_str, references=label_str)
    cer = cer_metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer, "cer": cer}

## Training Pipeline

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="/content/drive/MyDrive/india-supreme-court-asr",
    per_device_train_batch_size=16,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-5,
    warmup_steps=100,
    max_steps=1000,
    gradient_checkpointing=True,
    fp16=True,
    eval_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=255,
    save_steps=100,
    eval_steps=100,
    logging_steps=25,
)

In [None]:
from transformers import Seq2SeqTrainer

# Use dropout to prevent overfitting
model.config.dropout = 0.3  # General dropout
model.config.attention_dropout = 0.3  # Dropout on attention layers
model.config.activation_dropout = 0.3  # Dropout on activation functions

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset['train'],
    eval_dataset=dataset['test'],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)
model.config.use_cache = False

In [None]:
baseline = trainer.evaluate()
baseline

In [None]:
trainer.train()

## Push model to hugging face

In [None]:
from transformers import WhisperForConditionalGeneration
finetuned_model = WhisperForConditionalGeneration.from_pretrained("/content/drive/MyDrive/india-supreme-court-asr/checkpoint-400/", local_files_only=True, device_map="auto")
finetuned_model.push_to_hub("divi212/whisper-small-india-supreme-court-asr")

## Evaluate model on completely unseen data (from a different court hearing not in the dataset)



In [None]:
with open("/content/drive/MyDrive/india-supreme-court-asr/unseen_audio_transcript_actual.txt", "r") as f:
    actual_transcript = f.read()

In [None]:
from transformers import pipeline
from datasets import Audio
from transformers import WhisperForConditionalGeneration

test_file = "/content/drive/MyDrive/india-supreme-court-asr/unseen_audio.mp3"

base_pipeline = pipeline(
    "automatic-speech-recognition",
    model=WhisperForConditionalGeneration.from_pretrained(MODEL, device_map="auto"),
    tokenizer=tokenizer,
    feature_extractor=feature_extractor,
    chunk_length_s=30
)
base_text = base_pipeline(test_file)["text"]
with open("/content/drive/MyDrive/india-supreme-court-asr/unseen_audio_transcript_whisper_small.txt", "w") as f:
    f.write(base_text)

In [None]:
wer_base = wer_metric.compute(predictions=[base_text], references=[actual_transcript])
cer_base = cer_metric.compute(predictions=[base_text], references=[actual_transcript])

In [None]:
finetuned_pipeline = pipeline(
    "automatic-speech-recognition",
    model=WhisperForConditionalGeneration.from_pretrained("/content/drive/MyDrive/india-supreme-court-asr/checkpoint-400/", local_files_only=True, device_map="auto"),
    tokenizer=tokenizer,
    feature_extractor=feature_extractor,
    chunk_length_s=30
)
finetuned_text = finetuned_pipeline(test_file)["text"]
with open("/content/drive/MyDrive/india-supreme-court-asr/unseen_audio_transcript_finetuned_whisper_small.txt", "w") as f:
    f.write(finetuned_text)

In [None]:
wer_finetuned = wer_metric.compute(predictions=[finetuned_text], references=[actual_transcript])
cer_finetuned = cer_metric.compute(predictions=[finetuned_text], references=[actual_transcript])

Summarize metrics

In [None]:
print(f"Base Model WER: {wer_base}, CER: {cer_base}")
print(f"Finetuned Model WER: {wer_finetuned}, CER: {cer_finetuned}")

Qualitative evaluation

In [None]:
actual_transcript[0:5000]

In [None]:
base_text[0:5000]

In [None]:
finetuned_text[0:5000]

In [None]:
runtime.unassign()