# Retrieval-Augmented Generation (RAG)

## Introducion

The rapid growth of information and the complexity of modern queries have revealed a significant challenge for
traditional Natural Language Processing (NLP) systems. While large language models (LLMs) like GPT-4 and Gemini excel at
generating coherent and contextually relevant text, they often face a fundamental limitation: their knowledge is
confined to the data they were trained on. This can lead to "hallucinations" (factually incorrect information) and an
inability to answer questions about recent or private, domain-specific data.

A Retrieval-Augmented Generation (RAG) pipeline addresses these limitations by combining the strengths of two
approaches:

1. A powerful **retriever** that pulls relevant, up-to-date, or proprietary information from an external knowledge base.
2. A **generator** (an LLM) that synthesizes this retrieved information into a clear and natural-sounding response.

By implementing a RAG pipeline, we can build a system that is more accurate, up-to-date, and trustworthy.

### A Real-World Use Case: RAG for Software Engineering

Imagine you're a new developer at a large tech company. The company has thousands of microservices, extensive internal
libraries, and decades of documentation scattered across different platforms. Finding the right information, how to use a
specific API, the purpose of a legacy service, or the solution to a cryptic errorâ€”can be a significant bottleneck.

This is where a RAG pipeline becomes a superpower for developers. Instead of manually searching through wikis, code
repositories, and old design documents, you could ask a specialized assistant:

- *"What's the correct way to implement authentication for the 'Triton' service?"*
- *"Show me an example of how to use the `Unicorn` library's data processing module in Python."*
- *"What was the reasoning behind the last major change to the 'Phoenix' microservice?"*

A RAG system connected to the company's internal knowledge base (documentation, code, architectural records) can
retrieve the most relevant information and use it to generate a direct, accurate, and code-supported answer. This boosts
developer productivity, improves knowledge sharing, and accelerates onboarding. In this project, you will build the core
components of such a system.

## Implementation

### Imports

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import initialize_notebook

import abc
import dataclasses
import os
import pathlib
import random
from collections.abc import Sequence
from typing import Literal, Protocol, override

import jinja2
import numpy as np
import pandas as pd

import openai
import pydantic
import tqdm
from hslu.dlm03.rag import metrics
from hslu.dlm03.rag import util
from hslu.dlm03.util import ratelimit

### Data Structures

First, let's define the basic data structures that will be used for the pipeline.

1. **Query**: this data structure will hold the user's query text.

In [None]:
@dataclasses.dataclass()
class Query:
    text: str

2. **Answer**: this data structure will hold the models/pipeline answer text.

In [None]:
@dataclasses.dataclass()
class Answer:
    text: str

3. **Document**: this data structure will hold a document's content with its corresponding metadata and identifier.

In [None]:
@dataclasses.dataclass()
class Document:
    id: str
    text: str
    metadata: dict[str, str]

4. **Corpus**: this data structure will hold a set of documents

In [None]:
Corpus = Sequence[Document]

### Dataset

For a RAG pipeline, we require a set of `Document` to help the model retrieve information from.

Additionally we require a dataset of `Query` to pass to the model, with their additional expected
`Answer` for evaluation.

Optionally, we can also associate a list of relevant `Document` that contain the information to
properly answer the `Query`, which allows us to also measure the performance of internal pipeline components
independently; thus we propose to define data samples as triplets of `(Query, Answer, Sequence[Document])` (and with additionally some metadata).

In [None]:
@dataclasses.dataclass()
class QueryAnswerDocument:
    query: Query
    answer: Answer
    relevant_documents: Sequence[Document]
    metadata: dict[str, str]

We will also define a sequence of these data samples as a `QuestionAnswerDataset`.

In [None]:
QuestionAnswerDataset = Sequence[QueryAnswerDocument]

We will be using Kaggle Question-Answer dataset for this work, but feel free to load any other dataset to experiment with.

The dataset is available [here](https://www.kaggle.com/datasets/rtatman/questionanswer-dataset?resource=download).

In [None]:
class KaggleQADataset:
    @classmethod
    def load_qa(cls, row: pd.Series, source: pathlib.Path, document_index: dict[str, int]) -> QueryAnswerDocument:
        relevant_document = row["ArticleFile"]
        relevant_documents = [document_index[relevant_document]] if isinstance(relevant_document, str) else []
        return QueryAnswerDocument(
            query=Query(text=row["Question"]),
            answer=Answer(text=row["Answer"]),
            relevant_documents=relevant_documents,
            metadata={
                "source": source,
                "question_difficulty": row["DifficultyFromQuestioner"],
                "answer_difficulty": row["DifficultyFromAnswerer"],
            },
        )

    @classmethod
    def read_document(cls, path: pathlib.Path) -> Document:
        filename = path.name
        content = path.read_text(encoding="latin-1")
        title, content = content.split("\n", 1)
        document_id = filename.split(".", 1)[0]
        return Document(
            id=document_id,
            text=content.strip(),
            metadata={"source": filename, "title": title},
        )

    @classmethod
    def load(
            cls,
            path: pathlib.Path,
            sets: str | Sequence[str] = ("S08", "S09", "S10"),
            sample: int | None = None,
    ) -> tuple[QuestionAnswerDataset, Corpus]:
        if isinstance(sets, str):
            sets = (sets,)
        documents_path = path / "text_data"
        documents = []
        for filename in documents_path.iterdir():
            if any(s in str(filename) for s in sets):
                document_path = documents_path / filename
                document = cls.read_document(document_path)
                documents.append(document)
        document_index = {document.id: document for document in documents}
        qa_dataset = []
        for filename in path.iterdir():
            if any(s in str(filename) for s in sets):
                dataset = pd.read_csv(
                    path / filename, sep="\t", encoding="latin-1",
                )
                dataset = dataset.dropna(axis=0)
                for _, row in dataset.iterrows():
                    qa = cls.load_qa(row, filename, document_index)
                    qa_dataset.append(qa)

        if sample is not None:
            random.shuffle(qa_dataset)
            qa_dataset = qa_dataset[:sample]
        return qa_dataset, documents

We will load the entirety of the dataset, as well as a partial dataset to allow for faster evaluation.

In [None]:
DATA_PATH = pathlib.Path(os.environ.get("DATA_PATH"))
dataset_path = DATA_PATH / "datasets" / "questionanswer-dataset"

qa_dataset, documents = KaggleQADataset.load(dataset_path, sample=None, sets=("S08", "S09", "S10"))
partial_dataset, _ = KaggleQADataset.load(dataset_path, sample=50, sets=("S08", "S09", "S10"))

Below is an example of a sample from the dataset:

In [None]:
sample = qa_dataset[35] # 35
print("Query:", sample.query.text)
print("Answer:", sample.answer.text)
for i, document in enumerate(sample.relevant_documents):
    print(f"Document {i} ({document.id}):", document.text) 

Using this dataset, we can already highlight the weakness of LLMs for factual query answering, by probing an LLM with one of the queries.

In [None]:
@dataclasses.dataclass(frozen=True)
class LLM:
    system_prompt: str
    client: openai.Client
    model_name: str
    ratelimiter: ratelimit.RateLimiter
    
    def answer(self, query: Query) -> Answer:
        with self.ratelimiter:
            response = self.client.chat.completions.create(
                messages=[
                    {"role": "system", "content": self.system_prompt},
                    {"role": "user", "content": query.text},
                ],
                model=self.model_name,
            )
            choice = random.choice(response.choices)
            return Answer(text=choice.message.content)

    def __call__(self, query: Query) -> Answer:
        return self.answer(query)

In [None]:
MODEL_BASE_URL = "http://localhost:8080/v1"
MODEL_NAME = "..."
MODEL_API_KEY = "..."
MODEL_RPM = 1e6

MODEL_SYSTEM_PROMPT = """Please answer the given user query consicely (do not write full sentences, just provide a direct answer)."""

model_client = openai.Client(base_url=MODEL_BASE_URL, api_key=MODEL_API_KEY)
model = LLM(client=model_client, model_name=MODEL_NAME, ratelimiter=ratelimit.RateLimiter(rpm=MODEL_RPM), system_prompt=MODEL_SYSTEM_PROMPT)

In [None]:
answer = model.answer(sample.query)
print("Query:", sample.query.text)
print("---------------")
print("Generated:", answer.text)
print("---------------")
print("Expected:", sample.answer.text)

### Embedder

We refer as `Embedding` a numerical vector whose goal is to hold a semantic representation of a given text.
We will use `numpy` to store this numerical representation.

In [None]:
Embedding = np.ndarray

We will define an `Embedder` as an interface that can transform a text into an embedding.

In [None]:
class Embedder(Protocol):
    def embed(self, text: str) -> Embedding:
        ...

    def embed_query(self, query: Query) -> Embedding:
        return self.embed(query.text)

    def embed_document(self, document: Document) -> Embedding:
        return self.embed(document.text)

    def embed_documents(
            self, documents: Sequence[Document], *, progress: bool = False,
    ) -> Sequence[Embedding]:
        if progress:
            documents = tqdm.tqdm(documents)
        return [self.embed_document(document) for document in documents]

As a first approach, we propose to use LLM-based embeddings methods using the [OpenAI Embedding API](https://platform.openai.com/docs/guides/embeddings), by
defining an `LLMEmbedder`.

**Note**: Some Embeddings server cannot handle more than a certain context window, so you might need to truncate your texts before sending them to the server.

In [None]:
@dataclasses.dataclass(frozen=True)
class LLMEmbedder(Embedder):
    client: openai.Client
    model_name: str
    ratelimiter: ratelimit.RateLimiter

    def embed(self, text: str) -> Embedding:
        with self.ratelimiter:
            response = self.client.embeddings.create(
                input=text[:5000], model=self.model_name,
            )
            return np.array(response.data[0].embedding)

In [None]:
EMBEDDING_BASE_URL = "http://localhost:8081/v1"
EMBEDDING_MODEL_NAME = "..."
EMBEDDING_API_KEY = "..."
EMBEDDING_RPM = 1e6

embedding_client = openai.Client(base_url=EMBEDDING_BASE_URL, api_key=EMBEDDING_API_KEY)
embedder = LLMEmbedder(client=embedding_client, model_name=EMBEDDING_MODEL_NAME, ratelimiter=ratelimit.RateLimiter(rpm=EMBEDDING_RPM))

In [None]:
embedder.embed_query(sample.query)

### Embedding Similarity

`Embeddings` can be used to create structured spaces, in which we can define distance/similarity metrics to efficiently navigate the space.

We will define `Similarity` between `Embedding` using `numpy`.

In [None]:
Similarity = np.ndarray

We will refer to as a similarity function any function that computes the similarity between 2 `Embedding` and returns a `Similarity`.

In [None]:
class SimilarityFn(Protocol):
    def __call__(self, left: Embedding, right: Embedding, /,) -> Similarity:
        ...

**Note**: Please ensure that the similarity function you used for the Vector Store is the same as the one the model
was trained for, or you risk ending up with irrelevant documents being retrieved. Please refer to the model's official
documentation for closed source model or the model's card on Hugging Face for open weights models.

The most commonly used similarity functions are:

- **Cosine Similarity**: $$\text{sim}(x, y) = \frac{x \cdot y}{|x|\times|y|}$$

In [None]:
def cosine_similarity(left: Embedding, right: Embedding, /) -> Similarity:
    if left.shape[-1] != right.shape[-1]:
        error_message = f"Expected embeddings to have same dimension, but got {left.shape[-1]} and {right.shape[-1]}"
        raise ValueError(error_message)
    left, right = util.expand_match_dims(left, right, sizes=(left.ndim - 1, right.ndim - 1))
    left_norm = np.linalg.norm(left, axis=-1)
    right_norm = np.linalg.norm(right, axis=-1)
    left, right = np.broadcast_arrays(left, right)
    dot_product = np.matmul(left[..., None, :], right[..., None]).squeeze((-1, -2))
    return dot_product / (left_norm * right_norm)

- **Euclidean Similarity**: $$\text{sim}(x, y) = \frac{1}{1 + d(x, y)},\qquad d(x, y) = \sqrt{\sum (x_i - y_i)^2}$$

In [None]:
def euclidean_similarity(left: Embedding, right: Embedding, /) -> Similarity:
    if left.shape[-1] != right.shape[-1]:
        error_message = f"Expected embeddings to have same dimension, but got {left.shape[-1]} and {right.shape[-1]}"
        raise ValueError(error_message)
    left, right = util.expand_match_broadcast(left, v, sizes=(left.ndim - 1, v.ndim - 1))
    euclidean_distance = np.sqrt(np.power(left - right, 2).sum(-1))
    return 1 / (1 + euclidean_distance)

### Retriever

The retriever's job is to find the most relevant documents from a given corpus that contain information that are
relevant to answer a given query.

We will define a `Retriever` as an interface that given a `Query` returns an (ordered) sequence of `Document`.
The `Retriever` should hold the corpus of `Document` to retrieve from.

In [None]:
DocumentIndex = dict[str, int]

@dataclasses.dataclass(frozen=True)
class Retriever(abc.ABC):
    documents: Corpus
    document_index: DocumentIndex

    @abc.abstractmethod
    def retrieve(self, text: Query, k: int | None = None) -> tuple[Sequence[Document], Similarity]:
        raise NotImplementedError

    def __call__(self, text: Query, k: int | None = None) -> Sequence[Document]:
        documents, _ = self.retrieve(text, k)
        return documents
    
    @staticmethod
    def index(documents: Corpus) -> DocumentIndex:
        document_index = {}
        for i, document in enumerate(documents):
            document_index[document.id] = i
        return document_index

    @classmethod
    def from_documents(cls, documents: Corpus, **kwargs) -> 'Retriever':
        document_index = cls.index(documents)
        return cls(documents=documents, document_index=document_index, **kwargs)

    def get_document_by_index(self, index: int) -> Document:
        return self.documents[index]

    def get_index_by_id(self, document_id: str) -> int:
        return self.document_index[document_id]

    def get_document_by_id(self, document_id: str) -> Document:
        index = self.get_index_by_id(document_id)
        return self.get_document_by_index(index)

A special case of `Retriever`, based on using `Embedding` to represent `Query` and `Document` and compute the `Similarity` between them is referred to as
a [Vector Store](https://en.wikipedia.org/wiki/Vector_database). A `VectorStore` is composed of 3 main components:

- A `Corpus` of documents to retrieve from (part of the `Retriever`).
- An `Embedder` to use to embed both documents and queries into a common vector space.
- A `SimilarityFn` used to compute the similarity between query embeddings and document embeddings.

A Vector Store works by precomputing the embeddings of all the documents in the corpus, and when probed with a query, it
embeds it and computes the similarity between the query and each of the documents in the corpus and returns the top-$k$
most similar documents.

In [None]:
@dataclasses.dataclass(frozen=True)
class VectorStore(Retriever):
    embedder: Embedder
    similarity_fn: SimilarityFn
    embeddings: Embedding
    
    
    @staticmethod
    def embeddings_matrix(
            documents: Corpus,
            embedder: Embedder,
            *,
            progress: bool = True,
    ) -> Embedding:
        document_embeddings = embedder.embed_documents(documents, progress=progress)
        return np.stack(document_embeddings, axis=0)

    def retrieve(
            self, query: Query, k: int | None = None,
    ) -> tuple[np.ndarray, Similarity]:
        if k is None:
            k = len(self.documents)
        embedding = self.embedder.embed(query.text)
        similarities = self.similarity_fn(embedding, self.embeddings)
        top_k_indices = np.argsort(-similarities)[..., :k]
        similarities = np.take_along_axis(similarities, top_k_indices, axis=-1)
        documents = np.array(
            [self.get_document_by_index(i) for i in np.ravel(top_k_indices)],
        ).reshape(top_k_indices.shape)
        return documents, similarities

    @classmethod
    def from_documents(cls, documents: Corpus, *, embedder: Embedder, **kwargs):
        return super().from_documents(documents=documents, embedder=embedder, embeddings=cls.embeddings_matrix(documents, embedder), **kwargs)

In [None]:
retriever = VectorStore.from_documents(documents, embedder=embedder, similarity_fn=cosine_similarity)

We have succesfully completed the implementation of the first component of the RAG pipeline!

#### Evaluation

We can already start by evaluating the quality of our `Retriever` component (from which the overall quality of the RAG pipeline will depend). It can be evaluated using standard retrieval metrics:

- **recall_at_k**: computes the ratio of relevant documents that are present in the top-$k$ retrieved documents.
- **precision_at_k**: computes the ratio of documents that are relevant in to top-$k$ retrieved documents.
- **mean_rank**: computes the average rank of the relevant documents.

In [None]:
def infer_retrieval(
        dataset: QuestionAnswerDataset,
        retriever: Retriever,
        *,
        progress: bool = True,
) -> list[list[str]]:
    if progress:
        dataset = tqdm.tqdm(dataset)
    return [[document.id for document in retriever(sample.query)] for sample in dataset]

def evaluate_retrieval(
    dataset: QuestionAnswerDataset,
    retriever: Retriever,
    k: int | Sequence[int],
    *,
    progress: bool = True,
) -> dict[str, float]:
    k = np.array(k)
    rankings = infer_retrieval(dataset, retriever, progress=progress)
    target_documents = [
        [document.id for document in sample.relevant_documents] for sample in dataset
    ]
    rankings = np.array(rankings)
    target_size = max(len(indices) for indices in target_documents)
    targets = np.empty((len(target_documents), target_size), dtype=np.object_)
    masks = np.zeros((len(target_documents), target_size), dtype=float)
    for i, document_ids in enumerate(target_documents):
        targets[i, : len(document_ids)] = document_ids
        masks[i, : len(document_ids)] = 1
    target_ranks = metrics.rank(targets, rankings)
    results = {}

    recalls = metrics.recall_at_k(target_ranks=target_ranks, k=k, mask=masks)
    recalls = recalls[masks.sum(-1) > 0].mean(0)

    precisions = metrics.precision_at_k(target_ranks=target_ranks, k=k, mask=masks)
    precisions = precisions[masks.sum(-1) > 0].mean(0)

    mean_ranks = metrics.mean_rank(target_ranks=target_ranks, mask=masks)
    mean_ranks = mean_ranks[masks.sum(-1) > 0].mean(0)
    for k_value, precision_at_k in zip(k, precisions, strict=False):
        results[f"precision@{k_value}"] = precision_at_k.item()
    for k_value, recall_at_k in zip(k, recalls, strict=False):
        results[f"recall@{k_value}"] = recall_at_k.item()

    results["mean_rank"] = mean_ranks.item()
    return results

In [None]:
k = np.array([1, 5, 10, 100])
retrieval_results = evaluate_retrieval(qa_dataset, retriever, k=k)
retrieval_results = pd.Series(retrieval_results)
retrieval_results

### Generator

The second component of the RAG pipeline is the `Generator`.
It takes the user's query and the retrieved documents and produces a final, human-readable answer.

In [None]:
class Generator(Protocol):
    def __call__(self, query: Query, documents: Sequence[Document]) -> Answer:
        ...

LLMs are well suited for generating text, and can be used to synthesize information from
retrieved documents into a coherent answer. We will define an `LLMGenerator` class, which leverages the OpenAI API for
text generation. The generation will be parametrized using a prompt template, using the `jinja2` library.

In [None]:
@dataclasses.dataclass(frozen=True)
class LLMGenerator(Generator):
    client: openai.Client
    model_name: str
    prompt_template: jinja2.Template
    ratelimiter: ratelimit.RateLimiter

    def prompt(self, query: Query, documents: Sequence[Document]) -> str:
        return self.prompt_template.render(query=query, documents=documents)

    def generate(self, query: Query, documents: Sequence[Document]) -> tuple[str, Answer]:
        prompt = self.prompt(query, documents)
        response = self.client.chat.completions.create(
            messages=[{"role": "user", "content": prompt}],
            model=self.model_name,
        )
        choice = random.choice(response.choices)
        text = choice.message.content
        return prompt, Answer(text=text)

    def __call__(self, query: Query, documents: Sequence[Document]) -> Answer:
        _, answer = self.generate(query, documents)
        return answer

In [None]:
GENERATOR_BASE_URL = "http://localhost:8080/v1"
GENERATOR_MODEL_NAME = "..."
GENERATOR_API_KEY = "..."
GENERATOR_RPM = 1e6

GENERATOR_PROMPT_TEMPLATE = jinja2.Template("""
Given the following documents please answer the query below. Please give a concise and short answer in a few words. If non of the documents provide the answer, please output "Unknown".

{% for document in documents %}
Document {{ loop.index }}:
{{ document.text }}
{% endfor %}

Query: {{ query.text }}
""", undefined=jinja2.StrictUndefined)

generator_client = openai.Client(base_url=GENERATOR_BASE_URL, api_key=GENERATOR_API_KEY)
generator = LLMGenerator(prompt_template=GENERATOR_PROMPT_TEMPLATE, client=generator_client,
                                    model_name=GENERATOR_MODEL_NAME, ratelimiter=ratelimit.RateLimiter(rpm=GENERATOR_RPM))

In [None]:
prompt, answer = generator.generate(sample.query, sample.relevant_documents)
print(prompt)
print("---------------")
print("Generated:", answer.text)
print("---------------")
print("Expected:", sample.answer.text)

#### Evaluation

The text generation is inherently harder to evaluate, due to its free-from nature, but LLMs are also a good choice to compare 2 free text answers and return some signal of how much they match!

We will use an LLM-based `Rater` to judge the quality of the generated answers compared to the dataset's reference answer.

You can read more on this approach, called "LLM-as-a-Judge", [here](https://www.evidentlyai.com/llm-guide/llm-as-a-judge).

In [None]:
class Rate(Protocol):
    def to_float(self) -> float:
        ...

class Rater[T: Rate](Protocol):
    def rate(self, answer: Answer, expected: Answer) -> Rate:
        ...

    def __call__(self, answer: Answer, expected: Answer) -> float:
        return self.rate(answer, expected).to_float()



class Score(pydantic.BaseModel):
    """Class used to represent standard letter-scale scores."""
    score: Literal["A", "B", "C", "D", "E", "F"]

    def to_float(self) -> float:
        """Converts the score to a float value."""
        match self.score:
            case "A":
                return 1.0
            case "B":
                return 0.8
            case "C":
                return 0.6
            case "D":
                return 0.4
            case "E":
                return 0.2
            case "F":
                return 0.0


@dataclasses.dataclass(frozen=True)
class LLMRater[T: Rate](Rater):
    prompt_template: jinja2.Template
    client: openai.Client
    model_name: str
    response_format: type[T] | openai.NotGiven
    ratelimiter: ratelimit.RateLimiter

    def rate(self, answer: Answer, expected: Answer) -> T:
        prompt = self.prompt_template.render(
            answer=answer, expected=expected,
        )
        response = self.client.chat.completions.parse(
            messages=[{"role": "user", "content": prompt}],
            model=self.model_name,
            response_format=self.response_format,
        )
        choice = random.choice(response.choices)
        return choice.message.parsed

In [None]:
RATER_BASE_URL = "http://localhost:8080/v1"
RATER_MODEL_NAME = "..."
RATER_API_KEY = "..."
RATER_RPM = 1e6

RATER_FORMAT = Score
RATER_PROMPT_TEMPLATE = jinja2.Template("""
You will be provided with 2 answers, the first being the ground truth expected answer, and the second one being the actual generated answer, and you should rate the correctness of the generated answer from A to F (A being the best grade, F the worst).

[Expected Answer]
{{ expected.text }}

[Actual Answer]
{{ answer.text }}

[Score]
""", undefined=jinja2.StrictUndefined)

rater_client = openai.Client(base_url=RATER_BASE_URL, api_key=RATER_API_KEY)
rater = LLMRater(prompt_template=RATER_PROMPT_TEMPLATE, client=rater_client, model_name=RATER_MODEL_NAME,
                        ratelimiter=ratelimit.RateLimiter(rpm=RATER_RPM), response_format=RATER_FORMAT)
raters = {"accuracy": rater}

In [None]:
def infer_generator(
        dataset: QuestionAnswerDataset,
        generator: Generator,
        *,
        progress: bool = True,
) -> list[Answer]:
    if progress:
        dataset = tqdm.tqdm(dataset)
    return [
        generator(sample.query, sample.relevant_documents)
        for sample in dataset
    ]

def evaluate_answers(
    actual_answers: list[Answer],
    expected_answers: list[Answer],
    raters: dict[str, Rater],
    *,
    progress: bool = True,
) -> dict[str, float]:
    results = {}
    raters = raters.items()
    if progress:
        raters = tqdm.tqdm(raters, position=0)
    for rater_name, rater in raters:
        rater_values = []
        data = zip(actual_answers, expected_answers, strict=False)
        if progress:
            data = tqdm.tqdm(data, position=0, total=len(actual_answers))
        for actual_answer, expected_answer in data:
            value = rater(actual_answer, expected_answer)
            rater_values.append(value)
        results[rater_name] = sum(rater_values) / len(rater_values)
    return results

def evaluate_generator(
    dataset: QuestionAnswerDataset,
    generator: Generator,
    raters: dict[str, Rater],
        *,
    progress: bool = True,
) -> tuple[dict[str, float], list[Answer]]:
    answers = infer_generator(dataset, generator, progress=progress)
    expected_answers = [sample.answer for sample in dataset]
    results = evaluate_answers(answers, expected_answers, raters, progress=progress)
    return results, answers

In [None]:
generator_results, generator_answers = evaluate_generator(partial_dataset, generator, raters)
generator_results

### RAG Pipeline

Now that we have implemented all the components, we can implement the high level RAG wrapper pipeline.

The `RAG` pipeline should be parametrized from a `Retriever`, a `Generator` and a value `k` of documents to retrieve.

In [None]:
@dataclasses.dataclass(frozen=True)
class RAG:
    retriever: Retriever
    generator: Generator
    k: int

    def generate(self, query: Query) -> tuple[Sequence[Document], Answer]:
        documents = self.retriever(query, self.k)
        answer = self.generator(query, documents)
        return documents, answer

    def __call__(self, query: Query) -> Answer:
        _, answer = self.generate(query)
        return answer

In [None]:
K = 1
rag = RAG(retriever=retriever, generator=generator, k=K)

In [None]:
_, answer = rag.generate(sample.query)
print("Query:", sample.query)
print("---------------")
print("Generated:", answer.text)
print("---------------")
print("Expected:", sample.answer.text)

#### Evaluation

The full RAG pipeline can be evaluated using the same metrics as the generator, but this time the documents passed to the prompt will be provided by the retriever (instead of being taken from the ground truth documents). We can also evaluate the baseline model in a similar fashion.

In [None]:
def infer_model(
        dataset: QuestionAnswerDataset,
        model: LLM | RAG,
        *,
        progress: bool = True,
) -> list[Answer]:
    if progress:
        dataset = tqdm.tqdm(dataset)
    answers = [model(sample.query) for sample in dataset]
    return answers
    
def evaluate_model(
    dataset: QuestionAnswerDataset,
    pipeline: LLM | RAG,
    raters: dict[str, Rater],
    *,
    progress: bool = True,
) -> tuple[dict[str, float], list[Answer], list[Sequence[Document]]]:
    answers = infer_model(dataset, pipeline, progress=progress)
    expected_answers = [sample.answer for sample in dataset]
    results = evaluate_answers(answers, expected_answers, raters, progress=progress)
    return results, answers

In [None]:
rag_results, rag_answers = evaluate_model(partial_dataset, rag, raters)
rag_results

In [None]:
model_results, model_answers = evaluate_model(partial_dataset, model, raters)
model_results

In [None]:
pd.DataFrame({"Vanilla LLM": model_results, "RAG": rag_results, "RAG (Generator only)": generator_results}).plot.bar(figsize=(10, 10))

In [None]:
sample.query

### Conclusion

This work provided a foundational implementation for each model component. For those interested in **further refinement**, significant improvements can be achieved by investigating the following individual components and techniques:

* **Document Chunking**: This technique involves splitting source documents into smaller text segments (chunks). The benefit is a **reduction in the context window size** required by the Generator, which improves efficiency. The trade-off, however, is an **increase in the overall corpus size**, which impacts the time and memory complexity of the retrieval pipeline.
* **Hybrid & Asymmetrical Embedding Methods**:
    * **Hybrid Methods** involve aggregating the outputs of several distinct embedding techniques (e.g., sparse and dense methods) to generate a more robust document embedding.
    * **Asymmetrical Methods** use different embedding models specifically optimized for the unique characteristics of the user query (short, conversational) versus the document chunks (long, factual).
* **Better Large Language Models (LLMs)**: The pipeline was designed to be **model-agnostic**. You can investigate and experiment with different combinations of LLMs for the various roles (embedding, generation, and evaluation) to determine which models are best suited for each specific retrieval, generation, or evaluation task.
* **"Agentic RAG"**: Standard RAG is limited because it must always pull information exclusively from the existing document corpus. A powerful extension is to integrate RAG into a broader **Agent** framework. In this approach, the Retrieval Component is treated as one **tool** an agent can choose to call. The agent can then use this tool (or other non-retrieval tools, like web searches or code execution) to gather information, implicitly driving the final generation based on the results of the tool calls.

### Bonus: Agentic RAG

In [None]:
from hslu.dlm03.common import agent, backend, chat, chat_display, tools, types

In [None]:
from mcp.server import FastMCP

SERVER = FastMCP()


@SERVER.tool()
def retrieve(query: str, k: int | None = None, threshold: float | None = None) -> float:
    """Retrieves the k most relevant for the given query (that are at least above the given similarity threshold)."""
    documents, similarity = retriever.retrieve(Query(text=query), k)
    if threshold:
        documents = [document for document, similarity in zip(documents, similarity) if similarity > threshold]
    return documents

In [None]:
import threading

import uvicorn

PORT = 5000
HOST = "localhost"

RUN_ARGS = {
    "app": SERVER.streamable_http_app,
    "port": PORT,
    "host": HOST,
}

MCP_THREAD = threading.Thread(target=uvicorn.run, kwargs=RUN_ARGS)
MCP_THREAD.start()

In [None]:
MCP_SERVER_URL = f"http://{HOST}:{PORT}/mcp"
TOOL_MANAGER = tools.ToolManager.from_url(MCP_SERVER_URL)

In [None]:
AGENT_BACKEND = backend.LLamaCpp(base_url=MODEL_BASE_URL, ratelimit=MODEL_RPM).get_async_backend()

In [None]:
SYSTEM_INSTRUCTIONS = """You are a helpful assistant tasked with answering user questions.
You should use the tools provided to you to ensure the answer is factual by finding the answer in relevant documents.
Please give a consice final answer the given user query (do not write full sentences, just provide a direct answer)."""

In [None]:
AGENT = agent.Agent(AGENT_BACKEND, TOOL_MANAGER)

In [None]:
async def infer_agent(
        dataset: QuestionAnswerDataset,
        agent: agent.Agent,
        system_instuctions: str,
        *,
        progress: bool = True,
) -> list[Answer]:
    if progress:
        dataset = tqdm.tqdm(dataset)
    answers = []
    for sample in dataset:
        sample_chat = chat.Chat(messages=[{"role": "system", "content": system_instuctions}, {"role": "user", "content": sample.query.text}])
        messages = await agent(sample_chat)
        answers.append(Answer(text=messages[-1].content))
    return answers
    
async def evaluate_agent(
    dataset: QuestionAnswerDataset,
    agent: agent.Agent,
    system_instuctions: str,
    raters: dict[str, Rater],
    *,
    progress: bool = True,
) -> tuple[dict[str, float], list[Answer], list[Sequence[Document]]]:
    answers = await infer_agent(dataset, agent, system_instuctions, progress=progress)
    expected_answers = [sample.answer for sample in dataset]
    results = evaluate_answers(answers, expected_answers, raters, progress=progress)
    return results, answers

In [None]:
agent_results, agent_answers = await evaluate_agent(partial_dataset, AGENT, SYSTEM_INSTRUCTIONS, raters)
agent_results

In [None]:
pd.DataFrame({"Vanilla LLM": model_results, "RAG": rag_results, "RAG (Generator only)": generator_results, "Agentic RAG": agent_results}).plot.bar(figsize=(10, 10))