In [None]:
# !pip install datasets==3.6.0

In [None]:
import logging
import torch
import warnings

logging.basicConfig(level=logging.INFO)
warnings.filterwarnings('ignore')
logging.getLogger("pyngrok").setLevel(logging.ERROR)
logging.getLogger("transformers").setLevel(logging.ERROR)
logging.getLogger("torch").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)

import warnings
warnings.filterwarnings('ignore')

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')
print(f"PyTorch version: {torch.__version__}")
if torch.cuda.is_available():
    print(f"CUDA device: {torch.cuda.get_device_name(0)}")

torch.manual_seed(42)

In [None]:
from transformers import (
    AutoProcessor,
    AutoTokenizer,
    AutoModelForMaskedLM,
    Trainer,
    Wav2Vec2Processor,
    Wav2Vec2ForCTC,
    TrainingArguments,
    DataCollatorWithPadding
    )
import torch
import librosa
import numpy as np
import re
from datasets import load_dataset, Audio, Dataset
import jiwer
import zipfile
from jiwer import wer,cer

In [None]:
from datasets import load_dataset

ds_cy = load_dataset("Elormiden/Thesaurus-Cypriot-Greek-Dialect")

In [None]:
##### Old model #######
old_tokenizer = AutoTokenizer.from_pretrained("google-bert/bert-base-multilingual-cased")
old_model = AutoModelForMaskedLM.from_pretrained("google-bert/bert-base-multilingual-cased")
#######################

checkpoint_path = '/content/cypriot_bert_checkpoint_2000.zip'

#### Loading fine tuned model from checkponts ####
# with zipfile.ZipFile(checkpoint_path, 'r') as zip_ref:
#     zip_ref.extractall('/content/checkpoint_2000')

tokenizer = AutoTokenizer.from_pretrained('/content/checkpoint_2000')
model = AutoModelForMaskedLM.from_pretrained('/content/checkpoint_2000')

In [None]:
# print(tokenizer)
# print(model)

In [None]:
train_cy = ds_cy['train']
val_cy = ds_cy['validation']

In [None]:
def compute_heavy_metrics(eval_pred):
    predictions, labels = eval_pred
    pred_ids = np.argmax(predictions, axis=-1)
    labels = np.where(labels != -100, labels, tokenizer.pad_token_id)

    pred_texts = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    label_texts = tokenizer.batch_decode(labels, skip_special_tokens=True)

    transformation = jiwer.Compose([
        jiwer.ToLowerCase(),
        jiwer.RemovePunctuation(),
        jiwer.RemoveMultipleSpaces(),
        jiwer.Strip()
    ])

    wer_scores = []
    cer_scores = []

    for pred, truth in zip(pred_texts, label_texts):
        norm_pred = transformation(pred)
        norm_truth = transformation(truth)

        if norm_truth.strip():
            wer_score = wer(norm_truth, norm_pred)
            cer_score = cer(norm_truth, norm_pred)
        else:
            wer_score = 1.0
            cer_score = 1.0

        wer_scores.append(wer_score)
        cer_scores.append(cer_score)

    return {
        "wer": np.mean(wer_scores),
        "cer": np.mean(cer_scores),
        "wer_std": np.std(wer_scores),
        "cer_std": np.std(cer_scores)
    }

In [None]:
from torch.nn.utils.rnn import pad_sequence
from tqdm import tqdm
import torch

"""
Converting text into tokens
"hello world" → [101, 1234, 5678, 102]
"""
def tokenize_multilingual_text(text):
    ids = tokenizer(text, max_length=256, truncation=True)['input_ids']
    return ids

"""
1. Creating of prompts:
input_text = f"correct cypriot to greek: {word} - {description}"
target_text = f"{greek_word} - {greek_desc}"
2. Tokenization of the whole batch
3. Converting PyTorch list into tensors
4. Texts has different length, make the equal padding
5. Converting 0 to -100, saying to a model not to count these positions
"""
def tokenize_text_pairs(batch):
    tokenized_input_texts = []
    tokenized_target_texts = []

    for word, description, greek_word, greek_desc in tqdm(
        zip(batch['word'], batch['description'], batch['greek_word'], batch['greek_description']),
        total=len(batch['word']), desc="Tokenizing batch"):

        input_text = f"correct cypriot to greek: {word} - {description}"
        target_text = f"{greek_word} - {greek_desc}"

        tokenized_input_texts.append(tokenize_multilingual_text(input_text))
        tokenized_target_texts.append(tokenize_multilingual_text(target_text))

    input_ids_tensors = [torch.tensor(ids, dtype=torch.long) for ids in tokenized_input_texts]
    labels_tensors = [torch.tensor(ids, dtype=torch.long) for ids in tokenized_target_texts]

    input_ids_padded = pad_sequence(input_ids_tensors, batch_first=True, padding_value=tokenizer.pad_token_id)
    labels_padded = pad_sequence(labels_tensors, batch_first=True, padding_value=-100)

    attention_mask = (input_ids_padded != tokenizer.pad_token_id).long()
    labels_padded[labels_padded == tokenizer.pad_token_id] = -100

    return {
        "input_ids": input_ids_padded,
        "attention_mask": attention_mask,
        "labels": labels_padded
    }

In [None]:
"""
Filling into the function
"""
train_cyprus_tokenized = tokenize_text_pairs(train_cy)
val_cyprus_tokenized = tokenize_text_pairs(val_cy)

In [None]:
"""
Converting dicts to Dataset HuggingFace format
"""
train_hf = Dataset.from_dict(train_cyprus_tokenized)
val_hf = Dataset.from_dict(val_cyprus_tokenized)

In [None]:
val_hf

In [None]:
"""
# Function does not work with this MLM, will explore another approach

1. Model goes to eval mode, gpu/cpu tracking device
2. Evaluation by batches, 4 batches at the time
3. Tokens -> Tensors
4. Getting predictions from a model
5. Decode to text
6. Metrics counter
"""

def calculate_wer_cer_separately(model, tokenizer, eval_dataset, batch_size=4, max_samples=50):
   model.eval()
   predictions = []
   references = []

   device = model.device

   print(f"Исходный размер датасета: {len(eval_dataset)}")
   eval_subset = eval_dataset.select(range(min(max_samples, len(eval_dataset))))
   print(f"Samples {len(eval_subset)} taken")
   print("-"*50)

   total_batches = len(eval_subset) // batch_size + (1 if len(eval_subset) % batch_size != 0 else 0)

   for i in tqdm(range(0, len(eval_subset), batch_size),
                 desc="Calculating WER/CER",
                 total=total_batches):
       batch = eval_subset[i:i+batch_size]

       input_ids = torch.stack([torch.tensor(x) for x in batch['input_ids']]).to(device)
       labels = torch.stack([torch.tensor(x) for x in batch['labels']]).to(device)
       attention_mask = torch.stack([torch.tensor(x) for x in batch['attention_mask']]).to(device)

       with torch.no_grad():
           outputs = model(input_ids=input_ids, attention_mask=attention_mask)
           pred_ids = torch.argmax(outputs.logits, dim=-1)

       for pred, label in zip(pred_ids, labels):
           label_clean = label[label != -100]
           pred_clean = pred[:len(label_clean)]

           pred_text = tokenizer.decode(pred_clean, skip_special_tokens=True)
           ref_text = tokenizer.decode(label_clean, skip_special_tokens=True)

           predictions.append(pred_text)
           references.append(ref_text)

   print("="*50)
   print("ОТЛАДОЧНАЯ ИНФОРМАЦИЯ")
   print("="*50)
   print(f"Обработано примеров: {len(predictions)}")

   for i in range(min(10, len(predictions))):
       print(f"\nПример {i+1}:")
       print(f"  Предсказание: '{predictions[i]}'")
       print(f"  Правильный:   '{references[i]}'")
       print(f"  Совпадение:   {predictions[i] == references[i]}")

   wer_score = jiwer.wer(references, predictions)
   cer_score = jiwer.cer(references, predictions)

   print("="*50)
   print(f"WER: {wer_score:.3f} ({wer_score*100:.1f}% неправильных слов)")
   print(f"CER: {cer_score:.3f} ({cer_score*100:.1f}% неправильных символов)")
   print("="*50)

   return {
       "wer": wer_score,
       "cer": cer_score,
       "predictions": predictions[:5],
       "references": references[:5]
   }

In [None]:
# results = calculate_wer_cer_separately(model, tokenizer, val_hf, batch_size=1, max_samples=50)
# old_results = calculate_wer_cer_separately(old_model, old_tokenizer, val_hf, batch_size=4)

In [None]:
############ 2000 Steps Fine-tuned model results ############
print("NEW MODEL")
print(f"WER: {results['wer']:.3f}")
print(f"CER: {results['cer']:.3f}")
#############################################################

############# Clean/Old model results ######################
# print("OLD MODEL:")
# print(f"WER: {old_results['wer']:.3f}")
# print(f"CER: {old_results['cer']:.3f}")
############################################################

In [None]:
# from transformers import TrainingArguments, Trainer

# eval_args = TrainingArguments(
#     output_dir="./evaluation_results",

#     per_device_eval_batch_size=1,

#     fp16=True,
#     dataloader_num_workers=0,
#     dataloader_pin_memory=False,

#     remove_unused_columns=True,
#     logging_steps=50,
#     report_to=[],

#     save_strategy="no",
#     eval_strategy="no",
# )

# evaluator = Trainer(
#     model=model,
#     args=eval_args,
#     eval_dataset=val_hf,
#     compute_metrics=compute_heavy_metrics,
#     tokenizer=tokenizer,
# )

# torch.cuda.empty_cache()

In [None]:
# eval_results = evaluator.evaluate()