# Importing necessary libraries

In [None]:
!nvidia-smi
!nvcc --version


In [None]:
#if you see this it worked!!!

!add-apt-repository -y ppa:ubuntuhandbook1/ffmpeg8
!apt-get update -y
!apt-get install -y ffmpeg

In [None]:
!ffmpeg -version

In [None]:
!pip uninstall -y torch torchvision torchaudio transformers peft datasets
!pip cache purge


In [None]:
!pip uninstall -y torch torchvision torchaudio
!pip install torch==2.8.0 torchvision==0.23.0 torchaudio==2.8.0


In [None]:
pip install --upgrade transformers==4.55.4 peft==0.17.1 "datasets[audio]>=2.21.0" accelerate evaluate jiwer tensorboard


In [None]:
#NOTE: WHATEVER HAPPENS, MAKE SURE datasets IS NOT 4.0.0

!pip uninstall -y datasets torchcodec soundfile
!pip install "datasets[audio]==2.21.0" --no-cache-dir


In [None]:
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from peft import prepare_model_for_kbit_training

from datasets import load_dataset

import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

import evaluate

from transformers import Seq2SeqTrainingArguments

from transformers import Seq2SeqTrainer

# loading dataset

In [None]:
from kaggle_secrets import UserSecretsClient

# Get the value of the secret you created
HF_TOKEN = UserSecretsClient().get_secret("HF_TOKEN")

# Now you can use this token for authentication
from huggingface_hub import login
login(token=HF_TOKEN)

# You can now access gated models or push models to the Hugging Face Hub

In [None]:
from datasets import Audio

# Load the dataset, specifying the 'test' split since it's the only one available
# Use 'token=True' to automatically use the token you've logged in with
# If you didn't run 'huggingface-cli login', you can pass your token here: token="hf_..."
svarah_dataset = load_dataset("ai4bharat/Svarah", split="test", token=True)

svarah_dataset = svarah_dataset.cast_column('audio_filepath', Audio(decode=True))

# Split the dataset into a training and validation set
# We'll use 15% of the data for validation and the remaining 85% for training
# Setting seed for reproducibility
split_dataset = svarah_dataset.train_test_split(test_size=0.15, seed=42)

# Access the new splits
train_dataset = split_dataset['train']
validation_dataset = split_dataset['test']  # The 'test_size' data is named 'test' by this method

# Print the sizes to confirm the split
print(f"Total dataset size: {len(svarah_dataset)}")
print(f"Training set size: {len(train_dataset)}")
print(f"Validation set size: {len(validation_dataset)}")

In [None]:
#this is to test and check if array is being returned or not

import numpy as np

print(svarah_dataset.column_names)
print(svarah_dataset[0])

train_dataset = train_dataset.cast_column("audio_filepath", Audio(decode=True))

# now try accessing a row
sample = train_dataset[0]["audio_filepath"]

print(type(sample))   # should show ['path', 'array', 'sampling_rate']
print(sample["array"], sample["sampling_rate"])

# Loading Whisper model and processor

In [None]:
# load model and processor
processor = WhisperProcessor.from_pretrained("openai/whisper-small.en")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-small.en")

model = prepare_model_for_kbit_training(model)

# Applying LORA on model

In [None]:
from peft import LoraConfig, get_peft_model

lora_config = LoraConfig(
    r=32,
    lora_alpha=64,
    target_modules=["q_proj", "v_proj"],
    lora_dropout=0.05,
    bias="none",
)

# Apply the PEFT configuration to the model
model = get_peft_model(model, lora_config)

# Print the number of trainable parameters
model.print_trainable_parameters()

# Preprocessing of dataset

In [None]:
import torch

def prepare_dataset(batch):
    audio = batch["audio_filepath"]  # Access underlying file path if available
    processed_output = processor(
        audio=audio['array'],
        sampling_rate=audio['sampling_rate'],
        text=batch["text"],
    )
    batch["input_features"] = processed_output.input_features[0]
    batch["labels"] = processed_output.labels[0]
    batch["attention_mask"] = processed_output.attention_mask[0]
    return batch


# Run preprocessing (only on small test subset first, then full)
tds = train_dataset.map(prepare_dataset, remove_columns=train_dataset.column_names, num_proc=4)
vds = validation_dataset.map(prepare_dataset, remove_columns=validation_dataset.column_names, num_proc=4)

In [None]:
tds  #just to see how it is

# Data collator

In [None]:
@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if labels.dim() > 1 and (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]
        elif labels.dim() == 1 and labels[0] == self.decoder_start_token_id:
            labels = labels[1:].unsqueeze(0)


        batch["labels"] = labels

        return batch


In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)


# Evaluation

In [None]:
metric = evaluate.load("wer")

def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}


# Creating Trainer

In [None]:
training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-small-en",
    per_device_train_batch_size=4,
    gradient_accumulation_steps=4,
    learning_rate=1e-6,
    max_steps=10000,              # 👈 preferred over num_train_epochs
    logging_steps=100,
    save_steps=500,
    eval_steps=500,
    do_train=True,
    do_eval=True,                 # old-style eval trigger
    predict_with_generate=True,
    save_total_limit=2,
    report_to="none", #this is for disabling wandB
    fp16=True
)


In [None]:
trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=tds,
    eval_dataset=vds,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)


# Training model

In [None]:
import torch
print(torch.cuda.device_count())


In [None]:
# import os
# os.environ["WANDB_DISABLED"] = "true"  #wandB is used for checking metrics and all, which we don't necessarily need here, without an api key you can't run this

# trainer.train()

In [None]:
# trainer.save_model("./whisper-small-en-final")
# tokenizer.save_pretrained("./whisper-small-en-final")


In [None]:
# !zip -r whisper-small-en-final.zip ./whisper-small-en-final