In [1]:
def clean_text(text: str) -> str:
    text = text.replace("\n", " ")
    text = text.replace("[ [ [", "[[[").replace("] ] ]", "]]]")
    return text

In [2]:
def get_syntactic_relations(doc, lemmatizer):
    """
    Извлекает синтаксические связи (именные группы, включая прилагательные и артикли) из текста.
    Обрабатывает сочетания глагола с предлогом как единое ребро.
    Обрабатывает сочинение (conj), создавая дополнительные связи.
    """
    chunks = []  # Список для хранения именных групп
    relations = []  # Список для хранения связей (субъект, глагол, объект)
    subjects = {}  # Словарь для хранения подлежащих
    conjunctions = {}  # Словарь для хранения связей conj (сочинение)
    chunk_to_text = {}  # Связываем root токены с текстом именных групп

    # Добавляем именные группы (NOUN CHUNKS) и нормализуем их
    for chunk in doc.noun_chunks:
        normalized_chunk = ' '.join([lemmatizer.lemmatize(token.text.lower(), pos='n') for token in chunk if token.text.lower() not in ['the', 'a', 'an']])
        chunks.append((chunk.start_char, chunk.end_char, chunk, normalized_chunk, chunk.root.head, chunk.root.dep_))
        chunk_to_text[chunk.root] = normalized_chunk  # Связываем root токен с нормализованным текстом

    # Обрабатываем сочинение (conj)
    for token in doc:
        if token.dep_ == "conj" and token.head in chunk_to_text:
            head_text = chunk_to_text[token.head]
            conj_text = chunk_to_text.get(token, None)  # Используем уже обработанный chunk

            if head_text and conj_text:
                conjunctions.setdefault(head_text, []).append(conj_text)

    # Добавляем подлежащие
    for chunk in chunks:
        if chunk[5] == 'nsubj':
            subject_text = chunk_to_text.get(chunk[2].root, chunk[3])  # Используем нормализованный текст
            subjects.setdefault(chunk[4], []).append(subject_text)

            # Добавляем conj-подлежащие
            if subject_text in conjunctions:
                subjects[chunk[4]].extend(conjunctions[subject_text])

    # Добавляем связи для глаголов и предлогов
    for i, chunk in enumerate(chunks):
        # Связи с глаголами
        if chunk[4].pos_ == 'VERB' and chunk[5] != 'nsubj':
            subject_list = subjects.get(chunk[4], [])
            object_text = chunk_to_text.get(chunk[2].root, chunk[3])  # Используем нормализованный текст

            for subject in subject_list:
                relations.append((subject, chunk[4].text, object_text))

                # Добавляем conj-объекты
                if object_text in conjunctions:
                    for conj in conjunctions[object_text]:
                        relations.append((subject, chunk[4].text, conj))

        # Обрабатываем сочетания глаголов и предлогов как единое ребро
        if chunk[4].pos_ == 'VERB' and i + 1 < len(chunks):
            next_chunk = chunks[i + 1]
            if next_chunk[4].pos_ == 'ADP':  # Если следующий элемент - предлог
                subject_list = subjects.get(chunk[4], [])
                relation_text = f"{chunk[4].text} {next_chunk[4].text}"
                object_text = chunk_to_text.get(next_chunk[2].root, next_chunk[3])

                for subject in subject_list:
                    relations.append((subject, relation_text, object_text))

                    # Добавляем conj-объекты
                    if object_text in conjunctions:
                        for conj in conjunctions[object_text]:
                            relations.append((subject, relation_text, conj))

    # Добавляем связи для предлогов и объектов
    for token in doc:
        if token.dep_ == "prep" and token.head.pos_ == "NOUN":
            prep_text = token.text
            object_text = None

            # Ищем объект предлога (pobj)
            for child in token.children:
                if child.dep_ == "pobj":
                    object_text = chunk_to_text.get(child, child.text.lower())

            if object_text:
                head_text = chunk_to_text.get(token.head, token.head.text.lower())  # Нормализуем ключ
                subject_list = [head_text]

                # Добавляем все сочинённые существительные
                if head_text in conjunctions:
                    subject_list.extend(conjunctions[head_text])

                # Добавляем рёбра для каждого существительного в subject_list
                for subject in subject_list:
                    relations.append((subject, prep_text, object_text))
                    # print(f"Adding edge: {subject} --[{prep_text}]--> {object_text}")

    # Удаляем связи, где субъект "that"
    relations = [rel for rel in relations if rel[0] != "that"]

    return relations

In [3]:
from nltk.stem import WordNetLemmatizer
import spacy
spacy.require_gpu()
nlp = spacy.load("en_core_web_trf")
lemmatizer = WordNetLemmatizer()

def get_triplets(text: str) -> list[str]:
    doc = nlp(text)
    triplets = get_syntactic_relations(doc, lemmatizer)
    return triplets

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
import re

def clean_word(word: str) -> str:
    reference = False
    if word.startswith("[ [ [ "):
        reference = True
    word = word.replace("[", "").replace("]", "")
    word = word.replace("(", "").replace(")", "")
    word = word.replace(",", "").replace(";", "")
    word = word.replace("-", "").replace(".", "")
    word = word.replace(":", "").replace(".", "")
    word = word.replace("‘", "").replace("’", "")
    word = word.strip()
    word = re.sub(r'\s{2,}', ' ', word).strip()
    if reference:
        word = "[[[" + word + "]]]"
    return word

def clean_triplet(triplet: tuple[str]) -> tuple[str]:
    triplet = (clean_word(word) for word in triplet)
    return triplet

In [9]:
import csv

def process_file(input_file: str, output_file: str):
    with open(input_file, 'r', encoding="utf-8") as f:
        text = f.read()

    text = clean_text(text)
    triplets = get_triplets(text)

    writer = csv.writer(open(output_file, 'w+', encoding="utf-8"), delimiter=";")
    for triplet in triplets:
        triplet = clean_triplet(triplet)
        writer.writerow(triplet)

In [10]:
import os
from pathlib import Path

input_directory = Path("/home/kdemyokhin_1/concept-tree-course-work/articles_anaphora_resolved/arxiv-txt-cs")
output_directory = Path("/home/kdemyokhin_1/concept-tree-course-work/articles_triples/arxiv-txt-cs")


# Получаем список всех txt файлов рекурсивно (включая поддиректории)
input_files = list(input_directory.rglob("*.txt"))

# Формируем список выходных файлов, сохраняя структуру поддиректорий
output_files = []
for file in input_files:
    # Вычисляем относительный путь файла относительно input_directory
    relative_path = file.relative_to(input_directory)
    # Формируем путь к файлу в выходной директории
    out_file = output_directory / relative_path
    # Создаем директорию, если её ещё нет
    out_file.parent.mkdir(parents=True, exist_ok=True)
    output_files.append(out_file)


In [None]:
import tqdm

for input_path, output_path in tqdm.tqdm(zip(input_files, output_files), total=len(input_files)):
    if os.path.exists(output_path):
        continue
    try:
        process_file(input_path, output_path)
    except Exception as e:
        print(f"Can't process {input_path}: {e}")
        with open(f"/home/kdemyokhin_1/concept-tree-course-work/articles_triples/unprocessed_files.txt","w+") as f:
            f.write(str(input_path)+'\n')

  1%|          | 122/10959 [00:43<2:14:03,  1.35it/s]