# RAG - Search evaluation
The quality of a RAG result heavily depends on the output of the "retrieval" step. This notebook shows how to use IBM watsonx and Langchain Evaluation framework to assess weather the result of a search is relevant to the search string.
You find the "RETRIEVAL_RELEVANCE" criteria used to perform the evalution in [./utils/customer_criteria.py](./utils/custom_criteria.py). 

Please note that the criteria defined above is targeting the most common RAG use case, in which you are searching for an answer to a question. Your use case might be different. You should then customise the criteria to match your problem domain.




## Create the evaluator factory

In [1]:
import os
from dotenv import load_dotenv
from utils.wx_evaluator import wx_EvalFactory
from utils.custom_criteria import CustomCriteria
from langchain.evaluation import Criteria

load_dotenv()
url = os.environ.get("WATSONX_API_URL")
apikey = os.environ.get("WATSONX_API_KEY")
project_id = os.environ.get("WATSONX_PROJECT_ID")

credentials = {
    "url": url,
    "apikey": apikey
}

factory = wx_EvalFactory(credentials, project_id)


## Configure logging

In [2]:
import logging
import langchain
import sys

log_levels = {
    "CRITICAL": logging.CRITICAL,
    "ERROR": logging.ERROR,
    "WARNING": logging.WARNING,
    "INFO": logging.INFO,
    "DEBUG": logging.DEBUG,
}

LOG_LEVEL = log_levels[os.environ.get("LOG_LEVEL", "INFO").upper()]

if LOG_LEVEL == logging.DEBUG:
    langchain.globals.set_debug(True)


logging.basicConfig(
    level=LOG_LEVEL,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        #logging.FileHandler("app.log"), 
        logging.StreamHandler(sys.stdout)
    ],
)


## Load search results
This implementation assumes the search results to evaluate are stored in a .csv file in the ./data folder. The file has two columns: "Input" (the search query) and "Output" (the search result). Saving your searches in a file before evaluating them enables you to decouple the execution of the search from the evaluation. This is useful during the first evaluation cycles when you might want to test multiple evaluation approaches.

When you are comfortable with the evaluation results you can extend the notebook to include the execution of the search too. 

In [None]:
import pandas as pd

csv_file_path = './data/search_results.csv'
strip_function = lambda x: x.strip() if isinstance(x, str) else x
columns_to_strip = ['Input', 'Output']
df = pd.read_csv(csv_file_path, converters={col: strip_function for col in columns_to_strip})
df = df.dropna(axis=0, how='all')
df = df.dropna(axis=1, how='all')
records = df.to_dict('records')
logging.debug(df.head)

## Evaluate search results relevancy
This section is running an evaluation chain for every example included in the dataset. The results are saved in a "search_result_evaluation.csv" file in the ./data folder.

The chains are run in parallel to reduce the total execution time. You can control the level of parallelism through the "n" variable defined below (set to 10 by default).



In [None]:
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

evaluator = factory.load_evaluator(CustomCriteria.RETRIEVAL_RELEVANCE)
results = []

# max number of parallel execution threads
n = 10

def evaluate_record(record):
    logging.debug(record)
    eval_result = evaluator.evaluate_strings(
        prediction=record["Output"],
        input=record["Input"],
    )
    logging.debug(eval_result)
    return record, eval_result

def evaluate_records_in_parallel(records, n):
    results = []
    with ThreadPoolExecutor(max_workers=n) as executor:
        # Submit all tasks to the executor
        future_to_record = {executor.submit(evaluate_record, record): record for record in records}
        try:
            # As each task completes, process its result
            for future in as_completed(future_to_record):
                eval_record, result = future.result()  # This will raise any exceptions caught during the task execution
                results.append({
                    "Input": eval_record["Input"],  
                    "value": result["value"],
                    "score": result["score"],
                    "reasoning": result["reasoning"]
                })
        except Exception as exc:
            # If any exception occurs, log it and raise it to stop the processing
            logging.error('A record generated an exception: %s' % exc)
            executor.shutdown(wait=False)  # Immediately stop all executing tasks
            raise  # Reraise the exception to indicate failure

    return results


results = evaluate_records_in_parallel(records, n)
results_dict = {result["Input"]: result for result in results}
    
# Initialize new columns with default values or NaNs
df['Eval value'] = pd.NA
df['Eval score'] = pd.NA
df['Eval reasoning'] = pd.NA
    
# Iterate over DataFrame to set values based on "Input"
for idx, row in df.iterrows():
    input_val = row["Input"]
    if input_val in results_dict:
        df.at[idx, 'Eval value'] = results_dict[input_val]["value"]
        df.at[idx, 'Eval score'] = results_dict[input_val]["score"]
        df.at[idx, 'Eval reasoning'] = results_dict[input_val]["reasoning"]
  
# Getting the current date and time
current_datetime = datetime.now().strftime("%Y%m%d_%H%M%S")

# Constructing the new file name with the current date and time and the original csv_file_path
eval_file_path = csv_file_path.replace('.csv', f"_evaluation_{current_datetime}.csv")
df.to_csv(eval_file_path, index=False)


