In [1]:
import numpy as np
import math

In [2]:
import io
import stanza
import networkx as nx
from tqdm import tqdm
from nltk.tokenize import sent_tokenize

Косинусные расстояния

In [3]:
import torch
from transformers import BertModel, BertTokenizer
from sklearn.metrics.pairwise import cosine_similarity

In [8]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
bert_version = 'DeepPavlov/rubert-base-cased'
tokenizer = BertTokenizer.from_pretrained(bert_version)
model = BertModel.from_pretrained(bert_version)
model = model.eval()
model = model.to(device)

Some weights of the model checkpoint at DeepPavlov/rubert-base-cased were not used when initializing BertModel: ['cls.predictions.transform.LayerNorm.weight', 'cls.predictions.transform.dense.weight', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.bias', 'cls.predictions.decoder.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


Средства для парсинга текста

In [5]:
nlp = stanza.Pipeline(lang='ru', processors='tokenize,pos,lemma,ner,depparse')

2024-02-15 11:36:10 INFO: Checking for updates to resources.json in case models have been updated.  Note: this behavior can be turned off with download_method=None or download_method=DownloadMethod.REUSE_RESOURCES
Widget Javascript not detected.  It may not be installed or enabled properly. Reconnecting the current kernel may help.





2024-02-15 11:36:14 INFO: Loading these models for language: ru (Russian):
| Processor | Package            |
----------------------------------
| tokenize  | syntagrus          |
| pos       | syntagrus_charlm   |
| lemma     | syntagrus_nocharlm |
| depparse  | syntagrus_charlm   |
| ner       | wikiner            |

2024-02-15 11:36:14 INFO: Using device: cpu
2024-02-15 11:36:14 INFO: Loading: tokenize
2024-02-15 11:36:15 INFO: Loading: pos
2024-02-15 11:36:15 INFO: Loading: lemma
2024-02-15 11:36:15 INFO: Loading: depparse
2024-02-15 11:36:17 INFO: Loading: ner
2024-02-15 11:36:23 INFO: Done loading processors!


Подгрузка текстов

In [9]:
texts = {0 : {'fname' : "text.txt"}}
# тут должен быть словарь с названиями файлов

In [10]:
text_n = 0 # задаем номер текста для обработки
text = ''
with io.open(texts[text_n]['fname'] , encoding='utf-8' ) as inp:
    text += inp.read()
    texts[text_n]['len'] = len(text)

sentences = [sent for sent in sent_tokenize(text, language="russian")]
current_document = sentences

Структуры для хранения элементов графа

In [11]:
class Subtree_:
    def __init__(self, parse_d, text_n, sent_s):
        self.dict = parse_d
        
        start_pos = None
        end_pos = None
        for elm in self.dict:
            if start_pos is None or self.dict[elm]['start'] < start_pos:
                start_pos = self.dict[elm]['start']
            if end_pos is None or self.dict[elm]['end'] < end_pos:
                end_pos = self.dict[elm]['end']
        start_pos += sent_s
        end_pos += sent_s
        
        self.positions = [{'t' : text_n, 's' : start_pos, 'e' : end_pos, 'm' : (start_pos + end_pos) / 2}]

    def absorb(self, other):
        # считаем, что пересечений в позициях не бывает
        self.positions += other.positions

    def print(self):
        print(f"=============== {self.start_pos} = {self.end_pos} ===============")
        for elm in self.dict:
            print(elm, self.dict[elm])
            
    def dist(self, other):
        dist = 1
        for pos in self.positions:
            for other_pos in other.positions:
                if pos['t'] == other_pos['t']:
                    dist = abs(pos['m'] - other_pos['m']) / texts[text_n]['len']
        return dist
    
    def text(self):
        text = ""
        for elm in self.dict:
            text += elm + ' '
        return text
    
    def __repr__(self):
        res = f"{self.text()} \t|\t {self.positions}" + "\n"
        return res

In [12]:
def extract_subtree(parse_d, root): # вернуть поддерево, содержащие элементы, зависящие от корня
    subtree_d = {root : parse_d[root]}
    while True:
        changed = False
        for elm in parse_d:
            if parse_d[elm]['head'] in subtree_d.keys() and elm not in subtree_d.keys():
                subtree_d[elm] = parse_d[elm]
                changed = True
        if not changed:
            break
            
    return subtree_d

def trim_tree(parse_d, root): # удалить из дерева все зависимости, что не являются obj и subj
    trimmed_d = {}
    for elm in parse_d:
        if parse_d[elm]['dep'] in ['obj', 'subj', 'nsubj'] or elm == root:
            trimmed_d[elm] = parse_d[elm]
            
    return trimmed_d            

In [17]:
class graph_base:
    def __init__(self, subtrees_list, dist_measure = calc_weight):
        self.nodes_list = subtrees_list
        self.nodes = {subtree.text() : subtree for subtree in subtrees_list}
        self.dist_measure = dist_measure

    def find_weight_txt(self, txt_1, txt_2):
        return self.dist_measure(self.nodes[txt_1],self.nodes[txt_2])
        
    def find_weight_pos(self, num_1, num_2):
        return self.dist_measure(self.nodes_list[num_1],self.nodes_list[num_2])
    
    def find_all_distances(self, elm_pos):
        dist_list = {}
        for pos in range(len(self.nodes_list)):
            dist_list[self.nodes_list[pos].text()] = self.find_weight_pos(elm_pos, pos)
        return dist_list
    
    def find_closest(self, elm_pos, amount = 1):
        dist_list = {}
        for pos in range(len(self.nodes_list)):
            dist_list[self.nodes_list[pos].text()] = self.find_weight_pos(elm_pos, pos)

        return list(reversed(sorted(dist_list.items(), key=lambda item: item[1])))[0 : amount]

Функции для измерения весов ребер

In [14]:
def shanon_entropy(parse_d):
    subtree_str = ''
    for elm in parse_d:
        subtree_str += ' ' + elm
    str_elements = set(subtree_str)
    entropy = 0
    for elm in str_elements:
        prob = subtree_str.count(elm) / len(subtree_str)
        entropy -= prob * math.log2(prob)
        
    return entropy

In [15]:
def calculate_cosine_similarity(sent_1, sent_2):
    texts = [sent_1, sent_2]
    encodings = tokenizer(texts, padding=True, return_tensors='pt').to(device)
    with torch.no_grad():
        embed_1, embed_2 = model(**encodings)[0].cpu()
    return cosine_similarity(embed_1, embed_2).mean()    

In [16]:
def calc_weight(lhs : Subtree_, rhs : Subtree_):
    return 1 - lhs.dist(rhs) + calculate_cosine_similarity(lhs.text(), rhs.text()) + shanon_entropy(rhs.text()) / 2

Собственно, парсинг текста

In [18]:
# добавление документа к графу
selected_subtrees = []
sent_start = 0

current_document_num = 0 #my_knowledge_graph.lst_doc_index + 1
current_document_graph = nx.DiGraph() # граф, который будет строиться во время обработки документа, а затем сольется с исходным графом
# альтернативный подъод - искать в сузествующем графе слова из обрабатываемого документа, если их нет, то делать новые вершины прям в нем
# но мы это и делаем при слиянии (по крайней мере планируем делать)

current_sentence_num = -1
# проход по предложениям в документе
for s in tqdm(current_document):
    current_sentence_num += 1

    doc = nlp(s)
    sent = doc.sentences[0]

    temp_d = dict()
    
    # преобразование вывода пайплайна в словарь, содержащий только необходимые ключи
    current_sentence_pos = -1
    for word in sent.words:
        current_sentence_pos += 1
        temp_d[word.text] = {"head": sent.words[word.head-1].text, 
                             "dep": word.deprel, 
                             "id": word.id, 
                             "upos": word.upos,
                             "lem": word.lemma.lower(),
                             "start" : word.start_char,
                             "end" : word.end_char}
    
    
    cur_sent_len = temp_d[list(temp_d.keys())[-1]]['end']
    
    for elm in temp_d.keys():
        sent_start += len(elm) + 1
        # определяем, делаем ли мы из этого вершины и ребра
        if     temp_d[elm]['upos'] == 'NOUN'\
            or temp_d[elm]['upos'] == 'NUM':
            
            sub_d = extract_subtree(temp_d, elm)
            trimmed_sub_d = trim_tree(sub_d, elm)
            subtree = Subtree_(trimmed_sub_d, 0, sent_start)
            
            selected_subtrees.append(subtree)
           



100%|█████████████████████████████████████████| 244/244 [04:26<00:00,  1.09s/it]


Тут убираются дубликаты, созраняются списки упоминаний каждого уникального элемента знаний

In [19]:
merged_subtrees = []
absorbed_ids = []
for ind in range(len(selected_subtrees)):
    if ind in absorbed_ids:
        continue
    for other_ind in range(ind + 1, len(selected_subtrees)):
        if selected_subtrees[ind].text() == selected_subtrees[other_ind].text():
            selected_subtrees[ind].absorb(selected_subtrees[other_ind])
            absorbed_ids.append(other_ind)
    merged_subtrees.append(selected_subtrees[ind])

Собатвенно, пример работы с графом

In [20]:
graph = graph_base(merged_subtrees)

In [21]:
n = 100
print(graph.nodes_list[n])
graph.find_closest(n,4)

лекции  	|	 [{'t': 0, 's': 3315, 'e': 3321, 'm': 3318.0}]



[('сожалению ', 2.5114902473806398),
 ('изображении ', 2.474484403074112),
 ('учреждения ', 2.460029080098655),
 ('роста Первый очки шляпу ', 2.4575713087864135)]