# 🎯 Answer Improver: The Evaluator-Optimizer Pattern

Welcome to our final notebook! The evaluator optimizer technique works by having another prompt "interrogate" the answer previously given. That feedback is then used to improve the result. The evaluator-optimizer pattern shines when:
- We have clear evaluation criteria
- Iterative refinement adds measurable value
- LLM feedback can meaningfully improve responses

Perfect for improving OpenSearch answers! 🚀

In [None]:
import boto3

from utils.retrieval_utils import get_chroma_os_docs_collection, ChromaDBRetrievalClient

# Initialize the Bedrock client
session = boto3.Session()
bedrock = session.client(service_name='bedrock-runtime')

# We've pushed the retrieval client from the prompt chaining notebook to the retrieval utils for simplicity
chroma_os_docs_collection: ChromaDBRetrievalClient = get_chroma_os_docs_collection()

print("✅ Client setup and retrieval client complete!")

# Create Helpers
Reuse the same helpers from our previous workshops

In [9]:
from typing import Type, Dict, Any, List

# We pushed the base propmt from the previous lab to a a base prompt file.
from utils.base_prompt import BasePrompt
from utils.retrieval_utils import RetrievalResult

def call_bedrock(prompt: BasePrompt) -> str:
    kwargs = {
        "modelId": prompt.model_id,
        "inferenceConfig": prompt.hyperparams,
        "messages": prompt.to_bedrock_messages(),
        "system": prompt.to_bedrock_system(),
    }

    # Call Bedrock
    converse_response: Dict[str, Any] = bedrock.converse(**kwargs)
    # Get the model's text response
    return converse_response['output']['message']['content'][0]['text']

# Helper function to call bedrock
# def do_rag(user_input: str, rag_prompt: Type[BasePrompt]) -> str:
#     # Retrieve the context from the vector store
#     retrieval_results: List[RetrievalResult] = chroma_os_docs_collection.retrieve(user_input, n_results=2)
#     # Format the context into a string
#     context: str = "\n\n".join([result.document for result in retrieval_results])

#     print("Retrieval done")
#     # Create the RAG prompt
#     inputs: Dict[str, Any] = {"question": user_input, "context": context}
#     rag_prompt: BasePrompt = rag_prompt(inputs=inputs)
#     # Call Bedrock with the RAG prompt

#     print("Calling Bedrock")
#     return call_bedrock(rag_prompt)

def do_retrieve(query: str) -> str:
    """Retrieves the context from the vector store"""
    retrieval_results: List[RetrievalResult] = chroma_os_docs_collection.retrieve(query, n_results=2)
    # Format the context into a string
    return  "\n\n".join([result.document for result in retrieval_results])

## 1. Creating Our Answer Improvement System

We'll build a system that:
1. Generates initial answers
2. Evaluates them for quality
3. Provides specific improvement feedback
4. Iteratively refines the answer

First lets create our prompts

In [10]:
from utils.base_prompt import BasePrompt

# Define system prompt
SYSTEM_PROMPT = """
You are an expert OpenSearch troubleshooter who provides accurate, comprehensive solutions.
"""

DECISION_SYSTEM_PROMPT = "You make clear decisions based on feedback quality."

# Define prompt templates as constants
GENERATE_SOLUTION_TEMPLATE = """
Provide a comprehensive troubleshooting solution for this OpenSearch issue:

<query>
{question}
</query>

<documentation>
{context}
</documentation>

Include:
- Potential root causes
- Diagnostic steps
- Resolution instructions
- Verification methods
"""

EVALUATE_SOLUTION_TEMPLATE = """
Evaluate if this OpenSearch troubleshooting solution fully addresses the problem:

<question>
{question}
</question>

<context>
{context}
</context>

<answer>
{answer}
</answer>

Assess:
- Completeness
- Technical accuracy
- Practical applicability
- Clarity of explanation

Provide specific feedback for improvement.
"""

IMPROVE_SOLUTION_TEMPLATE = """
Improve this OpenSearch troubleshooting solution based on feedback:

<problem>
{question}
</problem>

<context>
{context}
</context>

<current_solution>
{answer}
</current_solution>

<feedback>
{feedback}
</feedback>

Provide an improved solution that addresses all feedback points.
"""

DECISION_PROMPT_TEMPLATE = """
Based on this feedback, does the OpenSearch troubleshooting solution need improvement?

<feedback>
{feedback}
</feedback>

Reply with ONLY with 'IMPROVE' or 'DONE'.
"""

# Define prompt classes
class GenerateSolutionPrompt(BasePrompt):
    system_prompt: str = SYSTEM_PROMPT
    user_prompt: str = GENERATE_SOLUTION_TEMPLATE

class EvaluateSolutionPrompt(BasePrompt):
    system_prompt: str = SYSTEM_PROMPT
    user_prompt: str = EVALUATE_SOLUTION_TEMPLATE

class ImproveSolutionPrompt(BasePrompt):
    system_prompt: str = SYSTEM_PROMPT
    user_prompt: str = IMPROVE_SOLUTION_TEMPLATE

class DecisionPrompt(BasePrompt):
    system_prompt: str = DECISION_SYSTEM_PROMPT
    user_prompt: str = DECISION_PROMPT_TEMPLATE

### Create our Nodes
Use plain python functions like we've done in the previous labs to create our node logic

In [11]:
from typing import Type, Dict, Any, List, TypedDict
from langgraph.graph import StateGraph, END, START
from utils.base_prompt import BasePrompt

# Define the WorkflowState using TypedDict.
class WorkflowState(TypedDict):
    question: str
    answer: str = None
    context: str = None
    feedback: str = None
    iteration: int = 0  
    final_answer: str = None

def generate_answer(state: WorkflowState) -> WorkflowState:
    """Generates an initial answer using RAG"""
    
    # Make sure we have a question in the state
    question: str = state.get('question')

    # Get context from VectorDB
    context: str = do_retrieve(question)

    # Build inputs
    inputs: Dict[str, any] = {
        'question': question,
        'context': context
    }

    rag_prompt: BasePrompt = GenerateSolutionPrompt(inputs=inputs)
    
    # Use the do_rag helper function with the GenerateSolutionPrompt
    answer: str = call_bedrock(rag_prompt)

    # Update the state dictionary.
    state["answer"] = answer
    state["context"] = context
    state['iteration'] = state['iteration'] + 1
    
    # Return updated state with the answer and initialize iteration counter
    return state

def evaluate_answer(state: WorkflowState) -> WorkflowState:
    """Evaluates the quality of the answer"""

    print(f"Evaluating answer to question: {state['question']}")
    inputs: Dict[str, Any] = {
        "question": state["question"],
        "answer": state["answer"],
        "context": state["context"]
    }

    evaluate_prompt: BasePrompt = EvaluateSolutionPrompt(inputs=inputs)
    
    # Call Bedrock to get evaluation feedback
    feedback: str = call_bedrock(evaluate_prompt)
    state['feedback'] = feedback
    
    # Return updated state with feedback
    return state

def should_improve(state: WorkflowState) -> str:
    """Decides whether to improve or finalize based on evaluation"""
    
    # Limit to maximum 2 improvement iterations
    if state["iteration"] >= 2:
        return "DONE"
        
    # Create a simple prompt to decide if improvement is needed
    inputs: Dict[str, Any] = {
        'feedback': state['feedback']
    }
    decision_prompt: BasePrompt = DecisionPrompt(inputs=inputs)
    
    # Get the decision
    decision: str = call_bedrock(decision_prompt).strip()
    
    # Default to DONE if decision isn't clearly IMPROVE
    return "IMPROVE" if "IMPROVE" in decision else "DONE"

def improve_answer(state: WorkflowState) -> WorkflowState:
    """Improves the answer based on feedback using RAG"""

    # Just do the retrieve portion of RAG.
    context: str = do_retrieve(state["question"])

    # Build inputs.
    inputs={
        "question": state["question"],
        "answer": state["answer"],
        "feedback": state["feedback"],
        "context": context
    }
    
    # Create the improvement prompt
    improve_prompt = ImproveSolutionPrompt(inputs=inputs)
    
    # Call Bedrock to get improved answer
    improved_answer: str = call_bedrock(improve_prompt)

    state["answer"] = improved_answer
    state["iteration"] = state["iteration"] + 1

    return state

def finalize_answer(state: WorkflowState) -> WorkflowState:
    """Finalizes the answer"""
    # The final answer is the current answer
    state["final_answer"] = state["answer"]
    return state



## Compile Our Graph
Build and compile the evaluator optimizer graph

In [12]:
def build_evaluator_optimizer_workflow():
    """Builds the evaluator-optimizer workflow"""
    workflow = StateGraph(WorkflowState)

    # Add nodes to the graph
    workflow.add_node("generate", generate_answer)
    workflow.add_node("evaluate", evaluate_answer)
    workflow.add_node("improve", improve_answer)
    workflow.add_node("finalize", finalize_answer)
    
    # Connect the workflow
    workflow.add_edge(START, "generate")
    workflow.add_edge("generate", "evaluate")

    # Build decision map. 
    decision_map: Dict[str, str] = {
        "IMPROVE": "improve",
        "DONE": "finalize"
    }

    # Add conditional edges.
    workflow.add_conditional_edges("evaluate", should_improve, decision_map)

    # Create the improvement loop
    workflow.add_edge("improve", "evaluate")

    # Finalize the workflow
    workflow.add_edge("finalize", END)
    
    return workflow.compile()

# Create our workflow
evaluator_optimizer: StateGraph = build_evaluator_optimizer_workflow()


In [None]:
# Lets visualize the graph to get a sense of what we're about to run
from IPython.display import Image, display
from langchain_core.runnables.graph import CurveStyle, MermaidDrawMethod, NodeStyles
display(
    Image(
        evaluator_optimizer.get_graph().draw_mermaid_png(
            draw_method=MermaidDrawMethod.API,
        )
    )
)

## Test our Optimizer

In [None]:
def init_state(question: str) -> WorkflowState:
    """Initialize the workflow state with a question."""
    return WorkflowState(
        question=question,
        answer="",
        context="",
        feedback="",
        iteration=0,
        final_answer=None
    )

# Initialize with a test question - make sure to use "question" as the key
question: str = "OpenSearch cluster showing red health status and not responding to queries"
initial_state: WorkflowState = init_state(question)

# Run the evaluator-optimizer workflow
result: WorkflowState = evaluator_optimizer.invoke(initial_state)

print("🔍 Optimized Solution\n")
print(result["final_answer"])

## 3. Benefits of the Evaluator-Optimizer Pattern

Our iterative improvement approach provides several advantages:

✅ Systematic quality improvement through feedback loops

✅ Clear evaluation criteria for consistency

✅ Automatic refinement without human intervention

✅ Better answers through multiple improvement iterations

Remember: While this pattern is powerful, use it judiciously where the extra compute cost is justified by measurable quality improvements! 🚀