## **RAG-система**

Основные этапы:
1. **Энкодер и векториация**
2. **Получение эмбеддингов:**
Документы из базы знаний заранее преобразуются в эмбеддинги с помощью выбранного энкодера, а запрос преобразовывается при поступлении.
3. **Анализ сходства между запросом и документами:**
Вычисляется косинусное сходство между эмбеддингом запроса и эмбеддингами документов. Косинусное сходство измеряет угол между двумя векторами в пространстве и варьируется от -1 до 1, где 1 означает полное совпадение, а 0 – отсутствие сходства. Таким образом документы с наибольшим косинусным сходством по отношению к запросу считаются наиболее релевантными и извлекаются для дальнейшего использования.

In [1]:
from typing import List, Tuple, Union
from sentence_transformers import SentenceTransformer, util
import torch

class Encoder:
    """
    Encoder class for generating embeddings from textual data using a SentenceTransformer model.
    """
    def __init__(self, model_name: str = 'cointegrated/rubert-tiny2', use_gpu: bool = False):
        """
        Initializes the Encoder with the given model name and device configuration.
        """
        if not model_name:
            raise ValueError("Model name cannot be empty.")

        try:
            self.device = 'cuda' if (use_gpu and torch.cuda.is_available()) else 'cpu'
            self.model = SentenceTransformer(model_name, device=self.device)
        except Exception as e:
            raise RuntimeError(f"Failed to load model '{model_name}': {str(e)}")

    def encode(self, data: Union[List[str], str]) -> torch.Tensor:
        """
        Encodes text(s) into embeddings.
        """
        try:
            if isinstance(data, str):
                data = [data]
            embeddings = self.model.encode(data, convert_to_tensor=True, device=self.device)
            return embeddings
        except Exception as e:
            raise RuntimeError(f"Encoding failed: {str(e)}")

class RAG:
    """
    Retrieval-Augmented Generation (RAG) class.
    """
    def __init__(self, encoder: Encoder):
        """
        Initializes the RAG class with the given encoder.
        """
        if not isinstance(encoder, Encoder):
            raise ValueError("The encoder must be an instance of Encoder.")
        self.encoder = encoder
        self.documents = []
        self.doc_embeddings = None

    def fit(self, documents: List[str]):
        """
        Encodes and stores document embeddings.
        """
        if not documents:
            raise ValueError("Document list cannot be empty.")
        self.documents = documents
        try:
            self.doc_embeddings = self.encoder.encode(documents)
        except Exception as e:
            raise RuntimeError(f"Document encoding failed: {str(e)}")

    def retrieve(self, query: str, retrieval_limit: int = 5, similarity_threshold: float = 0.5) -> Tuple[List[int], List[str]]:
        """
        Retrieves top-k most relevant documents for the query.
        """
        if self.doc_embeddings is None:
            raise ValueError("You must call fit() before retrieve().")
        if not (1 <= retrieval_limit <= 10):
            raise ValueError("retrieval_limit must be between 1 and 10.")
        if retrieval_limit > len(self.documents):
            raise ValueError("retrieval_limit cannot exceed number of documents.")
        if not (0.0 <= similarity_threshold <= 1.0):
            raise ValueError("similarity_threshold must be between 0 and 1.")

        try:
            #Кодирование запроса в вектор
            query_embedding = self.encoder.encode(query)
            #Расчёт косинусной схожести - как logit
            cosine_scores = util.cos_sim(query_embedding, self.doc_embeddings)[0]  # shape: (num_docs)
            #Выбор топ-N документов
            top_results = torch.topk(cosine_scores, k=retrieval_limit)

            relevant_indices = top_results.indices.tolist()
            relevant_scores = top_results.values.tolist()

            filtered_indices = [
                idx for idx, score in zip(relevant_indices, relevant_scores)
                if score >= similarity_threshold
            ]

            retrieved_docs = [self.documents[idx] for idx in filtered_indices]
            return filtered_indices, retrieved_docs
        except Exception as e:
            raise RuntimeError(f"Retrieval failed: {str(e)}")

    def _create_prompt_template(self, query: str, retrieved_docs: List[str]) -> str:
        """
        Creates a prompt template for generation.
        """

        prompt = "Instructions: Based on the relevant documents, generate a comprehensive response to the user's query.\n\n"
        prompt += "Relevant Documents:\n"
        for i, doc in enumerate(retrieved_docs):
            prompt += f"Document {i+1}: {doc}\n"
        prompt += f"\nUser Query: {query}\n"
        prompt += "Answer:"
        return prompt

    def _generate(self, query: str, retrieved_docs: List[str]) -> str:
        """
        Placeholder for text generation logic.
        """
        prompt = self._create_prompt_template(query, retrieved_docs)

        generated_response = f"(Simulated Response based on documents and query: '{query}')"
        return generated_response

    def run(self, query: str) -> str:
        """
        Runs full RAG pipeline.
        """
        _, retrieved_docs = self.retrieve(query)
        return self._generate(query, retrieved_docs)

In [2]:
documents = [
    "Machine learning is a method of data analysis that automates analytical model building.",
    "Artificial intelligence is intelligence demonstrated by machines, in contrast to the natural intelligence displayed by humans.",
    "Natural language processing is a subfield of linguistics, computer science, and artificial intelligence concerned with the interactions between computers and human language.",
    "Deep learning is a class of machine learning algorithms that uses multiple layers to progressively extract higher-level features from the raw input."
]

encoder = Encoder()
rag = RAG(encoder)
rag.fit(documents)

query = "Tell me about deep learning."
result_indices, result_documents  = rag.retrieve(query, retrieval_limit=2, similarity_threshold=0.6)

print(f'Result indices: {result_indices}')
print(f'Result documents: {result_documents}')

# >> Output:
# >> Result indices: [3, 0]
# >> Result documents:
# >> >> 'Deep learning is a class of machine learning algorithms that uses multiple layers to progressively extract higher-level features from the raw input.',
# >> >> 'Machine learning is a method of data analysis that automates analytical model building.'


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Result indices: [3, 0]
Result documents: ['Deep learning is a class of machine learning algorithms that uses multiple layers to progressively extract higher-level features from the raw input.', 'Machine learning is a method of data analysis that automates analytical model building.']


## **Анализ базы знаний**

**RAG Retrieval Score**

In [3]:
from typing import List, Tuple
from sentence_transformers import SentenceTransformer, util
import torch
import json
import os


class Encoder:
    def __init__(self, model_name: str = 'cointegrated/rubert-tiny2', use_gpu: bool = False):
        if not model_name:
            raise ValueError("Model name cannot be empty.")

        try:
            self.device = 'cuda' if (use_gpu and torch.cuda.is_available()) else 'cpu'
            self.model = SentenceTransformer(model_name, device=self.device)
        except Exception as e:
            raise RuntimeError(f"Failed to load model '{model_name}': {str(e)}")

    def encode(self, data):
        try:
            if isinstance(data, str):
                data = [data]
            embeddings = self.model.encode(data, convert_to_tensor=True, device=self.device)
            return embeddings
        except Exception as e:
            raise RuntimeError(f"Encoding failed: {str(e)}")


class RAG:
    def __init__(self, encoder: Encoder):
        if not isinstance(encoder, Encoder):
            raise ValueError("The encoder must be an instance of Encoder.")
        self.encoder = encoder
        self.documents = []
        self.doc_embeddings = None

    def fit(self, documents: List[str]):
        if not documents:
            raise ValueError("Document list cannot be empty.")
        self.documents = documents
        try:
            self.doc_embeddings = self.encoder.encode(documents)
        except Exception as e:
            raise RuntimeError(f"Document encoding failed: {str(e)}")

    def retrieve(self, query: str, retrieval_limit: int = 5, similarity_threshold: float = 0.5) -> Tuple[List[int], List[str]]:
        if self.doc_embeddings is None:
            raise ValueError("You must call fit() before retrieve().")
        if not (1 <= retrieval_limit <= 10):
            raise ValueError("retrieval_limit must be between 1 and 10.")
        if retrieval_limit > len(self.documents):
            raise ValueError("retrieval_limit cannot exceed number of documents.")
        if not (0.0 <= similarity_threshold <= 1.0):
            raise ValueError("similarity_threshold must be between 0 and 1.")

        try:
            query_embedding = self.encoder.encode(query)
            cosine_scores = util.cos_sim(query_embedding, self.doc_embeddings)[0]
            top_results = torch.topk(cosine_scores, k=retrieval_limit)

            relevant_indices = top_results.indices.tolist()
            relevant_scores = top_results.values.tolist()

            filtered_indices = [
                idx for idx, score in zip(relevant_indices, relevant_scores) if score >= similarity_threshold
            ]
            retrieved_docs = [self.documents[idx] for idx in filtered_indices]
            return filtered_indices, retrieved_docs
        except Exception as e:
            raise RuntimeError(f"Retrieval failed: {str(e)}")


class RAGEval:
    def __init__(
        self,
        documents_path: str,
        questions_path: str,
        retrieval_limit: int = 5,
        similarity_threshold: float = 0.5
    ):
        self.documents = self.load_documents(documents_path)
        self.questions = self.load_questions(questions_path)
        self.retrieval_limit = retrieval_limit
        self.similarity_threshold = similarity_threshold

        if not self.documents:
            raise ValueError("The documents list is empty.")
        if not self.questions:
            raise ValueError("The questions list is empty.")

        self.encoder = Encoder()
        self.rag = RAG(self.encoder)
        self.rag.fit(self.documents)

    def load_documents(self, path: str) -> List[str]:
        if not os.path.exists(path):
            raise FileNotFoundError(f"File {path} does not exist")
        try:
            with open(path, encoding="utf-8") as f:
                data = json.load(f)
        except json.JSONDecodeError:
            raise ValueError("Invalid JSON file for documents")

        if not isinstance(data, list):
            raise ValueError("Documents JSON must contain a list")

        return [self.validate_document(doc) for doc in data]

    def validate_document(self, doc) -> str:
        if not isinstance(doc, dict):
            raise ValueError("Each document must be a dictionary.")
        if 'content' not in doc:
            raise ValueError("Each document must contain a 'content' field.")
        return doc['content']

    def load_questions(self, path: str) -> List[str]:
        if not os.path.exists(path):
            raise FileNotFoundError(f"File {path} does not exist")
        try:
            with open(path, encoding="utf-8") as f:
                data = json.load(f)
        except json.JSONDecodeError:
            raise ValueError("Invalid JSON file for questions")

        if not isinstance(data, list):
            raise ValueError("Questions JSON must contain a list")

        return [self.validate_question(q) for q in data]

    def validate_question(self, question) -> str:
        if not isinstance(question, dict):
            raise ValueError("Each question must be a dictionary.")
        if 'question' not in question:
            raise ValueError("Each question must contain a 'question' field.")
        return question['question']

    def evaluate(self, threshold: int = 1) -> Tuple[float, List[int], List[int]]:
        doc_hit_count = [0] * len(self.documents)
        questions_wo_docs = []

        for q_idx, question in enumerate(self.questions):
            retrieved_indices, _ = self.rag.retrieve(
                query=question,
                retrieval_limit=self.retrieval_limit,
                similarity_threshold=self.similarity_threshold
            )

            if not retrieved_indices:
                questions_wo_docs.append(q_idx)
            else:
                for idx in retrieved_indices:
                    doc_hit_count[idx] += 1

        # Бесполезные документы
        useless_docs = [i for i, count in enumerate(doc_hit_count) if count < threshold]

        # Метрика RAG Retrieval Score
        portion_useless_docs = len(useless_docs) / len(self.documents)
        portion_unanswered_questions = len(questions_wo_docs) / len(self.questions)

        rag_score = 1 - portion_useless_docs - portion_unanswered_questions

        return rag_score, useless_docs, questions_wo_docs

In [4]:
eval = RAGEval('documents.json', 'questions.json', retrieval_limit=5, similarity_threshold=0.6)
rag_score, useless_docs, questions_wo_docs = eval.evaluate()

print(f'RAG Retrieval Score: {rag_score:0.2f}')
print(f'Useless documents [{len(useless_docs)}]: {useless_docs}')
print(f'Questions without relevant documents [{len(questions_wo_docs)}]: {questions_wo_docs}')

# >> Output:
# >> RAG Retrieval Score: 0.55
# >> Useless documents [19]: [20, 21, 37, 41, 44, 50, 61, 65, 68, 74, 85, 89, 92, 98, 102, 109, 113, 122, 126]
# >> Questions without relevant documents [48]: [98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 124, 125, 127, 129, 130, 131, 132, 133, 135, 137, 139, 140, 141, 142, 143, 146, 149, 151, 152, 153, 154, 155, 156, 157]

RAG Retrieval Score: 0.92
Useless documents [3]: [37, 84, 95]
Questions without relevant documents [8]: [114, 119, 122, 124, 127, 139, 146, 156]


**QCluster**

In [5]:
from typing import List, Dict, Union
from sklearn.cluster import KMeans
from sentence_transformers import SentenceTransformer
import torch

class Encoder:
    def __init__(self, model_name: str = 'cointegrated/rubert-tiny2', use_gpu: bool = False):
        if not model_name:
            raise ValueError("Model name cannot be empty.")

        try:
            self.device = 'cuda' if (use_gpu and torch.cuda.is_available()) else 'cpu'
            self.model = SentenceTransformer(model_name, device=self.device)
        except Exception as e:
            raise RuntimeError(f"Failed to load model '{model_name}': {str(e)}")

    def encode(self, data):
        try:
            if isinstance(data, str):
                data = [data]
            embeddings = self.model.encode(data, convert_to_tensor=True, device=self.device)
            return embeddings
        except Exception as e:
            raise RuntimeError(f"Encoding failed: {str(e)}")


class QCluster:
    """Класс для кластеризации вопросов с использованием k-means."""

    def __init__(self, questions_idx: List[int], questions: List[str]):
        """
        Инициализирует QCluster с индексами вопросов и самими вопросами.

        Args:
            questions_idx: Список индексов вопросов
            questions: Список текстов вопросов

        Raises:
            ValueError: Если списки разной длины или пустые
        """
        if len(questions_idx) != len(questions):
            raise ValueError("Списки индексов и вопросов должны быть одинаковой длины")
        if not questions_idx or not questions:
            raise ValueError("Списки вопросов не могут быть пустыми")

        self.questions_idx = questions_idx
        self.questions = questions
        self.encoder = Encoder()
        self.clusters = {}

    def cluster(self, n_clusters: int, show_results: bool = False) -> Dict[int, List[int]]:
        """
        Кластеризует вопросы с помощью k-means.

        Args:
            n_clusters: Количество кластеров (от 1 до 10)
            show_results: Показывать ли результаты (по умолчанию False)

        Returns:
            Словарь {метка_кластера: [индексы_вопросов]}

        Raises:
            ValueError: Если n_clusters не в диапазоне 1-10
        """
        if not 1 <= n_clusters <= 10:
            raise ValueError("Количество кластеров должно быть от 1 до 10")

        # Преобразуем вопросы в векторные представления
        embeddings = self.encoder.encode(self.questions).cpu().numpy()

        # Кластеризация K-means
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings)

        # Формируем словарь кластеров
        self.clusters = {}
        for idx, label in enumerate(labels):
          label = int(label)  # гарантируем тип int
          if label not in self.clusters:
            self.clusters[label] = []
          self.clusters[label].append(self.questions_idx[idx])

        # Вывод результатов при необходимости
        if show_results:
            self.print_clusters()

        return self.clusters

    def print_clusters(self):
        """Выводит кластеры с вопросами и их индексами."""
        if not self.clusters:
            print("Нет кластеров для отображения. Сначала выполните cluster().")
            return

        # Сортируем кластеры по метке
        for cluster_id in sorted(self.clusters.keys()):
            print(f"\nКластер {cluster_id}:")

            # Выводим вопросы для каждого кластера
            for q_idx in self.clusters[cluster_id]:
                try:
                    # Находим текст вопроса по оригинальному индексу
                    idx_in_list = self.questions_idx.index(q_idx)
                    question_text = self.questions[idx_in_list]
                    print(f"- {question_text} (Индекс: {q_idx})")
                except ValueError:
                    print(f"- [!] Вопрос с индексом {q_idx} не найден")

eval = RAGEval('documents.json', 'questions.json', retrieval_limit=5, similarity_threshold=0.6)
rag_score, useless_docs_idx, questions_wo_docs_idx = eval.evaluate()
questions_wo_docs = [eval.questions[i] for i in questions_wo_docs_idx]

qcluster = QCluster(questions_wo_docs_idx, questions_wo_docs)
clusters = qcluster.cluster(n_clusters=4, show_results=True)

print(f'Raw clusters: {clusters}')


Кластер 0:
- Как объединить ветки в GitHub? (Индекс: 119)
- Как создать релиз на GitHub? (Индекс: 122)
- Как разрешить конфликты при слиянии веток на GitHub? (Индекс: 127)
- Как развернуть Telegram-бота? (Индекс: 156)

Кластер 1:
- Производительность запросов в PostgreSQL? (Индекс: 146)

Кластер 2:
- Что такое GitHub и для чего он используется? (Индекс: 114)
- Что такое GitHub Pages и как их использовать? (Индекс: 124)

Кластер 3:
- Скользящее среднее PostgreSQL (Индекс: 139)
Raw clusters: {2: [114, 124], 0: [119, 122, 127, 156], 3: [139], 1: [146]}


**Обновление базы знаний**

In [6]:
# Загрузка базы знаний
with open('documents.json', 'r', encoding='utf-8') as f:
    documents = json.load(f)

# Удалим документы с индексами useless_docs_idx
filtered_documents = [doc for idx, doc in enumerate(documents) if idx not in useless_docs_idx]

# Пересохраним очищенный documents.json
with open('documents.json', 'w', encoding='utf-8') as f:
    json.dump(filtered_documents, f, ensure_ascii=False, indent=2)

print(f'Удалено {len(documents) - len(filtered_documents)} неиспользуемых документов.')

Удалено 3 неиспользуемых документов.


In [7]:
clusters = qcluster.clusters
cluster_texts = []

for cluster_id, question_ids in clusters.items():
    questions_texts = [
        qcluster.questions[qcluster.questions_idx.index(qid)]
        for qid in question_ids
    ]

    # Запоминаем все вопросы кластера
    cluster_texts.append((cluster_id, questions_texts))

In [8]:
cluster_texts

[(2,
  ['Что такое GitHub и для чего он используется?',
   'Что такое GitHub Pages и как их использовать?']),
 (0,
  ['Как объединить ветки в GitHub?',
   'Как создать релиз на GitHub?',
   'Как разрешить конфликты при слиянии веток на GitHub?',
   'Как развернуть Telegram-бота?']),
 (3, ['Скользящее среднее PostgreSQL']),
 (1, ['Производительность запросов в PostgreSQL?'])]

In [9]:
with open('documents.json', 'r', encoding='utf-8') as f:
    documents = json.load(f)

# Шаг 2: Примеры новых документов для добавления
new_docs = [
    {
        #"title": "Как создать Docker-образ из Dockerfile?",
        "content": "Для создания Docker-образа необходимо использовать команду docker build. Убедитесь, что у вас есть Dockerfile в корне проекта..."
    },
    {
        #"title": "Работа с ветками в GitHub",
        "content": "Вы можете создавать, переименовывать и удалять ветки в вашем репозитории. Это позволяет работать над разными функциями параллельно и удобно внедрять pull requests..."
    },
    {
        #"title": "Телеграм-бот: добавление кнопок",
        "content": "Чтобы добавить кнопки в Telegram-бота, используйте ReplyKeyboardMarkup или InlineKeyboardMarkup из библиотеки python-telegram-bot..."
    },
    {
        #"title": "Оконные функции в PostgreSQL",
        "content": "Оконные функции позволяют выполнять агрегатные вычисления поверх набора строк, разбитого на окна. Примеры функций: ROW_NUMBER(), RANK(), DENSE_RANK(), LEAD(), LAG()..."
    }
]

# Шаг 3: Добавление новых документов в основной список
documents.extend(new_docs)

# Шаг 4: Сохранение обновлённого списка в файл
with open('documents.json', 'w', encoding='utf-8') as f:
    json.dump(documents, f, ensure_ascii=False, indent=2)

In [10]:
eval = RAGEval('documents.json', 'questions.json', retrieval_limit=5, similarity_threshold=0.6)
rag_score, useless_docs_idx, questions_wo_docs_idx = eval.evaluate()

print('RAG Score:', rag_score)

RAG Score: 0.9405951587830335
