In [26]:
import xml.etree.ElementTree as ET
import string
import json
import os

from jedi.inference.utils import to_list

Функция build_verb_to_inf по морфологическому словарю возвращает словарь, в котором в качестве ключа хранятся лемматизированные глаголы, а значениями являются соответствующие инфинитивы.

In [27]:
def build_verb_to_inf(dictionary_file):

    tree = ET.parse(dictionary_file)
    root = tree.getroot()

    lemma_info = {}

    lemmata_section = root.find("lemmata")
    if lemmata_section is None:
        return {}

    for lemma_elem in lemmata_section.findall("lemma"):

        lemma_id_str = lemma_elem.get("id")
        if lemma_id_str is None:
            continue
        try:
            lemma_id = int(lemma_id_str)
        except ValueError:
            continue

        l_elem = lemma_elem.find("l")
        if l_elem is None:
            continue

        lemma_text = l_elem.get("t", "").strip()
        if not lemma_text:
            continue
        g_elem = l_elem.find("g")

        if g_elem is None:
            continue

        gram = g_elem.get("v", "").strip()
        lemma_info[lemma_id] = (lemma_text, gram)

    verb_to_inf= {}

    for lem_id in sorted(lemma_info.keys()):
        lemma_text, gram = lemma_info[lem_id]

        if gram == "VERB":
            next_id = lem_id
            while(True):
                next_id += 1
                if next_id in lemma_info:
                    next_text, next_gram = lemma_info[next_id]
                    if next_gram == "INFN":

                        verb_to_inf[lemma_text] = next_text
                        break
                    else:

                        verb_to_inf[lemma_text] = lemma_text
                    if next_gram != "VERB":
                        break
                else:

                    verb_to_inf[lemma_text] = lemma_text


    return verb_to_inf

In [28]:
dict_path = "../../raw_data/dict.opcorpora.xml"

if (os.path.exists("../../processed_data/verb_to_inf.json")):
    with open("../../processed_data/verb_to_inf.json", 'r', encoding='utf-8') as f:
            verb_to_inf = json.load(f)
else:
    verb_to_inf = build_verb_to_inf(dict_path)


Подготовим xml-дерево и массивы для удобного взаимодействия с деревом

In [29]:
file_path = "../../raw_data/annot.opcorpora_upgraded.xml"

In [30]:
tree = ET.parse(file_path)

In [54]:
def build_adjacency_array():
    root = tree.getroot()

    adjacency_array = [[None]] * 5000

    vertices = [None] * 5000

    for text_elem in root.findall("text"):
        parent = int(text_elem.get("parent"))
        child = int(text_elem.get("id"))
        vertices[child] = text_elem
        if adjacency_array[parent][0] is None:
            adjacency_array[parent] = []
        adjacency_array[parent].append(child)

    return adjacency_array, vertices

In [55]:
adjacency_array, vertices = build_adjacency_array()

In [56]:
def expand(type_id):
    sentences = []
    for sentence in vertices[type_id].findall(".//sentence"):
        tokens_elem = sentence.find("tokens")
        if tokens_elem is not None:
            tokens_list = []
            for token in tokens_elem.findall("token"):
                if token.find("tfr/v/l") is not None and (
                word := token.find("tfr/v/l").attrib.get("t")) not in stoplist:
                    if token.find("tfr/v/l/g") is not None and token.find("tfr/v/l/g").attrib.get(
                        "v") == "VERB":
                        word = verb_to_inf[word]
                    if any(char.isdigit() for char in word): continue
                    tokens_list.append(word)
        sentences.append(tokens_list)
    return sentences

In [57]:
stoplist = list(string.punctuation)
additional_chars = ['…', '–', '—', '«', '»', '“', '”', '’', '‘']
stop_words = [
    # Предлоги
    "в", "во", "без", "для", "до", "за", "из", "изо", "к", "ко",
    "на", "над", "о", "об", "обо", "от", "по", "под", "подо",
    "при", "про", "с", "со", "через", "сквозь", "среди", "около",

    # Союзы
    "и", "а", "но", "либо", "или", "да", "однако", "зато",
    "потому что", "так как", "поскольку", "как", "если", "когда",
    "хотя", "ведь", "поэтому", "так что", "ибо",

    # Частицы
    "же", "бы", "ли", "разве", "только", "даже", "уж", "ну", "хоть", "-то"
]
stoplist.extend(additional_chars)
stoplist.extend(stop_words)

In [58]:
def corpus_filtering(type_id, begin, end, year = None):
    corpus = []
    tags_elem = vertices[type_id].find("tags")
    if tags_elem is not None:

        for tag in tags_elem.findall("tag"):
            tag_text = tag.text
            if tag_text and tag_text.startswith("Год:"):
                try:
                    year = int(tag_text.split("Год:")[-1].strip())
                except ValueError:
                    pass
                break

    for child in adjacency_array[type_id]:

        if child is None:
            if year is not None and (begin <= year < end):
                corpus = expand(type_id)
            return corpus

        corpus.extend(corpus_filtering(child, begin, end, year))


    return corpus

In [59]:
corpus_types = {
    "ЧасКор (новости)" : 226,
    "Википедия" : 8,
    "Блоги" : 184,
    "Худож. литература" : 806,
    "Нон-фикшн" : 2037
}

In [60]:
def amount_of_words(corpus):
    amount = 0
    for sen in corpus:
        amount += len(sen)
    return amount

In [61]:
corpora_stats = {}

start_with = 1800
end = 2020
shift = 10

for (type, id) in corpus_types.items():
    quantity = 0
    time_cutoffs = []
    words = []
    for year in range(start_with, end, shift):
        tmp_corpus = corpus_filtering(id, year, year + shift)
        words.append(amount_of_words(tmp_corpus))
        time_cutoffs.append(year)
    corpora_stats[type] = (time_cutoffs, words)


In [62]:
import os
import matplotlib.pyplot as plt
from typing import List, Union

In [63]:
def visualize_word_counts(
    x_data: List[Union[int, str]],
    y_data: List[int],
    output_path: str = "word_counts_visualization.png",
    figsize: tuple = (12, 6),
    dpi: int = 300,
    show_values: bool = True,
    rotation: int = 45
) -> None:

    if all(isinstance(x, int) for x in x_data):
        sorted_data = sorted(zip(x_data, y_data), key=lambda pair: pair[0])
        x_data, y_data = zip(*sorted_data)
        x_data = [str(x) for x in x_data]
    else:
        x_data = [str(x) for x in x_data]

    plt.figure(figsize=figsize, dpi=dpi)
    bars = plt.bar(x_data, y_data, color='#1f77b4')

    plt.title(f'Распределение количества слов типа текстов {output_path}', pad=20)
    plt.xlabel('Десятилетие', labelpad=10)
    plt.ylabel('Количество слов', labelpad=10)
    plt.xticks(rotation=rotation, ha='right')
    plt.grid(axis='y', linestyle=':', alpha=0.7)

    if show_values:
        for bar in bars:
            height = bar.get_height()
            plt.text(
                bar.get_x() + bar.get_width() / 2,
                height,
                f'{int(height):,}',
                ha='center',
                va='bottom',
                fontsize=8
            )

    plt.tight_layout()
    output_path = "../../processed_data_upgraded/pics/распределения слов " + output_path + ".png"
    output_dir = os.path.dirname(output_path)
    if output_dir:
        os.makedirs(output_dir, exist_ok=True)

    plt.savefig(output_path, bbox_inches='tight')
    plt.close()

    print(f"Visualization saved to {output_path}")

In [64]:
for (type, (decades, words)) in corpora_stats.items():
    visualize_word_counts(decades, words, output_path=type)

Visualization saved to ../../processed_data_upgraded/pics/распределения слов ЧасКор (новости).png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов Википедия.png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов Блоги.png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов Худож. литература.png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов Нон-фикшн.png


Изучив данные, становится понятно, что для исследования семантических сдвигов, мы должны взять Худож. литература, ибо другие типы не предоставляют достаточного количества данных до 21 века.


Для выбора второй выборки проведем сравнение, исследуем сумму слов для каждого типа + Худож. литература

In [65]:
def sum_arr(first, second):
    length = len(first)
    res = [0] * length
    for i in range(length):
        res[i] = first[i] + second[i]
    return res

In [66]:
lit_type = "Худож. литература"
lit_word = corpora_stats[lit_type][1]

for (type, (decades, words)) in corpora_stats.items():
    if type == lit_type: continue
    visualize_word_counts(decades, sum_arr(words, lit_word), output_path=("суммы " + lit_type + " и " + type))

Visualization saved to ../../processed_data_upgraded/pics/распределения слов суммы Худож. литература и ЧасКор (новости).png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов суммы Худож. литература и Википедия.png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов суммы Худож. литература и Блоги.png
Visualization saved to ../../processed_data_upgraded/pics/распределения слов суммы Худож. литература и Нон-фикшн.png


Из полученных графиков, можно определить, что разбиение на 3 периода осмысленно, только в случае:
1. до 2000 года
2. с 2000 по 2010
3. с 2010 по 2020

Для такой кластеризации подойдут наборы данных Худож. литература + Блоги и Худож. литература + ЧасКор (новости)

Составим соответствующие корпуса текстов

In [67]:
def periods_calc(type, periods):
    length = len(periods)
    data = [0] * length
    for i in range(length):
        data[i] = corpus_filtering(corpus_types[type], *periods[i])
    return data

In [68]:
fiction_and_blogs = {}
fiction_and_news = {}

fiction_type = "Худож. литература"
blogs_type = "Блоги"
news_type = "ЧасКор (новости)"

periods = [(1800, 2000), (2000, 2015)]

fiction_data = periods_calc(fiction_type, periods)
blogs_data = periods_calc(blogs_type, periods)
news_data = periods_calc(news_type, periods)

times = len(periods)

for i in range(times):
    fiction_and_blogs[periods[i]] = fiction_data[i] + blogs_data[i]
    fiction_and_news[periods[i]] = fiction_data[i] + news_data[i]


Сохраним полученные данные

In [69]:
import json

In [70]:
digits = {1: "one", 2: "two", 3: "three"}

In [71]:
for i in range(times):
    with open(f"../../processed_data_upgraded/training_data/fiction_and_blogs_datum_{digits[i + 1]}.json", 'w', encoding='utf-8') as file:
        json.dump(fiction_and_blogs[periods[i]], file, ensure_ascii=False, indent=2)

    with open(f"../../processed_data_upgraded/training_data/fiction_and_news_datum_{digits[i + 1]}.json", 'w', encoding='utf-8') as file:
        json.dump(fiction_and_news[periods[i]], file, ensure_ascii=False, indent=2)

In [49]:
# for i in range(times):
#     with open(f"../../processed_data_upgraded/training_data/fiction_and_blogs_datum_{digits[i + 1]}.cor", 'w', encoding='utf-8') as file:
#         for tokens in fiction_and_blogs[periods[i]]:
#             line = " ".join(tokens)
#             file.write(line + "\n")
#
#     with open(f"../../processed_data_/training_data/fiction_and_news_datum_{digits[i + 1]}.cor", 'w', encoding='utf-8') as file:
#         for tokens in fiction_and_news[periods[i]]:
#             line = " ".join(tokens)
#             file.write(line + "\n")

Также сохраним словарь verb_to_inf

In [50]:
with open("../../processed_data/verb_to_inf.json", 'w', encoding='utf-8') as file:
    json.dump(verb_to_inf, file, ensure_ascii=False, indent=2)