In [1]:
!pip install razdel

Collecting razdel
  Downloading razdel-0.5.0-py3-none-any.whl.metadata (10.0 kB)
Downloading razdel-0.5.0-py3-none-any.whl (21 kB)
Installing collected packages: razdel
Successfully installed razdel-0.5.0


In [2]:
import json

import pandas as pd
from transformers import BertModel, BertTokenizerFast

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
!ls drive/MyDrive/diploma/data

aligned_chekhov_75.000_sents_09_02.json
aligned_chekhov_chameleon_sents_09_02.json
aligned_chekhov_maria_ivanovna_sents_09_02.json
aligned_chekhov_person_in_case_sents_09_02.json
aligned_chekhov_prishibaev_sents_09_02.json
aligned_chekhov_self-delusion_sents_09_02.json
aligned_chekhov_trifon_sents_09_02.json
aligned_furmanov_chapaev_sents_09_02.json
aligned_gaydar_distant_countries_sents_09_02.json
aligned_gogol_dead_souls_sample_sents_09_02.json
aligned_gorkiy_happiness_sents_09_02.json
aligned_gorkiy_hero_sents_09_02.json
aligned_gorkiy_italian_11_sents_09_02.json
aligned_gorkiy_mother_part2_sents_09_02.json
aligned_gorkiy_mother_sents_09_02.json
aligned_gorkiy_russian_fairy_tales_sents_09_02.json
aligned_konstitution_sents_09_02.json
aligned_land_direct_sents_09_02.json
aligned_news_names_09_02.json
aligned_news_sents_09_02.json
aligned_shchedrin_konyaga_sents_09_02.json
aligned_shchedrin_vilage_fire_sents_09_02.json
all_dicts_data.tsv
all_phrases.tsv
e-mordovia
mdf_mono
moksha_bibl

In [6]:
from itertools import groupby
import re

import razdel

QUOTE_TYPE = '"'
DASH_TYPE = '-'


def remove_hyphenation(text: str) -> str:
    """
    Removes hyphenation from a given text by merging words split with hyphens or spaces.

    Example:
        "по-\ нимаемый иска- женный при- мер" -> "понимаемый искаженный пример"

    Args:
        text (str): The input text containing hyphenated words.

    Returns:
        str: The text with hyphenation removed.
    """
    return re.sub(
        rf'(\w)([\{DASH_TYPE}+]\s+)(\w)',
        lambda matchobj: matchobj.group(1) + matchobj.group(3),
        text
    )


def limit_repeated_chars(text: str, max_run: int = 3) -> str:
    """
    Limits consecutive repeated characters to a specified maximum number.

    Example:
        "[8_________________________ 2400 3 сядт, 4 дес. 6 един." -> "[8___ 2400 3 сядт, 4 дес. 6 един."

    Args:
        text (str): The input text containing repeated characters.
        max_run (int, optional): The maximum number of consecutive identical characters allowed. Default is 3.

    Returns:
        str: The text with excessive repeated characters trimmed.
    """
    return ''.join(''.join(list(group)[:max_run]) for _, group in groupby(text))


def clean_text(raw_text: str) -> str:
    """
    Cleans the input text by performing the following operations:
    - Replacing all quotes with the specified type.
    - Replacing all dashes with the specified type.
    - Removing hyphenation.
    - Limiting repeated characters.
    - Replacing multiple spaces with a single space.
    - Removing asterisks at the beginning of words.
    - Normalizing spacing around periods.

    Args:
        raw_text (str): The input raw text.

    Returns:
        str: The cleaned text.
    """
    text = re.sub(r'[“”„‟«»‘’‚‛]', QUOTE_TYPE, raw_text)
#     text = re.sub(r'[‐‑‒–—―]', DASH_TYPE, text)

    text = remove_hyphenation(text)
    text = limit_repeated_chars(text)

    text = re.sub('(\. )+', '. ', text)
    text = text.replace('\xa0', ' ')

    text = re.sub('\s+', ' ', text)

    text = text.replace('* ', '')
    return text.strip()


def split_into_sentences(text: str) -> list[str]:
    """
    Splits a given text into sentences using the Razdel library.

    Args:
        text (str): The input text to be split.

    Returns:
        list[str]: A list of sentences extracted from the text.
    """
    sents = []
    for sent in razdel.sentenize(text):
        sent_text = sent.text.replace('-\n', '').replace('\n', ' ').strip()
        sents.append(sent_text)
    return sents


def is_text_valid(text: str) -> bool:
    """
    Checks if the given text meets validity criteria:
    - Contains at least one word with two or more characters.
    - Contains at least one Cyrillic letter.
    - Has a length between 3 and 500 characters.

    Args:
        text (str): The input text to validate.

    Returns:
        bool: True if the text is valid, False otherwise.
    """
    if max(len(w) for w in text.split()) < 2:
        return False

    if not re.match('.*[а-яё].*', text.lower()):
        return False

    if len(text) < 3:
        return False

    if len(text) > 500:
        return False

    return True


In [8]:
import numpy as np
import torch
from transformers import BertModel, BertTokenizerFast


def align_sentences(
        lang1_text: str,
        lang2_text: str,
        model: BertModel,
        tokenizer: BertTokenizerFast,
        alpha: float = 0.2,
        penalty: float = 0.2,
        threshold: float = 0.45,
    ) -> list[list[str]]:
    """
    Aligns sentences between two languages using sentence embeddings and similarity metrics.

    Args:
        lang1_text (str): The text in the first language.
        lang2_text (str): The text in the second language.
        model (BertModel): The embedding model.
        tokenizer (BertTokenizerFast): The tokenizer for the model.
        alpha (float, optional): The alpha parameter for similarity adjustment. Default is 0.2.
        penalty (float, optional): The penalty for alignment. Default is 0.2.
        threshold (float, optional): The similarity threshold for alignment. Default is 0.45.

    Returns:
        list[list[str]]: A list of aligned sentence pairs.
    """
    cleaned_lang1_text = clean_text(lang1_text)
    cleaned_lang2_text = clean_text(lang2_text)

    sents_lang1 = split_into_sentences(cleaned_lang1_text)
    sents_lang2 = split_into_sentences(cleaned_lang2_text)

    if not sents_lang1 and not sents_lang2:
        return []

    emb_lang1 = np.stack([get_sentence_embedding(s, model, tokenizer) for s in sents_lang1])
    emb_lang2 = np.stack([get_sentence_embedding(s, model, tokenizer) for s in sents_lang2])

    length_ratio = np.array([[min(len(x), len(y)) / max(len(x), len(y)) for x in sents_lang2] for y in sents_lang1])
    sims = np.maximum(0, np.dot(emb_lang1, emb_lang2.T)) ** 1 * length_ratio

    sims_rel = (sims.T - compute_topk_mean(sims) * alpha).T - compute_topk_mean(sims.T) * alpha - penalty

    alignment = compute_alignment_path(sims_rel)

    aligned_pairs = []
    for i, j in alignment:
        if sims[i, j] >= threshold:
            aligned_pairs.append([sents_lang1[i], sents_lang2[j]])

    return aligned_pairs


def get_sentence_embedding(text: str, model: BertModel, tokenizer: BertTokenizerFast, max_length: int = 128) -> np.ndarray:
    """
    Computes the sentence embedding using a transformer model.

    Args:
        text (str): The input text.
        model (BertModel): The embedding model.
        tokenizer (BertTokenizerFast): The tokenizer for the model.
        max_length (int, optional): The maximum token length. Default is 128.

    Returns:
        np.ndarray: The sentence embedding vector.
    """
    encoded_input = tokenizer(
        text,
        padding=True,
        truncation=True,
        max_length=max_length,
        return_tensors='pt'
    ).to(model.device)

    with torch.inference_mode():
        model_output = model(**encoded_input)
    embeddings = torch.nn.functional.normalize(model_output.pooler_output)

    return embeddings[0].detach().cpu().numpy()


def compute_topk_mean(x: np.ndarray, k: int = 5) -> np.ndarray:
    """
    Computes the mean of the top-k values in each row of a matrix.

    Args:
        x (np.ndarray): The input matrix.
        k (int, optional): The number of top elements to consider. Default is 5.

    Returns:
        np.ndarray: The mean values for each row.
    """
    m, n = x.shape
    k = min(k, n)
    topk_indices = np.argpartition(x, -k, axis=1)[:, -k:]
    rows, _ = np.indices((m, k))
    return x[rows, topk_indices].mean(1)


def compute_alignment_path(sims: np.ndarray) -> list[list[int]]:
    """
    Computes the optimal alignment path between two sets of sentences based on similarity scores.

    Args:
        sims (np.ndarray): The similarity matrix.

    Returns:
        list[list[int]]: A list of aligned index pairs.
    """
    rewards = np.zeros_like(sims)
    choices = np.zeros_like(sims, dtype=int)

    for i in range(sims.shape[0]):
        for j in range(0, sims.shape[1]):
            score_add = sims[i, j]
            if i > 0 and j > 0:
                score_add += rewards[i-1, j-1]
                choices[i, j] = 1
            best = score_add
            if i > 0 and rewards[i-1, j] > best:
                best = rewards[i-1, j]
                choices[i, j] = 2
            if j > 0 and rewards[i, j-1] > best:
                best = rewards[i, j-1]
                choices[i, j] = 3
            rewards[i, j] = best

    alignment = []
    i = sims.shape[0] - 1
    j = sims.shape[1] - 1
    while i > 0 and j > 0:
        if choices[i, j] == 1:
            alignment.append([i, j])
            i -= 1
            j -= 1
        elif choices[i, j] == 2:
            i -= 1
        else:
            j -= 1

    alignment.reverse()
    return alignment


In [9]:
DATA_PATH_PREFIX = 'drive/MyDrive/diploma/data/'

In [10]:
MODEL_PATH = 'drive/MyDrive/diploma/labse_moksha_40k+5k'

# Load df with parallel texts

In [11]:
wikipedia_texts = pd.read_pickle(DATA_PATH_PREFIX + 'texts_for_align/wikipedia_dump.pkl')

In [12]:
wikipedia_texts.shape

(3164, 2)

In [13]:
wikipedia_texts.head()

Unnamed: 0,ru_text,mdf_text
0,Раде́хов (укр. Раде́хів) — город в Червоноград...,Радэхив (укр.: Радехів (няйф)) — ошсь Украинас...
1,Эпидендрум (лат. Epidendrum) — род многолетних...,Эпидендрум ( лат.: Epidendrum) — Орхидея тънал...
2,Соломо́н Кверкве́лия (груз. სოლომონ კვირკველია...,Соломон Квэрквэлия (груз.: სოლომონ კვირკველია;...
3,Донгхой — город провинциального подчинения во ...,"Донгхой (виет.: Đồng Hới) - ошсь Виетнамса, Ку..."
4,Старая Муравьёвка — опустевшая деревня в соста...,Сире Муравьевка (руз.: Старая Муравьёвка) — мо...


# Align each article

In [14]:
model = BertModel.from_pretrained(MODEL_PATH)
tokenizer = BertTokenizerFast.from_pretrained(MODEL_PATH)

In [15]:
model.cuda();

In [17]:
all_aligned_pairs = []

for idx, (mdf_text, ru_text) in enumerate(zip(
    wikipedia_texts['mdf_text'],
    wikipedia_texts['ru_text']
)):
    if not ru_text or not mdf_text:
        print(f"empty pair: ({idx}), {mdf_text}, {ru_text}")
        continue

    aligned_pairs = align_sentences(mdf_text, ru_text, model, tokenizer)
    all_aligned_pairs += aligned_pairs

    if not aligned_pairs:
        print(f"0 aligned pairs: {idx}")

0 aligned pairs: 0
0 aligned pairs: 1
0 aligned pairs: 3
0 aligned pairs: 4
0 aligned pairs: 5
0 aligned pairs: 6
0 aligned pairs: 7
0 aligned pairs: 9
0 aligned pairs: 11
0 aligned pairs: 14
0 aligned pairs: 15
0 aligned pairs: 16
0 aligned pairs: 18
0 aligned pairs: 20
0 aligned pairs: 21
0 aligned pairs: 23
0 aligned pairs: 24
0 aligned pairs: 25
0 aligned pairs: 26
0 aligned pairs: 28
0 aligned pairs: 29
0 aligned pairs: 30
0 aligned pairs: 34
0 aligned pairs: 36
0 aligned pairs: 38
0 aligned pairs: 39
0 aligned pairs: 40
0 aligned pairs: 41
0 aligned pairs: 42
0 aligned pairs: 44
0 aligned pairs: 45
0 aligned pairs: 48
0 aligned pairs: 49
0 aligned pairs: 50
0 aligned pairs: 53
0 aligned pairs: 54
0 aligned pairs: 55
0 aligned pairs: 56
0 aligned pairs: 57
0 aligned pairs: 59
0 aligned pairs: 61
0 aligned pairs: 62
0 aligned pairs: 63
0 aligned pairs: 64
0 aligned pairs: 65
0 aligned pairs: 66
0 aligned pairs: 67
0 aligned pairs: 68
0 aligned pairs: 70
0 aligned pairs: 72
0 aligne

# Save all parallel pairs

In [18]:
data = []
for mdf, ru in all_aligned_pairs:
    cleaned_mdf = clean_text(mdf)
    cleaned_ru = clean_text(ru)

    if not is_text_valid(cleaned_mdf) or not is_text_valid(cleaned_ru):
        continue

    data.append({'mdf': cleaned_mdf, 'ru': cleaned_ru})

In [19]:
len(data)

1400

In [20]:
with open(DATA_PATH_PREFIX + 'aligned_wikipedia_sents_09_02.json', "w") as file:
    json.dump(data, file, ensure_ascii=False, indent=4)