# Написать программу для построения обратного индекса с учетом векторного представления слов (Word2Vec, Bert). Необходимо реализовать возможность поиска по нескольким словам.

In [1]:
#!pip install transformers

In [2]:
import torch
import math
from transformers import BertTokenizer, BertModel, logging
import os
import unicodedata
import sys
import re
import warnings
import numpy as np
from scipy.spatial.distance import cosine
from itertools import chain
from pathlib import Path
logging.set_verbosity_error()
warnings.filterwarnings(action='ignore', category=RuntimeWarning) # setting ignore as a parameter and further adding category

In [3]:
if not sys.version_info >= (3, 5):
    print('Для работы программы требуется версия Pyhton не меньше 3.5')
    exit()

In [4]:
DATA_DIR='data'
TOP_TXT=3
tokenizer = BertTokenizer.from_pretrained("bert-base-uncased")
model = BertModel.from_pretrained('bert-base-uncased',output_hidden_states = True)
model.eval()

BertModel(
  (embeddings): BertEmbeddings(
    (word_embeddings): Embedding(30522, 768, padding_idx=0)
    (position_embeddings): Embedding(512, 768)
    (token_type_embeddings): Embedding(2, 768)
    (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
    (dropout): Dropout(p=0.1, inplace=False)
  )
  (encoder): BertEncoder(
    (layer): ModuleList(
      (0): BertLayer(
        (attention): BertAttention(
          (self): BertSelfAttention(
            (query): Linear(in_features=768, out_features=768, bias=True)
            (key): Linear(in_features=768, out_features=768, bias=True)
            (value): Linear(in_features=768, out_features=768, bias=True)
            (dropout): Dropout(p=0.1, inplace=False)
          )
          (output): BertSelfOutput(
            (dense): Linear(in_features=768, out_features=768, bias=True)
            (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
            (dropout): Dropout(p=0.1, inplace=False)
          

In [5]:
def file_to_tokens(filename) -> list[str]:
    """
    Функция, которая для каждого документа составляет список слов (токенов). 
    Лемматизируем, токенизируем, убираем знаки препинания.
    """
    text = Path(filename).read_text()
    text = text.replace('\n', '')
    #text = re.sub(r'[^\w\s]', '', text)
    text = re.sub(r'[^\w\s]','',unicodedata.normalize("NFKC",text))

    marked_text = "[CLS] " + text + " [SEP]"
    tokenized_text = tokenizer.tokenize(marked_text)
    indexed_tokens = tokenizer.convert_tokens_to_ids(tokenized_text)
    segments_ids = [1] * len(tokenized_text)
    tokens_tensor = torch.tensor([indexed_tokens])
    segments_tensors = torch.tensor([segments_ids])
    with torch.no_grad():
        hidden_states = model(tokens_tensor, segments_tensors)[2]
    token_embeddings = torch.stack(hidden_states, dim=0)
    token_embeddings = torch.squeeze(token_embeddings, dim=1)
    token_embeddings = token_embeddings.permute(1,0,2)
    token_vecs_sum = [torch.sum(x[-4:], dim=0) for x in token_embeddings]
    ret = [[tokenized_text[x], token_vecs_sum[x]] for x in range(len(tokenized_text))]
    return ret

In [6]:
class ReverseIndex:
    def __init__(self, documents) -> None:
        """
        Конструктор класса.
        documents -- список файлов.
        """
        self.documents = documents
        self.bert_data = dict()
        self.count_words_in_documents = dict()

    def similarity(self, word1, word2) -> float:
        """
        Косинусное сходство между словами.
        word1 -- первое слово.
        word2 -- второе слово.
        """
        return 1. - cosine(word1, word2)

    def find_vector(self, line) -> list:
        """
        Функция, получающая вектор для строки на основе модели Национального корпуса русского языка.
        line -- слово/строка, для которой нужно получить вектор/ы.
        """
        if(type(line) != str):
            single = False
            line = " ".join(line)
        else:
            single = True
        marked_text = "[CLS] " + line + " [SEP]"
        tokenized_text = tokenizer.tokenize(marked_text)
        indexed_tokens = tokenizer.convert_tokens_to_ids(tokenized_text)
        segments_ids = [1] * len(indexed_tokens) 
        tokens_tensor = torch.tensor([indexed_tokens])
        segments_tensors = torch.tensor([segments_ids])
        with torch.no_grad():
            hidden_states = model(tokens_tensor, segments_tensors)[2]
        token_embeddings = torch.stack(hidden_states, dim=0)
        token_embeddings = torch.squeeze(token_embeddings, dim=1)
        token_embeddings = token_embeddings.permute(1, 0, 2)
        token_vecs_sum = [torch.sum(x[-4:], dim=0) for x in token_embeddings]
        return token_vecs_sum[1] if single else token_vecs_sum

    def get_tokens_from_document(self, docId) -> dict():
        """
        Функция, получающая список токенов из документа по его идентификатору.
        docId -- идентификатор документа.
        """
        return [x for x in self.bert_data.keys() if docId in self.bert_data[x].keys()]

    def word_counter(self, docId, info) -> tuple:
        """
        Получает список слов, которые наиболее часто встречаются в документе, 
        и их косинусные расстояния с искомым вектором.
        docId -- идентификатор документа.
        info -- словарь, где ключи -- идентификаторы документов, 
        а значения -- список из словарей с ключами "слово_документ_позиция" и значениями "похожесть слова на искомое".
        """
        words = info[docId]
        words.sort(key=lambda x: x[1])
        best_3 = words[-3:]
        words = [x[0] for x in best_3 if x[1] > 0.3]
        sims = [x[1] for x in best_3 if x[1] > 0.3]
        return words, sims, math.fsum(sims)

    def multiple_words_counter(self, docId, info) -> tuple:
        """
        Функция, получающая список слов и позиций в документе, которые наиболее часто встречаются в документе, 
        и их косинусные расстояния с искомым словом.
        docId -- идентификатор документа.
        info -- словарь, где ключи -- идентификаторы документов, 
        а значения -- список из словарей с ключами "слово_документ_позиция" и значениями "похожесть слова на искомое".
        """
        words = info[docId]
        words.sort(key=lambda x: x[1])
        best_3 = words[-3:]
        words = [x[0].split("_")[0] for x in best_3 if x[1] > 0.3]
        sims  = [x[1] for x in best_3 if x[1] > 0.3]
        pos  = [x[0].split("_")[1] for x in best_3 if x[1] > 0.3]
        return words, pos, sims, math.fsum(sims)

    def find_single_word(self, word) -> dict():
        """
        Функция для поиска документов, содержащих одно слово.
        word -- слово, по которому нужно осуществить поиск.
        """
        results = dict()
        docs = dict()
        for i, document in enumerate(self.documents):
            similarity_of_words = []
            word_vector = self.find_vector(word)
            for w in self.get_tokens_from_document(i):
                for position in self.bert_data[w][i].keys():
                    sim_rate = self.similarity(word_vector, self.bert_data[w][i][position])
                    if i not in results.keys():  # добавляем индекс документа
                        results[i] = []
                    if w == word:
                        sim_rate += 2
                    new_w = w + "_" + str(position)
                    # Храним словарь слово, документ, позиция, его похожесть с искомым
                    results[i].append([new_w, sim_rate])  
            words, sims, sum = self.word_counter(i, results)
            # При таких результатах документ пропускаем
            if(sum == 0):
                continue
            docs[i] = [words, sum]
        output = dict()
        for i in dict(sorted(docs.items(), key=lambda item: item[1][1], reverse=True)).keys():
            output[self.documents[i]] = docs[i][0]  # позиция слова в документе
        return output

    def min_distance(self, word1, word2) -> int:
        """
        Функция для определения минимального расстояния между двумя словами.
        """
        # Используется лямбда-функция для вычисления расстояния между символами в соответствующих позициях в словах
        return min(map(lambda x, y: abs(int(x) - int(y)), word1, word2)) 

    def find(self, *words): # -> list(str), dict
        """
        Функция для нахождения слов в файлах. Если передано несколько слов, обрабатываем их внутри метода
        для каждого слова ищем похожие, анализируем их расположение и веса.
        Если передано одно слово - делегируем обработку в find_single_word().
        """
        awords = [a.lower() for a in words]
        # если передано одно слово, то вызываем метод find_single_word()
        if(len(awords) == 1):
            return self.find_single_word(awords[0])
        results = dict()
        docs = dict()
        results_tmp = dict()
        final_results = dict()
        word_vectors = self.find_vector(words)[1:-1] # получаем векторы слов
        for i, document in enumerate(self.documents): # итерируемся по каждому документу
            avg_sim = [] 
            similarity_of_words = dict()
            for idx, word_vec in enumerate(word_vectors):
                for w in self.get_tokens_from_document(i):
                    for position in self.bert_data[w][i].keys():
                        # считаем меру похожести для каждого слова, которое встречается в документе
                        sim_rate = self.similarity(word_vec, self.bert_data[w][i][position])
                        if i not in results.keys():
                            results[i] = []
                        if w in awords:
                            sim_rate += 2 # увеличиваем вес при совпадении слова с запросом
                        new_w = w + "_" + str(position)
                        results[i].append([new_w, sim_rate]) # сохраняем значение похожести для каждого слова
                # Получаем количество совпадающих слов, сумму их весов и значение средней похожести
                words, positions, sims, sum = self.multiple_words_counter(i, results)  
                # Сохраняем результаты для каждого слова. Используется для вычисления среднего расстояния между словами
                results_tmp[idx] = [words, positions, sims, sum]
                # Добавляем значение похожести для каждого слова в список avg_sim
                avg_sim.append(sims)
            # При таких результатах документ пропускаем
            if(len(results_tmp[0][0]) == 0):
                continue;
            # Вычисляем массив минимальных значений между парами слов
            avg_distance = [self.min_distance(results_tmp[x][1], results_tmp[x + 1][1]) for x in range(len(results_tmp.keys()) - 1)]
            # Получаем итоговое значение оптимальности для каждого документа
            final_results[i] = [np.mean(np.array(list(chain.from_iterable(avg_sim)))), np.mean(np.array(avg_distance))]
        # Возвращаем имена документов, отсортированные по оптимальности
        ret = [self.documents[x] for x in dict(sorted(final_results.items(), key=lambda item: (item[1][0], item[1][1]), reverse=True)).keys()]
        return ret
    
    def run(self) -> None:
        """
        Метод запускает процесс индексации файлов.
        """
        for i, document in enumerate(self.documents):
            words_and_vectors = file_to_tokens(document)
            self.count_words_in_documents[i] = len(words_and_vectors)

            for idx, el in enumerate(words_and_vectors):
                w = words_and_vectors[idx][0]
                vector = words_and_vectors[idx][1]
                if w not in self.bert_data: # добавляем слово
                    self.bert_data[w] = dict()
                if i not in self.bert_data[w].keys(): # добавляем индекс документа
                    self.bert_data[w][i] = dict()
                self.bert_data[w][i][idx] = vector # добавляем позицию слова и его вектор

In [7]:
files = os.listdir('./' + DATA_DIR + '/') # Фрагменты статей англоязычной Википедии
files[:] = [DATA_DIR + '\\' + x for x in files]
X = ReverseIndex(files)
X.run()

In [8]:
X.find('trees')

{'data3\\insect.txt': ['trees_256', 'trees_35', 'trees_281'],
 'data3\\willow.txt': ['species_61', 'species_40', 'trees_26'],
 'data3\\disney.txt': ['things_66', 'time_18', '##s_113'],
 'data3\\rainfall.txt': ['##ees_52', 'creating_50', 'sites_63'],
 'data3\\pushkin.txt': ['he_92', '##eral_70', '##on_109'],
 'data3\\wildfires.txt': ['and_101', 'acres_69', 'europe_113'],
 'data3\\jordan.txt': ['something_22', '10_74', 'who_75'],
 'data3\\tuttifrutti.txt': ['##t_101', 'a_327', '##boy_197']}

In [9]:
X.find_single_word('Symphyta')

{'data3\\insect.txt': ['ce_204', '##hy_224', 'sy_222'],
 'data3\\willow.txt': ['the_98', 'it_115', 'species_40'],
 'data3\\wildfires.txt': ['##wil_89']}

In [10]:
X.find_single_word('Ariel')

{'data3\\disney.txt': ['time_18', '##s_113', 'ariel_33'],
 'data3\\pushkin.txt': ['##eral_70', '##on_109'],
 'data3\\rainfall.txt': ['the_24', 'sites_63'],
 'data3\\wildfires.txt': ['europe_113'],
 'data3\\tuttifrutti.txt': ['boom_409'],
 'data3\\jordan.txt': ['lakers_86'],
 'data3\\willow.txt': ['##most_39']}

In [11]:
X.find_single_word("caterpillar")

{'data3\\insect.txt': ['wasp_145', 'larvae_274', 'cater_59'],
 'data3\\willow.txt': ['herb_103', 'willow_100']}

In [12]:
X.find('Alladdin', 'Beauty')

['data3\\disney.txt', 'data3\\pushkin.txt', 'data3\\willow.txt']

In [13]:
X.find('especially', 'willow')

['data3\\willow.txt',
 'data3\\insect.txt',
 'data3\\disney.txt',
 'data3\\jordan.txt',
 'data3\\pushkin.txt',
 'data3\\wildfires.txt',
 'data3\\tuttifrutti.txt',
 'data3\\rainfall.txt']

In [14]:
X.find('willow', 'especially')

['data3\\willow.txt',
 'data3\\insect.txt',
 'data3\\disney.txt',
 'data3\\pushkin.txt',
 'data3\\wildfires.txt',
 'data3\\jordan.txt',
 'data3\\tuttifrutti.txt']

Разный результат у тестов в зависимости от порядка слов. Словосочетание "willow especially" выглядит неграмотно и приоритетнее оказывается первое слово - willow.

In [15]:
X.find('especially')

{'data3\\insect.txt': ['especially_284'],
 'data3\\disney.txt': ['##s_113'],
 'data3\\pushkin.txt': ['##on_109'],
 'data3\\wildfires.txt': ['europe_113'],
 'data3\\tuttifrutti.txt': ['boom_409']}

In [16]:
X.find('over', 'the', 'lakers')

['data3\\jordan.txt',
 'data3\\disney.txt',
 'data3\\rainfall.txt',
 'data3\\pushkin.txt',
 'data3\\tuttifrutti.txt',
 'data3\\insect.txt',
 'data3\\willow.txt',
 'data3\\wildfires.txt']

In [17]:
X.find('trees', 'shrubs')

['data3\\insect.txt',
 'data3\\willow.txt',
 'data3\\rainfall.txt',
 'data3\\pushkin.txt',
 'data3\\wildfires.txt',
 'data3\\disney.txt',
 'data3\\jordan.txt',
 'data3\\tuttifrutti.txt']

In [18]:
X.find('have', 'burned')

['data3\\wildfires.txt',
 'data3\\insect.txt',
 'data3\\disney.txt',
 'data3\\jordan.txt',
 'data3\\pushkin.txt',
 'data3\\tuttifrutti.txt',
 'data3\\willow.txt']

In [19]:
X.find('Emperor', 'court')

['data3\\pushkin.txt']