In [None]:
from google.colab import drive
import os
drive.mount('/content/drive')
ROOT_FOLDER = '/content/drive/My Drive/Project/'

In [None]:
# Install ffmpeg
!apt-get update
!apt-get install -y ffmpeg

# # Verify installation
# !ffmpeg -version
!pip install evaluate
!pip install jiwer
!pip install torchcodec

In [None]:
!pip install --upgrade transformers accelerate

In [None]:
from transformers import WhisperForConditionalGeneration
from transformers import WhisperProcessor
import os
import pandas as pd
import json
from datasets import Dataset, Audio
from typing import Any, List, Dict, Union
import torch
import jiwer

from transformers import (
    Seq2SeqTrainingArguments,
    Seq2SeqTrainer
)

import evaluate

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small")
processor = WhisperProcessor.from_pretrained(
    "openai/whisper-small",
    language="English",      # sets English as decoding language
    task="transcribe",        # task can be 'transcribe' or 'translate'
    feature_extractor_type="whisper",  # Explicitly set feature extractor type
    feature_size=80 # Set feature size to 80
)
model.generation_config.task = 'transcribe'
model.generation_config.forced_decoder_ids = None
from peft import LoraConfig, PeftModel

In [None]:
from datasets import load_from_disk
data_set_path = os.path.join(ROOT_FOLDER, "data", "data_sets",'prepared_val')
val_dataset = load_from_disk(data_set_path)

In [None]:
from datasets import load_from_disk, concatenate_datasets
base_dir = os.path.join(ROOT_FOLDER, "data", "data_sets")

parts = []
n_parts = 20

for i in range(1, n_parts + 1):
    data_set_path = os.path.join(base_dir, f"prepared_train_part{i}")
    if not os.path.isdir(data_set_path):
        print(f"Warning: missing directory {data_set_path}")
        continue
    ds = load_from_disk(data_set_path)
    parts.append(ds)
    print(f"Loaded part {i}: {data_set_path} with {len(ds)} rows")

# Optionally concatenate into a single dataset
if parts:
    train_dataset = concatenate_datasets(parts)
    print(f"Concatenated dataset rows: {len(train_dataset)}")
else:
    train_dataset = None
    print("No parts loaded.")


In [None]:
class DataCollatorSpeechSeq2SeqWithPadding:

    def __init__(self, processor: Any, decoder_start_token_id: int):
        self.processor = processor
        self.decoder_start_token_id = decoder_start_token_id

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # Use processor.feature_extractor to extract features
        input_features = [{'input_features': self.processor.feature_extractor(feature['path']['array'], sampling_rate=feature['path']['sampling_rate']).input_features[0]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors='pt')

        label_features = [{'input_ids': feature['labels'][0:448]} for feature in features]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors='pt')

        labels = labels_batch['input_ids'].masked_fill(labels_batch.attention_mask.ne(1), -100)

        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch['labels'] = labels

        return batch


data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

# metric = evaluate.load('wer')

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = processor.tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = processor.tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    # Concatenate for MICRO / WEIGHTED WER
    concatenated_pred = " ".join(pred_str)
    concatenated_ref = " ".join(label_str)

    weighted_wer = 100 * jiwer.wer(
        concatenated_ref,
        concatenated_pred
    )

    return {'wer': weighted_wer}

# def compute_metrics(pred):
#     pred_ids = pred.predictions
#     label_ids = pred.label_ids

#     # Replace -100 with pad token
#     label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

#     # Decode
#     pred_str = processor.tokenizer.batch_decode(
#         pred_ids, skip_special_tokens=True
#     )
#     label_str = processor.tokenizer.batch_decode(
#         label_ids, skip_special_tokens=True
#     )

#     total_edits = 0
#     total_ref_words = 0

#     for hyp, ref in zip(pred_str, label_str):
#         hyp_words = hyp.strip().split()
#         ref_words = ref.strip().split()

#         # Skip empty references to avoid divide-by-zero
#         if len(ref_words) == 0:
#             continue

#         # Compute word-level edit distance
#         measures = jiwer.compute_measures(ref, hyp)

#         total_edits += measures["substitutions"] \
#                      + measures["deletions"] \
#                      + measures["insertions"]

#         total_ref_words += len(ref_words)

#     weighted_wer = 100.0 * total_edits / total_ref_words

#     return {
#         "wer": weighted_wer
#     }


In [None]:
#parameters
out_dir = os.path.join(ROOT_FOLDER, 'finetuned_model_files', 'Small_64_lr_4_10-4_correct_alpha')
batch_size = 16
gradient_accum = 1
epochs = 45
dataloader_num = 16


#save/eval strategy at every 3 epoch
step_size = int(train_dataset.shape[0]/(batch_size*gradient_accum) + 1)*3



In [None]:
training_args = Seq2SeqTrainingArguments(
    output_dir=out_dir,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    gradient_accumulation_steps= gradient_accum,
    learning_rate=4*10**-4,
    warmup_steps=300,
    bf16=False,
    fp16=True,
    num_train_epochs=epochs,
    logging_strategy='epoch',
    #save_strategy='epoch',
    predict_with_generate=True,
    generation_max_length=448,
    report_to=['tensorboard'],
    load_best_model_at_end=False,
    metric_for_best_model='wer',
    greater_is_better=False,
    dataloader_num_workers= dataloader_num,
    save_total_limit=10,
    lr_scheduler_type='constant',
    seed=42,
    data_seed=42,
    remove_unused_columns=False, # Add this line
    eval_strategy="steps",
    eval_steps=step_size,
    save_strategy='steps',
    save_steps=step_size

)

In [None]:
from peft import LoraConfig, get_peft_model
lora_config = LoraConfig(
    r=64,             # Rank
    lora_alpha=128,   # Scaling
    target_modules= ["q_proj","k_proj","v_proj","out_proj"],  # Attention projection modules (important for Whisper)
    lora_dropout=0.1,
    bias="none",
    # task_type="SEQ_2_SEQ_LM"
)

model = get_peft_model(model, lora_config)
model.print_trainable_parameters()   # Debug: shows % of params tuned


In [None]:
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=train_dataset,
    eval_dataset=val_dataset,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)

In [None]:
trainer.train()

In [None]:
trainer.train(
    resume_from_checkpoint=os.path.join(out_dir, "checkpoint-4650")
)


In [None]:
trainer.save_model(os.path.join(out_dir, "best_model"))

#to get latest checkpoint use , go to latest check_point
# To find the actual best checkpoint, you need to look at the trainer_state.json file generated in your output directory (or inside any checkpoint-XXX folder).
# Open output_dir/checkpoint-XXX/trainer_state.json.
