In [None]:
!pip3 install huggingface_hub datasets transformers librosa evaluate jiwer accelerate bitsandbytes gradio loralib
!pip3 install git+https://github.com/huggingface/peft

# import modules

In [1]:
import os

In [2]:

model_name_or_path = "openai/whisper-large-v2"
language = "English"
language_abbr = "en"
task = "transcribe"
dataset_name = "Tarakeshwaran/Whisper-train-data"

# load data set

In [3]:
from datasets import load_dataset, DatasetDict

data_dict= DatasetDict()

data_dict["train"] = load_dataset(dataset_name, split="train")
data_dict["test"] = load_dataset(dataset_name, split="test")

print(data_dict)

  from .autonotebook import tqdm as notebook_tqdm


DatasetDict({
    train: Dataset({
        features: ['audio', 'text', 'start', 'end'],
        num_rows: 80
    })
    test: Dataset({
        features: ['audio', 'text', 'start', 'end'],
        num_rows: 20
    })
})


In [4]:
data_dict = data_dict.remove_columns(["start","end"])

In [5]:
data_dict

DatasetDict({
    train: Dataset({
        features: ['audio', 'text'],
        num_rows: 80
    })
    test: Dataset({
        features: ['audio', 'text'],
        num_rows: 20
    })
})

## Prepare Feature Extractor, Tokenizer and Data

In [6]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained(model_name_or_path)

In [7]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained(model_name_or_path, language=language, task=task)

In [8]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained(model_name_or_path, language=language, task=task)

In [9]:
print(data_dict["train"][0])

{'audio': {'path': 'sample-005811.mp3', 'array': array([-3.10192730e-25,  8.27180613e-25, -4.65289095e-24, ...,
       -1.07697597e-05,  2.15554501e-05, -2.88033916e-05]), 'sampling_rate': 16000}, 'text': "in alchemy it's called the soul of the world"}


In [10]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["text"]).input_ids
    return batch

In [11]:
data_dict = data_dict.map(prepare_dataset, remove_columns=data_dict.column_names["train"], num_proc=1)

In [12]:
data_dict["train"]

Dataset({
    features: ['input_features', 'labels'],
    num_rows: 80
})

## Training and Evaluation

### Define a Data Collator

In [13]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union


@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [14]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

### evaluate

In [15]:
import evaluate

metric = evaluate.load("wer")

In [16]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

#### load pretrained check point

In [17]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(model_name_or_path, load_in_8bit=True, device_map="auto")

The `load_in_4bit` and `load_in_8bit` arguments are deprecated and will be removed in the future versions. Please, pass a `BitsAndBytesConfig` object in `quantization_config` argument instead.


In [18]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

### Post-processing on the model

In [None]:

# Freeze all model parameters
for param in model.parameters():
    param.requires_grad = False

# Cast all LayerNorm layers to float32
def cast_layernorm_to_float32(model):
    for name, module in model.named_modules():
        if isinstance(module, torch.nn.LayerNorm):
            module.float()  # Ensure LayerNorm operates in float32

cast_layernorm_to_float32(model)

# Wrap the model to cast output to float32
class Float32OutputModel(torch.nn.Module):
    def __init__(self, model):
        super().__init__()
        self.model = model

    def forward(self, *args, **kwargs):
        output = self.model(*args, **kwargs)
        # Cast logits or outputs to float32
        if isinstance(output, tuple):
            return tuple(o.float() if torch.is_floating_point(o) else o for o in output)
        return output.float()

    def __getattr__(self, name):
        # Delegate attribute access to the underlying model
        return getattr(self.model, name)

# Apply the wrapper
model = Float32OutputModel(model)

NameError: name 'nn' is not defined

### Apply LoRA

In [25]:
from peft import LoraConfig, PeftModel, LoraModel, LoraConfig, get_peft_model

config = LoraConfig(r=32, lora_alpha=64, target_modules=["q_proj", "v_proj"], lora_dropout=0.05, bias="none")

model = get_peft_model(model, config)
model.print_trainable_parameters()

trainable params: 15,728,640 || all params: 1,559,033,600 || trainable%: 1.0089


### define training configuration

In [26]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="temp",  # change to a repo name of your choice
    per_device_train_batch_size=8,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=1e-3,
    warmup_steps=50,
    num_train_epochs=3,
    evaluation_strategy="epoch",
    fp16=True,
    per_device_eval_batch_size=8,
    generation_max_length=128,
    logging_steps=25,
    remove_unused_columns=False,  # required as the PeftModel forward doesn't have the signature of the wrapped model's forward
    label_names=["labels"],  # same reason as above
)



In [28]:
from transformers import Seq2SeqTrainer

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=data_dict["train"],
    eval_dataset=data_dict["test"],
    data_collator=data_collator,
    # compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
)
model.config.use_cache = False  # silence the warnings. Please re-enable for inference!

  trainer = Seq2SeqTrainer(


AttributeError: 'Float32OutputModel' object has no attribute 'config'