In [4]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [5]:
import math
import os
import re
import shutil
from collections import Counter

import nltk
from gensim.models import Word2Vec
from gensim.similarities.annoy import AnnoyIndexer
from nltk.corpus import stopwords

from utils import from_current_file, load_json, round_float, save_json

nltk.download("stopwords")
nltk.download("punkt_tab")

[nltk_data] Error loading stopwords: <urlopen error [WinError 10061]
[nltk_data]     Подключение не установлено, т.к. конечный компьютер
[nltk_data]     отверг запрос на подключение>
[nltk_data] Error loading punkt_tab: <urlopen error [WinError 10061]
[nltk_data]     Подключение не установлено, т.к. конечный компьютер
[nltk_data]     отверг запрос на подключение>


False

In [6]:
class Word2VecIndexer:
    _stop_words = set(stopwords.words("english"))

    def __init__(
        self,
        index_dir: str = "../data/embedding_directory",
        documents_dir: str = "../data/scrapped/class_data_function__1_1",
        top_similar: int = 10,
        force: bool = False,
    ):
        self._index_dir = from_current_file(index_dir)
        self._documents_dir = from_current_file(documents_dir)
        self.top_similar = top_similar

        self._word2vec_model_path = os.path.join(self._index_dir, "word2vec.model")
        self._annoy_index_path = os.path.join(self._index_dir, "annoy_index")
        self._doc_id_path = os.path.join(self._index_dir, "documents.json")
        self.documents: dict[int, str] = {}

        self.model: Word2Vec = None
        self.annoy_indexer: AnnoyIndexer = None

        if force or not os.path.exists(self._index_dir):
            print("Index is not found, creating new...")
            if force:
                try:
                    shutil.rmtree(self._index_dir)
                except FileNotFoundError:
                    pass
            os.mkdir(path=self._index_dir)
            self.build_index()
            print("Complete!")

        self.load_index()

    def _tokenize(self, text: str) -> list[str]:
        return [w for w in re.findall(r"\w+", text.lower()) if w not in self._stop_words]

    def _get_similar_words(self, word: str) -> set[tuple[str, float]]:
        matches = set()
        if self.model and word in self.model.wv:
            for similar_word, similarity in self.model.wv.most_similar(
                word, topn=self.top_similar, indexer=self.annoy_indexer
            ):
                if similar_word in self.index:
                    matches.add((similar_word, similarity))
        return matches

    def build_index(self):
        sentences = []
        for document_id, filename in enumerate(os.listdir(self._documents_dir)):
            if filename.endswith(".txt"):
                with open(
                    os.path.join(self._documents_dir, filename), "r", encoding="utf-8"
                ) as f:
                    text = f.read()
                    self.documents[document_id] = filename[:-4]
                    words = self._tokenize(text)
                    sentences.append(words)

        self.model = Word2Vec(
            sentences=sentences,
            vector_size=100,
            window=5,
            min_count=1,
            workers=4,
        )
        # Initialize Annoy indexer
        self.annoy_indexer = AnnoyIndexer(self.model, num_trees=100)

        # Persist model and index
        self.model.save(self._word2vec_model_path)
        self.annoy_indexer.save(self._annoy_index_path)

        save_json(self._doc_id_path, self.documents)

    def load_index(self):
        self.documents = {int(k): v for k, v in load_json(self._doc_id_path).items()}
        self.model = Word2Vec.load(self._word2vec_model_path)
        self.annoy_indexer = AnnoyIndexer()
        self.annoy_indexer.load(self._annoy_index_path)
        self.annoy_indexer.model = self.model

    def find(self, query: str, top_k: int = 10) -> list:
        query_words = self._tokenize(query)
        document_scores = Counter()
        total_documents = len(self.documents)

        for word in query_words:
            matching_words = self._get_similar_words(word) | {(word, 1)}

            for match, distance_coef in matching_words:
                if match in self.index:
                    doc_freq = len(self.index[match])
                    idf = math.log(total_documents / (1 + doc_freq))

                    for doc_id in self.index[match]:
                        tf = (
                            self.document_word_count[doc_id][match]
                            / self.document_lengths[doc_id]
                        )
                        document_scores[doc_id] += tf * idf * distance_coef  # type: ignore

        ranked_docs = sorted(document_scores.items(), key=lambda x: -x[1])[:top_k]
        return [
            (self.documents[doc_id], round_float(score, 5))
            for doc_id, score in ranked_docs
        ]

    def find_semantic(self, query: str, top_k: int = 10) -> list:
        query_words = self._tokenize(query)
        query_vectors = []
        for word in query_words:
            if word in self.model.wv:
                query_vectors.append(self.model.wv[word])

        if not query_vectors:
            return []

        query_embedding = sum(query_vectors) / len(query_vectors)

        doc_ids, distances = self.annoy_indexer.get_nns_by_vector(
            query_embedding, top_k, include_distances=True
        )

        results = []
        for doc_id, distance in zip(doc_ids, distances):
            cosine_sim = (
                1 - (distance**2) / 2
            )  # Convert angular distance to cosine similarity
            results.append((doc_id, cosine_sim))

        return [
            (self.documents[doc_id], round_float(score, 5))
            for doc_id, score in sorted(results, key=lambda x: -x[1])
        ]


indexer = Word2VecIndexer()
results = indexer.find_semantic("sin")
for doc, score in results:
    print(f"Score: {score}\tFile: {doc}")

Index is not found, creating new...


ImportError: Annoy not installed. To use the Annoy indexer, please run `pip install annoy`.