In [None]:
import torch
import torch.nn.functional as F
from transformers import AutoTokenizer, AutoModel
import os
import json
from tqdm import tqdm
from openai import OpenAI
from enum import Enum
import numpy as np
from openai import OpenAI
import re

In [None]:
class Metric(Enum):
    '''
    Vector distance metrics enum
    '''
    COS = "_cos"
    SCALAR = "_scalar"

    def __call__(self, a, b):
        method = getattr(self, self.value)
        return method(a, b)

    def is_reversed(self):
        return self == Metric.COS or self == Metric.SCALAR

    def _cos(self, a, b):
        return torch.dot(a, b)/(torch.norm(a.to(torch.float32))*torch.norm(b.to(torch.float32)))

    def _scalar(self, a, b):
        return torch.dot(a.to(torch.float32), b.to(torch.float32))

class VectorStore:
    '''
    Class for working with vectors, recieving nearest neighbors
    '''
    def __init__(self, embedings, sort_metric: Metric):
        self.embedings = embedings
        self.sort_metric = sort_metric

    def get_k_nearest(self, query_embeding, k=None):
        sorted_items = sorted(
            self.embedings.items(),
            key=lambda x: self.sort_metric(query_embeding, torch.tensor(x[1])),
            reverse=self.sort_metric.is_reversed()
        )
        return list(dict(sorted_items[:k]).keys())


class DocumentRetriever:
    '''
    Class for working with files
    '''

    vectors = 'text_vectors.json'
    articles = 'articles'

    def __init__(self, root):
        self.root = root
        self.articles_dir = os.path.join(self.root, self.articles)
        self.vectors_path = os.path.join(self.root, self.vectors)

    def get_articles_by_filenames(self, filenames: list[str]) -> list[str]:
        articles = []
        for f in filenames:
            articles.append(self._get_article_text(f))
        return articles

    def get_all_articles(self) -> dict:
        articles = {}
        for f in tqdm(os.listdir(self.articles_dir)):
            articles[f] = self._get_article_text(f)
        return articles

    def _get_article_text(self, filename: str) -> str:
        with open(os.path.join(self.articles_dir, filename)) as file:
            raw = file.read()
            # splt = re.split('{{NAME}}|{{/NAME}}|{{DESC}}|{{/DESC}}', re.sub('\n(?:[ \t]*\n)+', '', raw))
            return raw #splt[1] + splt[2] + splt[3]

    def is_ready(self) -> bool:
        if(os.path.exists(os.path.join(self.vectors_path))):
            filenames = os.listdir(self.articles_dir)
            embedings = self.get_articles_embedings_json()
            for f in filenames:
                if(f not in embedings):
                    return False
            return True
        return False

    def get_articles_embedings_json(self) -> dict:
        try:
            with open(self.vectors_path) as f:
                return json.load(f)
        except FileNotFoundError:
            return {}

    def save_articles_embedings_json(self, embedings: dict) -> None:
        with open(self.vectors_path, 'w') as f:
            json.dump(embedings, f)

class PromptGenerator:
    '''
    Class for fromatting prompts for LLM
    '''
    @staticmethod
    def build_prompt(query, articles) -> str:
        prompt = f'Найди ответ на этот запрос: {query} \nОсновываясь на статьях далее, не следует в ответ включать информацию, которая в них не содержится: \n{articles}'
        return prompt

class DataProcessor:
    '''
    Class for turning texts into embeddings
    '''
    _instance = None

    def __new__(cls, model_name: str, document_retrivier: DocumentRetriever):
        if cls._instance is None:
            cls._instance = super(DataProcessor, cls).__new__(cls)
            cls._instance.tokenizer = AutoTokenizer.from_pretrained(model_name)
            cls._instance.model = AutoModel.from_pretrained(model_name)
            cls._instance.document_retrivier = document_retrivier
            if not cls._instance.document_retrivier.is_ready():
                cls._instance._prepare_data()
        return cls._instance

    def get_articles_embedings_json(self):
        return self.document_retrivier.get_articles_embedings_json()

    def _pool(self, hidden_state, mask, pooling_method="cls"):
        if pooling_method == "mean":
            s = torch.sum(hidden_state * mask.unsqueeze(-1).float(), dim=1)
            d = mask.sum(axis=1, keepdim=True).float()
            return s / d
        elif pooling_method == "cls":
            return hidden_state[:, 0]

    def get_embeding(self, text: str) -> torch.Tensor:
        tokenized_input = self.tokenizer(text, max_length=512, padding=True, truncation=True, return_tensors="pt")
        with torch.no_grad():
            outputs = self.model(**tokenized_input)
        embedding = self._pool(
            outputs.last_hidden_state,
            tokenized_input["attention_mask"],
            pooling_method="mean"
        )

        embedding = F.normalize(embedding, p=2, dim=1)
        return embedding.flatten()

    def _prepare_data(self):
        embeddings = {}
        articles = self.document_retrivier.get_all_articles()
        print(articles)
        for k, v in tqdm(articles.items()):
            embeddings[k] = self.get_embeding(v).tolist()
        self.document_retrivier.save_articles_embedings_json(embeddings)

class LLMWrapper:
    '''
    Class for interacting with LLM
    '''
    _instance = None

    def __new__(cls, model_name: str, api_key: str):
        if cls._instance is None:
            cls._instance = super(LLMWrapper, cls).__new__(cls)
            cls._instance.base_url = "https://openrouter.ai/api/v1"
            cls._instance.api_key = api_key
            cls._instance.model_name = model_name
            cls._instance.client = OpenAI(
                base_url="https://openrouter.ai/api/v1",
                api_key=cls._instance.api_key
            )
        return cls._instance

    def generate_response(self, prompt: str) -> str:
        try:
            completion = self.client.chat.completions.create(
                model=self.model_name,
                messages=[
                    {
                        "role": "user",
                        "content": prompt
                    }
                ]
            )
            if completion and completion.choices:
                return completion.choices[0].message.content
            else:
                return "Error: No response from the API"
        except Exception as e:
            return f"Error generating response: {str(e)}"

class Config:
    '''
    Class for configurating RAG

    Attributes:
        root (str): path to directory containing articles in folder 'articles'
        embedding_model (str): link to huggingface model for generating embeddings
        llm_model (str): link to openrouter model for generating answers
        api_key (str): openrouter api key
        k (int): number of nearest neighbors to use for generating answer
    '''
    def __init__(self, root: str, embedding_model: str, llm_model: str, api_key: str, k: str):
        self.root = root
        self.embedding_model = embedding_model
        self.llm_model = llm_model
        self.api_key = api_key
        self.k = k


class RAGPipeline:
    def __init__(self, config: Config):
        self.config = config
        self.document_retriever = DocumentRetriever(self.config.root)
        self.data_processor = DataProcessor(self.config.embedding_model, self.document_retriever)
        self.vector_store = VectorStore(self.data_processor.get_articles_embedings_json(), Metric.COS)
        self.llm = LLMWrapper(self.config.llm_model, self.config.api_key)

    def run(self, query: str) -> str:
        files = self.vector_store.get_k_nearest(self.data_processor.get_embeding(query), k=self.config.k)
        articles = self.document_retriever.get_articles_by_filenames(files)
        prompt = PromptGenerator.build_prompt(query, articles)
        out = self.llm.generate_response(prompt)
        return out

In [None]:
config = Config(root='/articles',
                embedding_model='ai-forever/ru-en-RoSBERTa',
                llm_model='deepseek/deepseek-chat-v3-0324:free',
                api_key='',
                k=20)

In [None]:
pipe = RAGPipeline(config)

In [None]:
print(pipe.run('Что можно подарить Ване Транькову на день рождения'))

На основе предоставленных фрагментов переписки можно выделить несколько идей для подарка Ване Транькову на день рождения:

1. **Книга по алгоритмам**  
   Упоминание книги *«Грокаем алгоритмы»* (1 место среди часто краденых) намекает на его интерес к программированию. Такой подарок будет полезным и актуальным.  

2. **Тематический мерч или шутливый подарок**  
   - Футболка или кружка с надписью про «24 — всего лишь цифра» (отсылка к фразе Александра Правдина).  
   - Сувенир, связанный с «Тарасовым» (например, табличка «Не бояться Тарасова»), так как в переписке часто обсуждаются стрессовые моменты обучения.  

3. **Гаджеты или аксессуары для работы**  
   - USB-хаб или внешний жесткий диск, учитывая его активную работу с проектами и данными.  

4. **Настольная игра**  
   Например, *«Бирмингем»* (упоминается в контексте вечеринки) или другую стратегическую игру, поскольку ребята любят проводить время вместе.  

5. **Шуточный сертификат**  
   Например, «Сертификат на спасение от запр