In [1]:
import re
import unicodedata
import pandas as pd
import math

from pathlib import Path
from collections import Counter, defaultdict
from typing import List, Tuple

Функционал для загрузки и очистки текста от лишних символов.

1. Нормализация Unicode
2. Удаление управляющих символов
3. Удаление эмодзи и пиктограмм
4. Приведение любой пробельной последовательности к одному пробелу

In [2]:
# Управляющие символы: все категории Unicode, начинающиеся на 'C' (Cc, Cf, Cs, Co, Cn)
RE_CONTROL = re.compile(r'[\x00-\x1F\x7F]')

# Эмодзи и пиктографика: покрывает основные блоки Unicode, ZWJ, вариационные селекторы и пр.
RE_EMOJI = re.compile(
    '['                    
    '\U0001F600-\U0001F64F'
    '\U0001F300-\U0001F5FF'
    '\U0001F680-\U0001F6FF'
    '\U0001F1E0-\U0001F1FF'
    '\U00002500-\U00002BEF'
    '\U00002702-\U000027B0'
    '\U000024C2-\U0001F251'
    '\U0001F926-\U0001F937'
    '\U00010000-\U0010FFFF'
    '\u2640-\u2642'
    '\u2600-\u2B55'
    '\u200d'
    '\u23cf'
    '\u23e9'
    '\u231a'
    '\ufe0f'
    '\u3030'
    ']+',
    flags = re.UNICODE
)

# Все виды пробелов в один:
RE_SPACES = re.compile(r'\s+')

# Стандартная нормализация Unicode
def normalize_unicode(string: str, form: str = 'NFC') -> str:
    return unicodedata.normalize(form, string)

# Удаляем ASCII-контроли через диапазон, остальные — через категорию
def remove_control_chars(string: str) -> str:
    string = RE_CONTROL.sub('', string)
    string = ''.join(ch for ch in string if unicodedata.category(ch)[0] != 'C')
    return string

# Удаляем эмодзи
def remove_emojis(string: str) -> str:
    return RE_EMOJI.sub('', string)

# Приводим все виды пробелов к обычному пробелу, режем по краям
def normalize_spaces(string: str) -> str:
    string = RE_SPACES.sub(' ', string)
    return string.strip()

def clean_line(string: str) -> str:
    string = normalize_unicode(string, 'NFC')         # Unicode нормализация
    string = remove_control_chars(string)             # Удаление control chars
    string = remove_emojis(string)                    # Удаление эмодзи
    string = normalize_spaces(string)                 # Нормализация пробелов
    return string

Функционал для загрузки текста

1. Загружаем и очищаем текст для тренировки модели и сохраняем в один файл
2. Загружаем файл очищенного текста и преобразовываем его в массив строк для обучения модели
3. Загружаем тестовые текстовые данные в формат pd.DataFrame

In [3]:
# Загружаем тексты с необработанными данными, очищаем и сохраняем в один файл
def load_and_clean_txt(paths: str | Path, out_path: str | Path, encoding: str = 'utf-8', newline: str = '\n'):

    file_out_path = Path(out_path)

    with file_out_path.open('w', encoding = encoding, newline = newline) as out_file:
        for path in paths:
            file_path = Path(path)
            with file_path.open('r', encoding = encoding, errors = 'replace') as file:
                for line in file:
                    cleaned = clean_line(line)
                    if cleaned:
                        out_file.write(cleaned + '\n')

# Загружаем очищенные тексты для, преобразуя его в массив строк
def load_cleaned_text(path: str | Path, encoding: str = 'utf-8'):
    file_path = Path(path)
    text = file_path.read_text(encoding = encoding)
    lines = text.splitlines()
    return lines

# Загружаем тексты для теста в формате pd.DataFrame
def load_test_text(path: str | Path, encoding: str = 'utf-8'):
    rows_data = []

    file_path = Path(path)
    with file_path.open('r', encoding = encoding) as file:
        first_string = True

        for line in file:
            if first_string:
                first_string = False
                continue
            if not line:
                continue

            line = line.rstrip("\n\r")
            parts = line.split(",", 1)

            if len(parts) != 2:
                continue

            id, text_no_spaces = parts[0].strip(), parts[1].strip()
            rows_data.append((int(id), text_no_spaces))

    return pd.DataFrame(rows_data, columns = ['id', 'text_no_spaces'])

Загружаем исходные тексты и обрабатываем их

In [7]:
raw_text_paths = ['./raw_datasets/articles.txt', 
                  './raw_datasets/books-A.txt', 
                  './raw_datasets/poems.txt', 
                  './raw_datasets/books-B.txt',
                  './raw_datasets/fanfiction.txt']

load_and_clean_txt(paths = raw_text_paths, out_path = 'processed_text.txt')

Реализация модели Char-level n-gram LM с beam search

1. Английские буквы и цифры заменяются на специальные токены
2. Параметры модели: 
    
    · n - количество предыдущих символов для
    
    · add_k - параметр сглаживания, для избежания нулевых вероятностей у редких n-gramm
    
    · lower - приводить ли текст к нижнему регистру
3. Функция _prep() превращает строку в последовательность символов-токенов с нормализацией (буквы → ENG, цифры → DIG)
4. Функция fit() строит разреженную условную частотную таблицу «контекст → распределение по следующим символам» и создает словарь символов
5. Функция logprob_next() рассчитывает вероятность следующего символа после контекста с использованием сглаживания
6. Функция segment_with_beam() реализует beam search учитывая вероятности модели

In [None]:
ENG = "<ENG>"
DIG = "<DIG>"

# Нормализация символов для LM: латиница и цифры становятся классами
def to_lm_token(char: str) -> str:
    oc = char
    if oc == " ":
        return " "
    if ("A" <= oc <= "Z") or ("a" <= oc <= "z"):
        return ENG
    if "0" <= oc <= "9":
        return DIG
    return oc

class CharNGramLM:
    def __init__(self, n: int = 5, add_k: float = 0.1, lower: bool = True):
        self.n = n
        self.add_k = add_k
        self.lower = lower

        # context_counts: словарь {контекст(tuple из n-1 токенов) -> Counter следующих символов}
        self.context_counts = defaultdict(Counter)
        # vocab: множество всех наблюдавшихся символов-выходов (включая спецтокены)
        self.vocab = set()
        # Спецтокены начала/конца последовательности
        self.BOS = "<s>"
        self.EOS = "</s>"

    def _prep(self, text: str) -> List[str]:
        s = text.lower() if self.lower else text
        # для LM заменяем англ. буквы и цифры на классы
        return [to_lm_token(c) for c in s]

    def fit(self, corpus: List[str]):
        for line in corpus:
            # Добавляем (n-1) BOS в начало и EOS в конец
            toks = [self.BOS] * (self.n - 1) + self._prep(line) + [self.EOS]
            for i in range(self.n - 1, len(toks)):
                # Контекст — окно из n-1 предыдущих токенов
                ctx = tuple(toks[i - (self.n - 1):i])
                # Следующий токен после контекста   
                nxt = toks[i]
                # Увеличиваем счётчик для пары (контекст -> следующий токен)
                self.context_counts[ctx][nxt] += 1
                # Добавляем наблюдавшийся токен в словарь
                self.vocab.add(nxt)
        self.vocab.update([self.BOS, self.EOS])
        # Размер словаря (нужен для знаменателя при add-k)
        self.V = len(self.vocab)

    def logprob_next(self, context, nxt: str) -> float:
        # Приводим переданный контекст к длине n-1:
        # - если контекст короче, добиваем слева BOS
        # - если длиннее, берём правые n-1 токенов
        if len(context) < self.n - 1:
            context = (self.BOS,) * (self.n - 1 - len(context)) + tuple(context)
        else:
            context = tuple(context[-(self.n - 1):])

        # Получаем счётчики для этого контекста
        cnts = self.context_counts.get(context)


        if cnts is None:
            # P(x|контекст_невиден) = k / (k*V) = 1/V
            return math.log(self.add_k / (self.add_k * self.V))
        
        num = cnts.get(nxt, 0) + self.add_k # сглаженный числитель
        den = sum(cnts.values()) + self.add_k * self.V # сглаженный знаменатель

        # Возвращаем логарифм условной вероятности следующего символа
        return math.log(num / den)

def segment_with_beam(text: str,
                      lm: CharNGramLM,
                      beam_size: int = 32,
                      max_len: int = 100000,
                      space_penalty: float = 0.2,
                      min_word_len: int = 1,
                      max_consec_spaces: int = 1) -> Tuple[str, List[int]]:
    """
    Возвращает (текст с пробелами, позиции пробелов относительно исходной строки без пробелов).
    """

    # Элемент beam — это кортеж состояния гипотезы:
    # (lp, out_chars, lwlen, ctx, srun, spos), где:
    # - lp: накопленный логарифм вероятности гипотезы (чем больше, тем лучше);
    # - out_chars: уже собранные выходные символы (список строк);
    # - lwlen: длина последнего текущего слова (счётчик символов с момента последнего пробела);
    # - ctx: текущий контекст для LM (tuple из n-1 последних токенов LM);
    # - srun: сколько пробелов подряд уже поставлено (для ограничения max_consec_spaces);
    # - spos: список позиций, где мы вставили пробелы относительно индексов входного текста.
    beam = [ (0.0, [], 0, tuple([lm.BOS] * (lm.n - 1)), 0, []) ]

    def extend_with_symbol(hyp, real_ch: str, is_space: bool):
        # Расширяет гипотезу добавлением либо пробела, либо реального символа входа
        lp, out_chars, lwlen, ctx, srun, spos = hyp

        lm_sym = " " if is_space else to_lm_token(real_ch)
        # Добавляем лог-вероятность символа по LM в текущем контексте
        new_lp = lp + lm.logprob_next(ctx, lm_sym)
         # Обновляем контекст LM: сдвигаем окно и добавляем текущий символ LM
        new_ctx = (ctx + (lm_sym,))[-(lm.n - 1):]

        if is_space:
            new_srun = srun + 1
            new_lwlen = 0
            new_out = out_chars + [" "]
        else:
            new_srun = 0
            new_lwlen = lwlen + 1
            new_out = out_chars + [real_ch]
        return (new_lp, new_out, new_lwlen, new_ctx, new_srun, spos)

    for i, ch in enumerate(text):
        new_beam = []
        for hyp in beam:
            lp, out_chars, lwlen, ctx, srun, spos = hyp

            # Без пробела: просто добавить текущий символ
            cont = extend_with_symbol(hyp, ch, is_space=False)
            new_beam.append(cont)

            # Вставить пробел перед текущим символом (если допустимо)
            can_space = (lwlen >= min_word_len) and (srun < max_consec_spaces)
            if can_space:
                # Сначала расширяем гипотезу пробелом
                sp = extend_with_symbol(hyp, " ", is_space=True)
                lp2, out2, lw2, ctx2, sr2, sp2 = sp
                lp2 -= space_penalty
                # Добавляем индекс i в список позиций пробелов
                sp_after = (lp2, out2, lw2, ctx2, sr2, sp2 + [i])
                # После пробела добавляем сам текущий символ
                cont2 = extend_with_symbol(sp_after, ch, is_space=False)
                new_beam.append(cont2)

        # Сортируем новые гипотезы по лог-скорингу (убывание) и обрезаем до ширины луча (beam_size)
        new_beam.sort(key = lambda x: x[0], reverse = True)
        beam = new_beam[:beam_size]

        # Удаляем гипотезы, превысившие лимит длины выходной строки
        if any(len(h[1]) > max_len for h in beam):
            beam = [h for h in beam if len(h[1]) <= max_len]

    # Завершаем последовательность добавлением вероятности EOS к каждой гипотезе
    finalized = []
    for hyp in beam:
        lp, out_chars, lwlen, ctx, srun, spos = hyp
        lp_end = lp + lm.logprob_next(ctx, lm.EOS)
        finalized.append((lp_end, out_chars, lwlen, ctx, srun, spos))

    # Выбираем лучшую финальную гипотезу
    finalized.sort(key = lambda x: x[0], reverse = True)
    best = finalized[0]
    
    out_text = "".join(best[1])
    out_text = " ".join(out_text.split())
    return out_text, best[5]


Обучение ngramm LM на корпусе очищенных текстов и расстановка пробелов для тестовых данных

Подобраны оптимальные параметры для модели и beam search, позволящие достигнуть наилучшего результата

results_for_view_data - датасет с восстановленными строками

results_data - датасет для тестирования, содержащий только позиции пробелов

In [None]:
# Загружаем корпус тренировочных текстов и обучаем LM
corpus = load_cleaned_text('processed_text.txt')
lm = CharNGramLM(n = 6, add_k = 0.1, lower = True)
lm.fit(corpus)

# Загружаем тестовые данные и создаем датафремы для сохранения результатов
test_text_data = load_test_text('./test_datasets/dataset_1937770_3.txt')

results_for_view_data = pd.DataFrame(columns = ['id', 'text_no_spaces', 'text_with_spaces', 'predicted_positions'])
results_data = pd.DataFrame(columns = ['id', 'predicted_positions'])

# Проходимся по каждой строке и вставлем пробелы с помощью beam search
for index, row in test_text_data.iterrows():
    id = row['id']
    text_no_spaces = row['text_no_spaces']

    segmented, space_positions = segment_with_beam(
    text = text_no_spaces,
    lm = lm,
    beam_size = 128,
    space_penalty = 0.5,
    min_word_len = 1,
    max_consec_spaces = 1
    )

    results_for_view_data.loc[len(results_for_view_data)] = [id, text_no_spaces, segmented, space_positions]
    results_data.loc[len(results_data)] = [id, space_positions]

Сохранение результатов в файлы csv

In [44]:
results_for_view_data.to_csv('./results/results_for_view_ngram.csv', index = False)
results_data.to_csv('./results/results_for_test_ngram.csv', index = False)