In [None]:
pip -q install evaluate==0.3.0 jiwer==2.5.1

In [None]:
# login wandb
import wandb
wandb.login(key = '')

In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"]="0"

import torch
from transformers import WhisperProcessor, WhisperForConditionalGeneration
from transformers import TrainingArguments, Trainer

import datasets
from datasets import load_dataset, DatasetDict
from dataclasses import dataclass

from typing import Dict, List, Optional, Union
import numpy as np
import evaluate

wer_metric = evaluate.load("wer")
bleu_metric = evaluate.load("bleu")
cer_metric = evaluate.load("cer")

2024-05-30 07:35:28.583485: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-30 07:35:28.583557: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-30 07:35:28.585167: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


# Build data

In [2]:
@dataclass
class DataCollatorWhisperCTCEncoder:
    processor: WhisperProcessor
    padding: Union[bool, str] = True
    max_length: Optional[int] = None
    truncation: Optional[bool] = True
    max_length_labels: Optional[int] = None
    pad_to_multiple_of: Optional[int] = None
    pad_to_multiple_of_labels: Optional[int] = None
    
    def __call__(self, features: List[Dict[str, np.ndarray]]) -> Dict[str, torch.Tensor]:
        batch_audio = []
        batch_label = []

        batch_size = len(features)

        for batch_idx in range(batch_size):
            batch_audio.append(features[batch_idx]['input_features'])
            batch_label.append(features[batch_idx]['label'])

        data = list(zip(batch_audio, batch_label))
        # random.shuffle(data)
        
        batch_audio = [item[0] for item in data]
        batch_label = [item[1] for item in data]

        batch = self.processor.feature_extractor(
            batch_audio,
            truncation=self.truncation,
            sampling_rate = 16000,
            pad_to_multiple_of=self.pad_to_multiple_of,
            return_tensors="pt",
        )
        
        batch_label_id = [self.processor.tokenizer(item, truncation=True, max_length=448)['input_ids'] for item in batch_label]
        label_features = [{"input_ids": np.asarray(item)} for item in batch_label_id]
        labels_batch = self.processor.tokenizer.pad(label_features, return_tensors="pt")
        labels = labels_batch["input_ids"].masked_fill(labels_batch.attention_mask.ne(1), -100)

        batch["labels"] = labels
        return batch

In [3]:
youtube = DatasetDict()
youtube["test"] = load_dataset('linhtran92/viet_youtube_asr_corpus_v2', split="test")

Resolving data files:   0%|          | 0/33 [00:00<?, ?it/s]

In [4]:
def prepare_dataset(examples):
    examples['label'] = examples["transcription"]
    examples["input_features"] = examples["audio"]['array']
    return examples

all_dataset_test_vectorized = youtube["test"].map(
    prepare_dataset,
    num_proc=8,
    remove_columns=youtube["test"].column_names,
) 

In [5]:
# Using to split test set if OOM
# num_rows = len(all_dataset_test_vectorized)
# batch_size = 700
# num_splits = num_rows // batch_size + (1 if num_rows % batch_size != 0 else 0)
# splits = []

# for i in range(num_splits):
#     start_idx = i * batch_size
#     end_idx = start_idx + batch_size
#     split = all_dataset_test_vectorized.select(range(start_idx, min(end_idx, num_rows)))
#     splits.append(split)

# split_dataset_dict = DatasetDict({f'split_{i}': splits[i] for i in range(num_splits)})

# Reload model

In [6]:
model = WhisperForConditionalGeneration.from_pretrained("/kaggle/input/whisper-utube-5ep/model_final").to('cuda') # vinai/PhoWhisper-base
processor = WhisperProcessor.from_pretrained("openai/whisper-base", language="vi", task="transcribe") # vinai/PhoWhisper-base
processor.tokenizer.pad_token = processor.tokenizer.eos_token
processor.tokenizer.max_length = 448
processor.tokenizer.set_prefix_tokens(language="vi", task="transcribe")


model.config.forced_decoder_ids = processor.tokenizer.get_decoder_prompt_ids(
    language="vi", task="transcribe"
)
model.config.suppress_tokens = []
model.generation_config.forced_decoder_ids = processor.tokenizer.get_decoder_prompt_ids(
    language="vi", task="transcribe"
)
model.generation_config.suppress_tokens = []

Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.


# Evaluate setting

In [7]:
batch_size = 1
eval_accumulation_steps=100

# total_steps = (total_samples / batch_size) * num_epochs
training_args = TrainingArguments(
        output_dir='/kaggle/working/',
        logging_dir='/kaggle/working/',
        per_device_eval_batch_size=batch_size,
        eval_accumulation_steps=eval_accumulation_steps,
        metric_for_best_model='wer',
        greater_is_better=False,
        fp16=True,
        dataloader_num_workers=2,
        ignore_data_skip=True,
        label_names=["labels"],
    ) 

data_collator = DataCollatorWhisperCTCEncoder(
    processor=processor, 
)

In [8]:
def compute_wer(eval_prediction):
    pred_ids = eval_prediction.predictions[0] # shape (total eval sample, max_length, vocab size)
    label_ids = eval_prediction.label_ids  # shape (total eval sample, max_length)

    pred_ids = np.argmax(pred_ids, axis=-1) # -> to (total eval sample, max_length)

    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

    pred_str = processor.tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    pred_text = [s.replace('!', '') for s in pred_str]
    label_text = processor.tokenizer.batch_decode(label_ids, skip_special_tokens=True)

    wer = 100 * wer_metric.compute(predictions=pred_text, references=label_text)
    cer = 100 * cer_metric.compute(predictions=pred_text, references=label_text)
    bleu = 100 * bleu_metric.compute(predictions=pred_text, references=label_text)['bleu']
    return { 'wer': wer,
             'cer': cer,
             'bleu': bleu}

In [None]:
import numpy as np

# Tinh so loi tu
def calculate_wer(ref_words, hyp_words):
    d = np.zeros((len(ref_words) + 1, len(hyp_words) + 1), dtype=np.uint8)
    for i in range(1, len(ref_words) + 1):
        d[i][0] = i
    for j in range(1, len(hyp_words) + 1):
        d[0][j] = j
    
    for i in range(1, len(ref_words) + 1):
        for j in range(1, len(hyp_words) + 1):
            if ref_words[i-1] == hyp_words[j-1]:
                cost = 0
            else:
                cost = 1
            d[i][j] = min(d[i-1][j] + 1,      # deletion
                          d[i][j-1] + 1,      # insertion
                          d[i-1][j-1] + cost) # substitution
    
    i = len(ref_words)
    j = len(hyp_words)
    substitutions = 0
    deletions = 0
    insertions = 0
    
    while i > 0 and j > 0:
        if ref_words[i-1] == hyp_words[j-1]:
            i -= 1
            j -= 1
        elif d[i][j] == d[i-1][j-1] + 1:
            substitutions += 1
            i -= 1
            j -= 1
        elif d[i][j] == d[i-1][j] + 1:
            deletions += 1
            i -= 1
        elif d[i][j] == d[i][j-1] + 1:
            insertions += 1
            j -= 1
    
    deletions += i
    insertions += j
    
    return substitutions, deletions, insertions

def calculate_wer_for_lists(reference_list, hypothesis_list):
    total_substitutions = 0
    total_deletions = 0
    total_insertions = 0
    
    for reference, hypothesis in zip(reference_list, hypothesis_list):
        ref_words = reference.split()
        hyp_words = hypothesis.split()
        substitutions, deletions, insertions = calculate_wer(ref_words, hyp_words)
        
        total_substitutions += substitutions
        total_deletions += deletions
        total_insertions += insertions
    
    return total_substitutions, total_deletions, total_insertions

# dem so tu
def count_words(strings):
    total_words = 0
    for string in strings:
        words = string.split()
        total_words += len(words)
    return total_words

# Evaluate

In [9]:
trainer = Trainer(
        model=model,
        data_collator=data_collator,
        args=training_args,
        compute_metrics=compute_wer,
    )


trainer.evaluate(eval_dataset=all_dataset_test_vectorized)
# pred = trainer.predict(all_dataset_test_vectorized)

dataloader_config = DataLoaderConfiguration(dispatch_batches=None, split_batches=False, even_batches=True, use_seedable_sampler=True)


In [None]:
# for trainer.predict
pred_ids = pred.predictions[0]
label_ids = pred.label_ids

pred_ids = np.argmax(pred_ids, axis=-1)
label_ids[label_ids == -100] = processor.tokenizer.pad_token_id

pred_str = processor.tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
pred_text = [s.replace('!', '') for s in pred_str]
label_text = processor.tokenizer.batch_decode(label_ids, skip_special_tokens=True)

total_substitutions, total_deletions, total_insertions = calculate_wer_for_lists(label_text, pred_text)

print(f"Tổng số lỗi thay thế (S): {total_substitutions}")
print(f"Tổng số lỗi xóa (D): {total_deletions}")
print(f"Tổng số lỗi chèn thêm (I): {total_insertions}")
print(f"Tổng số từ (N): {count_words(label_text)}")

# DEMO

In [None]:
processor = WhisperProcessor.from_pretrained("vinai/PhoWhisper-base", language="vi", task="transcribe") # vinai/PhoWhisper-base
processor.tokenizer.pad_token = processor.tokenizer.eos_token # openai/whisper-base
processor.tokenizer.max_length = 448

processor.tokenizer.set_prefix_tokens(language="vi", task="transcribe")

forced_decoder_ids = processor.get_decoder_prompt_ids(language="vi", task="transcribe")

In [None]:
import librosa
input_speech, rate = librosa.load('/kaggle/input/youtube-test2/youtube_5.wav', sr=16000)

input_features = processor(input_speech, sampling_rate=rate, return_tensors="pt").input_features

In [None]:
model_trained = WhisperForConditionalGeneration.from_pretrained('vinai/PhoWhisper-base')
predicted_ids_model_trained = model_trained.generate(input_features, forced_decoder_ids=forced_decoder_ids)
pho = processor.batch_decode(predicted_ids_model_trained, skip_special_tokens=True)

In [None]:
print("True: chị vào viện tất cả với chị đều xúc động")
print('pho_fine:', pho_fine)
print('pho:', pho)
print('whisper_fine:', whisper_fine)
print('whisper:', whisper)