## retrievers

In [None]:
# interface imports
from abc import ABC, abstractmethod
from typing import Iterable

# bm25 imports
from rank_bm25 import BM25Okapi
import re
import pymorphy3
import numpy as np

# embedding imports
from sentence_transformers import SentenceTransformer, util
import torch

# hybrid imports
from collections import defaultdict


class RetrieverInterface(ABC):
    """Интерфейс для поисковиков по базе знаний.

    Все добавляемые поисковики должны наследоваться от него
    и имплементировать метод `make_query`.
    """

    @abstractmethod
    def make_query(self):
        pass


class BM25Retriever(RetrieverInterface):
    """Статистический поиск по базе знаний.

    Поиск с использованием разреженных векторов.
    Алгоритм ранжирования: Okapi BM25.
    """

    def __init__(self,
                 stopwords_path: str,
                 documents: Iterable[str],
                 token_pattern: str) -> None:
        """
        Args:
            stopwords_path: путь к стоп-словам для языка поиска.
            documents: коллекция ключей, среди которых производится поиск.
            token_pattern: паттерн для регулярного выражения.
        """

        #: экстракция стоп-слов из файла
        with open(stopwords_path, 'r') as file:
            self._stopwords = {word.strip() for word in file}

        self._lemmatizer = pymorphy3.MorphAnalyzer()
        self._TOKEN_PATTERN = re.compile(token_pattern)

        #: предобработка коллекции ключей, среди которых производится поиск
        self._documents_tokenized = [
            self._preprocess_one_sentence(doc, self._stopwords)
            for doc in documents
        ]

        self._bm25_index = BM25Okapi(self._documents_tokenized)

    def _preprocess_one_sentence(self, sentence: str, stopwords: set) -> list:
        """Предобрабатывает одно предложение.

        Приводит к нижнему регистру, убирает мусорные слова,
        оставляет слова, подходящиие под regex, лемматизирует.

        Args:
            sentence: предложение в формате строки (ключ для поиска).
            stopwords: слова, убираемые из предложения.

        Returns:
            result: предобработанное предложение или пустой список.
        """
        if isinstance(sentence, str):
            regex_words = re.findall(self._TOKEN_PATTERN, sentence.lower())
            clear_words = [
                self._lemmatizer.parse(token.strip())[0].normal_form
                for token in regex_words
                if token not in stopwords
            ]
            return clear_words
        return []

    def make_query(
        self,
        query: str,
        top: int,
        threshold: int | float | None = None
    ) -> list[tuple[int, float]]:
        """Делает запрос к базе знаний.

        Args:
            query: текстовый запрос к базе знаний.
            top: количество возвращаемых документов.
            threshold: трешхолд.

        Returns:
            Список кортежей в формате: [(индекс документа, релевантность), ...].
            релевантность - значение Okapi BM25.
        """

        top = min(top, len(self._documents_tokenized))
        query_tokenized = self._preprocess_one_sentence(query, self._stopwords)

        scores = self._bm25_index.get_scores(query_tokenized)

        top_k = np.argsort(scores)[::-1][:top]

        result = [(i, scores[i]) for i in top_k]
        if threshold is not None:
            result = [elem for elem in result if elem[1] > threshold]

        return result


class EmbeddingRetriever(RetrieverInterface):
    """Семантический поиск по базе знаний.

    Поиск с использованием эмбеддингов.
    Модель для эмбеддингов должна быть совместима с `SentenceTransformers`.
    """

    def __init__(self,
                 embedder_path: str,
                 documents: Iterable[str],
                 device: torch.device | str) -> None:
        """
        Args:
            embedder_path: путь к модели для эмбеддингов.
            documents: коллекция ключей, среди которых производится поиск.
            device: девайс.
        """
        self._embedder = SentenceTransformer(
            model_name_or_path=embedder_path,
            device=device,
            local_files_only=True
        )
        self._documents_embeddings = self._embedder.encode(
            documents,
            convert_to_tensor=True
        )
        self._documents_embeddings = self._documents_embeddings.to(device)
        self._device = device

    def make_query(
        self,
        query: str,
        top: int,
        threshold: int | None = None
    ) -> list[tuple[int, float]]:
        """Делает запрос к базе знаний.

        Args:
            query: текстовый запрос к базе знаний.
            top: количество возвращаемых документов.
            threshold: трешхолд.

        Returns:
            Список кортежей в формате: [(индекс документа, релевантность), ...].
            релевантность - косинусная близость.
        """
        top = min(top, len(self._documents_embeddings))

        query_emb = self._embedder.encode(query, convert_to_tensor=True)
        query_emb = query_emb.to(self._device)

        similarity_scores = util.semantic_search(
            query_emb,
            self._documents_embeddings,
            top_k=top
        )[0]

        result = [(dct['corpus_id'], dct['score']) for dct in similarity_scores]

        if threshold is not None:
            result = [elem for elem in result if elem[1] > threshold]

        return result


class HybridRetriever(RetrieverInterface):
    """Гибридный/ансамблевый поиск по базе знаний.

    Ранжирует и объединяет результаты произвольного
    количества поисковиков, используя алгоритм RRF.

    метод `make_query()` каждого из поисковиков должен
    возвращать результаты в формате:
        - [(doc_index, score), ...]
    """

    def __init__(self,
                 retrievers: list,
                 weights: list[float],
                 documents: Iterable[str],
                 thresholds: list | None) -> None:
        """
        Args:
            retrievers: список заранее созданных поисковиков.
            weights: веса поисковиков.
            documents: коллекция ключей, среди которых производится поиск.
            thresholds: трешхолды поисковиков.
        """

        assert len(retrievers) == len(weights), "Количество поисковиков и весов должно совпадать"
        assert sum(weights) - 1 <= 0.0001, "Сумма весов поисковиков должна быть равна 1"
        assert not sum(weights) - 1 > 0.0001, "Сумма весов поисковиков не может быть больше 1"
        assert all(x >= 0 for x in weights), "Значения весов не могут быть отрицательными"

        if thresholds is not None:
            assert len(thresholds) == len(retrievers), "Количество поисковиков и порогов должно совпадать"
            self._thresholds = thresholds
        else:
            self._thresholds = [None] * len(retrievers)

        self._retrievers = retrievers
        self._weights = weights

    def _reciprocal_rank_fusion(
        self,
        scores: list[list[tuple[int, float]]],
        weights: list[float]
    ) -> list[tuple[int, float]]:
        """Объединяет результаты нескольких поисковиков.

        Args:
            scores: результаты индивидуальных поисковиков.
            weights: веса поисковиков.
        """
        assert len(scores) == len(weights), "Количество поисковиков и весов должно совпадать"

        final_scores = defaultdict(float)

        for idx, (score_list, weight) in enumerate(zip(scores, weights)):
            for rank, (doc_index, score) in enumerate(score_list):
                final_scores[doc_index] += weight / (rank + 1)

        final_score_list = [
            (doc_index, score)
            for doc_index, score in final_scores.items()
        ]
        final_score_list.sort(key=lambda x: x[1], reverse=True)
        return final_score_list

    def make_query(self,
                   query: str,
                   top: int) -> list[tuple[int, float]]:
        """Делает запрос к базе знаний.

        Args:
            query: текстовый запрос к базе знаний.
            top: количество возвращаемых документов.

        Returns:
            Список кортежей в формате: [(индекс документа, релевантность), ...].
            релевантность - reciprocal rank fusion score.
        """
        scores = []
        for retriever, threshold in zip(self._retrievers, self._thresholds):
            scores.append(retriever.make_query(query, top, threshold))

        fused_scores = self._reciprocal_rank_fusion(scores, self._weights)
        return fused_scores

In [None]:
import pandas as pd


def squeeze_retrieved(
    retriever_result: list[tuple[int, float]],
    dataframe: pd.DataFrame,
    df_col_index: int = 1
) -> list[tuple[int, float]]:
    """Из нескольких услуг с одинаковым кодом
    оставляет одну с наибольшим скором.

    Args:
        retriever_result: результаты поиска.
        dataframe: датафрейм с услугами и кодами.
        df_col_index: индекс колонки с кодом услуги в датафрейме.

    Returns:
        Список кортежей в формате:
        [
            (индекс документа, релевантность, КОД УСЛУГИ),
            ...
        ].
    """
    result = {}
    for elem in retriever_result:
        # получаем словарь вида {'КОД УСЛУГИ': (score, index), ...}
        key = dataframe.iloc[elem[0], df_col_index]
        if key not in result:
            result[key] = elem
        else:
            _, old_value = result[key]
            new_value = elem[1]
            if new_value > old_value:
                result[key] = elem

    # распаковка
    result = [(value[0], value[1], key) for key, value in result.items()]

    # сортировка по итоговому значению скора
    result = sorted(result, key=lambda item: -item[1])
    return result

## LLM

### Mistral, Vikhr, Mistral Q, Vikhr Q

In [None]:
from vllm import LLM, SamplingParams


class AnswerGenerator:
    """Класс для использования LLM через vLLM.

    Работает также с квантизованными моделями GGUF.
    """

    def __init__(self,
                 model_path,
                 gpu_memory_utilization,
                 max_model_len=8192):
        """
        Args:
            model_path: путь к LLM в файловой системе.
        """
        self.model = LLM(
            model=model_path,
            gpu_memory_utilization=gpu_memory_utilization,
            max_model_len=max_model_len
        )

    def get_answer(
        self,
        params,
        query: str,
        contexts: list[str] | None = None,
        system_prompt: str | None = None
    ) -> tuple[str, list[dict[str, str]]]:
        """Генерирует ответ от LLM, переданной в __init__.

        Args:
            params: параметры, задаются через SamplingParams.
            query: запрос.
            contexts: контексты, которые необходимо учесть.
            system_prompt: системный промпт.

        Returns:
            answer: ответ.
            conversation: история диалога.
        """

        if contexts is None:
            contexts = ['']

        combined_context = '\n'.join(
            [
                f"Контекст {i + 1}: {ctx}"
                for i, ctx in enumerate(contexts)
            ]
        )

        if system_prompt is None:
            system_prompt = (
                "Ты - интеллектуальный ассистент, отвечающий вежливо на языке запроса. " 
                "Для ответа на вопросы ты всегда учитываешь и используешь переданный Контекст."
            )

        prompt = f"Контекст: {combined_context}\nВопрос: {query}:"

        conversation = [
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": prompt},
        ]

        outputs = self.model.chat(conversation, params)

        answer = ''
        for output in outputs:
            answer += output.outputs[0].text
            conversation.append({
                "role": "assistant",
                "content": output.outputs[0].text
            })

        return answer, conversation

    def chat_endlessly(
        self,
        params,
        system_prompt: str | None = None
    ) -> list[dict[str, str]]:
        """Бесконечный чат с LLM, переданной в __init__.

        Args:
            params: параметры, задаются через SamplingParams.
            system_prompt: системный промпт.

        Returns:
            conversation: история диалога.
        """
        if system_prompt is None:
            system_prompt = "Ты - интеллектуальный ассистент, отвечающий вежливо на языке запроса."

        conversation = [{"role": "system", "content": system_prompt}]

        while True:
            query = input(">>> ")
            if not query:
                break
            if len(conversation) >= 13:
                conversation = [conversation[0]] + conversation[-6:]

            conversation.append({"role": "user", "content": query})

            outputs = self.model.chat(conversation, params)

            for output in outputs:
                answer = output.outputs[0].text

                conversation.append({
                    "role": "assistant",
                    "content": answer
                })

                print(answer)

        return conversation

In [None]:
params = SamplingParams(
    temperature=0.25,
    max_tokens=1024,
    logprobs=20,
)

In [None]:
import time


def get_answer_and_metrics_from_mistral(
    model,
    params,
    query: str,
    contexts: list[str] | None = None,
    system_prompt: str | None = None
) -> tuple:
    """docstring
    
    Доступно для мистраля и вихря
    """

    # context processing
    if contexts is None:
        contexts = ['']

    context = '\n'.join(
        [
            f"Контекст {i + 1}: {ctx}"
            for i, ctx in enumerate(contexts)
        ]
    )

    # system_prompt processing
    if system_prompt is None:
        system_prompt = (
            "Ты - интеллектуальный ассистент, отвечающий вежливо на языке запроса. "
            "Для ответа на вопросы ты всегда учитываешь и используешь переданный [CONTEXT]. "
        )

    prompt = f"[CONTEXT]{context}\n[QUESTION]{query}"

    conversation = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": prompt},
    ]

    # generating answer
    start_time = time.time()
    outputs = model.chat(conversation, params)
    total_time = time.time() - start_time

    answer = outputs[0].outputs[0].text
    logprobs = outputs[0].outputs[0].logprobs
    token_ids = outputs[0].outputs[0].token_ids

    metrics = GeneratorMetrics(logprobs, token_ids)

    perplexity = metrics.calculate_perplexity()
    mean_token_probability = metrics.calculate_mean_token_probability()
    mean_token_entropy = metrics.calculate_mean_token_entropy()

    return (
        query,
        contexts,
        answer,
        perplexity,
        mean_token_probability,
        mean_token_entropy,
        total_time
    )

In [None]:
import time


def rerank_get_answer_and_metrics_from_vikhr(
    model,
    params,
    query: str,
    contexts: list[str] | None = None,
    system_prompt: str | None = None
) -> tuple:
    """docstring

    Доступно только для вихря
    """

    # context processing
    if contexts is None:
        contexts = ['']

    documents = [
        {"doc_id": i, "content": v}
        for i, v in enumerate(contexts)
    ]

    # system_prompt processing
    if system_prompt is None:
        system_prompt = (
            "Your task is to answer the user's questions "
            "using only the information from the provided documents."
            "Give two answers to each question: "
            "one with a list of relevant document identifiers "
            "and the second with the answer to the question itself, "
            "using documents with these identifiers."
        )

    conversation = [
        {'role': 'system', 'content': system_prompt},
        {'role': 'documents', 'content': json.dumps(documents, ensure_ascii=False)},
        {'role': 'user', 'content': query}
    ]

    # reranking documents
    relevant_indices = model.chat(conversation, params)[0].outputs[0].text

    # generating answer
    start_time = time.time()
    outputs = model.chat(
        conversation + [{'role': 'assistant', 'content': relevant_indices}],
        params
    )
    total_time = time.time() - start_time

    answer = outputs[0].outputs[0].text
    logprobs = outputs[0].outputs[0].logprobs
    token_ids = outputs[0].outputs[0].token_ids

    metrics = GeneratorMetrics(logprobs, token_ids)

    perplexity = metrics.calculate_perplexity()
    mean_token_probability = metrics.calculate_mean_token_probability()
    mean_token_entropy = metrics.calculate_mean_token_entropy()

    return (
        query,
        contexts,
        answer,
        perplexity,
        mean_token_probability,
        mean_token_entropy,
        total_time
    )

### Gemma, Saiga

In [None]:
import json
import requests


with open('uri_saiga_mistral.txt', 'r') as file:
    MODEL_URI = file.read()


# подгрузим из конфига
with open('config_saiga_mistral.json', 'r') as openfile:
    json_data = json.load(openfile)

json_data['dataframe_records'][0]['kwargs']['temperature'] = 0.25
json_data['dataframe_records'][0]['kwargs']['max_tokens'] = 1024

In [None]:
def post_to_saiga_mistral(data: dict) -> str:
    """Отправляет POST-запрос с данными на предсказание.

    Args:
        data: JSON-объект с массивом записей.

    Returns:
        Ответ от LLM Saiga-Mistral-7B-Instruct-v0.2.
    """

    response = requests.post(
        MODEL_URI,
        json=data,
    )

    answer = ''
    if response.status_code == 200:
        response_data = response.json()
        if 'predictions' in response_data:
            check_text = response_data['predictions'][0]
            answer = check_text['answer']
            return answer
        else:
            print(f'Ошибка в ответе: {response_data}')
            return answer
    else:
        print(f'Ошибка: {response.status_code} - {response.json()}')
    return answer

In [None]:
import time


def get_answer_from_saiga(
    dct: dict,
    query: str,
    contexts: list[str] | None = None,
    system_prompt: str | None = None
) -> tuple:
    """docstring
    """

    # context processing
    if contexts is None:
        contexts = ['']

    context = '\n'.join(
        [
            f"Контекст {i + 1}: {ctx}"
            for i, ctx in enumerate(contexts)
        ]
    )

    # system_prompt processing
    if system_prompt is None:
        system_prompt = (
            "Ты - интеллектуальный ассистент, отвечающий вежливо на языке запроса. "
            "Для ответа на вопросы ты всегда учитываешь и используешь переданный контекст. "
        )

    dct['dataframe_records'][0]['system_prompt'] = system_prompt
    dct['dataframe_records'][0]['query'] = query
    dct['dataframe_records'][0]['context'] = context

    # generating answer
    start_time = time.time()
    answer = post_to_saiga_mistral(dct)
    total_time = time.time() - start_time

    return answer, total_time

In [None]:
import json
import requests


with open('uri_gemma.txt', 'r') as file:
    MODEL_URI = file.read()

# подгрузим из конфига
with open('config_gemma.json', 'r') as openfile:
    data_json = json.load(openfile)

data_json['dataframe_records'][0]['config']['kwargs']['temperature'] = 0.25
data_json['dataframe_records'][0]['config']['kwargs']['max_tokens'] = 1024

In [None]:
def post_to_gemma(data: dict) -> str:
    """Отправляет POST-запрос с данными на предсказание.

    Args:
        data: JSON-объект с массивом записей.

    Returns:
        Ответ от LLM Gemma-2-9b-it.
    """

    response = requests.post(
        MODEL_URI,
        json=data,
    )

    answer = ''
    if response.status_code == 200:
        response_data = response.json()
        if 'predictions' in response_data:
            check_text = response_data['predictions'][0]
            answer = check_text['response'][0]
            return answer
        else:
            print(f'Ошибка в ответе: {response_data}')
            return answer
    else:
        print(f'Ошибка: {response.status_code} - {response.json()}')
    return answer

In [None]:
import time


def get_answer_from_gemma(
    dct: dict,
    query: str,
    contexts: list[str] | None = None,
    system_prompt: str | None = None
) -> tuple:
    """docstring
    """

    # context processing
    if contexts is None:
        contexts = ['']

    context = '\n'.join(
        [
            f"Контекст {i + 1}: {ctx}"
            for i, ctx in enumerate(contexts)
        ]
    )

    # system_prompt processing
    if system_prompt is None:
        system_prompt = (
            "Ты - интеллектуальный ассистент, отвечающий вежливо на языке запроса. "
            "Для ответа на вопросы ты всегда учитываешь и используешь переданное поле 'Контекст'. "
        )

    template = (
        f'Системный промпт: {system_prompt}\n'
        f'Контекст: {context}\n'
        f'Вопрос пользователя: {query}'
    )

    dct['dataframe_records'][0]['query'] = template

    # generating answer
    start_time = time.time()
    answer = post_to_gemma(dct)
    total_time = time.time() - start_time

    return answer, total_time

## metrics

### retriever metrics

In [None]:
import numpy as np
from typing import Iterable


class RetrieverMetrics:
    def __init__(self,
                 y_true: Iterable,
                 y_pred: Iterable,
                 top: int):

        self.y_true = y_true
        self.y_pred = y_pred
        self.top = top

    def p_at_k(self, y_true: Iterable, y_pred: Iterable) -> float:
        """Computes precision at k.

        Args:
            y_true: A collection of all relevant labels.
            y_pred: A collection of predicted labels.

        Returns:
            score: precision at k score.
        """
        if not y_pred:
            return 0.0

        count = 0
        actual = set(y_true)
        for elem in y_pred:
            if elem in actual:
                count += 1

        score = count / len(y_pred)
        return score

    def compute_theo_max(self, top: int) -> float:
        """Computes theoretical maximum for AP@K.

        Args:
            top: The maximum number of predicted elements.

        Returns:
            score: theoretical_maximum for AP@K.
        """
        theo_array = [0 for i in range(top)]
        theo_array[0] = 1
        theo_true = [1]
        score = 0.0
        for i in range(1, top + 1):
            score += self.p_at_k(theo_true, theo_array[:i])

        score /= top
        return score

    def ap_at_k(self,
                y_true: Iterable,
                y_pred: Iterable,
                top: int) -> tuple[float]:
        """Computes average precision at k.

        Args:
            y_true: A collection of all relevant labels.
            y_pred: Iterable: A collection of predicted labels.
            top: The maximum number of predicted elements.

        Returns:
            score: AP@K score.
            norm_score: normalized score.
        """
        score = 0.0
        actual_length = min(top, len(y_pred))

        for i in range(1, actual_length + 1):
            score += self.p_at_k(y_true, y_pred[:i])

        theo_max = self.compute_theo_max(actual_length)
        norm_score = score / theo_max

        norm_score /= actual_length
        score /= actual_length
        return score, norm_score

    def map_at_k(self,
                 y_true: Iterable,
                 y_pred: Iterable,
                 top) -> tuple[float]:
        """Computes mean average precision at k.

        Args:
            y_true: A collection of collections of all relevant labels.
            y_pred: A collection of collections of predicted labels.
            top: The maximum number of predicted elements.

        Returns:
            score: MAP@K score.
            norm_score: normalized score.
        """

        aps_at_k = [
            self.ap_at_k(actual, pred, top)
            for actual, pred in zip(y_true, y_pred)
        ]

        score = np.mean([elem[0] for elem in aps_at_k])
        norm_score = np.mean([elem[1] for elem in aps_at_k])

        return score, norm_score

    def rr_at_k(self, y_true: Iterable, y_pred: Iterable) -> float:
        """Computes reciprocal rank at k.

        Args:
            y_true: A collection of all relevant labels.
            y_pred: A collection of predicted labels.

        Returns:
            score: RR@K score.
        """
        if not y_pred:
            return 0.0

        actual = set(y_true)
        score = 0.0
        for idx, elem in enumerate(y_pred):
            if elem in actual:
                score = 1 / (idx + 1)
                return score
        return score

    def mrr_at_k(self, y_true: Iterable, y_pred: Iterable) -> float:
        """Computes reciprocal rank at k.

        Args:
            y_true: A collection of collections of all relevant labels.
            y_pred: A collection of collections of predicted labels.

        Returns:
            score: MRR@K score.
        """
        score = np.mean([
            self.rr_at_k(actual, pred)
            for actual, pred in zip(self.y_true, self.y_pred)
            ]
        )
        return score

    def report(self) -> tuple[float]:
        map_k, n_map_k = self.map_at_k(self.y_true, self.y_pred, self.top)
        mrr_k = self.mrr_at_k(self.y_true, self.y_pred)
        return map_k, n_map_k, mrr_k

### LLM metrics

In [None]:
import numpy as np
from typing import Iterable


class GeneratorMetrics:
    """Считает метрики ответа LLM.

    Точно работает с фреймворком vLLM.
    """

    def __init__(self, logprobs: list[dict], token_ids: tuple[int]) -> None:
        """
        Args:
            logprobs: лог-вероятности токенов ответа.
            token_ids: индексы токенов ответа.
        """
        self.logprobs = logprobs
        self.token_ids = token_ids
        self.decoded_logprobs = [
            dct[key].logprob for dct, key in zip(logprobs, token_ids)
        ]
        self.probs = np.exp(self.decoded_logprobs)

    def calculate_perplexity(self) -> float:
        """Считает перплексию ответа LLM.

        Returns:
            perplexity: перплексия.
        """
        dtype = np.float64
        perplexity = np.exp(
            -np.mean(self.decoded_logprobs, dtype=dtype),
            dtype=dtype
        )
        return perplexity

    def calculate_mean_token_probability(self) -> float:
        """Считает среднюю вероятность токенов ответа LLM.

        Returns:
            mean_token_probability: средняя вероятность токенов ответа.
        """
        dtype = np.float64
        mean_token_probability = np.mean(self.probs, dtype=dtype)
        return mean_token_probability

    def calculate_sequence_probability(self) -> float:
        """Считает вероятность последовательности ответа LLM.

        Returns:
            sequence_probability: вероятность последовательности ответа.
        """
        dtype = np.float64
        sequence_probability = np.prod(self.probs, dtype=dtype)
        return sequence_probability

    @staticmethod
    def decode_logprob_dict(dct: dict) -> Iterable[float]:
        """Декодирует словарь лог-вероятностей.

        Args:
            dct: написать потом.

        Returns:
            result: написать потом.
        """
        dtype = np.float64
        result = np.array([v.logprob for _, v in dct.items()], dtype=dtype)
        return result

    def calculate_mean_token_entropy(self) -> float:
        """Считает среднюю энтропию токенов ответа LLM.

        Returns:
            mean_token_entropy: средняя энтропия ответа.
        """
        dtype = np.float64
        sum_entropy = 0.0

        for elem in self.logprobs:
            log_p = GeneratorMetrics.decode_logprob_dict(elem)
            sum_entropy -= np.sum(log_p * np.exp(log_p), dtype=dtype)

        mean_token_entropy = np.mean(sum_entropy, dtype=dtype)
        return mean_token_entropy

In [None]:
import torch
from bert_score import BERTScorer


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model_path = # model_path

scorer = BERTScorer(model_type=model_path, num_layers=24, device=device, lang="ru")

ref_answers = question_answer['Ответ'].tolist()


def calculate_bertscore(llm_answers) -> dict[str, list[float]]:
    llm_answers = llm_answers['answer'].tolist()
    P, R, F1 = scorer.score(cands=llm_answers, refs=ref_answers)

    result = {
        'precision': P.tolist(),
        'recall': R.tolist(),
        'f1_score': F1.tolist(),
    }
    return result

### LLM-as-a-Judge

In [None]:
import json
import json5
from openai import OpenAI


# грузим конфиг
with open('config_ruadapt_qwen.json', 'r') as file:
    json_file = json.load(file)


# конфиг в формате: {'BASE_URL': value, 'API_KEY': value, 'MODEL': value}
client = OpenAI(
    base_url=json_file['BASE_URL'],
    api_key=json_file['API_KEY'],
)

MODEL = json_file['MODEL']

In [None]:
# JSON schema for Structured Output
output_format = {
    "type": "object",
    "properties": {
        "Правильность": {
            "type": "array",
            "items": {"type": "string"},
            "minItems": 2,
            "maxItems": 2
        },
        "Точность": {
            "type": "array",
            "items": {"type": "integer", "minimum": 0, "maximum": 5},
            "minItems": 2,
            "maxItems": 2
        },
        "Полнота": {
            "type": "array",
            "items": {"type": "integer", "minimum": 0, "maximum": 5},
            "minItems": 2,
            "maxItems": 2
        },
        "Релевантность вопросу": {
            "type": "array",
            "items": {"type": "integer", "minimum": 0, "maximum": 5},
            "minItems": 2,
            "maxItems": 2
        },
        "Соответствие контекстам": {
            "type": "array",
            "items": {"type": "integer", "minimum": 0, "maximum": 1},
            "minItems": 2,
            "maxItems": 2
        },
        "Соответствие ответу эксперта": {
            "type": "array",
            "items": {"type": "integer", "minimum": 0, "maximum": 1},
            "minItems": 2,
            "maxItems": 2
        },
        "Лучший ответ": {"type": "integer", "minimum": 0, "maximum": 1}
    },
    "required": [
        "Правильность",
        "Точность",
        "Полнота",
        "Релевантность вопросу",
        "Соответствие контекстам",
        "Соответствие ответу эксперта",
        "Лучший ответ"
    ]
}

In [None]:
# критерии оценки
criteria = """Критерии оценки:
1. Правильность ('верно' - верный ответ, 'неверно' - верный ответ, 'отказ' - отказ от ответа).
2. Точность (0-5 баллов).
3. Полнота (0-5 баллов).
4. Релевантность вопросу (0-5 баллов).
5. Соответствие контекстам (0 - не соответствует, 1 - соответствует).
6. Соответствие ответу эксперта (0 - не соответствует, 1 - соответствует).
7. Лучший ответ (0 или 1).
"""

In [None]:
# function for Structured Output
def evaluate_one_pair_of_answers(
    question: str,
    reference: str,
    contexts: str,
    answers: list[str]
) -> str:
    """Оценивает ответы с помощью RuadaptQwen2.5-32B-Pro-Beta.

    Args:
        question: вопрос.
        reference: ответ эксперта.
        contexts: контексты для ответа.
        answers: возможные ответы.

    Returns:
        judgement - ответ от LLM.
    """

    SYSTEM_PROMPT = f"""Ты - эксперт по оценке качества ответов.
    Тебе будет предоставлен: вопрос, контексты к вопросу, ответ эксперта и два варианта возможных ответов на вопрос.
    Оцени каждый из ответов по следующим критериям.
    {criteria}
    Обязательно, верни оценки в формате JSON, не добавляй символы Markdown:
    {json.dumps(output_format, ensure_ascii=False, indent=2)}

    Не добавляй никаких объяснений. Верни только чистый JSON.
    """

    PROMPT = f"ВОПРОС: {question}\nКОНТЕКСТЫ: {contexts}\nОТВЕТ ЭКСПЕРТА: {reference}"

    MESSAGES = [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": PROMPT},
    ]

    for i, answer in enumerate(answers, start=1):
        MESSAGES.append({"role": "user", "content": f"{i}. {answer}"})

    response = client.chat.completions.create(
        model=MODEL,
        messages=MESSAGES,
        max_tokens=256,
        temperature=0.01,
        functions=[
            {
                "name": "evaluate_responses",
                "description": "Оценка ответов по заданным критериям.",
                "parameters": output_format
            }
        ],
        function_call={"name": "evaluate_responses"}
    )

    judgement = response.choices[0].message.content
    return judgement

In [None]:
def evaulate_two_llms(llm_one, llm_two):
    assert len(llm_one) == len(llm_two)

    scores_llm_one = {
        'Правильность': [],
        'Точность': [],
        'Полнота': [],
        'Релевантность вопросу': [],
        'Соответствие контекстам': [],
        'Соответствие ответу эксперта': [],
        'Лучший ответ': []
    }

    scores_llm_two = {
        'Правильность': [],
        'Точность': [],
        'Полнота': [],
        'Релевантность вопросу': [],
        'Соответствие контекстам': [],
        'Соответствие ответу эксперта': [],
        'Лучший ответ': []
    }

    errors = []

    for i in range(len(llm_one)):
        print(f"{i + 1}/{len(llm_one)}")

        question = question_answer['Вопрос'][i]
        reference = question_answer['Ответ'][i]

        answers = [llm_one['answer'][i], llm_two['answer'][i]]

        result = evaluate_one_pair_of_answers(
            question,
            reference,
            contexts[i],
            answers
        )

        try:
            json_data = json5.loads(result.strip("`").strip("json"))
            for k, v in json_data.items():
                if k == 'Лучший ответ':
                    scores_llm_one[k].append(v)
                    scores_llm_two[k].append(v)
                else:
                    scores_llm_one[k].append(v[0])
                    scores_llm_two[k].append(v[1])

        except Exception as err:
            print(f"{err}")
            index_error = (i, json_data)
            errors.append(index_error)

    return scores_llm_one, scores_llm_two, errors