#### Organize imports

In [1]:
from decouple import config
from ibm_watsonx_ai.metanames import GenTextParamsMetaNames as GenParams
from instructor import Mode
from langchain_chroma import Chroma
from langchain_community.document_loaders import TextLoader
from langchain_core.documents import Document
from langchain_ibm import WatsonxEmbeddings, WatsonxLLM
from langchain_text_splitters.markdown import RecursiveCharacterTextSplitter
from langchain.prompts import PromptTemplate
from langgraph.graph import START, StateGraph
from litellm import completion
from pydantic import BaseModel, Field
from tqdm import tqdm
from typing import Literal, Any
from typing_extensions import TypedDict
import instructor
import litellm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

#### Retrieve secrets

In [2]:
WX_API_KEY = config("WX_API_KEY")
WX_PROJECT_ID = config("WX_PROJECT_ID")
WX_API_URL = "https://us-south.ml.cloud.ibm.com"

#### Authenticate and initialize LLM

In [3]:
llm = WatsonxLLM(

        model_id= "ibm/granite-3-8b-instruct",
        url=WX_API_URL,
        apikey=WX_API_KEY,
        project_id=WX_PROJECT_ID,

        params={
            GenParams.DECODING_METHOD: "greedy",
            GenParams.TEMPERATURE: 0,
            GenParams.MIN_NEW_TOKENS: 5,
            GenParams.MAX_NEW_TOKENS: 1_000,
            GenParams.REPETITION_PENALTY:1.2
        }

)

#### Load documents

In [4]:
# Load structured CBS knowledge base
df = pd.read_json("data/chunked_courses.json")
all_rows = df.to_dict("records")

# Prepare LangChain Document objects with metadata
chunks = []

for _, row in df.iterrows():
    programme = row.get("programme", "")
    course_title = row.get("course_title", "")
    url = row.get("url", "")
    language = row.get("language", "")
    ects = row.get("ects", "")
    type = row.get("type", "")
    level = row.get("level", "")
    study_board = row.get("study_board", "")
    chunk_text = row.get("chunk_text", "").strip()

    metadata = {
        "programme": programme,
        "course_title": course_title,
        "url": url,
        "language": language,
        "ects": ects,
        "type": type,
        "level": level,
        "study_board": study_board
    }
    
    # Create a Document object for each row
    chunks.append(Document(page_content=chunk_text, metadata=metadata))

print(f"Loaded {len(chunks)} chunks.")

Loaded 4762 chunks.


#### Initialize the embedding model

In [5]:
embed_params = {}

watsonx_embedding = WatsonxEmbeddings(
    model_id="ibm/granite-embedding-278m-multilingual",
    url=WX_API_URL,
    project_id=WX_PROJECT_ID,
    apikey=WX_API_KEY,
    params=embed_params,
)

#### Create vector index

In [6]:
local_vector_db = Chroma.from_documents(
    collection_name="my_collection",
    embedding=watsonx_embedding,
    persist_directory="my_vector_db",
    documents=chunks,
)

#### Create a RAG prompt template

In [7]:
template = """You are a helpful assistant answering questions about CBS graduate programmes. Use the provided context to generate a clear and accurate response. If the answer is not in the context, say you don't know. Keep the answer concise—no more than three sentences.

Question:
{question}

Context: 
{context} 

Answer:
"""

prompt = PromptTemplate.from_template(template)

#### Creating RAG pipeline

In [None]:
# Define state for application
class State(TypedDict):
    """ A langgraph state for the application """
    question: str
    context: list[Document]
    answer: str


# Define application steps
def retrieve(state: State):
    """ Our retrieval step. We use our local vector database to retrieve similar documents to the question """
    retrieved_docs = local_vector_db.similarity_search(state["question"], k=13)
    return {"context": retrieved_docs} 


def generate(state: State):
    """ Our generation step. We use the retrieved documents to generate an answer to the question """

    # Format the prompt
    docs_content = "\n\n".join(doc.page_content for doc in state["context"])
    formated_prompt = prompt.invoke({"question": state["question"], "context": docs_content})

    # Generate the answer
    response = llm.invoke(formated_prompt)
    return {"answer": response}


# Compile application and test
graph_builder = StateGraph(State).add_sequence([retrieve, generate])
graph_builder.add_edge(START, "retrieve") # Start at the retrieve step
graph = graph_builder.compile() # Compile the graph

#### Create Question-Answer pairs (Gold standard examples)

In [9]:
sample_queries = [
    "What is the course Organizational Behaviour about?",
    "Which programme includes the course Organizational Behaviour?",
    "What are the learning objectives of the Organizational Behaviour course?",
    "What teaching methods are used in Organizational Behaviour?",
    "How is the Organizational Behaviour course assessed?",
    "What is the course Strategic Management about?",
    "Which programme includes the course Strategic Management?",
    "What are the learning objectives of the Strategic Management course?",
    "What teaching methods are used in Strategic Management?",
    "How is the Strategic Management course assessed?",
    "What is the course Entrepreneurship and Innovation in Context about?",
    "Which programme includes the course Entrepreneurship and Innovation in Context?",
    "What are the learning objectives of the Entrepreneurship and Innovation in Context course?",
    "What teaching methods are used in Entrepreneurship and Innovation in Context?",
    "How is the Entrepreneurship and Innovation in Context course assessed?",
    "What is the course Bioentrepreneurship about?",
    "What is the course Strategic Management of Innovation and Technology about?",
    "What is the course Organizational Behavior: Arts and Culture about?",
    "What is the course Financing Innovation and Entrepreneurship about?",
    "What is the course Entrepreneurship and Innovation - a Business Game about?"

]

expected_responses = [
    "It teaches students how organizations work based on organization and management scholarship.",
    "This course is part of the BSc in International Business and Politics programme.",
    "Students learn to understand organizational behavior concepts and apply them to various organizational settings.",
    "The course uses a range of teaching methods and course materials to introduce important approaches, concepts, and frameworks.",
    "Assessment is based on a home assignment - written product, with a maximum of 10 pages, to be completed individually within 72 hours.",
    "It introduces students to the key principles of strategic management, focusing on strategic analysis and contemporary topics in strategy.",
    "This course is part of the MSc in Business Administration and Commercial Law programme.",
    "Students learn to master main strategic models at both theoretical and applied levels and apply strategy to real-life cases.",
    "The course includes lectures, case studies, and discussions to engage students in the classroom.",
    "Assessment details are provided during the course, typically involving oral exams based on mini-projects and syllabus questions.",
    "It explores entrepreneurship and innovation and the conditions which economic, societal, and cultural context sets for entrepreneurial decision-making.",
    "This course is part of the MSc in Social Sciences in Organisational Innovation and Entrepreneurship programme.",
    "Students learn to understand the impact of context on entrepreneurial decisions and analyze different industries and transitions.",
    "The course uses case studies, discussions, and analysis of entrepreneurs acting in different industries.",
    "Assessment details are provided during the course, typically involving written assignments and participation.",
    "It teaches central skills in science-based entrepreneurship in the biotech sector, bridging science and business.",
    "It focuses on applying theoretical knowledge to specific problem situations, business challenges, and cases, emphasizing AI and digital platforms.",
    "It highlights particularities of organizational behavior in the specific case of arts and culture through global examples.",
    "It helps students analyze entrepreneurial and innovative projects associated with high ambiguity and discusses inequalities in financing.",
    "It uses a business game platform to apply theories of entrepreneurship and innovation in a practical setting, teaching students to generate and implement new solutions."

]

#### Create a vanilla RAG evaluation system

In [10]:

# create a litellm client
litellm.drop_params = True  # watsonx.ai doesn't support `json_mode`
client = instructor.from_litellm(completion, mode=Mode.JSON)

# create a response model - LLM is forced to return an object of this type
class JudgeResponse(BaseModel):
    reasoning: str = Field(description="Short one-sentence reason for score")
    score: Literal[0, .5, 1] = Field(description="Final score")

# define a function to call the judge
def call_judge(prompt : str) -> JudgeResponse:
    response = client.chat.completions.create(
        model="watsonx/meta-llama/llama-3-3-70b-instruct",
        max_tokens=1024,
        messages=[
            {
                "role": "user",
                "content": prompt,
            }
        ],
        project_id=WX_PROJECT_ID,
        apikey=WX_API_KEY,
        api_base=WX_API_URL,
        response_model=JudgeResponse,
        # decoding_method="greedy",
        # temperature=0,
    )
    return response

In [17]:
import time

In [24]:
class RAGEvaluator:
    """
    A streamlined evaluator for RAG systems focusing on three key dimensions:
    1. Retrieval Quality
    2. Answer Correctness
    3. Hallucination Detection
    """
    def __init__(self, llm_func):
        """Initialize with an LLM."""
        self.llm_func = llm_func
            
    def evaluate_retrieval_quality(self, response: dict[str, Any], expected_answer: str, verbose : bool = False) -> JudgeResponse:
        """
        Ask LLM if retrieved documents contain information needed for the expected answer.
        """
        # Combine all retrieved document contents with clear formatting
        retrieved_text = "\n\n".join([f"Document {i+1}:\n{doc.page_content}" for i, doc in enumerate(response.get('context', []))])
        
        prompt = f"""You are given a set of documents about master programs at CBS and a fact. Can the fact be found in the documents? Judge by the information, not the exact wording of the fact.
        
        - Respond with 1 if the fact is present (also if the fact can be pieced together from multiple documents).
        - Respond with 0 if the fact is not present in any of the documents.
        - Responds with 0.5 ff only part of the fact is present.
        
        Retrieved Documents: 
        {retrieved_text}

        Fact:
        {expected_answer}
        
        Can the fact be found in the documents? Respond as a JudgeResponse object with: 
        - a short reason (max 20 words)
        - a score of 1, 0.5, or 0.
        """
        
        result = self.llm_func(prompt)
        if verbose:
            print(f"[evaluation_retrieval_quality] LLM response: {result}")
        return result
    
    def evaluate_answer_correctness(self, response: dict[str, Any], expected_answer: str, verbose : bool = False) -> JudgeResponse:
        """
        Ask LLM to rate how correct/similar the generated answer is to the expected answer.
        """
        generated_answer = response.get('answer', '')
        
        prompt = f"""You are evaluating a RAG system with academic knowledge base about CBS Master programs. You are given a question, an expected answer, and a generated answer. Is the generated answer as correct - or close to as correct - as the expected answer? 
        
        - Respond with 1 if the answer is yes (also if the answer is more detailed than expected)
        - Respond with 0 if the answer is no. 
        - respond with 0.5 if the generated answer is partially correct

        Question:
        {response.get('question', '')}
        
        Expected answer:
        {expected_answer}
        
        Generated answer:
        {generated_answer}
        
        Is the generated answer correct enough? Consider content correctness rather than exact wording. 
        Respond as a JudgeResponse object with: 
        - a short reason (max 20 words)
        - a score of 1, 0.5, or 0."""
        
        result = self.llm_func(prompt)
        if verbose:
            print(f"[evaluation_answer_correctness] LLM response: {result}")
        return result
    
    def evaluate_hallucination(self, response: dict[str, Any], verbose : bool = False) -> JudgeResponse:
        """
        Ask LLM to evaluate if the answer contains hallucinations.
        """
        generated_answer = response.get('answer', '')
        retrieved_text = "\n\n".join([f"Document {i+1}:\n{doc.page_content}" for i, doc in enumerate(response.get('context', []))])
        
        prompt = f"""You are evaluating a RAG system with academic knowledge base about CBS Master programs. Your task is to determine if the generated answer contains hallucinations. Hallucinations are any information that is not directly supported by the retrieved documents. Does the generated answer contain hallucinations? 
        
        - If the answer is no, respond with 0. 
        - If the answer is yes, respond with 1.
        - If the answer is partially hallucinated, respond with 0.5. 
        - If the generated answer states that it does not know, respond with 0.

        Question: 
        {response.get('question', '')}
        
        Retrieved context (this is all the information the AI had access to):
        {retrieved_text}
        
        Generated answer:
        {generated_answer}
        
        Does the generated answer contain hallucinations? Respond as a JudgeResponse object with: 
        - a short reason (max 20 words)
        - a score of 1, 0.5, or 0.
        """
        
        result = self.llm_func(prompt)
        if verbose:
            print(f"[evaluation_hallucination] LLM response: {result}")
        return result
    
    def evaluate(self, response: dict[str, Any], expected_answer: str, verbose : bool = False) -> dict[str, Any]:
        """
        Evaluate a RAG response across all three dimensions.
        """
        # Get scores for each dimension
        retrieval_score = self.evaluate_retrieval_quality(response, expected_answer, verbose=verbose)
        correctness_score = self.evaluate_answer_correctness(response, expected_answer, verbose=verbose)
        hallucination_score = self.evaluate_hallucination(response, verbose=verbose)
        
        return {
            "query": response.get("question", ""),
            "retrieved_context": response.get("context", []),
            "generated_answer": response.get("answer", ""),
            "expected_answer": expected_answer,
            "retrieval_quality": retrieval_score.score,
            "answer_correctness": correctness_score.score, 
            "hallucination_score": hallucination_score.score,  # Lower is better

            # keep the reasoning for manual inspection
            "retrieval_quality_reasoning": retrieval_score.reasoning,
            "answer_correctness_reasoning": correctness_score.reasoning,
            "hallucination_reasoning": hallucination_score.reasoning
        }


def evaluate_rag_system(graph, test_queries, expected_responses, evaluator, verbose=False):
    """
    Evaluate a RAG system containing information about CBS master programs on a test set.
    
    Args:
        graph: The LangGraph RAG system with invoke method
        test_queries: List of questions to test
        expected_responses: List of expected answers
        evaluator: The RAG evaluator object
        
    Returns:
        Evaluation results
    """
    results = []
    
    for query, expected in tqdm(zip(test_queries, expected_responses), total=len(test_queries)):

        # Get RAG response
        response = graph.invoke({"question": query})
        
        # Evaluate
        eval_result = evaluator.evaluate(response, expected, verbose=verbose)
        results.append(eval_result)
        time.sleep(8)  # Avoid hitting the API too fast
    
    # Calculate average scores
    avg_metrics = {
        "retrieval_quality": np.mean([r["retrieval_quality"] for r in results]),
        "answer_correctness": np.mean([r["answer_correctness"] for r in results]),
        "hallucination": np.mean([r["hallucination_score"] for r in results])
    }

    return {
        "individual_results": results,
        "scores": avg_metrics,
        "num_queries": len(test_queries)
    }

In [None]:
response = graph.invoke({"question": "What is the course Organizational Behaviour about?"})

response

In [None]:
results = evaluate_rag_system(
    graph, 
    sample_queries,
    expected_responses,
    evaluator=RAGEvaluator(llm_func=call_judge),
    verbose=True
)

results["scores"]

#### Plot the evaluation results

In [None]:
def plot_scores(scores):
    """
    Plot the evaluation scores.
    """
    labels = ["Retrieval Quality", "Answer Correctness", "Hallucination"]
    scores = [scores["retrieval_quality"], scores["answer_correctness"], scores["hallucination"]]
    
    _, ax = plt.subplots()
    ax.bar(labels, scores)
    ax.set_xlabel('Metric')
    # set y range to 0-1
    ax.set_ylim(0, 1)
    ax.set_ylabel('Score')
    ax.set_title('RAG Evaluation Scores')
    plt.show()

plot_scores(results["scores"])