In [1]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
    print('Not connected to a GPU')
else:
    print(gpu_info)

Fri Jun  6 17:17:58 2025       
+-----------------------------------------------------------------------------------------+
| NVIDIA-SMI 576.28                 Driver Version: 576.28         CUDA Version: 12.9     |
|-----------------------------------------+------------------------+----------------------+
| GPU  Name                  Driver-Model | Bus-Id          Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |           Memory-Usage | GPU-Util  Compute M. |
|                                         |                        |               MIG M. |
|   0  NVIDIA GeForce RTX 3070 ...  WDDM  |   00000000:01:00.0  On |                  N/A |
| N/A   62C    P8             17W /   80W |     508MiB /   8192MiB |      2%      Default |
|                                         |                        |                  N/A |
+-----------------------------------------+------------------------+----------------------+
                                                

In [2]:
from datasets import load_dataset, Audio, concatenate_datasets, DatasetDict
import os

clips_dir = "D:/projects/SPEECH/cv-corpus-21.0-2025-03-14/fa/clips"
data_files = {
    "train": "D:/projects/SPEECH/cv-corpus-21.0-2025-03-14/fa/train.csv",
    "test": "D:/projects/SPEECH/cv-corpus-21.0-2025-03-14/fa/test.csv",
    "validation": "D:/projects/SPEECH/cv-corpus-21.0-2025-03-14/fa/dev.csv"
}

dataset = load_dataset("csv", data_files=data_files)


def add_full_path(example):
    example["path"] = os.path.join(clips_dir, example["path"])
    return example


dataset = dataset.map(add_full_path)

dataset = dataset.cast_column("path", Audio(sampling_rate=16000))

for split in dataset:
    dataset[split] = dataset[split].rename_columns({
        "path": "audio",
        "sentence": "text"
    })

for split in dataset:
    dataset[split] = dataset[split].remove_columns(
        [col for col in dataset[split].column_names if col not in ["audio", "text"]]
    )

train_dataset = concatenate_datasets([dataset["train"], dataset["validation"]])
test_dataset = dataset["test"]

common_voice = DatasetDict()

common_voice["train"] = train_dataset
common_voice["test"] = test_dataset

print(common_voice)



DatasetDict({
    train: Dataset({
        features: ['audio', 'text'],
        num_rows: 40397
    })
    test: Dataset({
        features: ['audio', 'text'],
        num_rows: 10668
    })
})


In [3]:
checkpoint_path = "whisper-small-persian-v1/checkpoint-7575"

In [4]:
from transformers import WhisperFeatureExtractor

feature_extractor = WhisperFeatureExtractor.from_pretrained(checkpoint_path)

In [5]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-small", language="Persian", task="transcribe")

In [6]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-small", language="Persian", task="transcribe")

In [7]:
from datasets import Audio

common_voice = common_voice.cast_column("audio", Audio(sampling_rate=16000))

In [8]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["audio"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["text"]).input_ids
    return batch

In [9]:
common_voice = common_voice.map(prepare_dataset, remove_columns=common_voice.column_names["train"])

In [10]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained(checkpoint_path)

In [11]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any
    decoder_start_token_id: int

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.decoder_start_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [12]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(
    processor=processor,
    decoder_start_token_id=model.config.decoder_start_token_id,
)

In [13]:
import evaluate

metric = evaluate.load("wer")

In [14]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [15]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-small-persian-v1",  # change to a repo name of your choice
    per_device_train_batch_size=8,
    gradient_accumulation_steps=2,  # increase by 2x for every 2x decrease in batch size
    # group_by_length=True,
    learning_rate=4e-5,
    warmup_steps=500,
    # max_steps=4000,
    num_train_epochs=3,
    gradient_checkpointing=True,
    fp16=True,
    # eval_strategy="steps",
    eval_strategy="no",
    save_strategy="steps",
    save_total_limit=1,
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    # generation_max_length=48,
    save_steps=1000,
    # eval_steps=1000,
    logging_steps=50,
    logging_dir="./logs",
    report_to=["tensorboard"],
    load_best_model_at_end=False,
    # load_best_model_at_end=True,
    # metric_for_best_model="wer",
    # greater_is_better=False,
    push_to_hub=False,
    resume_from_checkpoint='last-checkpoint',
)

In [19]:
from transformers import Seq2SeqTrainer, EarlyStoppingCallback

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=common_voice["train"],
    eval_dataset=common_voice["test"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
    # callbacks=[EarlyStoppingCallback(early_stopping_patience=3)]
)

  trainer = Seq2SeqTrainer(


In [None]:
# trainer.args.per_device_eval_batch_size = 8
trainer.evaluate()

`generation_config` default values have been modified to match model-specific defaults: {'suppress_tokens': [1, 2, 7, 8, 9, 10, 14, 25, 26, 27, 28, 29, 31, 58, 59, 60, 61, 62, 63, 90, 91, 92, 93, 359, 503, 522, 542, 873, 893, 902, 918, 922, 931, 1350, 1853, 1982, 2460, 2627, 3246, 3253, 3268, 3536, 3846, 3961, 4183, 4667, 6585, 6647, 7273, 9061, 9383, 10428, 10929, 11938, 12033, 12331, 12562, 13793, 14157, 14635, 15265, 15618, 16553, 16604, 18362, 18956, 20075, 21675, 22520, 26130, 26161, 26435, 28279, 29464, 31650, 32302, 32470, 36865, 42863, 47425, 49870, 50254, 50258, 50360, 50361, 50362], 'begin_suppress_tokens': [220, 50257]}. If this is not desired, please set these values explicitly.
The attention mask is not set and cannot be inferred from input because pad token is same as eos token. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
A custom logits processor of type <class 'transformers.generation.logit

{'eval_loss': 0.33234187960624695,
 'eval_model_preparation_time': 0.0,
 'eval_wer': 31.930087051142547,
 'eval_runtime': 4676.1764,
 'eval_samples_per_second': 2.281,
 'eval_steps_per_second': 0.285}

In [20]:
from huggingface_hub import notebook_login

notebook_login()

VBox(children=(HTML(value='<center> <img\nsrc=https://huggingface.co/front/assets/huggingface_logo-noborder.sv…

In [21]:
model.push_to_hub("aliyzd95/whisper-small-persian-v1")
processor.push_to_hub("aliyzd95/whisper-small-persian-v1")

README.md:   0%|          | 0.00/5.17k [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


CommitInfo(commit_url='https://huggingface.co/aliyzd95/whisper-small-persian-v1/commit/bc099dbff80b84dc2c178947ab686a9265413586', commit_message='Upload processor', commit_description='', oid='bc099dbff80b84dc2c178947ab686a9265413586', pr_url=None, repo_url=RepoUrl('https://huggingface.co/aliyzd95/whisper-small-persian-v1', endpoint='https://huggingface.co', repo_type='model', repo_id='aliyzd95/whisper-small-persian-v1'), pr_revision=None, pr_num=None)