# RAG Evaluation Experiment Loop

This notebook reproduces the workflow in [rag_experiment_evaluation.ipynb](rag_experiment_evaluation.ipynb) through a series of functions. 

These functions allow us to loop through a CSV of experiments in order to systematically change prompts and measure the results.


### 1. Import required packages

In [None]:
from uuid import UUID
import json
import pandas as pd
import pickle
from dataclasses import asdict
from pathlib import Path
import jsonlines
from elasticsearch import Elasticsearch

from redbox.models import Settings
from redbox.models.settings import ElasticLocalSettings
from redbox.storage.elasticsearch import hit_to_chunk
from redbox.models import Settings

from langchain_community.chat_models import ChatLiteLLM
from langchain.globals import set_verbose

from collections.abc import Callable

from elasticsearch import Elasticsearch
from langchain_community.chat_models import ChatLiteLLM
from langchain_core.retrievers import BaseRetriever
from langchain_core.runnables import ConfigurableField

from redbox.models import Settings
from redbox.models.file import UUID
from redbox.storage.elasticsearch import hit_to_chunk


set_verbose(False)

from dotenv import find_dotenv, load_dotenv

_ = load_dotenv(find_dotenv())

pd.set_option("display.max_colwidth", None)

ENV = Settings(minio_host="localhost", elastic=ElasticLocalSettings(host="localhost"))

### 2. Set evaluation data version

In [None]:
# DATA_VERSION = ""


### 3. Set paths and global variables

In [None]:
ROOT = Path.cwd().parents[1]
EVALUATION_DIR = ROOT / "notebooks/evaluation"

V_ROOT = EVALUATION_DIR / f"data/{DATA_VERSION}"
V_RAW = V_ROOT / "raw"
V_SYNTHETIC = V_ROOT / "synthetic"
V_CHUNKS = V_ROOT / "chunks"
V_RESULTS = V_ROOT / "results"
V_EMBEDDINGS = V_ROOT / "embeddings"

V_ROOT.mkdir(parents=True, exist_ok=True)
V_RAW.mkdir(parents=True, exist_ok=True)
V_SYNTHETIC.mkdir(parents=True, exist_ok=True)
V_CHUNKS.mkdir(parents=True, exist_ok=True)
V_RESULTS.mkdir(parents=True, exist_ok=True)
V_EMBEDDINGS.mkdir(parents=True, exist_ok=True)

In [None]:
MODEL = ENV.embedding_model
INDEX = f"{DATA_VERSION}-{MODEL}".lower()

In [None]:
USER_UUID = UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa")
S3_CLIENT = ENV.s3_client()
ES_CLIENT = ENV.elasticsearch_client()

### 4. Load embeddings into the index and get file UUIDs <a id="load-embeddings"></a>

In [None]:
def load_chunks_from_jsonl_to_index(file_path: Path, es_client: Elasticsearch, index: str) -> set:

    file_uuids = set()

    with jsonlines.open(file_path, mode="r") as reader:
        for chunk_raw in reader:
            chunk = json.loads(chunk_raw)
            es_client.index(
                index=index,
                id=chunk["uuid"],
                body=chunk,
            )

            file_uuids.add(chunk["parent_file_uuid"])

    return file_uuids

In [None]:
FILE_UUIDS = load_chunks_from_jsonl_to_index(file_path=V_EMBEDDINGS / f"{MODEL}.jsonl", es_client=ES_CLIENT, index=INDEX)

### 5. Define function to get RAG outputs based on prompts in experiments 
Note: these can be made more efficient

In [None]:
from core_api.src.runnables import make_chat_prompt_from_messages_runnable
from core_api.src import dependencies
from core_api.src.format import format_chunks
from redbox.models import ChatRoute, Chunk, Settings
from core_api.src.dependencies import get_es_retriever
from redbox.models.chain import ChainInput

from typing import Annotated, Any
from fastapi import Depends
from langchain_core.runnables import Runnable, RunnableLambda, RunnablePassthrough, chain
from langchain_core.vectorstores import VectorStoreRetriever
from operator import itemgetter
from langchain.schema import StrOutputParser
from langchain_elasticsearch import ElasticsearchRetriever

LLM = ChatLiteLLM(
    model="gpt-4o",
    streaming=True,
)


def get_es_retriever(
    env: Annotated[Settings, Depends(dependencies.get_env)], es: Annotated[Elasticsearch, Depends(dependencies.get_elasticsearch_client)]
) -> BaseRetriever:
    """Creates an Elasticsearch retriever runnable.

    Runnable takes input of a dict keyed to question, file_uuids and user_uuid.

    Runnable returns a list of Chunks.
    """

    def es_query(query: dependencies.ESQuery, params: dependencies.ESParams) -> dict[str, Any]:
        vector = dependencies.get_embedding_model(env).embed_query(query["question"])

        query_filter = [
            {
                "bool": {
                    "should": [
                        {"term": {"creator_user_uuid.keyword": str(query["user_uuid"])}},
                        {"term": {"metadata.creator_user_uuid.keyword": str(query["user_uuid"])}},
                    ]
                }
            }
        ]

        if len(query["file_uuids"]) != 0:
            query_filter.append(
                {
                    "bool": {
                        "should": [
                            {"terms": {"parent_file_uuid.keyword": [str(uuid) for uuid in query["file_uuids"]]}},
                            {
                                "terms": {
                                    "metadata.parent_file_uuid.keyword": [str(uuid) for uuid in query["file_uuids"]]
                                }
                            },
                        ]
                    }
                }
            )

        return {
            "size": params["size"],
            "query": {
                "bool": {
                    "should": [
                        {
                            "match": {
                                "text": {
                                    "query": query["question"],
                                    "boost": params["match_boost"],
                                }
                            }
                        },
                        {
                            "knn": {
                                "field": "embedding",
                                "query_vector": vector,
                                "num_candidates": params["num_candidates"],
                                "filter": query_filter,
                                "boost": params["knn_boost"],
                                "similarity": params["similarity_threshold"],
                            }
                        },
                    ],
                    "filter": query_filter,
                }
            },
        }

    class ParameterisedElasticsearchRetriever(ElasticsearchRetriever):
        params: dependencies.ESParams
        body_func: Callable[[str], dict]

        def __init__(self, **kwargs: Any) -> None:
            super().__init__(**kwargs)
            self.body_func = dependencies.partial(self.body_func, params=self.params)

    default_params = {
        "size": env.ai.rag_k,
        "num_candidates": env.ai.rag_num_candidates,
        "match_boost": 1,
        "knn_boost": 1,
        "similarity_threshold": 0,
    }

    return ParameterisedElasticsearchRetriever(
        es_client=es,
        index_name=INDEX,
        body_func=es_query,
        document_mapper=hit_to_chunk,
        params=default_params,
    ).configurable_fields(
        params=ConfigurableField(
            id="params", name="Retriever parameters", description="A dictionary of parameters to use for the retriever."
        )
    )

def build_retrieval_chain(
    llm: Annotated[ChatLiteLLM, Depends(dependencies.get_llm)],
    retriever: Annotated[VectorStoreRetriever, Depends(dependencies.get_es_retriever)],
    env: Annotated[Settings, Depends(dependencies.get_env)],
    RETRIEVAL_SYSTEM_PROMPT: str,
    RETRIEVAL_QUESTION_PROMPT: str,
) -> Runnable:
    return (
        RunnablePassthrough.assign(documents=retriever)
        | RunnablePassthrough.assign(
            formatted_documents=(RunnablePassthrough() | itemgetter("documents") | format_chunks)
        )
        | {
            "response": make_chat_prompt_from_messages_runnable(
                RETRIEVAL_SYSTEM_PROMPT, RETRIEVAL_QUESTION_PROMPT
            )
            | llm
            | StrOutputParser(),
            "source_documents": itemgetter("documents"),
            "route_name": RunnableLambda(lambda _: ChatRoute.search.value),
        }
    )
def write_rag_results(V_SYNTHETIC: str, 
                      EXPERIMENT_NAME: str,  
                      RETRIEVAL_SYSTEM_PROMPT:str, 
                      RETRIEVAL_QUESTION_PROMPT:str
                      ) -> None:

    def get_rag_results(question: str
                        ) -> dict:

        retriever = get_es_retriever(env=ENV,
                                     es=ENV.elasticsearch_client())
        
        chain = build_retrieval_chain(llm=LLM,
                                      retriever=retriever, 
                                      RETRIEVAL_SYSTEM_PROMPT=RETRIEVAL_SYSTEM_PROMPT,
                                      RETRIEVAL_QUESTION_PROMPT=RETRIEVAL_QUESTION_PROMPT,
                                      env=ENV)
        
        response = chain.invoke(
            input=ChainInput(
                question=question,
                chat_history = [{"text": "", "role": "user"}],
                file_uuids=list(FILE_UUIDS),
                user_uuid=USER_UUID,
            ).model_dump()
        )

        filtered_chunks = []

        for chunk in response['source_documents']:

            chunk = dict(chunk)
            filtered_chunk = {'text': chunk['text'], 'uuid':chunk['uuid'], 'parent_file_uuid': chunk['parent_file_uuid']}
            filtered_chunks.append(filtered_chunk)

        return {"output_text": response["response"], "source_documents": filtered_chunks}
    
    df = pd.read_csv(f"{V_SYNTHETIC}/ragas_synthetic_data.csv")
    inputs = df["input"].tolist()

    df_function = df.copy()

    actual_output = []
    retrieval_context = []

    for question in inputs:
        
        data = get_rag_results(question=question)
        actual_output.append(data["output_text"])
        retrieval_context.append(data['source_documents'])

    df_function["actual_output"] = actual_output
    df_function["retrieval_context"] = retrieval_context

    df_function_clean = df_function.dropna()
    df_function_clean.to_csv(f"{V_SYNTHETIC}/{EXPERIMENT_NAME}_complete_ragas_synthetic_data.csv", index=False)

### 6. Define function to evaluation test dataset 
Note: these can be made more efficient

In [None]:
from deepeval.dataset import EvaluationDataset
from deepeval import evaluate
from deepeval.metrics import (
    ContextualPrecisionMetric,
    ContextualRecallMetric,
    ContextualRelevancyMetric,
    AnswerRelevancyMetric,
    FaithfulnessMetric,
    HallucinationMetric,
)

def do_evaluation(V_SYNTHETIC: str, V_RESULTS: str, EXPERIMENT_NAME: str) -> None:

    dataset = EvaluationDataset()
    dataset.add_test_cases_from_csv_file(
        file_path=f'{V_SYNTHETIC}/{EXPERIMENT_NAME}_complete_ragas_synthetic_data.csv', # function
        input_col_name="input",
        actual_output_col_name="actual_output",
        expected_output_col_name="expected_output",
        context_col_name="context",
        context_col_delimiter= ";",
        retrieval_context_col_name="retrieval_context",
        retrieval_context_col_delimiter= ";"
    )

    # Instantiate retrieval metrics
    contextual_precision = ContextualPrecisionMetric(
        threshold=0.5, # default is 0.5
        model="gpt-4o",
        include_reason=True
    )

    contextual_recall = ContextualRecallMetric(
        threshold=0.5, # default is 0.5
        model="gpt-4o",
        include_reason=True
    )

    contextual_relevancy = ContextualRelevancyMetric(
        threshold=0.5, # default is 0.5
        model="gpt-4o",
        include_reason=True
    )

    # Instantiate generation metrics
    answer_relevancy = AnswerRelevancyMetric(
        threshold=0.5, # default is 0.5
        model="gpt-4o",
        include_reason=True
    )

    faithfulness = FaithfulnessMetric(
        threshold=0.5, # default is 0.5
        model="gpt-4o",
        include_reason=True
    )

    hallucination = HallucinationMetric(
        threshold=0.5, # default is 0.5
        model="gpt-4o",
        include_reason=True
    )

    eval_results = evaluate(
        test_cases=dataset,
        metrics=[
            contextual_precision,
            contextual_recall,
            contextual_relevancy,
            answer_relevancy,
            faithfulness,
            hallucination
        ]
    )

    with open(f"{V_RESULTS}/{EXPERIMENT_NAME}_generation_eval_results", "wb") as f:
        pickle.dump(eval_results, f)

    with open(f"{V_RESULTS}/{EXPERIMENT_NAME}_generation_eval_results", "rb") as f:
        eval_results = pickle.load(f)

    metric_type = {
    "metric_name": ["Contextual Precision", "Contextual Recall", "Contextual Relevancy", "Answer Relevancy", "Faithfulness", "Hallucination"],
    "metric_type": ["retrieval", "retrieval", "retrieval", "generation", "generation", "generation"]}


    evaluation = (
        pd.DataFrame.from_records(asdict(result) for result in eval_results)
        .explode("metrics_metadata")
        .reset_index(drop=True)
        .assign(
            metric_name=lambda df: df.metrics_metadata.apply(getattr, args=["metric"]),
            score=lambda df: df.metrics_metadata.apply(getattr, args=["score"]),
            reason=lambda df: df.metrics_metadata.apply(getattr, args=["reason"]),
        )
        .merge(pd.DataFrame(metric_type), on="metric_name")
        .drop(columns=["success", "metrics_metadata"])
    )

    evaluation.to_csv(f"{V_RESULTS}/{EXPERIMENT_NAME}_generation_eval_results.csv", index=False)
    evaluation.head()
        
    return evaluation

### 7. Load CSV of experiments. See Google Drive folder 'experiment_parameters' for example.

In [None]:
experiment_parameters = pd.read_csv('data/experiment_parameters/prompt_chunk_experiment_dataset.csv')

# # # Filter by experiment name if you wish to only run certain experimental parameters
# experiment_parameters = experiment_parameters[(experiment_parameters.experiment_name == 'original_prompt') |
#                                               (experiment_parameters.experiment_name == 'unhelpful_prompt')]

experiment_parameters.head()

### 8. Loop through experiments and pass parameters to each function, returning the concantenated evaluation results.

In [None]:
for index, row in experiment_parameters.iterrows():

    EXPERIMENT_NAME = row["experiment_name"]
    RETRIEVAL_SYSTEM_PROMPT = row["retrieval_system_prompt"]
    RETRIEVAL_QUESTION_PROMPT = row["retrieval_question_prompt"]
    CHUNK_SIZE = row['chunk_size']

    write_rag_results(V_SYNTHETIC, EXPERIMENT_NAME, RETRIEVAL_SYSTEM_PROMPT, RETRIEVAL_QUESTION_PROMPT)

    do_evaluation(V_SYNTHETIC, V_RESULTS, EXPERIMENT_NAME)

### 9. Load and visualise results
Note: there are some complexities that could require additional analysis, such as the uncertainty associated with each individual LLM judge score and the wide range of scores (0 to 1 for some metrics)

In [None]:
%config InlineBackend.figure_format = 'retina'
import scipy.stats as stats
import seaborn as sns

experiments = []

# baseline = pd.read_csv(f"{V_RESULTS}/baseline.csv")
# baseline['experiment_name'] = 'baseline'
# experiments.append(baseline)

# Comment out if you only want to view baseline statistics
# Populate with experiment names
experiment_names = ['original_prompt', 'unhelpful_prompt']
for experiment_name in experiment_names:
    experiment = pd.read_csv(f"{V_RESULTS}/{experiment_name}_generation_eval_results.csv")
    experiment['experiment_name'] = experiment_name
    experiments.append(experiment)

experiments_df = pd.concat(experiments)

def empirical_ci(df):

    df_grouped = (df
                  .groupby(["experiment_name", "metric_name"])['score']
                  .agg(["mean", 'sem', 'min', 'max', 'count'])
                  .reset_index()
                  )
        
    ci = stats.t.interval(confidence=0.95, 
                          df=df_grouped['count']-1,
                          loc=df_grouped['mean'],
                          scale=df_grouped['sem'])

    df_grouped['ci_low'] = ci[0]
    df_grouped['ci_high'] = ci[1] 

    return df_grouped

# Note that the confidence intervals in sns.barplot is calculated by bootstrapping. 
# See empirical_ci() above for empirical confidence interval calculation. 
sns.barplot(experiments_df, x="score", y="metric_name", hue='experiment_name', errorbar=('ci', 95))

experiment_metrics = empirical_ci(experiments_df)
experiment_metrics.to_csv(f"{V_RESULTS}/{experiment_name}_eval_results_full.csv")
experiment_metrics