In [3]:
import lancedb
from lancedb.pydantic import LanceModel
from lancedb.embeddings import get_registry
from typing import List
from pydantic_ai import Agent
from pydantic import BaseModel, Field
from pydantic_ai.models.gemini import GeminiModel
import os
import numpy as np
import enum

from openai import OpenAI
import openai

import re
import tiktoken
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pandas as pd
from urllib.parse import quote
from uuid import UUID, uuid4
import cohere

import pandas as pd
import logging
from sqlalchemy import create_engine, Column, Integer, String, LargeBinary
from sqlalchemy.orm import declarative_base, sessionmaker, Session, scoped_session, Mapped, relationship
from sqlalchemy.dialects.postgresql import UUID, TSVECTOR
from pgvector.sqlalchemy import Vector
from sqlalchemy.sql import func

from sqlalchemy import (
    Column,
    Integer,
    String,
    Boolean,
    DateTime,
    ForeignKey,
    JSON,
    Text,
    VARCHAR,
    Enum,
    Index,
    desc,
    text

    
)



In [4]:

EMBEDDING_SIZE = 1536
Base = declarative_base()

class DocumentType(enum.Enum):
    MD = 'markdown'
    TXT = 'text'
    PDF = 'pdf'

class DocumentSubject(enum.Enum):
    FINANCE = "finance"
    HR = 'hr'
    TECH = 'tech'

# declare models
class Chunk(Base):
    __tablename__ = "chunks"

    id = Column(Integer, primary_key=True, autoincrement=True)
    document_id = Column(UUID(as_uuid=True), ForeignKey("documents.id"))
    chunk_metadata = Column(JSON)
    chunk = Column(String, nullable=False)
    embedding = Column(Vector(1536))  # Adjust vector dimension to match your embeddings
    document: Mapped["Document"] = relationship(back_populates="chunks") # creates a link to related document
    
    # ---- ADDED FOR FTS ----
    chunk_tsv = Column(TSVECTOR) # The new column for Full-Text Search vectors
    # ---- END ADDED FOR FTS ----

    # ---- ADDED/MODIFIED FOR INDEXES ----
    __table_args__ = (
        # Index for Full-Text Search on the tsvector column
        Index(
            'idx_gin_chunk_tsv',        # Index name
            'chunk_tsv',                # Column to index
            postgresql_using='gin'      # Index type (GIN is best for tsvector)
        ),
        # Index for Vector Search on the embedding column (choose ONE method)
        # Option 1: HNSW (Good balance, requires pgvector >= 0.5.0)
        Index(
            'idx_hnsw_embedding',       # Index name
            'embedding',                # Column to index
            postgresql_using='hnsw',    # Index type
            postgresql_with={'m': 16, 'ef_construction': 64}, # Example parameters (tune these)
            postgresql_ops={'embedding': 'vector_cosine_ops'} # Operator class (use cosine, l2, or ip based on your distance metric)
        ))
    
    class Config:
        orm_mode = True

class Document(Base):
    __tablename__ = "documents"

    pk = Column(Integer, primary_key=True, autoincrement=True)
    id = Column(UUID(as_uuid=True), nullable=False, unique=True)
    title = Column(String(255))
    type = Column(Enum(DocumentType), name = "filetype of document")
    subject = Column(Enum(DocumentSubject), name = "subject of document")
    location = Column(String(255))
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    chunks: Mapped[list["Chunk"]] = relationship()
    
    class Config:
        orm_mode = True

Create test database

In [5]:
test_db_url = "postgresql+psycopg://postgres:password@localhost:5432/test_db"
handbook_db_url = "postgresql+psycopg://postgres:password@localhost:5432/handbook_db"
engine = create_engine(
    url = handbook_db_url,
    )
sessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(engine)
       


In [23]:

DATA_PATH = "../data/handbook-main-content"
RESULT_PATH = "../data/rag-results/"
GOLDEN_TEST_PATH = "../data/gitlab-handbook-golden-test-set.csv"
EMBEDDING_MODEL = "text-embedding-3-small"
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")

def split_markdown_text(text: str, embedding_model: str, document_id: UUID, min_tokens: int = 200, max_tokens: int = 1000, overlap: int = 50):
    md_regex = r"(^#+\s*.*)" #regex which captures all levels of headers in markdown.
    tokenizer = tiktoken.encoding_for_model(embedding_model)
    chunks = []
    temp_chunk = ""
    temp_headers = ""
    splitter = RecursiveCharacterTextSplitter(chunk_size = max_tokens, chunk_overlap = overlap)


    # helper function to create and add chunk to list. Splits chunk if it exceeds max token size
    def add_chunk(chunk, context = ""):
        chunk = f"{context} \n {chunk}"
        if len(tokenizer.encode(chunk)) < max_tokens:
                chunks.append(Chunk(
                    document_id=document_id,
                    context=context.strip(),
                    chunk=chunk.strip()
                ))
        else:
            split_chunks = splitter.split_text(chunk)
            for part in split_chunks:
                chunks.append(Chunk(
                document_id=document_id,
                context=context,
                chunk=part
            ))
                
    
    # Helper function to add a merged chunk if present.
    def flush_temp_chunk():
        nonlocal temp_chunk, temp_headers
        if temp_chunk:
            add_chunk(temp_chunk, temp_headers)
            temp_chunk, temp_headers = "", ""

    #split text by headers
    sections = re.split(md_regex, text, flags=re.MULTILINE)

    #capture first text which often does not start with a header
    if len(tokenizer.encode(sections[0])) < min_tokens:
        temp_chunk += sections[0] + "\n"
    else:
        add_chunk(sections[0])

    for i in range(1, len(sections), 2): # loop through headers and text in sections
        header = sections[i].strip()
        content = sections[i+1].strip() if i + 1 <= len(sections) else ""

        token_count = len(tokenizer.encode(content))

        # add chunk to chunk list or to temporary chunk to combine with other chunks
        if token_count < min_tokens:
            temp_chunk += content + "\n"  
            temp_headers += header +"\n"        
        else:
            # add temp chunk if it exists
            flush_temp_chunk()

            add_chunk(content, header)
           
    # add remaining temp chunk if it exists
    flush_temp_chunk()
    
    return chunks

def parse_documents(folder_path, db, embed_model: str = EMBEDDING_MODEL, min_tokens: int = 200, max_tokens: int = 1000):
    chunks = []
    HANDBOOK_ROOT_URL = "https://gitlab.com/gitlab-com/content-sites/handbook/-/tree/main/content"

    # clear tables
    db.query(Chunk).delete()
    db.query(Document).delete()
    db.commit()

    # walk through all folders and subfolders
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith('.md'):                  #only extract text from markdown files
                file_path = os.path.join(root, file)

                # fix url to properly link to the handbook source
                file_url = file_path.replace("../data/handbook-trimmed\\content", HANDBOOK_ROOT_URL)
                file_url = file_url.replace('\\', '/')
                

                # add document to database
                doc = Document(
                    id = uuid4(),
                    title = file.split(".md")[0],
                    type = DocumentType.MD,
                    location = file_url)
                db.add(doc)

                with open(file_path, 'r', encoding="utf-8") as f:
                    content = f.read()
                    chunks.extend(split_markdown_text(content, embed_model, doc.id, min_tokens, max_tokens))

    print(f"Total amount of chunks: {len(chunks)}")
    db.commit()
    return chunks

def remove_small_chunks(chunks, min_tokens = 200, embedding_model = EMBEDDING_MODEL):
    tokenizer = tiktoken.encoding_for_model(embedding_model)
    total_count = 0
    trimmed_chunks = []
    for chunk in chunks:
        
        count = len(tokenizer.encode(chunk.chunk))
        if count > min_tokens:
            trimmed_chunks.append(chunk)

    print(f"amount of trimmed chunks: {len(trimmed_chunks)}")
    return trimmed_chunks

def create_embeddings(chunks, batch_size = 500, model=EMBEDDING_MODEL):
    client = OpenAI()
    for i in range(0, len(chunks), batch_size):
        batch = chunks[i:i+batch_size]
        texts = [chunk.chunk for chunk in batch]
        try:
            response = client.embeddings.create(input = texts, model=model)
            print("batch done")
                        # Extract the embeddings from the response
            embeddings = [entry.embedding for entry in response.data]
            for chunk, embedding in zip(batch, embeddings):
                chunk.embedding = embedding
        except Exception as e:
            print(f"Embedding failed with error: {e}")
    
def add_chunks_to_db(chunks, db):
   
    for chunk in chunks:
        db.add(chunk)

    db.commit()
    





In [7]:
def create_embedded_chunks(db: Session, data_path: str = DATA_PATH, embed_model: str = EMBEDDING_MODEL, min_tokens: int = 200, max_tokens: int = 1000, trim_chunks: bool = True):
    
    #parse chunks from documents
    chunks = parse_documents(DATA_PATH, db, embed_model, min_tokens, max_tokens)

    # remove short chunks
    if trim_chunks:
        chunks= remove_small_chunks(chunks, min_tokens)

    # create embeddings with openAI API
    create_embeddings(chunks, batch_size=500, model = embed_model)

    # add chunks to database
    add_chunks_to_db(chunks, db)


In [8]:
from scipy.stats import spearmanr
import statistics

def evaluate_rag(results, reranked_results, retrieval):
    """
    Evaluates RAG retrieval performance using normalized relevance scores,
    Spearman's rank correlation, and Mean Reciprocal Rank (MRR).
    
    :param results: List of retrieved chunks (original order)
    :param reranked_results: Cohere reranking results (ideal order)
    :return: Dictionary with evaluation metrics
    """
    relevance_scores = []
    normalized_scores = []
    scores_dict = {
        "dense": "similarity_score",
        "sparse": "rank",
        "hybrid": "hybrid_score"
    }
    

    # Extract Cohere relevance scores and normalize them
    for i, chunk in enumerate(results):
        relevance_score = next((item.relevance_score for item in reranked_results.results if item.index == i), 0)

         # Store scores for averaging later
        relevance_scores.append(relevance_score)
        
        if retrieval == "hybrid":
            print(f"Chunk {chunk['chunk'].id} - Relevance Score: {relevance_score} - dense_score: {chunk['dense_score']} - sparse_score: {chunk['sparse_score']} - hybrid_score: {chunk['hybrid_score']}")
        else:
            print(f"Chunk {chunk['chunk'].id} - Relevance Score: {relevance_score} - {scores_dict[retrieval]}: {chunk[scores_dict[retrieval]]}")


    # Compute Spearman Rank Correlation
    retrieved_ranks = list(range(len(results)))  # Original order (0, 1, 2, ...)
    cohere_ranks = [item.index for item in reranked_results.results]  # Ideal order
    spearman_corr, _ = spearmanr(retrieved_ranks, cohere_ranks)

    # Compute Mean Reciprocal Rank (MRR)
    def reciprocal_rank(retrieved, ideal):
        for i, chunk in enumerate(retrieved):
            if chunk['chunk'].id == ideal[0]['chunk'].id:  # Best Cohere chunk
                return 1 / (i + 1)
        return 0  # Not found

    best_chunk = results[reranked_results.results[0].index]  # Best chunk from Cohere
    mrr = reciprocal_rank(results, [best_chunk])
    
    # Return metrics
    evaluation_metrics = {
        "Spearman Rank Correlation": spearman_corr,
        "Mean Reciprocal Rank (MRR)": mrr,
        "Average Relevance Score": statistics.mean(relevance_scores),
        "Average Normalised Score": statistics.mean(relevance_scores)
    }
    
    print("\n🔍 Evaluation Metrics:")
    for key, value in evaluation_metrics.items():
        print(f"{key}: {value:.4f}")

    return evaluation_metrics

In [9]:
# def dense_search(query: str, query_method: str = "cosine_distance", n: int = 10, explain = False):
#     db = sessionLocal()
#     client = OpenAI()
           
#     # perform vector search on database
#     query_embedding = client.embeddings.create(
#         input = query,
#         model=EMBEDDING_MODEL
#         ).data[0].embedding

#     query_vector = np.array(query_embedding).tolist()
    
#     # TOO EXPENSIVE NEEDS ALTERNATIVE SOLUTION
#     # # rerank all chunks to get the chunk with highest relevance score 
#     # all_results = (
#     #     db.query(Chunk)
#     #     .all()
#     # )

#     # # Get list of strings to rerank with Cohere
#     # docs = [chunk.chunk for chunk in all_results]

#     # # rerank all results
#     # reranked_results = co.rerank(model="rerank-v3.5", query = query, documents = docs, top_n = 1)
#     # max_score = reranked_results.results[0].relevance_score

#     # Define comparator options
#     comparators = {
#         "l2_distance": Chunk.embedding.l2_distance(query_vector),
#         "l1_distance": Chunk.embedding.l1_distance(query_vector),
#         "cosine_distance": Chunk.embedding.cosine_distance(query_vector),
#         "dot_product": Chunk.embedding.max_inner_product(query_vector)  # Dot product needs descending order
#     }

#     if explain:
#         orm_query = db.query(Chunk, comparators[query_method].label("cosine_distance")).order_by(comparators[query_method]).limit(n)
#         print(f"\n--- Running EXPLAIN ANALYZE for query_method='{query_method}' (probes={100 if query_method=='cosine_distance' else 'N/A'}) ---")
        
#         # Compile the ORM query to SQL
#         # Use the session's dialect to ensure correct parameter handling

#         #db.execute(text("SET enable_indexscan = off;"))
#         # db.execute(text("SET ivf.probes = 100;"))
#         # db.execute(text("ANALYZE chunks;"))

#         compiled = orm_query.statement.compile(dialect=db.bind.dialect, compile_kwargs ={"literal_binds": True})
#         params = compiled.params
#         # Prepend EXPLAIN ANALYZE
#         explain_sql = "EXPLAIN ANALYZE " + str(compiled)

#         print(f"Compiled SQL for EXPLAIN: {explain_sql}")


#         # Execute EXPLAIN ANALYZE
#         explain_result = db.execute(text(explain_sql), params)

#         # Fetch and print the plan output
#         plan_output = "\n".join([row[0] for row in explain_result.fetchall()])
#         print("\n--- EXPLAIN ANALYZE Output: ---")
#         print(plan_output)
#         print("-------------------------------\n")
        
#         # Return an empty list or None when explaining, as we didn't fetch results
#         return None 

#     #db.execute(text("SET enable_indexscan = off;"))
#     query_results = (
#         db.query(Chunk, comparators[query_method].label("cosine_distance"))
#         .order_by(comparators[query_method])
#         .limit(n)
#         .all()
#     )
    
#     # Convert tuples to chunks with scores
#     scored_chunks = [
#         {
#             "chunk": chunk,
#             "similarity_score": 1 - cosine_distance # Attach similarity score
#         }
#         for chunk, cosine_distance in query_results
#     ]
        
#     return scored_chunks

# def sparse_search(query: str, n: int = 10):
#     db = sessionLocal()
#     try:
#         # query_tsquery = func.plainto_tsquery('english', query_text)
#         tsquery = func.websearch_to_tsquery('english', query)

#         rank_func = func.ts_rank_cd(Chunk.chunk_tsv, tsquery).label('rank')
        
#         results = (db.query(Chunk, rank_func)
#             .where(Chunk.chunk_tsv.op('@@')(tsquery))
#             .order_by(desc(rank_func)) # Higher rank is better
#             .limit(n).all()
#         )
#         # Convert tuples to chunks with scores
#         scored_chunks = [
#             {
#                 "chunk": chunk,
#                 "rank": rank_score  # Attach ranking score
#             }
#             for chunk, rank_score in results
#         ]
       
#         return scored_chunks
#     except Exception as e:
#         print(f"Something went wrong during sparse search: {e}")

    
# def hybrid_search(query: str, dense_comparator: str = "cosine_distance", alpha: float = 0.5, n: int = 10):
#     """
#     Performs a hybrid search combining dense and sparse retrieval methods.
    
#     :param query: Search query
#     :param dense_comparator: Similarity metric for dense search
#     :param alpha: Weight for combining scores (0.5 means equal weight to both)
#     :param n: Number of top results to return
#     :return: List of combined search results
#     """
    
#     # Perform both searches
#     dense_results = dense_search(query, dense_comparator, n)
#     sparse_results = sparse_search(query, n)

#     # Create a dictionary to store combined scores
#     result_dict = {}
#     results = []

#     # Normalize Dense Scores
#     max_dense_score = max(item["similarity_score"] for item in dense_results) if dense_results else 1
#     for item in dense_results:
#         norm_dense_score = item["similarity_score"] / max_dense_score
#         result_dict[item["chunk"].id] = {"chunk": item["chunk"], "dense_score": norm_dense_score, "sparse_score": 0}

#     # Normalize Sparse Scores
#     max_sparse_score = max(item["rank"] for item in sparse_results) if sparse_results else 1
#     for item in sparse_results:
#         norm_sparse_score = item["rank"] / max_sparse_score
#         if item["chunk"].id in result_dict:
#             result_dict[item["chunk"].id]["sparse_score"] = norm_sparse_score
#         else:
#             result_dict[item["chunk"].id] = {"chunk": item["chunk"], "dense_score": 0, "sparse_score": norm_sparse_score}

#     # Compute Hybrid Score
#     for chunk_id, values in result_dict.items():
#         values["hybrid_score"] = alpha * values["dense_score"] + (1 - alpha) * values["sparse_score"]
#         results.append({
#             "chunk": values["chunk"],
#             "hybrid_score": values["hybrid_score"],
#             "dense_score": values["dense_score"],
#             "sparse_score": values["sparse_score"]      
#         })


#     # Sort results by hybrid score (descending)
#     sorted_results = sorted(results, key=lambda x: x["hybrid_score"], reverse=True)
    
#     # Return top-N results
#     return sorted_results
 

In [19]:
def dense_search(
    query: str,
    db,
    query_method: str = "cosine_distance",
    n: int = 10
) -> list[dict]:
    """
    Performs dense vector similarity search using OpenAI embeddings and pgvector.

    Args:
        query (str): The natural language query to embed and search with.
        db (scoped_session): SQLAlchemy database session.
        query_method (str): Distance metric to use for similarity (e.g., cosine_distance).
        n (int): Number of top results to retrieve.
        explain (bool): Whether to return additional debug information.

    Returns:
        list[dict]: A list of chunks with their similarity scores.
    """
    try:
        # Get vector embedding for the query
        query_embedding = openai.embeddings.create(
            input=query,
            model=EMBEDDING_MODEL
        ).data[0].embedding
        query_vector = np.array(query_embedding).tolist()

        # Distance function map
        comparators = {
            "l2_distance": Chunk.embedding.l2_distance(query_vector),
            "l1_distance": Chunk.embedding.l1_distance(query_vector),
            "cosine_distance": Chunk.embedding.cosine_distance(query_vector),
            "dot_product": Chunk.embedding.max_inner_product(query_vector)
        }

        if query_method not in comparators:
            raise ValueError(f"Invalid query method '{query_method}'")

        comparator = comparators[query_method]

        # Run DB query with eager-loaded document
        query_results = (
            db.query(Chunk, comparator.label("score"))
            # .options(joinedload(Chunk.document))
            .order_by(comparator)
            .limit(n)
            .all()
        )

        # Normalize cosine distance (lower = better)
        scored_chunks = [
            {
                "chunk": chunk,
                "similarity_score": 1 - score if query_method == "cosine_distance" else score
            }
            for chunk, score in query_results
        ]

        return scored_chunks
    except Exception as e:
        print(f"[sparse_search] Search failed: {e}")
        return []


def sparse_search(query: str, db, n: int = 10) -> list[dict]:
    """
    Performs sparse full-text search using PostgreSQL's tsvector and ts_rank_cd.

    Args:
        query (str): The user query for text search.
        db (scoped_session): SQLAlchemy session.
        n (int): Number of results to return.

    Returns:
        list[dict]: Ranked chunks with relevance scores.
    """
    try:
        tsquery = func.websearch_to_tsquery('english', query)
        rank_func = func.ts_rank_cd(Chunk.chunk_tsv, tsquery).label('rank')

        results = (
            db.query(Chunk, rank_func)
            # .options(joinedload(Chunk.document))
            .where(Chunk.chunk_tsv.op('@@')(tsquery))
            .order_by(desc(rank_func))
            .limit(n)
            .all()
        )

        return [
            {"chunk": chunk, "rank": rank}
            for chunk, rank in results
        ]
    except Exception as e:
        print(f"[sparse_search] Search failed: {e}")
        return []

  
    
def hybrid_search(query: str, db, dense_comparator: str = "cosine_distance", alpha: float = 0.5, n: int = 10):
    """
    Performs a hybrid search combining dense and sparse retrieval methods.
    
    :param query: Search query
    :param dense_comparator: Similarity metric for dense search
    :param alpha: Weight for combining scores (0.5 means equal weight to both)
    :param n: Number of top results to return
    :return: List of combined search results
    """
    
    # Perform both searches
    dense_results = dense_search(query, db, dense_comparator, n)
    sparse_results = sparse_search(query, db, n)

    # Create a dictionary to store combined scores
    result_dict = {}
    results = []

    # Normalize Dense Scores
    max_dense_score = max(item["similarity_score"] for item in dense_results) if dense_results else 1
    for item in dense_results:
        norm_dense_score = item["similarity_score"] / max_dense_score
        result_dict[item["chunk"].id] = {"chunk": item["chunk"], "dense_score": norm_dense_score, "sparse_score": 0}

    # Normalize Sparse Scores
    max_sparse_score = max(item["rank"] for item in sparse_results) if sparse_results else 1
    for item in sparse_results:
        norm_sparse_score = item["rank"] / max_sparse_score
        if item["chunk"].id in result_dict:
            result_dict[item["chunk"].id]["sparse_score"] = norm_sparse_score
        else:
            result_dict[item["chunk"].id] = {"chunk": item["chunk"], "dense_score": 0, "sparse_score": norm_sparse_score}

    # Compute Hybrid Score
    for chunk_id, values in result_dict.items():
        values["hybrid_score"] = alpha * values["dense_score"] + (1 - alpha) * values["sparse_score"]
        results.append({
            "chunk": values["chunk"],
            "hybrid_score": values["hybrid_score"],
            "dense_score": values["dense_score"],
            "sparse_score": values["sparse_score"]      
        })

    # Sort results by hybrid score (descending)
    sorted_results = sorted(results, key=lambda x: x["hybrid_score"], reverse=True)
    
    # Return top-N results
    return sorted_results
 

In [12]:
def evaluate_search(query: str, db, retrieval: str = "dense", dense_comparator: str = "cosine_distance", n: int = 10):
    #initialise Cohere reranker
    co = cohere.Client(os.environ.get("COHERE_API_KEY"))
    docs = []
    
    print(f"Query: {query}")

    #perform search
    if retrieval.lower() == "dense":
        query_results = dense_search(query, db, dense_comparator)
    elif retrieval.lower() == "sparse":
        query_results = sparse_search(query, db)
    elif retrieval.lower() == "hybrid":
        query_results = hybrid_search(query, db, dense_comparator)
    else:
        raise ValueError("Invalid retrieval method. Choose 'dense', 'sparse', or 'hybrid'.")

    # rerank query results
    docs = [result['chunk'].chunk for result in query_results]
    reranked_results = co.rerank(model="rerank-v3.5", query = query, documents = docs, top_n = n * 2)

    #evaluate results
   

    metrics = evaluate_rag(query_results, reranked_results, retrieval)
    scored_chunks = []

    # for i, chunk in enumerate(query_results):
    #     score = next((item.relevance_score for item in reranked_results.results if item.index == i), None)
    #     normalised_score = score/max_score
    #     scored_chunks.append((chunk, score, normalised_score))
        
    return metrics




In [17]:
df_dense = pd.DataFrame(columns=["retrieval", "embedding_model", "chunk_size", "query", 'average_relevance_score', 'normalised_relevance_score', "Spearman Rank Correlation", "Mean Reciprocal Rank (MRR)"])
df_sparse = pd.DataFrame(columns=["retrieval", "embedding_model", "chunk_size", "query", 'average_relevance_score', 'normalised_relevance_score', "Spearman Rank Correlation", "Mean Reciprocal Rank (MRR)"])
df_hybrid = pd.DataFrame(columns=["retrieval", "embedding_model", "chunk_size", "query", 'average_relevance_score', 'normalised_relevance_score', "Spearman Rank Correlation", "Mean Reciprocal Rank (MRR)"])

In [13]:
def test_pipeline(
        df: pd.DataFrame,
        create_db: bool = False,
        data_path: str = DATA_PATH, 
        embed_model: str = EMBEDDING_MODEL, 
        min_tokens: int = 200, 
        max_tokens: int = 1000,
        trim_chunks: bool = True, 
        retrieval = "dense",
        dense_comparator = "cosine_distance"):
    
    db = sessionLocal()
    test_queries = [
    "setting up development environment?",
    "necessary software?",
    "Paid Time Off (PTO)",
    "How do I request time off",
    "Sick leave",
    "meal expanses limit",
    "set up meeting",
    "gitlabs coding standards"]
    # test_queries = [
    # "setting up development environment?"
    # ]
    if create_db:
        create_embedded_chunks(db, data_path, embed_model, min_tokens, max_tokens, trim_chunks)

    
    print(f"Showing results from {retrieval} search:")

    for query in test_queries:
        metrics = evaluate_search(query, db, retrieval, dense_comparator)
        new_row = {
            "retrieval": retrieval,
            "embedding_model": embed_model,
            "chunk_size": f"{min_tokens} - {max_tokens}",
            "query": query,
            "average_relevance_score": metrics["Average Relevance Score"],
            "normalised_relevance_score": metrics["Average Normalised Score"],
            "Spearman Rank Correlation": metrics["Spearman Rank Correlation"],
            "Mean Reciprocal Rank (MRR)": metrics["Mean Reciprocal Rank (MRR)"]
}       
        df.loc[len(df)] = new_row

    db.close()
  
    return df

In [14]:
db = sessionLocal()
dense_search("setting up development environment")

TypeError: dense_search() missing 1 required positional argument: 'db'

In [20]:
result_df = test_pipeline(df_hybrid, retrieval='hybrid')

Showing results from hybrid search:
Query: setting up development environment?
Chunk 5179 - Relevance Score: 0.756376 - dense_score: 0.9239201193548958 - sparse_score: 1.0 - hybrid_score: 0.9619600596774479
Chunk 5180 - Relevance Score: 0.7300017 - dense_score: 0.9052993162905258 - sparse_score: 1.0 - hybrid_score: 0.9526496581452629
Chunk 6242 - Relevance Score: 0.31154254 - dense_score: 1.0 - sparse_score: 0 - hybrid_score: 0.5
Chunk 5178 - Relevance Score: 0.80405265 - dense_score: 0 - sparse_score: 0.9459437207207368 - hybrid_score: 0.4729718603603684
Chunk 30031 - Relevance Score: 0.3554845 - dense_score: 0.9456348775261706 - sparse_score: 0 - hybrid_score: 0.4728174387630853
Chunk 10886 - Relevance Score: 0.1682824 - dense_score: 0.9412153779665121 - sparse_score: 0 - hybrid_score: 0.47060768898325606
Chunk 30030 - Relevance Score: 0.30062285 - dense_score: 0.9410965275813138 - sparse_score: 0 - hybrid_score: 0.4705482637906569
Chunk 11241 - Relevance Score: 0.35307175 - dense_sc

In [21]:
result_df

Unnamed: 0,retrieval,embedding_model,chunk_size,query,average_relevance_score,normalised_relevance_score,Spearman Rank Correlation,Mean Reciprocal Rank (MRR)
0,hybrid,text-embedding-3-small,200 - 1000,setting up development environment?,0.47356,0.47356,-0.007224,0.25
1,hybrid,text-embedding-3-small,200 - 1000,necessary software?,0.301512,0.301512,0.584962,0.5
2,hybrid,text-embedding-3-small,200 - 1000,Paid Time Off (PTO),0.643521,0.643521,-0.768421,0.058824
3,hybrid,text-embedding-3-small,200 - 1000,How do I request time off,0.501917,0.501917,0.890226,1.0
4,hybrid,text-embedding-3-small,200 - 1000,Sick leave,0.548409,0.548409,0.374436,0.083333
5,hybrid,text-embedding-3-small,200 - 1000,meal expanses limit,0.11924,0.11924,-0.428571,0.066667
6,hybrid,text-embedding-3-small,200 - 1000,set up meeting,0.237353,0.237353,-0.618045,0.076923
7,hybrid,text-embedding-3-small,200 - 1000,gitlabs coding standards,0.476001,0.476001,0.550376,1.0


In [62]:
query_method = "cosine_distance"
#results_df_dense = test_pipeline(df_dense, create_db=False, trim_chunks=True, retrieval= "dense", dense_comparator="cosine_distance")
#results_df_dense.to_csv(f"{RESULT_PATH}dense.csv")
#results_df_sparse = test_pipeline(df_sparse, create_db=False, trim_chunks=True, retrieval= "sparse")
#results_df_sparse.to_csv(f"{RESULT_PATH}sparse.csv")
results_df_hybrid = test_pipeline(df_hybrid, retrieval='hybrid')
#results_df_hybrid.to_csv(f"{RESULT_PATH}hybrid.csv")

Showing results from hybrid search:
Query: setting up development environment?
Value of enable_seqscan JUST BEFORE executing EXPLAIN: on

--- Running EXPLAIN ANALYZE for query_method='cosine_distance' (probes=100) ---
Compiled SQL for EXPLAIN: EXPLAIN ANALYZE SELECT chunks.id, chunks.document_id, chunks.context, chunks.chunk, chunks.embedding, chunks.chunk_tsv, chunks.embedding <=> '[-0.031588368117809296,-0.04046443849802017,0.09309431910514832,-0.008797752670943737,0.01267450675368309,-0.038271527737379074,0.09251998364925385,0.03453835844993591,-0.019657885655760765,-0.009678833186626434,0.032893672585487366,0.03187553584575653,-0.010266220197081566,-0.030283063650131226,0.05503163859248161,0.017765194177627563,0.004999316297471523,0.034198977053165436,-0.040699392557144165,0.0761253610253334,0.04383212700486183,-0.010305378586053848,-0.02327357977628708,0.0058118682354688644,-0.0006110456888563931,-0.005609546322375536,-0.03727949783205986,0.05518827587366104,0.016655685380101204,0

TypeError: 'NoneType' object is not iterable

In [109]:
results_df_hybrid

Unnamed: 0,retrieval,embedding_model,chunk_size,query,average_relevance_score,normalised_relevance_score,Spearman Rank Correlation,Mean Reciprocal Rank (MRR)
0,hybrid,text-embedding-3-small,200 - 1000,setting up development environment?,0.306525,0.306525,0.299248,0.083333
1,hybrid,text-embedding-3-small,200 - 1000,necessary software?,0.150516,0.150516,0.621053,0.5
2,hybrid,text-embedding-3-small,200 - 1000,Paid Time Off (PTO),0.558035,0.558035,0.457172,1.0
3,hybrid,text-embedding-3-small,200 - 1000,How do I request time off,0.252645,0.252645,0.605263,0.25
4,hybrid,text-embedding-3-small,200 - 1000,Sick leave,0.529405,0.529405,0.296569,0.166667
5,hybrid,text-embedding-3-small,200 - 1000,meal expanses limit,0.019019,0.019019,0.290909,0.5
6,hybrid,text-embedding-3-small,200 - 1000,set up meeting,0.311485,0.311485,0.384211,1.0
7,hybrid,text-embedding-3-small,200 - 1000,gitlabs coding standards,0.613979,0.613979,0.12807,1.0
8,hybrid,text-embedding-3-small,200 - 1000,setting up development environment?,0.306525,0.306525,0.299248,0.083333
9,hybrid,text-embedding-3-small,200 - 1000,necessary software?,0.150516,0.150516,0.621053,0.5


In [104]:
results_df_hybrid = test_pipeline(df_hybrid, retrieval='hybrid', dense_comparator="cosine_distance")

Showing results from hybrid search:
Query: setting up development environment?
Chunk 8110 - Relevance Score: 0.40797067 - dense_score: 1.0 - sparse_score: 0 - hybrid_score: 0.5
Chunk 3042 - Relevance Score: 0.7570233 - dense_score: 0 - sparse_score: 1.0 - hybrid_score: 0.5
Chunk 3324 - Relevance Score: 0.24588585 - dense_score: 0.9964826952650555 - sparse_score: 0 - hybrid_score: 0.49824134763252775
Chunk 3699 - Relevance Score: 0.30703637 - dense_score: 0.9848164048052285 - sparse_score: 0 - hybrid_score: 0.49240820240261424
Chunk 19473 - Relevance Score: 0.11931599 - dense_score: 0.9589768148543789 - sparse_score: 0 - hybrid_score: 0.47948840742718946
Chunk 14662 - Relevance Score: 0.3258213 - dense_score: 0.9337536748549158 - sparse_score: 0 - hybrid_score: 0.4668768374274579
Chunk 19607 - Relevance Score: 0.18866788 - dense_score: 0.9229720641365494 - sparse_score: 0 - hybrid_score: 0.4614860320682747
Chunk 18369 - Relevance Score: 0.22164577 - dense_score: 0.9199170150017324 - spa

In [94]:
results_df_hybrid

Unnamed: 0,retrieval,embedding_model,chunk_size,query,average_relevance_score,normalised_relevance_score,Spearman Rank Correlation,Mean Reciprocal Rank (MRR)
0,hybrid,text-embedding-3-small,200 - 1000,setting up development environment?,0.306525,0.306525,0.299248,0.083333
1,hybrid,text-embedding-3-small,200 - 1000,necessary software?,0.150516,0.150516,0.621053,0.5
2,hybrid,text-embedding-3-small,200 - 1000,Paid Time Off (PTO),0.558035,0.558035,0.457172,1.0
3,hybrid,text-embedding-3-small,200 - 1000,How do I request time off,0.252645,0.252645,0.605263,0.25
4,hybrid,text-embedding-3-small,200 - 1000,Sick leave,0.529405,0.529405,0.296569,0.166667
5,hybrid,text-embedding-3-small,200 - 1000,meal expanses limit,0.019019,0.019019,0.290909,0.5
6,hybrid,text-embedding-3-small,200 - 1000,set up meeting,0.311485,0.311485,0.384211,1.0
7,hybrid,text-embedding-3-small,200 - 1000,gitlabs coding standards,0.613979,0.613979,0.12807,1.0
8,hybrid,text-embedding-3-small,200 - 1000,setting up development environment?,0.306525,0.306525,0.299248,0.083333
9,hybrid,text-embedding-3-small,200 - 1000,necessary software?,0.150516,0.150516,0.621053,0.5


Testing with Golden Test set

In [36]:
import pandas as pd
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity
from sentence_transformers import SentenceTransformer
import re
import csv
import json
from typing import List, Dict, Tuple, Any, Optional
import os

class RAGEvaluator:
    def __init__(self, golden_test_set_path: str, model_name: str = "all-MiniLM-L6-v2"):
        """
        Initialize the RAG evaluation system
        
        Args:
            golden_test_set_path: Path to the CSV file containing the golden test set
            model_name: The sentence transformer model to use for semantic similarity
        """
        self.golden_test_set_path = golden_test_set_path
        self.golden_df = self._load_golden_test_set()
        self.encoder = SentenceTransformer(model_name)
        
    def _load_golden_test_set(self) -> pd.DataFrame:
        """Load the golden test set from CSV"""
        try:
            # Try reading with common encodings
            for encoding in ['utf-8', 'latin-1', 'ISO-8859-1']:
                try:
                    df = pd.read_csv(self.golden_test_set_path, encoding=encoding, delimiter=';')
                    return df
                except UnicodeDecodeError:
                    continue
                    
            # If all encodings fail, try excel format
            return pd.read_excel(self.golden_test_set_path)
            
        except Exception as e:
            print(f"Error loading golden test set: {e}")
            # Create empty DataFrame with expected columns if loading fails
            return pd.DataFrame(columns=["ID", "Question", "Answer", "Source File", "Relevant Section"])
    
    def _normalize_path(self, path: str) -> str:
        """Normalize file paths for comparison"""
        # Remove leading/trailing whitespace, normalize slashes
        path = path.strip()
        path = path.replace('\\', '/')
        
        # Remove leading "/" if present
        if path.startswith('/'):
            path = path[1:]
            
        # Handle cases where the path might be in a different format
        path = re.sub(r'^.*?content/', 'content/', path)
        
        return path.lower()
    
    def evaluate_retrieval(self, question_id: int, retrieved_sources: List[str]) -> Dict[str, Any]:
        """
        Evaluate if the retrieved sources match the expected sources for a question
        
        Args:
            question_id: The ID of the question to evaluate
            retrieved_sources: List of source paths retrieved by the RAG system
            
        Returns:
            Dictionary with evaluation metrics
        """
        # Get the golden source(s) for this question
        question_row = self.golden_df[self.golden_df["ID"] == question_id]
        if question_row.empty:
            return {"error": f"Question ID {question_id} not found in golden test set"}
        
        golden_sources_raw = question_row["Source File"].iloc[0]
        golden_sources = [self._normalize_path(src.strip()) for src in golden_sources_raw.split(',')]
        
        # Normalize retrieved sources
        normalized_retrieved = [self._normalize_path(src) for src in retrieved_sources]
        
        # Check exact matches
        exact_matches = set(golden_sources).intersection(set(normalized_retrieved))
        
        # Calculate partial matches (if a retrieved source contains or is contained in a golden source)
        partial_matches = []
        for g_src in golden_sources:
            for r_src in normalized_retrieved:
                if g_src in r_src or r_src in g_src:
                    if (g_src, r_src) not in partial_matches and (r_src, g_src) not in partial_matches:
                        partial_matches.append((g_src, r_src))
        
        # Calculate metrics
        precision = len(exact_matches) / len(normalized_retrieved) if normalized_retrieved else 0
        recall = len(exact_matches) / len(golden_sources) if golden_sources else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            "question_id": question_id,
            "golden_sources": golden_sources,
            "retrieved_sources": normalized_retrieved,
            "exact_matches": list(exact_matches),
            "partial_matches": partial_matches,
            "precision": precision,
            "recall": recall,
            "f1_score": f1,
            "retrieval_success": len(exact_matches) > 0 or len(partial_matches) > 0
        }
    
    def evaluate_answer(self, question_id: int, generated_answer: str) -> Dict[str, Any]:
        """
        Evaluate the quality of a generated answer against the golden answer
        
        Args:
            question_id: The ID of the question to evaluate
            generated_answer: The answer generated by the RAG system
            
        Returns:
            Dictionary with evaluation metrics
        """
        # Get the golden answer for this question
        question_row = self.golden_df[self.golden_df["ID"] == question_id]
        if question_row.empty:
            return {"error": f"Question ID {question_id} not found in golden test set"}
        
        golden_answer = question_row["Answer"].iloc[0]
        question = question_row["Question"].iloc[0]
        
        # Calculate semantic similarity
        golden_embedding = self.encoder.encode([golden_answer])[0]
        generated_embedding = self.encoder.encode([generated_answer])[0]
        semantic_similarity = cosine_similarity([golden_embedding], [generated_embedding])[0][0]
        
        # Simple lexical overlap (Jaccard similarity)
        golden_tokens = set(re.findall(r'\b\w+\b', golden_answer.lower()))
        generated_tokens = set(re.findall(r'\b\w+\b', generated_answer.lower()))
        
        jaccard = len(golden_tokens.intersection(generated_tokens)) / len(golden_tokens.union(generated_tokens)) if golden_tokens or generated_tokens else 0
        
        return {
            "question_id": question_id,
            "question": question,
            "golden_answer": golden_answer,
            "generated_answer": generated_answer,
            "semantic_similarity": float(semantic_similarity),
            "lexical_overlap": jaccard
        }
    
    def run_evaluation(self, rag_results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Run a full evaluation on a batch of RAG results
        
        Args:
            rag_results: List of dictionaries, each containing:
                - question_id: The ID of the question
                - retrieved_sources: List of source paths retrieved by the RAG
                - generated_answer: The answer generated by the system
                
        Returns:
            Dictionary with aggregated evaluation metrics
        """
        retrieval_results = []
        answer_results = []
        
        for result in rag_results:
            question_id = result["question_id"]
            retrieved_sources = result.get("retrieved_sources", [])
            generated_answer = result.get("generated_answer", "")
            
            retrieval_eval = self.evaluate_retrieval(question_id, retrieved_sources)
            retrieval_results.append(retrieval_eval)
            
            answer_eval = self.evaluate_answer(question_id, generated_answer)
            answer_results.append(answer_eval)
        
        # Calculate aggregate metrics
        retrieval_success_rate = sum(1 for r in retrieval_results if r.get("retrieval_success", False)) / len(retrieval_results) if retrieval_results else 0
        avg_retrieval_precision = sum(r.get("precision", 0) for r in retrieval_results) / len(retrieval_results) if retrieval_results else 0
        avg_retrieval_recall = sum(r.get("recall", 0) for r in retrieval_results) / len(retrieval_results) if retrieval_results else 0
        avg_retrieval_f1 = sum(r.get("f1_score", 0) for r in retrieval_results) / len(retrieval_results) if retrieval_results else 0
        
        avg_semantic_similarity = sum(a.get("semantic_similarity", 0) for a in answer_results) / len(answer_results) if answer_results else 0
        avg_lexical_overlap = sum(a.get("lexical_overlap", 0) for a in answer_results) / len(answer_results) if answer_results else 0
        
        return {
            "num_questions_evaluated": len(rag_results),
            "retrieval_metrics": {
                "success_rate": retrieval_success_rate,
                "avg_precision": avg_retrieval_precision,
                "avg_recall": avg_retrieval_recall,
                "avg_f1": avg_retrieval_f1
            },
            "answer_metrics": {
                "avg_semantic_similarity": float(avg_semantic_similarity),
                "avg_lexical_overlap": avg_lexical_overlap
            },
            "detailed_results": {
                "retrieval": retrieval_results,
                "answers": answer_results
            }
        }
    
    def save_results(self, evaluation_results: Dict[str, Any], output_path: str = "rag_evaluation_results.json"):
        """Save evaluation results to a JSON file"""
        with open(output_path, 'w') as f:
            json.dump(evaluation_results, f, indent=2)
        print(f"Evaluation results saved to {output_path}")
        
        # Also save a CSV summary of per-question results
        with open(output_path.replace('.json', '_summary.csv'), 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([
                "Question ID", "Question", "Retrieval Success", "Precision", "Recall", 
                "F1 Score", "Semantic Similarity", "Lexical Overlap"
            ])
            
            retrieval_results = evaluation_results["detailed_results"]["retrieval"]
            answer_results = evaluation_results["detailed_results"]["answers"]
            
            for i in range(len(retrieval_results)):
                r = retrieval_results[i]
                a = answer_results[i]
                writer.writerow([
                    r.get("question_id", "N/A"),
                    a.get("question", "N/A"),
                    r.get("retrieval_success", False),
                    r.get("precision", 0),
                    r.get("recall", 0),
                    r.get("f1_score", 0),
                    a.get("semantic_similarity", 0),
                    a.get("lexical_overlap", 0)
                ])
        print(f"Summary CSV saved to {output_path.replace('.json', '_summary.csv')}")

# Example usage function to demonstrate how to use the evaluator with an existing RAG system
def evaluate_rag_system(golden_test_set_path: str, rag_function, llm_function):
    """
    Evaluate a RAG system against the golden test set
    
    Args:
        golden_test_set_path: Path to the CSV file containing the golden test set
        rag_function: Function that takes a question and returns retrieved sources
        llm_function: Function that takes a question and retrieved chunks and returns an answer
    """
    evaluator = RAGEvaluator(golden_test_set_path)
    golden_df = evaluator.golden_df
    
    rag_results = []
    
    # Process each question in the golden test set
    for _, row in golden_df.iterrows():
        question_id = row["ID"]
        question = row["Question"]
        
        print(f"Evaluating question {question_id}: {question[:50]}...")
        
        # Call your existing RAG retrieval function
        retrieved_chunks = rag_function(question)
        
        # Extract just the source paths from the retrieved chunks
        # Adjust this based on how your system returns sources
        retrieved_sources = [chunk["source"] for chunk in retrieved_chunks]
        
        # Call your existing LLM answer generation function
        generated_answer = llm_function(question, retrieved_chunks)
        
        rag_results.append({
            "question_id": question_id,
            "retrieved_sources": retrieved_sources,
            "generated_answer": generated_answer
        })
    
    # Run the evaluation
    evaluation_results = evaluator.run_evaluation(rag_results)
    
    # Save the results
    evaluator.save_results(evaluation_results)
    
    # Print summary metrics
    print("\nEvaluation Summary:")
    print(f"Number of questions evaluated: {evaluation_results['num_questions_evaluated']}")
    print(f"Retrieval success rate: {evaluation_results['retrieval_metrics']['success_rate']:.2f}")
    print(f"Average retrieval F1 score: {evaluation_results['retrieval_metrics']['avg_f1']:.2f}")
    print(f"Average answer semantic similarity: {evaluation_results['answer_metrics']['avg_semantic_similarity']:.2f}")
    
    return evaluation_results

# Example of how to integrate with your existing RAG system
if __name__ == "__main__":
    # Replace these functions with your actual implementations
    def my_rag_search(question):
        # This should call your existing RAG search function
        # Return format should be a list of dictionaries, each with at least a "source" field
        # Example: [{"source": "/content/handbook/values/index.md", "content": "..."}]
        db = sessionLocal()
        results = hybrid_search(question, db)
        chunks = [result['chunk'] for result in results]
        chunks_dict = [{"source": chunk.document.location, "content": chunk.chunk} for chunk in chunks]
        return chunks_dict
        
    
    def my_llm_answer(question, retrieved_chunks):
        # This should call your existing LLM answer generation function
        # It should return the generated answer as a string
        pass

    
    # Run the evaluation
    evaluation_results = evaluate_rag_system(GOLDEN_TEST_PATH, my_rag_search, my_llm_answer)

Evaluating question 1: What is GitLab's approach to compensation?...
Evaluating question 2: How does GitLab handle remote work?...
Evaluating question 3: What are GitLab's core values?...
Evaluating question 4: What is GitLab's approach to feature flags?...
Evaluating question 5: How does GitLab handle database changes and migrat...
Evaluating question 6: What is GitLab's Merge Request workflow?...
Evaluating question 7: How does GitLab's stock option program work?...
Evaluating question 8: What health benefits does GitLab offer to employee...
Evaluating question 9: What is GitLab's parental leave policy?...
Evaluating question 10: How does GitLab handle incident management?...
Evaluating question 11: What is GitLab's approach to product development?...
Evaluating question 12: How does GitLab handle security vulnerabilities?...
Evaluating question 13: What is GitLab's mission and vision?...
Evaluating question 14: How does GitLab approach pricing for its products?...
Evaluating questio

TypeError: 'NoneType' object is not subscriptable

In [34]:
df = pd.read_csv(GOLDEN_TEST_PATH, delimiter=';', index_col="ID")
df.head()

Unnamed: 0_level_0,Question,Answer,Source File,Relevant Section
ID,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
1,What is GitLab's approach to compensation?,"GitLab uses a competitive, market-based compen...",/content/handbook/total-rewards/compensation/i...,"Under ""Compensation Principles"" section: ""We u..."
2,How does GitLab handle remote work?,GitLab is an all-remote company where everyone...,/content/handbook/company/culture/all-remote/i...,"Introduction section: ""GitLab is an all-remote..."
3,What are GitLab's core values?,"GitLab's six core values are: Collaboration, R...",/content/handbook/values/index.md,"In the opening section: ""Our six values are Co..."
4,What is GitLab's approach to feature flags?,GitLab uses feature flags to separately deploy...,/content/handbook/product-development/feature-...,"In the introduction: ""Feature flags are a powe..."
5,How does GitLab handle database changes and mi...,GitLab uses migrations to handle database chan...,/content/handbook/engineering/infrastructure/d...,"In the ""Types of Migrations"" section: ""In GitL..."


In [None]:
# Initialize the OpenAI client
client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))

# Evaluation criteria models
class CriterionScore(BaseModel):
    score: float = Field(..., ge=0, le=10, description="Score from 0-10")
    explanation: str = Field(..., description="Brief explanation for the score")

class ResponseEvaluation(BaseModel):
    factual_accuracy: CriterionScore = Field(..., description="Measures correctness of information")
    completeness: CriterionScore = Field(..., description="Measures if all aspects of the question are addressed")
    relevance: CriterionScore = Field(..., description="Measures if information is directly relevant")
    clarity: CriterionScore = Field(..., description="Measures if the response is clear and understandable")
    conciseness: CriterionScore = Field(..., description="Measures if the response is appropriately concise")
    total_score: float = Field(..., ge=0, le=50, description="Sum of all scores")
    evaluation_summary: str = Field(..., description="Brief overall assessment")

class LLMJudge:
    def __init__(self, model="gpt-4"):
        self.model = model
    
    def evaluate_response(self, question: str, rag_response: str, golden_answer: str) -> ResponseEvaluation:
        prompt = f"""You are an expert evaluator of RAG system responses.
        
        Question: {question}
        
        RAG Response: {rag_response}
        
        Golden Answer: {golden_answer}
        
        Evaluate the RAG response against the golden answer based on factual accuracy, completeness, relevance, clarity, and conciseness.
        
        Each criterion should be scored from 0-10.
        
        Return a structured evaluation with scores and brief explanations for each category.
        """
        
        messages = [
            {"role": "system", "content": "You are an expert evaluator of AI assistant responses."},
            {"role": "user", "content": prompt}
        ]
        
        response = client.chat.completions.create(
            model=self.model,
            messages=messages,
            response_format={"type": "json_object"}
        )
        
        result = ResponseEvaluation.model_validate_json(response.choices[0].message.content)
        return result

    def batch_evaluate(self, evaluation_data: List[Dict]) -> List[ResponseEvaluation]:
        results = []
        for item in evaluation_data:
            result = self.evaluate_response(
                item["question"],
                item["rag_response"],
                item["golden_answer"]
            )
            results.append(result)
        return results