In [None]:
import os
from typing import Dict, Any
import datasets
import requests
import json
import numpy as np
import tqdm
import time

# Get API key
## We rely on https://openrouter.ai/ to run inference on multiple models

In [None]:
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY")

# Pre-processors

## QA processor

In [None]:
class QAEvaluator:

    def __init__(
        self,
        model_name: str,
        openrouter_api_key: str,
        n_trials: int = 1,
        sleep_time_between_retrials: float = 1.0,
        max_sleep_time_between_retrials: float = 600.0,
    ) -> None:

        self.model_name = model_name
        self._base_system_prompt = (
            "You are an expert research assistant, skilled in answering questions "
            "concisely and precisely, using information provided by the user. "
        )
        self._base_user_prompt = (
            "I'd like for you to answer questions about a context text that will be provided."
            "I'll give you a pair with the form:\nContext: 'context text'\nQuestion: 'a question about the context'.\n"
            "First, tell me about your knowledge of the context and what information it contains, "
            "then, create an analysis of the context strictly using information contained in the text provided. "
            "Your knowledge about the context and the analysis must not be output. "
            "Finally, generate an explicit answer to the question that will be output. "
            "Make sure that the answer is the only output you provide, and the analysis of the context should be kept to yourself. "
            "Answer directly and do not prefix the answer with anything such as 'Answer:' nor 'The answer is:'. "
            "The answer has to be the only output you explicitly provide. "
            "The answer has to be as short, direct, and concise as possible. "
            "If the answer to the question can not be obtained from the provided context paragraph, output 'UNANSWERABLE'. "
            "Here's the context and question for you to reason about and answer:\n"
        )

        self.n_trials = n_trials
        self.sleep_time_between_retrials = sleep_time_between_retrials
        self.max_sleep_time_between_retrials = max_sleep_time_between_retrials

        self._openrouter_api_key = openrouter_api_key

    def __call__(self, example: Dict[str, Any]) -> Dict[str, Any]:

        if "document_extracted" in example:
            context_str = example["document_extracted"]
        elif "entity_pages":
            context_str = ("\n\n").join(example["entity_pages"]["wiki_context"])
        else:
            raise ValueError("Unknonw data format. Can't read 'context' or 'entity_pages' fields.")
        question_str = example["question"]
        system_prompt_str = self._base_system_prompt
        user_prompt_str = (
            self._base_user_prompt + f"Context: {context_str}\nQuestion: {question_str}?\n"
        )

        for trial in range(self.n_trials):

            try:
                model_response = requests.post(
                    url="https://openrouter.ai/api/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {self._openrouter_api_key}",
                    },
                    data=json.dumps(
                        {
                            "model": self.model_name,
                            "messages": [
                                {
                                    "role": "system",
                                    "content": system_prompt_str,
                                },
                                {"role": "user", "content": user_prompt_str},
                            ],
                        }
                    ),
                )

                raw_response = model_response.json()["choices"][0]["message"]["content"]
                break
            except (
                KeyError,
                IndexError,
                json.JSONDecodeError,
                requests.exceptions.ConnectionError,
                requests.exceptions.ChunkedEncodingError,
            ) as e:
                print(f"Trial: {trial}: {e}")
                raw_response = "ERROR"
                sleep_time = min(
                    self.max_sleep_time_between_retrials,
                    self.sleep_time_between_retrials * (2 ** (trial + 1)),
                )
                time.sleep(sleep_time)

        return {
            "answer_pred": raw_response,
            "cleaned_answer_pred": self.process_answer(raw_response),
            "model_name": self.model_name,
        }

    def process_answer(self, raw_answer: str) -> str:
        return raw_answer.split(":")[-1]




## Text classification processor

In [None]:
class TopicRetrievalEvaluator:

    def __init__(
        self,
        model_name: str,
        topics: str,
        openrouter_api_key: str,
        n_trials: int = 1,
        sleep_time_between_retrials: float = 1.0,
        max_sleep_time_between_retrials: float = 600.0,
    ) -> None:

        self.model_name = model_name

        self._base_system_prompt = (
            "You are an expert research assistant, skilled in answering questions "
            "concisely and precisely, using information provided by the user. "
        )
        self._base_user_prompt = (
            "I'd like for you to determine the topic of some text context that will be provided."
            "I'll give you the text with the form:\nContext: 'text'.\n"
            "First, tell me about your knowledge of the context and what information it contains, "
            "then, create an analysis of the context strictly using information contained in the text provided. "
            "Your knowledge about the context and the analysis must not be output. "
            "Finally, generate an explicit topic of the ceontext by choosing from the following list of topics: "
            f"{topics}. "
            "Make sure that the topic is the only output you provide, and the analysis of the context should be kept to yourself. "
            "Answer directly with the topic from the list, and do not prefix the answer with anything such as 'Answer:' nor 'Topic:'. "
            "The topic has to be the only output you explicitly provide. "
            "your output has to be as short, direct, and concise as possible. "
            "The answer strictly has to be one of the topics provided to you. "
            "If the topic cannot be obtained from the provided context paragraph, output 'UNANSWERABLE'. "
            "Here's the context for you to determine the topic:\n"
        )

        self.n_trials = n_trials
        self.sleep_time_between_retrials = sleep_time_between_retrials
        self.max_sleep_time_between_retrials = max_sleep_time_between_retrials

        self._openrouter_api_key = openrouter_api_key

    def __call__(self, example: Dict[str, Any]) -> Dict[str, Any]:

        context_str = example["document_extracted"]
        system_prompt_str = self._base_system_prompt
        user_prompt_str = self._base_user_prompt + f"Context: {context_str}\n"

        for trial in range(self.n_trials):

            try:
                model_response = requests.post(
                    url="https://openrouter.ai/api/v1/chat/completions",
                    headers={
                        "Authorization": f"Bearer {self._openrouter_api_key}",
                    },
                    data=json.dumps(
                        {
                            "model": self.model_name,
                            "messages": [
                                {
                                    "role": "system",
                                    "content": system_prompt_str,
                                },
                                {"role": "user", "content": user_prompt_str},
                            ],
                        }
                    ),
                )

                raw_response = model_response.json()["choices"][0]["message"]["content"]
                break
            except (
                KeyError,
                IndexError,
                json.JSONDecodeError,
                requests.exceptions.ConnectionError,
                requests.exceptions.ChunkedEncodingError,
            ) as e:
                print(f"Trial: {trial}: {e}")
                raw_response = "ERROR"
                sleep_time = min(
                    self.max_sleep_time_between_retrials,
                    self.sleep_time_between_retrials * (2 ** (trial + 1)),
                )
                time.sleep(sleep_time)

        return {
            "full_answer_retrieved_topic": raw_response,
            "cleaned_answer_retrieved_topic": self.process_answer(raw_response),
            "model_name": self.model_name,
        }

    def process_answer(self, raw_answer: str) -> str:
        return raw_answer.split(":")[-1]

# Evaluation

## List models to evaluate

In [None]:
# Available models are listed at: https://openrouter.ai/models
models = ["mistralai/mistral-7b-instruct", "meta-llama/llama-3-8b-instruct"]

## Get set of topics

In [None]:
repliqa = datasets.load_dataset("ServiceNow/repliqa")["repliqa_0"]
topics = list(np.unique(repliqa["document_topic"]))
topics

## Actual evaluation

In [None]:
inference_datasets = {}
N_RETRIALS = 5  # Maximum number of re-attempts after failed inference requests

for model in tqdm.tqdm(models, total=len(models), desc="Models"):

  repliqa = datasets.load_dataset("ServiceNow/repliqa")["repliqa_0"].select(range(10))

  qa_pre_processor = QAEvaluator(model, OPENROUTER_API_KEY, n_trials=N_RETRIALS)
  topic_retriever_pre_processor = TopicRetrievalEvaluator(model, topics, OPENROUTER_API_KEY, n_trials=N_RETRIALS)

  prediction_dataset = repliqa.map(qa_pre_processor)
  prediction_dataset = prediction_dataset.map(topic_retriever_pre_processor)

  inference_datasets[model] = prediction_dataset


In [None]:
inference_datasets