In [None]:
from typing import List, Tuple
import pickle
import os
import time

import pandas as pd
import numpy as np
from sklearn.metrics import classification_report, f1_score, accuracy_score
import torch
from torch.utils.data import DataLoader
from transformers import AutoModel, AutoTokenizer, AutoModelForTokenClassification, Trainer, TrainingArguments
import evaluate
metric = evaluate.load("seqeval")

import sys
sys.path.insert(0, 'D:\\progamming\\va\\truecase\\ru-punctuation-truecase\\src')
from process_text import clean_text, clean_text_3times

# ========== Data global variables ==========
PATH_TO_DATA = "../data"

# ========== Model global variables ==========
MODEL_NAME = "DeepPavlov/rubert-base-cased-conversational"
# "DeepPavlov/rubert-base-cased-conversational" -> rubert-base-cased-conversational
SHORT_MODEL_NAME = MODEL_NAME.split('/')[1] if '/' in MODEL_NAME else MODEL_NAME
MODEL_MAX_LENGTH = 512

In [None]:
def encode_tags(tags, tag2id, encodings):
    labels = [[tag2id[tag] for tag in doc] for doc in tags]
    encoded_labels = []
    for doc_labels, doc_offset in zip(labels, encodings.offset_mapping):
        # create an empty array of -100
        doc_enc_labels = np.ones(len(doc_offset),dtype=int) * -100
        arr_offset = np.array(doc_offset)

        # set labels whose first offset position is 0 and the second is not 0
        doc_enc_labels[(arr_offset[:,0] == 0) & (arr_offset[:,1] != 0)] = doc_labels
        encoded_labels.append(doc_enc_labels.tolist())

    return encoded_labels


def compute_metrics(eval_preds, label_names):
    logits, labels = eval_preds
    predictions = np.argmax(logits, axis=-1)

    # Remove ignored index (special tokens) and convert to labels
    true_labels = [[label_names[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [label_names[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    all_metrics = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": all_metrics["overall_precision"],
        "recall": all_metrics["overall_recall"],
        "f1": all_metrics["overall_f1"],
        "accuracy": all_metrics["overall_accuracy"],
    }

In [None]:
def split_texts_into_words(texts: List[str]):
    texts_words = []
    for text in texts:
        texts_words.append(text.split(' '))
    return texts_words


def tokenize_texts(texts_words: List[List[str]], tokenizer):
    inputs = tokenizer(texts_words, is_split_into_words=True, padding=True, truncation=True, return_tensors="pt")
    return inputs


def predict_token_classification(inputs, token_classification_model):
    with torch.no_grad():
        logits = token_classification_model(**inputs).logits

    predictions = torch.argmax(logits, dim=2)
    return predictions


def restore_capitalization(texts: List[str], tokenizer, token_classification_model) -> List[str]:
    texts_words = split_texts_into_words(texts)
    inputs = tokenize_texts(texts_words, tokenizer)
    predictions = predict_token_classification(inputs, token_classification_model)

    truecase_texts = []
    for text, model_input, predict in zip(texts_words, inputs.encodings, predictions):
        predicted_token_class = [model.config.id2label[t.item()] for t in predict]
        
        word_class = {}
        for word_id, token_class in zip(model_input.word_ids, predicted_token_class):
            if (word_id != None) and (not word_id in word_class):
                word_class[word_id] = token_class
        
        truecase_words = []
        for i, word in enumerate(text):
            is_upper = word_class[i] == 'U'
            if is_upper:
                truecase_word = word.capitalize()
            else:
                truecase_word = word
            
            truecase_words.append(truecase_word)

        truecase_texts.append(' '.join(truecase_words))
    return truecase_texts

In [None]:
test_queries = [
    ("меня зовут сергей а как тебя", "Меня зовут Сергей а как тебя"),
    ("подскажи пожалуйста сегодня вторник или среда", "Подскажи пожалуйста сегодня вторник или среда"),
    ("закрой за мной дверь я ухожу", "Закрой за мной дверь я ухожу"),
    ("в каком году родилась алла пугачёва", "В каком году родилась Алла Пугачёва"),
    ("когда родилась алла пугачёва", "Когда родилась Алла Пугачёва"),
    ("когда родилась пугачёва", "Когда родилась Пугачёва"),
    ("скажи год рождения пугачевой", "Скажи год рождения Пугачевой"),
    ("год рождения аллы пугачевой", "Год рождения Аллы Пугачевой"),
    ("год рождения пугачевой", "Год рождения Пугачевой"),
    ("день рождения пугачевой", "День рождения Пугачевой"),
    ("когда день рождения у аллы пугачевой", "Когда день рождения у Аллы Пугачевой"),
    ("в каком году родилась алла пугачева", "В каком году родилась Алла Пугачева"),
    ("год рождения пугачевой", "Год рождения Пугачевой"),
    ("иван пятый это кто", "Иван Пятый это кто"),
    ("кто такой иван пятый", "Кто такой Иван Пятый"),
    ("кто такой иван пятый", "Кто такой Иван Пятый"),
    ("что такое лыжи", "Что такое лыжи"),
    ("швеция столица", "Швеция столица"),
    ("столица швеции", "столица Швеции"),
    ("какая столица швеции", "Какая столица Швеции"),
    ("как называется столица у швеции", "Как называется столица у Швеции"),
    ("расскажи когда родился пушкин", "Расскажи когда родился Пушкин"),
    ("когда дата рождения пушкина", "Когда дата рождения Пушкина"),
    ("скажи дату рождения пушкина", "Скажи дату рождения Пушкина"),
    ("сколько прожил пушкин", "Сколько прожил Пушкин"),
    ("когда родился пушкин", "Когда родился Пушкин"),
    ("когда родился александр сергеевич пушкин", "Когда родился Александр Сергеевич Пушкин"),
    ("когда родился александр пушкин", "Когда родился Александр Пушкин"),
    ("кто такой лев николаевич толстой", "Кто такой Лев Николаевич Толстой"),
    ("кто такой лев толстой", "Кто такой Лев Толстой"),
    ("сколько лет путину", "Сколько лет Путину"),
    ("возраст путина", "Возраст Путина"),
    ("какой возраст у владимира путина", "Какой возраст у Владимира Путина"),
    ("какой возраст у путина", "Какой возраст у Путина"),
    ("какой возраст у владимира владимировича путина", "Какой возраст у Владимира Владимировича Путина"),
    ("праздник благовещение какого числа", "Праздник Благовещение какого числа"),
    ("что такое благовещение пресвятой богородицы", "Что такое Благовещение Пресвятой Богородицы"),
    ("кому принадлежит компания газпром", "Кому принадлежит компания Газпром"),
    ("кто директор газпрома", "Кто директор Газпрома"),
    ("кто является директором компании газпром", "Кто является директором компании Газпром"),
    ("сколько лет москве", "Сколько лет Москве"),
    ("сколько лет городу москва", "Сколько лет городу Москва"),
    ("в каком году умер брежнев", "В каком году умер Брежнев"),
    ("в каком году скончался леонид брежнев", "В каком году скончался Леонид Брежнев"),
]

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, model_max_length=MODEL_MAX_LENGTH)

checkpoint = 'D:/progamming/va/truecase/ru-punctuation-truecase/src/results/rubert-base-cased-conversational-512-tatoeba_dataset/20-53-40/checkpoint-31505/'
# checkpoint = "D:/progamming/va/truecase/ru-punctuation-truecase/src/results/checkpoint-18903/"
# model = AutoModelForTokenClassification.from_pretrained(checkpoint)
model = AutoModelForTokenClassification.from_pretrained("D:\\progamming\\va\\truecase\\ru-punctuation-capitalization\\rubert-base-cased-conversational-512-tatoeba_dataset\\02-09-2023-11-01-00\\checkpoint-7482")

inference_results = restore_capitalization([q[0] for q in test_queries], tokenizer, model)
for query, result in zip(test_queries, inference_results):
    print(f'Query   : {query[0]}')
    print(f'Combined: {result.strip()}\n')

In [None]:
def get_capitalization_mask(text):
    return [int(ch.isupper()) for ch in text]

def capitalization_metrics_report(y_true, y_predict):
    scores_by_sentences = []
    scores_by_characters = {
        'f1_score': [],
        'accuracy_score': []
    }
    for true_text, predicted_text in zip(y_true, y_predict):
        true_text = true_text.strip()
        predicted_text = predicted_text.strip()

        true_text_mask = get_capitalization_mask(true_text)
        predicted_text_mask = get_capitalization_mask(predicted_text)
        
        # scores by characters
        f1score = f1_score(true_text_mask, predicted_text_mask)
        accuracy = accuracy_score(true_text_mask, predicted_text_mask)
        scores_by_characters['f1_score'].append(f1score)
        scores_by_characters['accuracy_score'].append(accuracy)

        # scores by sentences
        scores_by_sentences.append(int(true_text == predicted_text))
    
    mean_accuracy_score_by_sentences = np.mean(scores_by_sentences)
    mean_f1_score_by_characters = np.mean(scores_by_characters['f1_score'])
    mean_accuracy_score_by_characters = np.mean(scores_by_characters['accuracy_score'])

    return mean_accuracy_score_by_sentences, mean_f1_score_by_characters, mean_accuracy_score_by_characters

data = {
    'checkpoint': [],
    'accuracy_sentences': [],
    'f1_characters': [],
    'accuracy_characters': []
}
path_to_runs = "D:/progamming/va/truecase/ru-punctuation-capitalization/rubert-base-cased-conversational-512-tatoeba_dataset"
for run in os.listdir(path_to_runs):
    print(f"------------------------ RUN\t{run} ------------------------\n")
    path_to_run = os.path.join(path_to_runs, run)
    for checkpoint in os.listdir(path_to_run):
        path_to_checkpoint = os.path.join(path_to_run, checkpoint) 
        
        tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, model_max_length=MODEL_MAX_LENGTH)
        model = AutoModelForTokenClassification.from_pretrained(path_to_checkpoint)

        inference_results = restore_capitalization([q[0] for q in test_queries], tokenizer, model)

        print(path_to_checkpoint)
        true_texts = [q[1] for q in test_queries]
        mean_accuracy_score_by_sentences, mean_f1_score_by_characters, mean_accuracy_score_by_characters = capitalization_metrics_report(true_texts, inference_results)

        data['checkpoint'].append(path_to_checkpoint)
        data['accuracy_sentences'].append(mean_accuracy_score_by_sentences)
        data['f1_characters'].append(mean_f1_score_by_characters)
        data['accuracy_characters'].append(mean_accuracy_score_by_characters)

        print(f"mean_accuracy_score_by_sentences :\t{round(mean_accuracy_score_by_sentences, 4)}")
        print(f"mean_f1_score_by_characters      :\t{round(mean_f1_score_by_characters, 4)}")
        print(f"mean_accuracy_score_by_characters:\t{round(mean_accuracy_score_by_characters, 4)}\n")

        for query, result in zip(test_queries, inference_results):
            print(f'Query   : {query[0]}')
            print(f'Restored: {result.strip()}\n')
        print("================================================================\n")

df = pd.DataFrame(data)
df.head()
        

In [None]:
sorted_df = df.sort_values(by=['f1_characters'], ascending=False)
for idx, row in sorted_df.iterrows():
    print(row['checkpoint'], row['f1_characters'])

In [None]:
sorted_df

In [None]:
import time
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME, model_max_length=MODEL_MAX_LENGTH)
model = AutoModelForTokenClassification.from_pretrained("D:\\progamming\\va\\truecase\\ru-punctuation-capitalization\\rubert-base-cased-conversational-512-tatoeba_dataset\\02-09-2023-11-01-00\\checkpoint-7482")

s = ["в каком году родилась алла пугачёва"]
test = [q[0] for q in test_queries]
tic = time.time()
inference_results = inference_results = restore_capitalization(test, tokenizer, model)
toc = time.time() - tic

print(toc)