This is a set of scripts to prepare a custom Russian dataset for training punctuator/capitalizer
based on BERT architecture using NeMo Python scripts.

The corpus is combined from OpenSubtitles' Russian subtitles and transcriptions from Russian
Gosduma and Moscow City Duma.

Originally it was also planned to use the Lenta news corpus,
however a detailed analysis showed that recent news texts may
affect the punctuator/capitalizer badly (mostly due direct speech inside
the news texts). So it's not recommended to use it.
The scripts clean the texts and convert them into NeMo training format (separate files
of lowercased words without punctuation and file with punctuation/capitalization labels
for each word in the corresponding first file).

The source corpuses are not that big, however their quality is quite high. Trying to find other
good Russian corpuses did not bring good results so far. For example, social media comments
are easily available but extremely illiterate. Mass media tend to include irrelevant info
into their news texts (like comments from social networks).
Large literature corpuses are barely available and usually have a lot of errors and
irregularities due to OCR mistakes. And besides that, using originally written texts as
sources for punctuator training may provide irrelevant results on spoken sentences, which
differ significantly from written Russian.
That's why professionally written transcriptions of speeches (including subtitles) seem
to be a good source of training data.

One important feature of the dataset is that it tries to concatenate as many sentences into
one training line as possible. For that, it appends sentences to the current line
until the count of tokens in its tokenized version exceeds the model input. Tokenizing
each string each time makes the preparation procedure quite time-consuming. But without
it it's almost impossible to train punctuator to place dots (that is, to separate sentences).
There are a lot of punctuators in the Internet which are mostly incapable of handling multi-sentence
texts, because they were trained on datasets with one sentence per line.

Another feature of the dataset is that considers additional labels, comparing to the original
NeMo BERT procedure:
1. Punctuation labels, including complex modifiers like -, —, … etc.
2. Capitalization label for abbreviation - T (meaning "total uppercase", all letters in the word).

The original NeMo punctuator cannot apply some of the custom labels properly. For example,
label "-" additionally means that there should be no space between this word and the next one.
Label "—" additionally means that there should be extra space between the word itself and the "—" symbol.
Label "T" means that all letters should be uppercased, which NeMo algorithm of applying
labels does not understand.
So it is advised to users of punctuator trained on this dataset to do manual post-processing
(including manually applying the labels) and use "return_labels=True" parameters during inference.

Imports:

In [None]:
import calendar
import csv
import datetime
import os
import random
import re
import subprocess
import tarfile
import time
import traceback
import shutil
import urllib
from bs4 import BeautifulSoup
from nemo.collections.common.tokenizers import AutoTokenizer
from typing import List, Dict
from urllib.request import urlopen

Constants:

In [None]:
RESULT_DATASET_OUTPUT_DIR = os.path.join(os.getcwd(), 'dataset')
DATASET_HEADER = ['source_type', 'source_entity', 'text', 'labels']
MAX_TOKENS_IN_MODEL_INPUT = 512
TOTAL_SET_FILE = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'punctuation_dataset.csv')
TRAIN_PERCENTS = 80
DEV_PERCENTS = 10

GOSDUMA_SOURCE_FOLDER = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_gosduma_source')
GOSDUMA_START_DATE = datetime.datetime(1994, 1, 1)
GOSDUMA_SUBSET_FILE = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_gosduma_dataset.csv')

MOSDUMA_SOURCE_FOLDER = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_mosduma_source')
PARAGRAPH_OR_HEADING_PATTERN = re.compile("^(p|h[1-6])$")
URL_FETCH_TIMEOUT = 5.0
MOSDUMA_SUBSET_FILE = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_mosduma_dataset.csv')

SUBTITLES_CORPUS_FOLDER = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'Subtitles', 'home', 'tsha', 'Subtitles')
TAGGED_SUBTITLES_FOLDER = os.path.join(SUBTITLES_CORPUS_FOLDER, 'tagged_texts')
RAW_SUBTITLES_FOLDER = os.path.join(SUBTITLES_CORPUS_FOLDER, 'texts')
SUBTITLES_SUBSET_FILE = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_subtitles_dataset.csv')

ROUND_BRACKETS_PATTERN = r'\([^)]*\)'
ANGLE_BRACKETS_PATTERN = r'\<[^>]*\>'
SQUARE_BRACKETS_PATTERN = r'\[.*?\]'
WORD_PATTERN = re.compile("[^\W_](?:[^\W_]|-|—|:|;|!|\?|\.|,|…)*")

LENTA_SOURCE_FOLDER = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_lenta_source')
LENTA_SOURCE_FILE_NAME = 'lenta-ru-news.csv'
LENTA_TEXT_ONLY_FILE = 'lenta_texts.txt'
# Too many news in the original corpus, so we cut some old texts out.
LENTA_OBSOLETE_NEWS_COUNT_THRESHOLD = 500000
LENTA_SUBSET_FILE = os.path.join(RESULT_DATASET_OUTPUT_DIR, 'ru_lenta_dataset.csv')

Prepare tokenizer to count dataset lines' tokens (as the model has a limited input layer, and each training raw should not exceed it).

In [None]:
tokenizer = AutoTokenizer(
    pretrained_model_name="DeepPavlov/rubert-base-cased-conversational",
    vocab_file=None,
    use_fast=False
)

Method for splitting text into raw uncased string without punctuation and a list of punctuation_capitalization labels

In [None]:
def make_dataset_entity(text: str) -> Dict[str, str]:
    result: Dict[str, str] = dict()
    words = text.split()
    proper_words_with_punctuation = []
    for word in words:
        if WORD_PATTERN.match(word):
            proper_words_with_punctuation.append(word)
    raw_words = []
    labels_array = []
    for word in proper_words_with_punctuation:
        hyphen_words = word.split('-')
        hyphen_part_index = 0
        for dash_word in hyphen_words:
            cleaned_word = dash_word[:]
            cleaned_word = re.sub('[.,\-—:;!?…]+', '', cleaned_word)
            if not cleaned_word:
                hyphen_part_index += 1
                continue
            cleaned_word = cleaned_word.lower()

            capit_mark = 'U'
            if dash_word.upper() == dash_word:
                capit_mark = 'T'
            elif dash_word.lower() == dash_word:
                capit_mark = 'O'

            punct_mark = 'O'
            if hyphen_part_index < len(hyphen_words) - 1:
                punct_mark = '-'
            elif dash_word.endswith('...'):
                punct_mark = '…'
            elif dash_word.endswith('…'):
                 punct_mark = '…'
            elif dash_word.endswith('.'):
                punct_mark = '.'
            elif dash_word.endswith(','):
                punct_mark = ','
            elif dash_word.endswith(':'):
                punct_mark = ':'
            elif dash_word.endswith(';'):
                punct_mark = ';'
            elif dash_word.endswith('?!'):
                punct_mark = '⁈'
            elif dash_word.endswith('!?'):
                punct_mark = '⁈'
            elif dash_word.endswith('—'):
                punct_mark = '—'
            elif dash_word.endswith('?'):
                punct_mark = '?'
            elif dash_word.endswith('!'):
                punct_mark = '!'

            raw_words.append(cleaned_word)
            labels_array.append(f"{punct_mark}{capit_mark}")

            hyphen_part_index += 1

    result['text'] = ' '.join(raw_words)
    result['labels'] = ' '.join(labels_array)

    return result

Taiga Subtitles corpus link (around 1 GB): http://bit.ly/2JLeujk , website: https://tatianashavrina.github.io/taiga_site/downloads.html .
Place the tar.gx archive into the RESULT_DATASET_OUTPUT_DIR (datasets/ru_punctuator) folder.

In [None]:
# Put the Subtitles corpus from Taiga dataset folder to the folder manually (see the comment above).

Extract the Taiga Subtitles archive.

In [None]:
subtitles_archive = tarfile.open(os.path.join(RESULT_DATASET_OUTPUT_DIR, 'Subtitles.tar.gz'))
subtitles_archive.extractall(os.path.join(RESULT_DATASET_OUTPUT_DIR, 'Subtitles'))
subtitles_archive.close()

Delete redundant folders and files from the Subtitles corpus.

In [None]:
if os.path.exists(TAGGED_SUBTITLES_FOLDER):
    shutil.rmtree(TAGGED_SUBTITLES_FOLDER)
for _, film_dirs, _ in os.walk(RAW_SUBTITLES_FOLDER):
    for film_folder in film_dirs:
        for _, _, subtitle_raw_files in os.walk(os.path.join(RAW_SUBTITLES_FOLDER, film_folder)):
            for subtitle_file in subtitle_raw_files:
                subtitle_file_path = os.path.join(RAW_SUBTITLES_FOLDER, film_folder, subtitle_file)
                if not subtitle_file.endswith("ru.txt"):
                    os.remove(subtitle_file_path)
                elif os.stat(subtitle_file_path).st_size == 0:
                    os.remove(subtitle_file_path)

Method for pre-processing a subtitle line.

In [None]:
def preprocess_subtitle_line(line: str) -> str:
    processed_line = ''
    try:
        # The format is like 1\t00:00:01,200\t00:00:02,630\tДоброе утро.
        # So we need to skip the tab character 3 times before the actual text begins.
        processed_line = line[line.index('\t') + 1:]
        processed_line = processed_line[processed_line.index('\t') + 1:]
        processed_line = processed_line[processed_line.index('\t') + 1:]
        if processed_line.endswith('\n'):
            processed_line = processed_line[:-1]
        processed_line = re.sub(ROUND_BRACKETS_PATTERN, '', processed_line)
        processed_line = re.sub(SQUARE_BRACKETS_PATTERN, '', processed_line)
        if not processed_line:
            return ''

        if not processed_line or any([processed_line.find(c) != -1 for c in ['<', '>', '♪', '(', ')', '[', ']', '/']]):
            print(f"Skipped line: {processed_line}")
            return ''

        processed_line = processed_line.replace('\"', '')
        processed_line = processed_line.replace('\'', '')
        processed_line = processed_line.replace('«', '')
        processed_line = processed_line.replace('»', '')
        processed_line = processed_line.replace('%', ' процентов ')
        processed_line = processed_line.replace('№', ' номер ')
        processed_line = re.sub(' {2,}', ' ', processed_line)
        processed_line = processed_line.replace(' ...', '...')
        processed_line = processed_line.replace('...', '…')
        processed_line = re.sub('\.{2}', '.', processed_line)
        if processed_line.startswith('- '):
            processed_line = processed_line[2:]
        if processed_line.startswith('— '):
            processed_line = processed_line[2:]
        if processed_line.startswith('-'):
            processed_line = processed_line[1:]
        if processed_line.startswith('—'):
            processed_line = processed_line[1:]
        processed_line = processed_line.replace('. -', '. ')
        processed_line = processed_line.replace('? -', '? ')
        processed_line = processed_line.replace('! -', '! ')
        processed_line = processed_line.replace('. —', '. ')
        processed_line = processed_line.replace('? —', '? ')
        processed_line = processed_line.replace('! —', '! ')
        processed_line = processed_line.replace('!...', '!')
        processed_line = processed_line.replace('?...', '?')
        processed_line = processed_line.replace('...!', '!')
        processed_line = processed_line.replace('...?', '?')
        processed_line = processed_line.replace(' - ', '— ')
        processed_line = processed_line.replace(' —', '—')
        processed_line = re.sub(' {2,}', ' ', processed_line)
        return processed_line
    except Exception as e:
        print(f"Could not parse a subtitle line {processed_line} due to {e}, {traceback.format_exc()}")
    return ''

Method to save a subset of dataset to a CSV file.

In [None]:
def save_entities_to_csv(dataset_entities: List[Dict[str, str]], file_path: str):
    with open(file_path, "w") as target_file:
        writer = csv.writer(target_file)
        writer.writerow(DATASET_HEADER)
        for dataset_entity in dataset_entities:
            writer.writerow([
                dataset_entity['source_type'],
                dataset_entity['source_entity'],
                dataset_entity['text'],
                dataset_entity['labels']
            ])
    print(f'File {file_path} saved.')

Prepare a punctuation/capitalization dataset from Subtitles corpus
(split into uncased words and punctuation/capitalization marks).

In [None]:
subtitles_dataset_entities: List[Dict[str, str]] = []
folder_counter = 0
for _, film_dirs, _ in os.walk(RAW_SUBTITLES_FOLDER):
    for film_folder in film_dirs:
        print(f"Entering {folder_counter} folder {film_folder}")

        for _, _, subtitle_raw_files in os.walk(
                os.path.join(RAW_SUBTITLES_FOLDER, film_folder)):
            for subtitle_file in subtitle_raw_files:
                subtitle_file_path = os.path.join(RAW_SUBTITLES_FOLDER, film_folder,
                                                  subtitle_file)
                with open(subtitle_file_path) as subtitles_file:
                    print(f"Entering file {subtitle_file_path}")
                    lines_candidates = []
                    last_lines_not_ended_with_stop = []
                    for subtitle_line in subtitles_file:
                        preprocessed_line = preprocess_subtitle_line(subtitle_line)
                        if not preprocessed_line:
                            continue
                        entity_text_candidate = ' '.join(lines_candidates) + \
                                                ' ' + preprocessed_line
                        entity_tokens = tokenizer.text_to_tokens(entity_text_candidate)
                        if len(entity_tokens) < MAX_TOKENS_IN_MODEL_INPUT:
                            if preprocessed_line.endswith('.') \
                                    or preprocessed_line.endswith('…') \
                                    or preprocessed_line.endswith('!') \
                                    or preprocessed_line.endswith('?'):
                                for line_not_ended_with_stop in last_lines_not_ended_with_stop:
                                    lines_candidates.append(line_not_ended_with_stop)
                                last_lines_not_ended_with_stop = []
                                lines_candidates.append(preprocessed_line)
                            else:
                                last_lines_not_ended_with_stop.append(preprocessed_line)
                        else:
                            text_for_entity = ' '.join(lines_candidates)
                            lines_candidates = last_lines_not_ended_with_stop[:]
                            lines_candidates.append(preprocessed_line)
                            last_lines_not_ended_with_stop = []
                            if text_for_entity:
                                dataset_entity = make_dataset_entity(text_for_entity)
                                if dataset_entity:
                                    dataset_entity['source_type'] = 'subtitles'
                                    dataset_entity['source_entity'] = f'{film_folder}; {subtitle_file}.'
                                    subtitles_dataset_entities.append(dataset_entity)

                    # If anything left in the string buffer and the token limit is not
                    # reached yet - add it to the dataset forcefully.
                    if lines_candidates:
                        text_for_entity = ' '.join(lines_candidates)
                        if text_for_entity:
                            dataset_entity = make_dataset_entity(text_for_entity)
                            if dataset_entity:
                                dataset_entity['source_type'] = 'subtitles'
                                dataset_entity['source_entity'] = f'{film_folder}; {subtitle_file}.'
                                subtitles_dataset_entities.append(dataset_entity)

        folder_counter += 1

save_entities_to_csv(subtitles_dataset_entities, SUBTITLES_SUBSET_FILE)

Method to detect the end of speaker’s introduction in the transcription line, considering that the introduction has at least one comma
(for complex introductions like “Иванов Иван Иванович, председатель законодательного собрания.”)
Since the introduction can be in the same line as the actual speech, consider the end of it at the nearest dot.

In [None]:
def detect_introduction_with_comma(line: str) -> int:
    line_after_comma = line[line.index(',') + 1:]
    sentence_separator_position = line_after_comma.find('. ')
    if sentence_separator_position != -1:
        return line.index(',') + 1 + sentence_separator_position + len('. ')
    else:
        return len(line)

Method to find the end of an introduction of the speaker in the transcription. A typical introduction is
<last name> <first initial>.(<last initial>.)(, <title>.)
And a hardcoded introduction is "Председательствующий", which means the chairman.
If the beginning of the string matches the introduction pattern, then returns the position of its last symbol.
When preparing punctuation dataset, the introduction in the lines should not be considered, as it's not a part of the actual speech.

In [None]:
def detect_introduction(line: str) -> int:
    if line.find(' ') == -1:
        return 0

    if line.startswith('Председательствующий. '):
        return len('Председательствующий. ')

    first_word = line[:line.index(' ')]
    if not first_word or first_word.capitalize() != first_word \
            or not re.match(r'[^\W\d_](?:[^\W\d_]| |-)*[^\W\d_]', first_word, re.UNICODE):
        return 0

    line_after_first_word = line[line.index(' ') + 1:]
    if not line_after_first_word or line_after_first_word.find(' ') == -1:
        return 0
    second_word = line_after_first_word[:line_after_first_word.index(' ')]
    if not second_word:
        return 0
    if second_word[0].upper() == second_word[0] \
            and re.match(r'[^\W\d_]', second_word[0], re.UNICODE):
        if len(second_word) > 1 and second_word[1] == '.':
            if len(second_word) > 3:
                if second_word[2].upper() == second_word[2] \
                        and re.match(r'[^\W\d_]', second_word[2], re.UNICODE) \
                        and second_word[3] == '.':
                    if len(second_word) == 5:
                        if second_word[4] == ',':
                            return detect_introduction_with_comma(line)
                    elif len(second_word) == 4:
                        return len(first_word) + 1 + 4
            elif len(second_word) == 2:
                line_after_second_word = line_after_first_word[
                                         line_after_first_word.index(' ') + 1:]
                if not line_after_second_word or line_after_second_word.find(' ') == -1:
                    return 0
                third_word = line_after_second_word[:line_after_second_word.index(' ')]

                if not third_word:
                    return 0
                if len(third_word) == 2 \
                        and third_word[0].upper() == third_word[0] \
                        and re.match(r'[^\W\d_]', third_word[0], re.UNICODE) \
                        and third_word[1] == '.':
                    return len(first_word) + 1 + len(second_word) + 1 + len(third_word) + 1
                elif len(third_word) == 3 \
                        and third_word[0].upper() == third_word[0] \
                        and re.match(r'[^\W\d_]', third_word[0], re.UNICODE) \
                        and third_word[1] == '.' \
                        and third_word[2] == ',':
                    return detect_introduction_with_comma(line)
                else:
                    return len(first_word) + 1 + len(second_word) + 1
            else:
                if second_word[3] == ',':
                    return detect_introduction_with_comma(line)

    return 0

Preprocess a line of transcription or regular written text, removes punctuation signs that cannot be handled by punctuator.
If the line contains indicators that it is malformed (uneven count of brackets etc.), then returns nothing not to consider
it for the dataset.

In [None]:
def clean_line(line: str) -> str:
    if not line:
        return ''

    if not line or any([line.find(c) != -1 for c in ['<', '>', '♪', '(', ')', '[', ']']]):
        print(f"Skipped line: {line}")
        return ''

    line = line.replace('/', ' ')
    line = line.replace('%', ' процентов ')
    line = line.replace('№', ' номер ')
    line = re.sub(r'(\d),(\d)', r'\1 и \2', line)
    line = line.replace('\"', '')
    line = line.replace('\'', '')
    line = line.replace('«', '')
    line = line.replace('»', '')
    line = re.sub(r' {2,}', ' ', line)
    line = line.replace(' ...', '...')
    line = line.replace('...', '…')
    line = re.sub(r'\.{2}', '.', line)

    if line.startswith('- '):
        line = line[2:]
    if line.startswith('— '):
        line = line[2:]
    if line.startswith('-'):
        line = line[1:]
    if line.startswith('—'):
        line = line[1:]
    line = line.replace('. -', '. ')
    line = line.replace('? -', '? ')
    line = line.replace('! -', '! ')
    line = line.replace('. —', '. ')
    line = line.replace('? —', '? ')
    line = line.replace('! —', '! ')
    line = line.replace('!...', '!')
    line = line.replace('?...', '?')
    line = line.replace('...!', '!')
    line = line.replace('...?', '?')
    line = line.replace(' - ', '— ')
    line = line.replace(' —', '—')
    line = re.sub(' {2,}', ' ', line)
    line = line.strip()

    if not line.endswith('.') \
            and not line.endswith('!') \
            and not line.endswith('?'):
        line = line + '.'

    return line

Method to split a line into sentences seperated by dots. Each sentence if then cleared from unwanted symbols,
and can be completelly omitted if it looks malformed.
The split is needed because a line of transcription can be way longer than the model input limit, so we cannot
train the punncuator on such long lines.
Please note that we don't use this method for the Subtitles corpus, since it has only very short line,
and also because the preprocessing looks a bit different for it due to another syntax.

In [None]:
def get_cleaned_lines(line: str) -> List[str]:
    line = re.sub(ROUND_BRACKETS_PATTERN, '', line)
    line = re.sub(SQUARE_BRACKETS_PATTERN, '', line)
    line = re.sub(ANGLE_BRACKETS_PATTERN, '', line)

    processed_lines = line.split('. ')

    result_lines = []
    for processed_line in processed_lines:
        processed_line = clean_line(processed_line)
        if not processed_line:
            continue

        result_lines.append(processed_line)
    return result_lines

Preprocesses a line of Gosduma's transcription. Splits it by sentences, removes unnecessary symbols,
omits malformed sentences.

In [None]:
def preprocess_gosduma_transcription_line(line: str) -> List[str]:
    if not line or line == '\n':
        return []
    if line.endswith('\n'):
        line = line[:-1]
    try:
        index_after_introduction = detect_introduction(line)
        if index_after_introduction >= len(line):
            return []

        line_after_introduction = line[index_after_introduction:]

        return get_cleaned_lines(line_after_introduction)
    except Exception as e:
        print(f"Could not parse a subtitle line {line} due to {e}, {traceback.format_exc()}")
    return []

Prepare raw data for the Gosduma sub-dataset. Download all available transcriptions from the
official website and place it to respective local folders by month-year pairs.

In [None]:
start_date = GOSDUMA_START_DATE
if not os.path.exists(GOSDUMA_SOURCE_FOLDER):
    os.makedirs(GOSDUMA_SOURCE_FOLDER)
next_year = int(datetime.date.today().year) + 1
while start_date.year < next_year:
    month_subfolder = os.path.join(GOSDUMA_SOURCE_FOLDER, f"{start_date.month}_{start_date.year}")
    if not os.path.exists(month_subfolder):
        os.makedirs(month_subfolder)

    start_date = start_date.replace(day=1)
    last_day_in_month = calendar.monthrange(start_date.year, start_date.month)[1]
    end_date = start_date.replace(day=last_day_in_month)

    transcription_url_prefix = "http://transcript.duma.gov.ru"
    transcriptions_list_url = f"http://transcript.duma.gov.ru/search/?sessid=0&doctype=3&dt_start={start_date.strftime('%d.%m.%Y')}&dt_end={end_date.strftime('%d.%m.%Y')}&phrase1="
    print(f"Transcription list URL: {transcriptions_list_url}")
    list_page = urlopen(transcriptions_list_url)
    list_soup = BeautifulSoup(list_page, 'html.parser')
    transcription_hrefs = []
    for a in list_soup.find_all('a', href=True):
        if a['href'].startswith('/node/'):
            transcription_hrefs.append(a['href'])

    for transcription_href in transcription_hrefs:
        full_transcription_url = transcription_url_prefix + transcription_href
        transcription_page = urlopen(full_transcription_url)
        transcription_soup = BeautifulSoup(transcription_page, 'html.parser')

        header_div = transcription_soup.find_all("div", {"class": "header-bord"})[0]
        header_text = header_div.get_text().split('\n')[1]

        content_div = transcription_soup.find_all("div", {"id": "selectable-content"})[0]
        content_text = content_div.get_text()

        with open(os.path.join(month_subfolder, f'{header_text}.txt'), 'w') as source_text_file:
            source_text_file.write(content_text)
            source_text_file.close()
        # Keep web-crawling responsible by limiting the requests count per minute.
        time.sleep(1)

    if start_date.month == 12:
        start_date = start_date.replace(year=start_date.year + 1)
        start_date = start_date.replace(month=1)
    else:
        start_date = start_date.replace(month=start_date.month + 1)

Splits the Gosduma raw dataset into uncased words and punctuation/capitalization labels.

In [None]:
gosduma_dataset_entities: List[Dict[str, str]] = []
folder_counter = 0
for _, month_dirs, _ in os.walk(GOSDUMA_SOURCE_FOLDER):
    for month_dir in month_dirs:
        print(f"Entering {folder_counter} folder {month_dir}. Count of entities so far: {len(gosduma_dataset_entities)}")

        for _, _, month_raw_files in os.walk(
                os.path.join(GOSDUMA_SOURCE_FOLDER, month_dir)):
            for transcription_raw_file in month_raw_files:
                transcription_file_path = os.path.join(GOSDUMA_SOURCE_FOLDER, month_dir,
                                                       transcription_raw_file)
                with open(transcription_file_path) as transcription_file:
                    print(f"Entering file {transcription_file_path}")
                    lines_candidates = []
                    last_lines_not_ended_with_stop = []
                    for transcription_line in transcription_file:
                        preprocessed_lines = preprocess_gosduma_transcription_line(transcription_line)
                        if not preprocessed_lines:
                            continue

                        for preprocessed_line in preprocessed_lines:
                            if not preprocessed_line:
                                continue

                            entity_text_candidate = ' '.join(lines_candidates) + \
                                                    ' ' + preprocessed_line
                            entity_tokens = tokenizer.text_to_tokens(entity_text_candidate)
                            if len(entity_tokens) < MAX_TOKENS_IN_MODEL_INPUT:
                                if preprocessed_line.endswith('.') \
                                        or preprocessed_line.endswith('…') \
                                        or preprocessed_line.endswith('!') \
                                        or preprocessed_line.endswith('?'):
                                    for line_not_ended_with_stop in last_lines_not_ended_with_stop:
                                        lines_candidates.append(line_not_ended_with_stop)
                                    last_lines_not_ended_with_stop = []
                                    lines_candidates.append(preprocessed_line)
                                else:
                                    last_lines_not_ended_with_stop.append(preprocessed_line)
                            else:
                                text_for_entity = ' '.join(lines_candidates)
                                lines_candidates = last_lines_not_ended_with_stop[:]
                                lines_candidates.append(preprocessed_line)
                                last_lines_not_ended_with_stop = []
                                if text_for_entity:
                                    dataset_entity = make_dataset_entity(text_for_entity)
                                    if dataset_entity:
                                        dataset_entity['source_type'] = 'gosduma'
                                        dataset_entity['source_entity'] = f'{transcription_raw_file}.'
                                        gosduma_dataset_entities.append(dataset_entity)

                    # If anything left in the string buffer and the token limit is not
                    # reached yet - add it to the dataset forcefully.
                    if lines_candidates:
                        text_for_entity = ' '.join(lines_candidates)
                        if text_for_entity:
                            dataset_entity = make_dataset_entity(text_for_entity)
                            if dataset_entity:
                                dataset_entity['source_type'] = 'gosduma'
                                dataset_entity['source_entity'] = f'{transcription_raw_file}.'
                                gosduma_dataset_entities.append(dataset_entity)

        folder_counter += 1

save_entities_to_csv(gosduma_dataset_entities, GOSDUMA_SUBSET_FILE)

Preprocesses regular text line of speech (without speaker's introduction). Suitable for Mosduma and Lenta datasets.
Splits the lines by sentences, deleted unnecessary symbols, omits malformed sentences.

In [None]:
def preprocess_regular_line(line: str) -> List[str]:
    if not line or line == '\n':
        return []
    if line.endswith('\n'):
        line = line[:-1]
    try:
        return get_cleaned_lines(line)
    except Exception as e:
        print(f"Could not parse a subtitle line {line} due to {e}, {traceback.format_exc()}")
    return []

Preprocesses a line from Mosduma transcription.

In [None]:
def preprocess_mosduma_transcription_line(line: str) -> List[str]:
    return preprocess_regular_line(line)

Download transcriptions from the official MosGorDuma website, convert doc files into HTML, then parse the raw transcription part
from it and save locally.
Converting from RTF/DOCX to HTML is needed because speaker's introductions in Mosduma transcriptions can be detected by the
bold formatting, which would be lost in case of straightforward doc->txt conversion.

In [None]:
if not os.path.exists(MOSDUMA_SOURCE_FOLDER):
    os.makedirs(MOSDUMA_SOURCE_FOLDER)
page_number = 1
last_page_doc_urls: List[str] = []
while True:
    page_subfolder = os.path.join(MOSDUMA_SOURCE_FOLDER, f"{page_number}")
    if os.path.exists(page_subfolder):
        page_number += 1
        continue

    transcriptions_list_url = f"https://duma.mos.ru/ru/60/conference?date%5Bfrom%5D=0&date%5Bto%5D=0&sort=date-desc&page={page_number}#results"
    print(f"Transcription list URL: {transcriptions_list_url}")
    ssl_browser_req = urllib.request.Request(
        transcriptions_list_url,
        data=None,
        headers={
            'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_3) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/35.0.1916.47 Safari/537.36',
            'Sec-Ch-Ua': '"Chromium";v="115", "Not/A)Brand";v="99"',
            'Sec-Ch-Ua-Mobile': '?0',
            'Sec-Ch-Ua-Platform': '"Linux"',
            'Sec-Fetch-Dest': 'document',
            'Sec-Fetch-Mode': 'navigate',
            'Sec-Fetch-Site': 'none',
            'Sec-Fetch-User': '?1',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7'
        }
    )
    # MosGorDuma website is prone to instabilities, so often retries are needed for most of pages.
    while True:
        successful_read = False
        try:
            list_page = urlopen(ssl_browser_req, timeout=URL_FETCH_TIMEOUT).read()
            successful_read = True
        except Exception:
            print(f"Exception while reading URL: {transcriptions_list_url} ; trace: {traceback.format_exc()}")
        if successful_read:
            break

    if not os.makedirs(page_subfolder):
        os.path.exists(page_subfolder)

    list_soup = BeautifulSoup(list_page, 'html.parser')
    transcription_hrefs = []
    for a in list_soup.find_all('a', href=True):
        span = a.find('span')
        if span and span.text and span.text.startswith('Стенограмма '):
            transcription_hrefs.append(a['href'])

    if not transcription_hrefs:
        break
    # Encountering a doc URL the same as in one of the previous pages means that the last
    # page was passed already, and now the website just returns us the content of the last page,
    # hence we need to exit the cylce.
    # This is a "feature" of MosGorDuma's site only.
    if any([(transcription_href in last_page_doc_urls) for transcription_href in transcription_hrefs]):
        break

    for transcription_href in transcription_hrefs:
        transcription_request = urllib.request.Request(transcription_href, method='HEAD')
        transcription_request_info = urllib.request.urlopen(transcription_request)
        file_name = transcription_request_info.info().get_filename()
        print(f"Remote file name: {file_name}")
        raw_transcription_path = os.path.join(page_subfolder, file_name)
        # MosGorDuma website is prone to instabilities, so often retries are needed for most of pages.
        while True:
            successful_download = False
            try:
                urllib.request.urlretrieve(transcription_href, raw_transcription_path)
                successful_download = True
            except Exception:
                print(f"Exception while downloading file from URL: {transcription_href} ; trace: {traceback.format_exc()}")
            if successful_download:
                break

        file_name_wo_extension = file_name[:file_name.rindex('.')]
        html_extension = 'html'

        subprocess.run(f"soffice --convert-to {html_extension} {raw_transcription_path} --outdir {page_subfolder}",
                       shell=True)
        os.remove(raw_transcription_path)

        text_lines = []
        html_file_path = os.path.join(page_subfolder, f'{file_name_wo_extension}.{html_extension}')
        with open(html_file_path) as html_file:
            transcription_soup = BeautifulSoup(html_file, 'html.parser')
            start_paragraph = None
            for paragraph in transcription_soup.find_all(PARAGRAPH_OR_HEADING_PATTERN):
                if paragraph.get_text().replace('\n', '').replace(' ', '').replace('\t', '').strip() == "СТЕНОГРАММА":
                    start_paragraph = paragraph
                    print(f"{file_name_wo_extension}. Start paragraph is СТЕНОГРАММА.")
                    break
            if not start_paragraph:
                for paragraph in transcription_soup.find_all('p'):
                    if paragraph.get_text().replace('\n', ' ').strip().startswith('файл '):
                        start_paragraph = paragraph
                        print(f"{file_name_wo_extension}. Start paragraph is файл: {paragraph.get_text()}.")
                        break
            if not start_paragraph:
                for paragraph in transcription_soup.find_all(PARAGRAPH_OR_HEADING_PATTERN):
                    if paragraph.get_text().replace('\n', ' ').strip().startswith('Материалы к протоколу заседания') \
                            or paragraph.get_text().replace('\n', ' ').strip().startswith('Материалы к протоколу внеочередного'):
                        materials_paragraph = paragraph
                        for paragraph_candidate in materials_paragraph.find_all_next(PARAGRAPH_OR_HEADING_PATTERN):
                            if paragraph_candidate.get_text().replace('\n', ' ').strip().startswith(
                                    'Заседание №') \
                                    or paragraph_candidate.get_text().replace('\n', ' ').strip().startswith(
                                    'Внеочередное заседание №') \
                                    or paragraph_candidate.get_text().replace('\n', ' ').strip().startswith(
                                    'Утреннее заседание №') \
                                    or paragraph_candidate.get_text().replace('\n', ' ').strip().startswith(
                                    'Вечернее заседание №'):
                                start_paragraph = paragraph_candidate
                                print(f"{file_name_wo_extension}. Start paragraph is Заседание №: {start_paragraph.get_text()}.")
                                break
                    if start_paragraph:
                        break

            if not start_paragraph:
                print(f"page_number {page_number}, file {file_name_wo_extension}, "
                        f"could not find start of transcription.")
                html_file.close()
                os.remove(html_file_path)
                continue

            for paragraph in start_paragraph.find_all_next('p'):
                stripped_text = paragraph.get_text().strip().replace('\n', ' ')
                if stripped_text.startswith('файл '):
                    continue
                is_all_bold = False
                for bold in paragraph.find_all('b'):
                    if stripped_text == bold.get_text().strip().replace('\n', ' '):
                        is_all_bold = True
                        break

                # Adds a line to the result, except introductions, which are detected
                # by bold formatting.
                if stripped_text and not is_all_bold:
                    text_lines.append(stripped_text)

            html_file.close()
            os.remove(html_file_path)

        with open(os.path.join(page_subfolder, f'{file_name_wo_extension}.txt'), 'w') as source_text_file:
            source_text_file.write('\n'.join(text_lines))
            source_text_file.close()
        # Keep web-crawling responsible by limiting the requests count per minute.
        time.sleep(1)

    last_page_doc_urls = transcription_hrefs

    page_number += 1

Split the Mosduma raw dataset into uncased words and punctuation/capitalization labels.

In [None]:
mosduma_dataset_entities: List[Dict[str, str]] = []
folder_counter = 0
for _, page_dirs, _ in os.walk(MOSDUMA_SOURCE_FOLDER):
    for page_dir in page_dirs:
        print(f"Entering {folder_counter} folder {page_dir}")

        for _, _, page_raw_files in os.walk(
                os.path.join(MOSDUMA_SOURCE_FOLDER, page_dir)):
            for transcription_raw_file in page_raw_files:
                transcription_file_path = os.path.join(MOSDUMA_SOURCE_FOLDER, page_dir,
                                                       transcription_raw_file)
                with open(transcription_file_path) as transcription_file:
                    print(f"Entering file {transcription_file_path}")
                    lines_candidates = []
                    last_lines_not_ended_with_stop = []
                    for transcription_line in transcription_file:
                        preprocessed_lines = preprocess_mosduma_transcription_line(
                            transcription_line)
                        if not preprocessed_lines:
                            continue

                        for preprocessed_line in preprocessed_lines:
                            if not preprocessed_line:
                                continue

                            entity_text_candidate = ' '.join(lines_candidates) + \
                                                    ' ' + preprocessed_line
                            entity_tokens = tokenizer.text_to_tokens(entity_text_candidate)
                            if len(entity_tokens) < MAX_TOKENS_IN_MODEL_INPUT:
                                if preprocessed_line.endswith('.') \
                                        or preprocessed_line.endswith('…') \
                                        or preprocessed_line.endswith('!') \
                                        or preprocessed_line.endswith('?'):
                                    for line_not_ended_with_stop in last_lines_not_ended_with_stop:
                                        lines_candidates.append(line_not_ended_with_stop)
                                    last_lines_not_ended_with_stop = []
                                    lines_candidates.append(preprocessed_line)
                                else:
                                    last_lines_not_ended_with_stop.append(preprocessed_line)
                            else:
                                text_for_entity = ' '.join(lines_candidates)
                                lines_candidates = last_lines_not_ended_with_stop[:]
                                lines_candidates.append(preprocessed_line)
                                last_lines_not_ended_with_stop = []
                                if text_for_entity:
                                    dataset_entity = make_dataset_entity(text_for_entity)
                                    if dataset_entity:
                                        dataset_entity['source_type'] = 'mosduma'
                                        dataset_entity['source_entity'] = f'{transcription_raw_file}'
                                        mosduma_dataset_entities.append(dataset_entity)

                    # If anything left in the string buffer and the token limit is not
                    # reached yet - add it to the dataset forcefully.
                    if lines_candidates:
                        text_for_entity = ' '.join(lines_candidates)
                        if text_for_entity:
                            dataset_entity = make_dataset_entity(text_for_entity)
                            if dataset_entity:
                                dataset_entity['source_type'] = 'mosduma'
                                dataset_entity['source_entity'] = f'{transcription_raw_file}'
                                mosduma_dataset_entities.append(dataset_entity)

        folder_counter += 1

save_entities_to_csv(mosduma_dataset_entities, MOSDUMA_SUBSET_FILE)

Preprocesses a line from a news corpus.

In [None]:
def preprocess_news_line(line: str) -> List[str]:
    return preprocess_regular_line(line)

Makes a raw dataset from the downloaded Lenta corpus.
Skips some first lines in order to keep the result dataset more balanced
(more spoken sentences, less written).
The source archive can be found here: https://github.com/yutkin/Lenta.Ru-News-Dataset/releases
Should be downloaded, extracted and placed to the LENTA_SOURCE_FOLDER folder as a CSV file.

In [None]:
with open(os.path.join(LENTA_SOURCE_FOLDER, LENTA_SOURCE_FILE_NAME), 'r') \
        as news_file:
    with open(os.path.join(LENTA_SOURCE_FOLDER, LENTA_TEXT_ONLY_FILE),
              'w') as lenta_texts_file:
        reader = csv.reader(news_file)
        news_counter = 0
        while news_counter < LENTA_OBSOLETE_NEWS_COUNT_THRESHOLD:
            next(reader)
            news_counter += 1

        while True:
            news_line = next(reader, None)

            if not news_line:
                break

            lenta_texts_file.write(news_line[2] + '\n')

Prepare CSV with Lenta dataset entities

In [None]:
lenta_dataset_entities: List[Dict[str, str]] = []
with open(os.path.join(LENTA_SOURCE_FOLDER, LENTA_TEXT_ONLY_FILE)) as lenta_texts_file:
    dataset_entities: List[Dict[str, str]] = []
    lines_candidates = []
    last_lines_not_ended_with_stop = []
    for line in lenta_texts_file:
        preprocessed_lines = preprocess_news_line(line)
        if not preprocessed_lines:
            continue

        for preprocessed_line in preprocessed_lines:
            if not preprocessed_line:
                continue

            entity_text_candidate = ' '.join(lines_candidates) + ' ' + preprocessed_line
            entity_tokens = tokenizer.text_to_tokens(entity_text_candidate)
            if len(entity_tokens) < MAX_TOKENS_IN_MODEL_INPUT:
                if preprocessed_line.endswith('.') \
                        or preprocessed_line.endswith('…') \
                        or preprocessed_line.endswith('!') \
                        or preprocessed_line.endswith('?'):
                    for line_not_ended_with_stop in last_lines_not_ended_with_stop:
                        lines_candidates.append(line_not_ended_with_stop)
                    last_lines_not_ended_with_stop = []
                    lines_candidates.append(preprocessed_line)
                else:
                    last_lines_not_ended_with_stop.append(preprocessed_line)
            else:
                text_for_entity = ' '.join(lines_candidates)
                lines_candidates = last_lines_not_ended_with_stop[:]
                lines_candidates.append(preprocessed_line)
                last_lines_not_ended_with_stop = []
                if text_for_entity:
                    dataset_entity = make_dataset_entity(text_for_entity)
                    if dataset_entity:
                        dataset_entity['source_type'] = 'lenta'
                        dataset_entity['source_entity'] = f'lenta news'
                        lenta_dataset_entities.append(dataset_entity)

    # If anything left in the string buffer and the token limit is not
    # reached yet - add it to the dataset forcefully.
    if lines_candidates:
        text_for_entity = ' '.join(lines_candidates)
        if text_for_entity:
            dataset_entity = make_dataset_entity(text_for_entity)
            if dataset_entity:
                dataset_entity['source_type'] = 'lenta'
                dataset_entity['source_entity'] = f'lenta news'
                lenta_dataset_entities.append(dataset_entity)

save_entities_to_csv(lenta_dataset_entities, LENTA_SUBSET_FILE)

Add a subset from a file to the total dataset

In [None]:
def add_to_total_dataset(dataset_entities: List[Dict[str, str]], file_path: str):
    with open(file_path, 'r') as subset_file:
        reader = csv.reader(subset_file)
        # Skip the header
        next(reader)
        for row in reader:
            dataset_entities.append({
                'source_type': row[0],
                'source_entity': row[1],
                'text': row[2],
                'labels': row[3]
            })

Make total dataset entities.

In [None]:
total_dataset_entities: List[Dict[str, str]] = []
add_to_total_dataset(total_dataset_entities, SUBTITLES_SUBSET_FILE)
add_to_total_dataset(total_dataset_entities, MOSDUMA_SUBSET_FILE)
add_to_total_dataset(total_dataset_entities, GOSDUMA_SUBSET_FILE)

save_entities_to_csv(total_dataset_entities, TOTAL_SET_FILE)

Method to save text/labels files for subsets of the total dataset (train, dev, test)

In [None]:
def write_text_and_labels_subset(folder_name: str, set_name: str, entities: List[Dict[str, str]]):
    with open(os.path.join(folder_name, f"text_{set_name}.txt"), "w") as text_file:
        with open(os.path.join(folder_name, f"labels_{set_name}.txt"), "w") as labels_file:
            for entity in entities:
                line_text = entity['text']
                line_labels = entity['labels']
                text_file.write(line_text + '\n')
                labels_file.write(line_labels + '\n')

Splits the result dataset into train/test/dev subsets as separate text and labels files.
The data is shuffled before splitting to evenly distribute original
corpuses (Gosduma, Mosduma, Subtitles etc.) among subsets.

In [None]:
total_dataset_entities: List[Dict[str, str]] = []
add_to_total_dataset(total_dataset_entities, TOTAL_SET_FILE)
random.shuffle(total_dataset_entities)

total = len(total_dataset_entities)
train, dev = (
    int(total * TRAIN_PERCENTS / 100),
    int(total * DEV_PERCENTS / 100),
)
write_text_and_labels_subset(RESULT_DATASET_OUTPUT_DIR, 'train', total_dataset_entities[:train])
write_text_and_labels_subset(RESULT_DATASET_OUTPUT_DIR, 'dev', total_dataset_entities[train:(train+dev)])
write_text_and_labels_subset(RESULT_DATASET_OUTPUT_DIR, 'test', total_dataset_entities[(train+dev):])

Set up environmental paths for scripts.

In [None]:
# The NeMo scripts should be downloaded separately (and perhaps they are contained in a different folder).
nemo_root = os.path.expanduser(os.path.join(os.getcwd(), 'nemo'))
config_root = os.path.expanduser(os.path.join(os.getcwd(), 'config'))
scripts_root = os.path.expanduser(os.getcwd())
dataset_root = os.path.expanduser(RESULT_DATASET_OUTPUT_DIR)

os.environ["NEMO_ROOT"] = nemo_root
os.environ["PUNCTUATOR_CONFIG_ROOT"] = config_root
os.environ["PUNCTUATOR_SCRIPTS_ROOT"] = scripts_root
os.environ["PUNCTUATOR_DATASET_ROOT"] = dataset_root

print("NeMo root:", nemo_root)
print("Config root:", config_root)
print("Scripts root:", scripts_root)
print("Dataset root:", dataset_root)

In [None]:
!python $NEMO_ROOT/examples/nlp/token_classification/punctuation_capitalization_train_evaluate.py \
    --config-name=trainer_ru_config.yaml --config-path=$PUNCTUATOR_CONFIG_ROOT

For inference scripts see "transcription_inference.ipynb" nearby.