# Ragas evaluation
Test batch and ragas capability.

Uses this article as a model: https://towardsdatascience.com/visualize-your-rag-data-evaluate-your-retrieval-augmented-generation-system-with-ragas-fc2486308557

Ragas repository: https://github.com/explodinggradients/ragas/tree/main

In [1]:
import os, sys
import json
from pathlib import Path
import pickle

from ragas.testset import TestsetGenerator
from ragas import RunConfig
from dotenv import load_dotenv,find_dotenv
import chromadb
from chromadb import PersistentClient
from pinecone import Pinecone as pinecone_client, ServerlessSpec
from langchain_pinecone import PineconeVectorStore
from langchain_community.vectorstores import Chroma
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_voyageai import VoyageAIEmbeddings
from langchain_community.embeddings import HuggingFaceInferenceAPIEmbeddings
from langchain_core.documents import Document
import pandas as pd
import random

from ragas import evaluate
from ragas.metrics import answer_correctness
from datasets import Dataset

from renumics import spotlight
from renumics.spotlight import Embedding
import pandas as pd

from umap import UMAP
import numpy as np

# Import local packages
sys.path.append('../src/aerospace_chatbot')
import queries
from data_processing import _stable_hash_meta, archive_db, get_docs_questions_df

# Set environment variables with .env
load_dotenv(find_dotenv(), override=True)

True

## Utility functions

In [2]:
def write_dict_to_file(data_dict, filename):
    """write a dictionary as a json line to a file - allowing for appending"""
    with open(filename, "a") as f:
        f.write(json.dumps(data_dict) + "\n")

def read_dicts_from_file(filename):
    """Read a json line file as a generator of dictionaries - allowing to load multiple dictionaries as list."""
    with open(filename, "r") as f:
        for line in f:
            yield json.loads(line)

In [3]:
def add_cached_column_from_file(df, file_name, merge_on, column):
    """Read a file with cached list of dicts data write it to a dataframe."""
    if Path(file_name).exists():
        cached_answer_correctness = (
            pd.DataFrame(list(read_dicts_from_file(file_name)))
            .drop_duplicates(
                subset=[merge_on],
            )[[column, merge_on]]
            .dropna()
            .reset_index(drop=True)
        )
        return df.merge(
            cached_answer_correctness,
            on=merge_on,
            how="left",
        ).reset_index(drop=True)
    else:
        # Create a copy of the DataFrame
        df_out = df.copy()

        # Add the new column with the name of the variable 'column'
        df_out[column] = None

        # Reorder the columns to place the new column at the end
        columns = list(df_out.columns)
        columns.remove(column)
        columns.append(column)
        df_out = df_out[columns]
        
        return df_out

In [4]:
def synthetic_dataset_loop(lcdocs,eval_size,n_questions,fname):
    # Check if testset.csv exists, use, or generate the synthetic dataset. If it doesn't exist, just loop through everything.
    # If a partial export exists, it will loop through the remaining indices.
    # If a few entries are missing it will loop through that full index.
    if os.path.exists(fname):
        # Import testset.csv into a DataFrame
        df_testset = pd.read_csv(fname)

        full_index_list = list(range(0, len(lcdocs), eval_size))
        index_values = df_testset['index'].unique()
        index_loop = [] # Indices to index over

        for index_value in index_values:
            if len(df_testset[df_testset['index'] == index_value]) != n_questions:
                index_loop.append(index_value)

        index_loop += list(set([index for index in full_index_list if index not in index_values]))
        index_loop = sorted(index_loop)
    else:
        df_testset = pd.DataFrame()  # Initialize an empty DataFrame
        index_loop = list(range(0, len(lcdocs), eval_size))
    return df_testset, index_loop

In [5]:
def generate_testset(lcdocs,generator,eval_size,n_questions,fname,run_config):
    # Loop through eval_size chunks of the dataset to generate the testset. Export to csv along the way.

    # Initialize the DataFrame to store the testset, determine what parts of the dataset need to be looped over
    df_testset, index_loop = synthetic_dataset_loop(lcdocs,eval_size,n_questions,fname)
    print(f"Index loop: {index_loop}")

    for i in index_loop:
        print(f"Processing index {i} to {i+eval_size}...")
        lcdocs_eval = lcdocs[i:i+eval_size]

        testset = generator.generate_with_langchain_docs(lcdocs_eval, 
                                                        test_size=n_questions,
                                                        with_debugging_logs=True,
                                                        is_async=True,
                                                        run_config=run_config,
                                                        raise_exceptions=False)

        # df_testset_new = pd.DataFrame()  # Initialize an empty DataFrame
        df_testset_new = testset.to_pandas()
        df_testset_new['index'] = i
        df_testset_new['eval_size'] = eval_size

        # Export the testset to a csv file with the index
        if os.path.exists(fname):
            df_testset_new.to_csv(fname, mode='a', header=False, index=False)
        else:
            df_testset_new.to_csv(fname, index=False)
        df_testset = pd.concat([df_testset, df_testset_new])
    return df_testset

In [6]:
def rag_responses(index_type, index_name, query_model, llm, QA_model_params, df_qa, df_docs):
    # df_questions_answers_out=df_questions_answers_in.copy()
    
    # Generate responses using RAG with input parameters
    for i, row in df_qa.iterrows():
        if row['answer'] is None or pd.isnull(row['answer']) or row['answer']=='':
            print(f"Processing question {i+1}/{len(df_qa)}")

            # Use the QA model to query the documents
            qa_obj=queries.QA_Model(index_type,
                            index_name,
                            query_model,
                            llm,
                            **QA_model_params)
            qa_obj.query_docs(row['question'])
            response=qa_obj.result

            df_qa.loc[df_qa.index[i], "answer"] = response['answer'].content

            ids=[_stable_hash_meta(source_document.metadata)
                for source_document in response['references']]
            df_qa.loc[df_qa.index[i], "source_documents"] = ', '.join(ids)

            # Save the response to cache file
            response_dict = {
                "question": row['question'],
                "answer": response['answer'].content,
                "source_documents": ids,
            }
            write_dict_to_file(response_dict, os.path.join('output',f'rag_response_cache_{index_name}.json'))

    # Get the context documents content for each question
    source_documents_list = []
    for cell in df_qa['source_documents']:
        cell_list = cell.strip('[]').split(', ')
        context=[]
        for cell in cell_list:
            context.append(df_docs[df_docs["id"] == cell]["document"].values[0])
        source_documents_list.append(context)
    df_qa["contexts"]=source_documents_list

    # Addtionaly get embeddings for questions
    if not Path(os.path.join('output',f'question_embeddings_{index_name}.pickle')).exists():
        question_embeddings = [
            query_model.embed_query(question)
            for question in df_qa["question"]
        ]
        with open(os.path.join('output',f'question_embeddings_{index_name}.pickle'), "wb") as f:
            pickle.dump(question_embeddings, f)

    question_embeddings = pickle.load(open(os.path.join('output',f'question_embeddings_{index_name}.pickle'), "rb"))
    df_qa["embedding"] = question_embeddings
    return df_qa

In [7]:
def eval_rag(index_name, df_qa):
    # df_questions_answers_out=df_questions_answers_in.copy()

    # Add answer correctness column, fill in if it exists
    df_qa = add_cached_column_from_file(
        df_qa, os.path.join('output',f'ragas_result_cache_{index_name}.json', "question"), "answer_correctness"
    )

    # Sometimes ground_truth does not provide a response. Just filter those out.
    df_qa = df_qa[df_qa['ground_truth'].apply(lambda x: isinstance(x, str))]
    df_qa

    # Prepare the dataframe for evaluation
    df_qa_eval = df_qa.copy()

    # Evaluate the answer correctness if not already done
    fields = ["question", "answer", "contexts", "ground_truth"]
    for i, row in df_qa_eval.iterrows():
        print(i, row["question"])
        # TODO add multiple eval criteria
        if row["answer_correctness"] is None or pd.isnull(row["answer_correctness"]):
            evaluation_result = evaluate(
                Dataset.from_pandas(df_qa_eval.iloc[i : i + 1][fields]),
                [answer_correctness],
            )
            df_qa_eval.loc[i, "answer_correctness"] = evaluation_result[
                "answer_correctness"
            ]

            # optionally save the response to cache
            response_dict = {
                "question": row["question"],
                "answer_correctness": evaluation_result["answer_correctness"],
            }
            write_dict_to_file(response_dict, os.path.join('output',f'ragas_result_cache_{index_name}.json'))

    # write the answer correctness to the original dataframe
    df_qa["answer_correctness"] = df_qa_eval["answer_correctness"]

    return df_qa_eval, df_qa

In [8]:
def data_viz_prep(index_name,df_qa_eval,df_docs):
    # This section adds a column to df_documents containing the ids of the questions that used the document as source. 

    # add the infos about questions using each document to the documents dataframe

    # Explode 'source_documents' so each document ID is in its own row alongside the question ID
    df_questions_exploded = df_qa_eval.explode("source_documents")

    # Group by exploded 'source_documents' (document IDs) and aggregate
    agg = (
        df_questions_exploded.groupby("source_documents")
        .agg(
            num_questions=("id", "count"),  # Count of questions referencing the document
            question_ids=(
                "id",
                lambda x: list(x),
            ),  # List of question IDs referencing the document
        )
        .reset_index()
        .rename(columns={"source_documents": "id"})
    )

    # Merge the aggregated information back into df_documents
    df_documents_agg = pd.merge(df_docs, agg, on="id", how="left")

    # Use apply to replace NaN values with empty lists for 'question_ids'
    df_documents_agg["question_ids"] = df_documents_agg["question_ids"].apply(
        lambda x: x if isinstance(x, list) else []
    )
    # Replace NaN values in 'num_questions' with 0
    df_documents_agg["num_questions"] = df_documents_agg["num_questions"].fillna(0)

    # Concatenate the two dataframes
    df_visualize = pd.concat([df_qa_eval, df_documents_agg], axis=0)

    df_questions = df_visualize[~df_visualize["question"].isna()]
    umap = UMAP(n_neighbors=20, min_dist=0.15, metric="cosine", random_state=42).fit(
        df_questions["embedding"].values.tolist()
    )
    umap_questions = umap.transform(df_visualize["embedding"].values.tolist())


    df_without_questions = df_visualize[df_visualize["question"].isna()]
    umap = UMAP(n_neighbors=20, min_dist=0.15, metric="cosine", random_state=42).fit(
        df_without_questions["embedding"].values.tolist()
    )
    umap_docs = umap.transform(df_visualize["embedding"].values.tolist())
    df_visualize["umap_docs"] = umap_docs.tolist()

    umap = UMAP(n_neighbors=20, min_dist=0.15, metric="cosine", random_state=42).fit(
        df_visualize["embedding"].values.tolist()
    )
    umap_all = umap.transform(df_visualize["embedding"].values.tolist())
    df_visualize["umap"] = umap_all.tolist()


    # find the nearet question (by embedding) for each document
    question_embeddings = np.array(df_visualize[df_visualize["question"].notna()]["embedding"].tolist())

    df_visualize["nearest_question_dist"] = [  # brute force, could be optimized using ChromaDB
        np.min([np.linalg.norm(np.array(doc_emb) - question_embeddings, axis=1)])
        for doc_emb in df_visualize["embedding"].values
    ]

    # write the dataframe to parquet for later use
    df_visualize.to_parquet(f'df_{index_name}.parquet')

    return df_visualize

# ChromaDB

## Connect to database

In [9]:
persistent_client = chromadb.PersistentClient(path=os.path.join(os.getenv('LOCAL_DB_PATH'),'chromadb'))   
collections=persistent_client.list_collections()
collections

[Collection(name=text-embedding-3-large-2merge-0),
 Collection(name=text-embedding-3-large-0merge-400),
 Collection(name=text-embedding-3-large-0merge-400-parent-child),
 Collection(name=text-embedding-3-large-2merge-0-queries)]

In [10]:
dbs=[{'index_name':'text-embedding-3-large-2merge-0',
     'query_model':OpenAIEmbeddings(model='text-embedding-3-large',openai_api_key=os.getenv('OPENAI_API_KEY'))},
     {'index_name':'text-embedding-3-large-0merge-400',
     'query_model':OpenAIEmbeddings(model='text-embedding-3-large',openai_api_key=os.getenv('OPENAI_API_KEY'))},
     {'index_name':'text-embedding-3-large-0merge-400-parent-child',
     'query_model':OpenAIEmbeddings(model='text-embedding-3-large',openai_api_key=os.getenv('OPENAI_API_KEY'))},
     {'index_name':'text-embedding-3-large-2merge-0-queries',
     'query_model':OpenAIEmbeddings(model='text-embedding-3-large',openai_api_key=os.getenv('OPENAI_API_KEY'))}]

## Export pickles

In [11]:
# Export all collections to pickles to store them. Uncomment if desired, takes a while.
# for db in dbs:
#     df_temp_chroma=archive_db('ChromaDB',db['index_name'],db['query_model'],export_pickle=True)

# df_temp_chroma.head(5)

## Create data for synthetic dataset

In [12]:
# Select database for determining synthetic dataset
db=dbs[0]

In [13]:
# Inspect the first db, save for synthetic test dataset
docs_vectorstore = Chroma(client=persistent_client,
                        collection_name=db['index_name'],
                        embedding_function=db['query_model'])  
all_docs = docs_vectorstore.get(include=["metadatas", "documents", "embeddings"])
lcdocs_chroma = [Document(page_content=doc, metadata=metadata) 
          for doc, metadata in zip(all_docs['documents'], all_docs['metadatas'])]

print(len(lcdocs_chroma))

2222


In [14]:
# Format docs into dataframe
all_docs = docs_vectorstore.get(include=["metadatas", "documents", "embeddings"])
df_docs = pd.DataFrame(
    {
        "id": [_stable_hash_meta(metadata) for metadata in all_docs["metadatas"]],
        "source": [metadata.get("source") for metadata in all_docs["metadatas"]],
        "page": [metadata.get("page", -1) for metadata in all_docs["metadatas"]],
        "document": all_docs["documents"],
        "embedding": all_docs["embeddings"],
    }
)

## Generate synthetic dataset

Good article on how models/embeddings are used in the `TestsetGenerator`: https://www.pondhouse-data.com/blog/evaluate-rag-performance-using-ragas

Sometimes you'll get a tricky threading error. Fully close vs studio, open a new window, restart the kernel, and it'll clear.

In [15]:
# Set generator inputs
generator_model="gpt-3.5-turbo-0125"
synthetic_generator_llm = ChatOpenAI(model=generator_model, tags=[generator_model])
# synthetic_generator_llm = ChatOpenAI(base_url='https://api-inference.huggingface.co/v1',
#                                     model='meta-llama/Meta-Llama-3-8B-Instruct',
#                                     api_key=os.getenv('HUGGINGFACEHUB_API_TOKEN'),
#                                     temperature=0.1,
#                                     max_tokens=500)


critic_model='gpt-4o'
synthetic_critic_llm = ChatOpenAI(model=critic_model,tags=[critic_model])
# synthetic_critic_llm = ChatOpenAI(base_url='https://api-inference.huggingface.co/v1',
#                                     model='meta-llama/Meta-Llama-3-8B-Instruct',
#                                     api_key=os.getenv('HUGGINGFACEHUB_API_TOKEN'),
#                                     temperature=0.1,
#                                     max_tokens=500)

# embedding_model='text-embedding-3-large'
# synthetic_embeddings = OpenAIEmbeddings(model=embedding_model,api_key=os.getenv('OPENAI_API_KEY'))
synthetic_embeddings=db['query_model']  # Set to be the same as the database

# Run parameters for testset generation
run_config=RunConfig(timeout=1000,
                max_retries=50,
                max_wait=1000,
                max_workers=1)
# run_config=None

# Create generator
generator = TestsetGenerator.from_langchain(
    synthetic_generator_llm,
    synthetic_critic_llm,
    synthetic_embeddings,
    run_config=run_config
)

In [16]:
# Input parameters
eval_size=50    # Number of samples to evaluate at a time. Intended to circumvent OpenAI API rate limits.
n_questions=5   # Number of questions to generate for each evaluation sample.
fname=os.path.join('output',f"testset_{db['index_name']}.csv")
lcdocs=lcdocs_chroma

In [17]:
df_testset=generate_testset(lcdocs,generator,eval_size,n_questions,fname,run_config)

Exception in thread Thread-5:
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 1038, in _bootstrap_inner
    self.run()
  File "/Users/danmueller/Documents/GitHub/aerospace_chatbot/.venv/lib/python3.11/site-packages/ragas/executor.py", line 87, in run
    results = self.loop.run_until_complete(self._aresults())
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 629, in run_until_complete
    self._check_running()
  File "/Library/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/base_events.py", line 588, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running


Index loop: [0, 50, 100, 150, 200, 250, 300, 350, 400, 450, 500, 550, 600, 650, 700, 750, 800, 850, 900, 950, 1000, 1050, 1100, 1150, 1200, 1250, 1300, 1350, 1400, 1450, 1500, 1550, 1600, 1650, 1700, 1750, 1800, 1850, 1900, 1950, 2000, 2050, 2100, 2150, 2200]
Processing index 0 to 50...


  self._invoke_excepthook(self)


ExceptionInRunner: The runner thread which was running the jobs raised an exeception. Read the traceback above to debug it. You can also pass `raise_exceptions=False` incase you want to show only a warning message instead.

  await self._event_pipe_gc()


## RAG questions/answers (batch mode)

### Format dataset and database for RAG

In [None]:
df_questions = df_testset[['question', 'ground_truth']].copy()
df_questions['id'] = 'Question ' + df_questions.index.astype(str)
df_questions['question_by'] = generator_model
df_questions = df_questions[['id', 'question', 'ground_truth', 'question_by']]

In [None]:
# Load the cached RAG answers and source_documents ids from a file - or create an empty column
df_questions_answers = add_cached_column_from_file(
    df_questions, f"rag_response_cache_{db['index_name']}.txt", "question", "answer")

df_questions_answers = add_cached_column_from_file(
    df_questions_answers, f"rag_response_cache_{db['index_name']}.txt", "question", "source_documents")

### Use RAG to generate responses

In [None]:
index_type='ChromaDB'
index_name=db['index_name']
query_model=synthetic_embeddings
llm=synthetic_generator_llm

QA_model_params={'rag_type':'Standard',
                 'k':4,
                 'search_type':'similarity',
                 'local_db_path':os.getenv('LOCAL_DB_PATH')}


In [None]:
df_questions_answers_rag=rag_responses(index_type, index_name, query_model, llm, QA_model_params, df_questions_answers, df_docs)

## Ragas eval, visualize

In [None]:
# Evaluate
df_qa_eval, df_questions_answers_rag = eval_rag(index_name, df_questions_answers_rag)

In [None]:
df_questions_answers_rag

In [None]:
# Link from documents to questions, that used the document as source. Add UMAP column for visualization purposes.
df_visualize=data_viz_prep(index_name,df_qa_eval,df_docs)

In [None]:
# concat the df containing the questions and the df containing the documents
df = pd.read_parquet(f'df_{index_name}.parquet')

# show the dataframe with the question and answer in spotlight
spotlight.show(
    df,
    layout="https://spotlightpublic.blob.core.windows.net/docs-data/rag_demo/layout_rag_3.json",
    dtype={x: Embedding for x in df.keys() if "umap" in x},
)

##  UMAP visualization froms cluster of the questions, workaround: UMAP only on documents

# Pinecone

## Connect to database

In [None]:
pinecone_client = pinecone_client(api_key=os.getenv('PINECONE_API_KEY'))
indexes=pinecone_client.list_indexes()
indexes

In [None]:
dbs=[{'index_name':'voyage-large-2-instruct-2merge-0',
     'query_model': VoyageAIEmbeddings(model='voyage-large-2-instruct', 
                                       voyage_api_key=os.getenv('VOYAGE_API_KEY'), truncation=False)},
     {'index_name':'voyage-large-2-instruct-0merge-400',
     'query_model': VoyageAIEmbeddings(model='voyage-large-2-instruct', 
                                       voyage_api_key=os.getenv('VOYAGE_API_KEY'), truncation=False)}]

In [None]:
# Inspect the first db, save for synthetic test dataset
db=dbs[0]
index = pinecone_client.Index(db['index_name'])
ids=[]
for id in index.list():
    ids.extend(id)

docs=[]
chunk_size=200  # Tune to whatever doesn't error out, 200 won't for serverless
for i in range(0, len(ids), chunk_size):
    print(f"Fetching {i} to {i+chunk_size}")
    vector=index.fetch(ids[i:i+chunk_size])['vectors']
    vector_data = []
    for key, value in vector.items():
        vector_data.append(value)
    docs.extend(vector_data)

lcdocs_pinecone = []
for data in docs:
    data=data['metadata']
    lcdocs_pinecone.append(Document(page_content=data['page_content'],
                           metadata={'page':data['page'],'source':data['source']}))
    
print(len(lcdocs_pinecone))

In [None]:
# Export all collections to pickles to store them
for db in dbs:
    df_temp_pinecone=archive_db('Pinecone',db['index_name'],db['query_model'],export_pickle=True)