# Fine-Tune Whisper For Multilingual ASR with 🤗 Transformers

In [None]:
Here we check the GPU:

In [None]:
gpu_info = !nvidia-smi
gpu_info = '\n'.join(gpu_info)
if gpu_info.find('failed') >= 0:
  print('Not connected to a GPU')
else:
  print(gpu_info)

In [None]:
!pip install datasets>=2.6.1
!pip install git+https://github.com/huggingface/transformers
!pip install librosa
!pip install evaluate>=0.30
!pip install jiwer
!pip install gradio
!pip install accelerate -U
!pip install ipywidgets
!pip install tensorboardX

In [None]:
from huggingface_hub import notebook_login

notebook_login()

In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "..."  # or "0,1" for multiple GPUs

## Load Dataset

In [None]:
# from datasets import load_dataset, DatasetDict
# import numpy as np
# from datasets import load_dataset, DatasetDict
# import pandas as pd
# import json


# dataset = DatasetDict()
# train = load_dataset('csv', data_files={'test':"/path/", },)
#                                    # data_dir="/path/")


# eval = load_dataset('csv', data_files={'test':"/path/", },)
#                                    # data_dir="/path/")

In [None]:
# import numpy as np
# from sklearn.model_selection import KFold
# from datasets import load_dataset, DatasetDict
# import pandas as pd

# # Load your dataset
# trainset = DatasetDict()
# train = load_dataset('csv', data_files={'test': "/path/"})
# trainset['train'] = train["test"]

# # Assuming 'label_column' is the column containing class labels
# label_column = 'Sentence'

# # Access the dataset using the key and convert it to a DataFrame
# train_df = pd.DataFrame(trainset['train'])

# # Get the labels
# labels = train_df[label_column]


# # Create StratifiedKFold object
# n_splits = 10  # or any other number of splits you want
# kfold = KFold(n_splits=n_splits, shuffle=True, random_state=42)

# # Now make your splits based on the labels
# splits = list(kfold.split(np.zeros(len(labels)), labels))

# # Finally, do what you want with it
# # In this case, I'm overriding the train/val/test

# df = pd.DataFrame(splits)
# df.to_json("/path/file.json", index=False)

In [None]:
# import numpy as np
# from datasets import load_dataset, DatasetDict
# import pandas as pd
# import json

# trainset = DatasetDict()
# file = open("", index=False)")
# spl = json.load(file)
# train = load_dataset('csv', data_files={'test': "/path/file.csv"})

# trainset['train'] = train["test"]

# dataset = DatasetDict()
# test_set = trainset["train"].select(spl['1']['9']) #first [] if 0 train and if 1 test, the second [] goes from 0 to 9 for the 10 splits
# dataset["train"] = trainset["train"].select(spl['0']['9'])

# test_df = pd.DataFrame(test_set)
# train_df = pd.DataFrame(dataset["train"])
# # print(test_df[test_df['Path']])
# print(train_df[train_df["Path"].isin(test_df["Path"])])
# # print(dataset["train"])
# print(test_df[test_df["Path"].isin(train_df["Path"])])

In [None]:
# import pandas as pd
# train_dev = pd.DataFrame(dataset["train"])
# dev_df = train_dev.sample(frac=0.1111)
# index_names = dev_df.index
# dataset["dev"] = dataset["train"].select(index_names)
# dev_set = pd.DataFrame(dataset["dev"])
# dev_set.to_csv("/path/file.csv", index=False)
# train_df = train_dev.drop(index_names, inplace = False)
# index_names = train_df.index
# dataset["train"] = dataset["train"].select(index_names)
# tra_set = pd.DataFrame(dataset["train"])
# tra_set.to_csv("/path/file.csv", index=False)

In [None]:
from datasets import load_dataset, DatasetDict

dataset = DatasetDict()

train = load_dataset('csv', data_files={'test': "/path/file.csv"})
dev = load_dataset('csv', data_files={'dev': "/path/file.csv"})

dataset["train"] = train["test"]
dataset["dev"] = dev["dev"]
# print(dataset)

In [None]:
# test_df = pd.DataFrame(test_set)
# train_df = pd.DataFrame(dataset["train"])
# print(test_df[test_df["Path"].isin(train_df["Path"])])
# print(train_df[train_df["Path"].isin(test_df["Path"])])

In [None]:
# np1 = np.array(spl['1']['9'])
# np2 = np.array(spl['0']['9'])

# any(np.isin(np2,np1))

In [None]:
print(dataset)
# print(test_set)

In [None]:
dataset = dataset.remove_columns(
    ["Unnamed: 0","Root", "DialectRegion", "Age", "SpeakerID", "Group", "Gender", "Duration (seconds)", "Duration (days)", "CEF", "Component"])

In [None]:
import re
chars_to_remove_regex = '[\,\?\.\!\-\;\:\"\“\%\‘\”\�\']'

def remove_special_characters(batch):
    batch["Sentence"] = re.sub(chars_to_remove_regex, '', batch["Sentence"]).lower()
    return batch

In [None]:
dataset['train'] = dataset['train'].map(remove_special_characters)
dataset['dev'] = dataset['dev'].map(remove_special_characters)
# test_set = test_set.map(remove_special_characters)

In [None]:
from transformers import WhisperFeatureExtractor


feature_extractor = WhisperFeatureExtractor.from_pretrained("openai/whisper-large-v2")

In [None]:
from transformers import WhisperTokenizer

tokenizer = WhisperTokenizer.from_pretrained("openai/whisper-large-v2", language="Dutch", task="transcribe")

In [None]:
from transformers import WhisperProcessor

processor = WhisperProcessor.from_pretrained("openai/whisper-large-v2", language="Dutch", task="transcribe")

In [None]:
print(dataset["train"][2])

In [None]:
from datasets import Audio

dataset = dataset.cast_column("Path", Audio(sampling_rate=16000))

In [None]:
print(dataset["train"][0])

In [None]:
def prepare_dataset(batch):
    # load and resample audio data from 48 to 16kHz
    audio = batch["Path"]

    # compute log-Mel input features from input audio array
    batch["input_features"] = feature_extractor(audio["array"], sampling_rate=audio["sampling_rate"]).input_features[0]

    # encode target text to label ids
    batch["labels"] = tokenizer(batch["Sentence"]).input_ids
    return batch

In [None]:
dataset = dataset.map(prepare_dataset, remove_columns=dataset.column_names["train"], num_proc=5)

In [None]:
repo_name = "whisper-9-dutch"

In [None]:
tokenizer.push_to_hub(repo_name)

In [None]:
import torch

from dataclasses import dataclass
from typing import Any, Dict, List, Union

@dataclass
class DataCollatorSpeechSeq2SeqWithPadding:
    processor: Any

    def __call__(self, features: List[Dict[str, Union[List[int], torch.Tensor]]]) -> Dict[str, torch.Tensor]:
        # split inputs and labels since they have to be of different lengths and need different padding methods
        # first treat the audio inputs by simply returning torch tensors
        input_features = [{"input_features": feature["input_features"]} for feature in features]
        batch = self.processor.feature_extractor.pad(input_features, return_tensors="pt")

        # get the tokenized label sequences
        label_features = [{"input_ids": feature["labels"]} for feature in features]
        # pad the labels to max length
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")

        # replace padding with -100 to ignore loss correctly
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        # if bos token is appended in previous tokenization step,
        # cut bos token here as it's append later anyways
        if (labels[:, 0] == self.processor.tokenizer.bos_token_id).all().cpu().item():
            labels = labels[:, 1:]

        batch["labels"] = labels

        return batch

In [None]:
data_collator = DataCollatorSpeechSeq2SeqWithPadding(processor=processor)

In [None]:
import evaluate

metric = evaluate.load("wer")

In [None]:
def compute_metrics(pred):
    pred_ids = pred.predictions
    label_ids = pred.label_ids

    # replace -100 with the pad_token_id
    label_ids[label_ids == -100] = tokenizer.pad_token_id

    # we do not want to group tokens when computing the metrics
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_str = tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * metric.compute(predictions=pred_str, references=label_str)

    return {"wer": wer}

In [None]:
import torch

# Set random seed for PyTorch
seed_value = 42
torch.manual_seed(seed_value)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(seed_value)

In [None]:
from transformers import WhisperForConditionalGeneration

model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-large-v2")

In [None]:
model.config.forced_decoder_ids = None
model.config.suppress_tokens = []

In [None]:
from transformers import Seq2SeqTrainingArguments

training_args = Seq2SeqTrainingArguments(
    output_dir="./whisper-9-dutch",  # change to a repo name of your choice
    per_device_train_batch_size=12,
    gradient_accumulation_steps=1,  # increase by 2x for every 2x decrease in batch size
    learning_rate=3e-05,
    warmup_steps=20,
    num_train_epochs=5,
    gradient_checkpointing=True,
    bf16=True,
    evaluation_strategy="steps",
    per_device_eval_batch_size=8,
    predict_with_generate=True,
    generation_max_length=225,
    save_total_limit = 2,
    save_steps=30,
    eval_steps=30,
    logging_steps=30,
    report_to=["tensorboard"],
    load_best_model_at_end=True,
    metric_for_best_model="wer",
    greater_is_better=False,
    push_to_hub=True,
)

In [None]:
from transformers import Seq2SeqTrainer, EarlyStoppingCallback

trainer = Seq2SeqTrainer(
    args=training_args,
    model=model,
    train_dataset=dataset["train"],
    eval_dataset=dataset["dev"],
    data_collator=data_collator,
    compute_metrics=compute_metrics,
    tokenizer=processor.feature_extractor,
    # callbacks = [EarlyStoppingCallback(early_stopping_patience=2, early_stopping_threshold=0.01)],
)

In [None]:
processor.save_pretrained(training_args.output_dir)

In [None]:
trainer.random_seed = seed_value 

In [None]:
# from numba import cuda 
# device = cuda.get_current_device()
# device.reset()

In [None]:
trainer.train()
# trainer.train(resume_from_checkpoint=True)

In [None]:
kwargs = {
    # "dataset_tags": "Jasmin-CGN",
    # "dataset": "Group 5: native adults above 65",  # a 'pretty' name for the training dataset
    # "dataset_args": "config: hi, split: test",
    "language": "nl",
    "model_name": "Whisper Large V2",  # a 'pretty' name for our model
    "finetuned_from": "openai/whisper-large-v2",
    "tasks": "automatic-speech-recognition",
    # "tags": "hf-asr-leaderboard",
}

In [None]:
trainer.push_to_hub(**kwargs)

In [None]:
# # # Download a static FFmpeg build and add it to PATH.
# !curl https://johnvansickle.com/ffmpeg/releases/ffmpeg-release-amd64-static.tar.xz -o ffmpeg.tar.xz \
#  && tar -xf ffmpeg.tar.xz && rm ffmpeg.tar.xz
# ffmdir = !find . -iname ffmpeg-*-static
# path = %env PATH
# path = path + ':' + ffmdir[0]
# %env PATH $path

# print('')
# !which ffmpeg
# print('Done!')

In [None]:
# from transformers import pipeline
# from datasets import load_dataset, load_metric, Audio, ClassLabel, load_from_disk, Features, Value
# import evaluate

# # IMPORTANT: for openai/whisper models: You can change the "language" attribute to transcribe in a different language.
# # If "language" is not mentioned, Whisper will translate the audio to English by default (or transcribe to English if the audio
# # is in English)
# transcriber = pipeline("automatic-speech-recognition", model='modddddddel/whisper-native-elderly-9-dutch', device=0,
#                       generate_kwargs = {"language":"<|nl|>","task": "transcribe"},)

# wer = evaluate.load("wer")
# labels = []
# preds = []
# i = 0
# print(dataset)
# for recording in test_set:
#     label = labels.append(recording['Sentence'])
#     pred = preds.append(transcriber(recording['Path'])['text'])
#     i += 1
#     print(str(i) + '/' + str(len(test_set)))
#     print('predicted: ' + preds[i-1])
#     print('actual: ' + labels[i-1])

# # Recommended: save the results in a CSV file to use later for comparison
# # (to avoid having to run the model(s) again)
# df = pd.DataFrame({'reference': labels, 'hypothesis': preds})
# df.to_csv('/path/file.csv')

# print( 100 * wer.compute(predictions=preds, references=labels))

In [None]:
# import pandas as pd

# # Load the CSV file into a DataFrame
# file_path = "/path/file.csv"
# df = pd.read_csv(file_path)

# # Apply the regex operation to the 'hypothesis' column
# df['hypothesis'] = df['hypothesis'].replace(r'�\s*', '', regex=True)

# # Save the modified DataFrame to a new CSV file
# output_file_path = "/path/file.csv"
# df.to_csv(output_file_path, index=False)


In [None]:
# import jiwer
# import pandas as pd

# df = pd.read_csv('/path/file.csv')

# out = jiwer.process_words(
#     df['reference'].values.tolist(),
#     df['hypothesis'].values.tolist(),
# )

# print(jiwer.visualize_alignment(out))