# WORKER–VALIDATOR | Agentic Pattern

A two-agent pattern that introduces a quality control loop. A Worker agent attempts the task, and a Validator agent reviews the result against the original instructions. If the result is inadequate, the Validator provides feedback, and the Worker revises the output. This loop continues until the Validator approves the result or a termination condition is reached.

## Problem Addressed
This pattern improves the reliability and quality of output for tasks that may require iterative refinement, quality assurance, or structured reasoning. It is especially useful when the cost of error is high or task success criteria are non-trivial to evaluate.

## Pattern Structure

**Agents |**
- `Worker`: Interprets the task and produces an initial output.
- `Validator`: Evaluates whether the Worker’s output satisfies the task requirements, and provides structured feedback if not.

**Coordination Topology |**
- Sequential feedback loop.
- `Caller → Worker → Validator → (loop: Worker ← Validator)` until the Validator approves the result or a max iteration limit is reached.

## Assumptions

Worker:
- Has all the assumptions of the Worker pattern (tools, reasoning, etc.).
- Can **interpret and apply feedback** to revise prior outputs.

Validator:
- Possesses the capability to **interpret task requirements**, **evaluate the Worker’s output**, and **generate feedback or approval**.
- Can compare against a reference prompt or desired criteria.

Agents may or may not share memory. Coordination state must persist across iterations.

## Inputs
- A task description (e.g., instructions or goals).
- Optionally: a rubric or evaluation criteria for the Validator.

## Outputs
- A final validated result.
- Optionally: Validator’s rationale or evaluation score.

## Behavioural Flow

1. Input task is passed to the Worker.
2. Worker generates initial output.
3. Output and task are passed to the Validator.
4. Validator evaluates the output:
    - If acceptable, the result is returned.
    - If not, Validator returns structured feedback.
5. Worker revises output based on feedback.
6. Loop returns to step 3. Repeat until approval or max iterations reached.

## Strengths
- Higher output quality and reliability.
- Supports more complex, ambiguous, or high-stakes tasks.
- Validator role introduces an externalized standard of correctness or fit.

## Weaknesses
- Higher latency and compute cost.
- Requires well-aligned Validator to avoid false negatives or noise.
- Coordination complexity increases with number of iterations.

## Variations and Extensions
- **Multi-round Validator**: Uses temperature annealing or scoring thresholds to control the feedback loop.
- **Rotating Roles**: Worker and Validator swap roles periodically to simulate peer review.
- **Planner-Worker-Validator**: Validator ensures both planning and execution meet the goal.

## Example Use Cases
- Generate a product description from specifications.
- Answer a factual question using embedded knowledge.
- Write a function to perform a specific operation.

---

## Implementation

Create a `.env` file in the root directory of this project for private configuration variables (API keys, etc). Running this notebook requires a valid `OPENAI_API_KEY=sk-proj-XXXX`.

In [None]:
# Load the environment variables
from dotenv import load_dotenv
import os

load_dotenv(override=True)

agent_model_DEFAULT = os.getenv('CONF_OPENAI_DEFAULT_MODEL')

Generate the `agents-config.yaml` configuration file that specifies the agents to be created for this pattern. The file includes the instructions prompt, model and other specifications for each agent.

In [None]:
!mkdir -p tmp

In [None]:
%%writefile ./tmp/agents-config.yaml
# agents-config.yaml
worker:
  name: Basic Worker
  instructions: |
    You are a single-step Worker. Execute the given task directly and return the best possible result.
    Do not ask clarifying questions. Do not handoff, delegate, or call tools unless explicitly provided.
    Prefer concise, correct outputs. Use Markdown formatting when helpful.
  model: openai/gpt-4.1-nano
  has_memory: False
  temperature: 0.4
  max_tokens: 1000

validator:
  name: Basic Validator
  instructions: |
    You are a Validator agent in a Worker–Validator loop. Your role is to **assess whether 
    the Worker’s output meets the task objective and success criteria**.
    
    Your job is **not to redo the research yourself**, but to:
    1. Decide if the output is acceptable given the task and whatever success criteria is provided.
    2. If not, give precise, actionable feedback so the Worker can correct it.
  model: openai/gpt-4o-mini
  temperature: 0.2
  max_tokens: 1000
  output_type: ValidatorResponse

In [None]:
# Define a structured output type for Validator

from pydantic import BaseModel, Field
from typing import Literal, Optional

class ValidatorResponse(BaseModel):
    status: Literal["approved", "needs_revision"]  = Field(..., description='Must be either "approved" or "needs_revision"')
    rationale: str = Field(..., description='Summarise your evaluation in 10 words, referencing specific success criteria.')
    feedback: Optional[str] = Field(None, description='Only include if status is "needs_revision". Provide concrete, actionable instructions for the Worker to correct or improve the output. Be explicit about which facts, quotes, or sections fail to meet audit-level verification standards.')

    def is_valid(self) -> bool:
        return True if self.status == "approved" else False


In [None]:
# Load the agents-config.yaml file

import json
import yaml

with open('./tmp/agents-config.yaml', 'r') as file:
    agent_config_data = yaml.safe_load(file)

formatted_json = json.dumps(agent_config_data, indent=4)
print(formatted_json)

In [None]:
from datetime import datetime
from agents import Agent, ModelSettings, SQLiteSession

def create_agent(agent_type: str = None):
    """ 
    Creates and returns an Agent that matches the given definition in the agents-config 
    YAML file. Optionally returns a memory Session if agent configuration calls for it.
    """

    if agent_type is None or not agent_type.strip():
        raise ValueError("agent_type must be a valid type of agent defined in agent-configs.yaml.")
    
    agent_config = agent_config_data.get(agent_type)
    if agent_config is None:
        raise ValueError(f"'{agent_type}' does not match an agent defined in agent-configs.yaml.")

    # Generate a timestamp string for unique naming
    now_string = datetime.now().strftime("%Y-%m-%dT%H:%M:%SU%s")

    # Build agent based on YAML specification
    try:
        agent_model_settings=ModelSettings(
            temperature=agent_config.get('temperature'),
            max_tokens=agent_config['max_tokens'],
        )

        agent_name = agent_config.get('name') or f"{agent_type}_{now_string}"
        
        new_agent = Agent(
            name=agent_name,
            instructions=agent_config['instructions'],
            model=agent_config.get('model') or agent_model_DEFAULT,
            output_type=globals().get(agent_config.get('output_type') or None),
            model_settings=agent_model_settings
        )
    except:
        raise

    # Create memory session for agent if configured
    agent_has_memory = agent_config.get('has_memory') or False
    agent_session_name = f"{agent_name}__SESSION_{now_string}" if agent_has_memory else None
    agent_session = SQLiteSession(agent_session_name) if agent_session_name else None

    return (new_agent, agent_session)

worker, worker_sess = create_agent('worker')
validator, validator_sess = create_agent('validator')


In [None]:
# Dialog loop for a Worker agent and a Validoator agent. 
# This assigns a task to the Worker-Validator pair. The Worker attempts to complete the task to the best of its
# ability. The Validator determines whether the Worker's output meets the success criteria for the 
# assigned task. If so, it returns "VALIDATED" and the output is returned. If not, it gives feedback to the
# Worker agent, which must attempt the task again using the feedback.

from agents import Runner, trace

async def assign_task(task: str, success_criteria: str = None, max_loops: int = 5) -> str:
    """
    Asssign a task to the Worker-Validator pair and receive their response in return.
    """

    # Check that Worker agent exists
    if worker is None:
        return "Worker agent has not been created."
    if validator is None:
        return "Validator agent has not been created."

    assignment = f"\ntask: {task}\nsuccess_criteria: {success_criteria}"
    history = [{"role": "user", "content": assignment}]

    count_loop = 0
    output_validated = False
    result = None

    with trace(f"{worker.name}_{validator.name}"):
        while (not output_validated) and (count_loop < max_loops):
            count_loop+=1

            try:
                worker_output = await Runner.run(starting_agent=worker, input=history)
                history.append({"role": "assistant", "content": worker_output.final_output})

                validator_output = await Runner.run(starting_agent=validator, input=history)
                assessment = validator_output.final_output
                history.append({"role": "assistant", "content": assessment.model_dump_json()})

            except Exception as e:
                return f"Error: {e}"
        
            output_validated = assessment.is_valid()
            result = worker_output.final_output if output_validated else assessment.model_dump_json()

    return result 

In [None]:
# Class that implements the Worker-Validator pattern

import json
import os
import yaml
from collections import deque
from datetime import datetime
from agents import Agent, ModelSettings
from agents import Runner, trace
from pydantic import BaseModel, Field
from typing import Literal, Optional

class ValidatorResponse(BaseModel):
    """
    Represents the structured response from a validator agent.

    This Pydantic model ensures the validator's output is consistent and machine-readable.
    It includes a status indicating approval or a need for revision, a brief rationale,
    and detailed feedback if the output requires changes.
    """

    status: Literal["approved", "needs_revision"]  = Field(..., description='Must be either "approved" or "needs_revision"')
    rationale: str = Field(..., description='Summarise your evaluation in 10 words, referencing specific success criteria.')
    feedback: Optional[str] = Field(None, description='Only include if status is "needs_revision". Provide concrete, actionable instructions for the Worker to correct or improve the output. Be explicit about which facts, quotes, or sections fail to meet audit-level verification standards.')

    def is_valid(self) -> bool:
        return True if self.status == "approved" else False

class WorkerValidatorPattern:
    """
    A class that implements the Worker-Validator pattern for AI agent collaboration.

    This pattern orchestrates a loop where a 'worker' agent performs a task and a 'validator'
    agent evaluates the worker's output against a set of success criteria. The validator provides
    feedback to the worker if the output needs revision. The process repeats until the
    validator approves the output or a maximum number of loops is reached.

    The class uses a YAML configuration file to define and initialize the worker and validator
    agents. It maintains a history of the conversation, allowing agents to have context
    from previous turns.
    """

    def __init__(self, agent_config_filename: str = './tmp/agents-config.yaml') -> None:
        """
        Initialize the pattern from the agents-config.yaml file
        """

        self.__agent_model_DEFAULT = os.getenv('CONF_OPENAI_DEFAULT_MODEL')

        # Load the agents-config.yaml file
        with open(agent_config_filename, 'r') as file:
            self.__agent_config_data = yaml.safe_load(file)
        
        # Create an empty history log
        self.__max_history = 5
        self.__history_log = deque(maxlen=self.__max_history)

        # Create the agents for this pattern
        self.__init_agents() 


    def __new_history(self) -> dict:
        """
        Add a new empty conversation history to the log and return it.
        """

        history = []
        self.__history_log.append(history)

        return history
    
    def get_history(self, age: int = 0) -> Optional[dict]:
        """
        Retrieves a stored history dictionary, using a zero-based index for age (0 is most recent).
        """
        if not self.__history_log:
            return None
        
        index = -1 * min((age + 1), len(self.__history_log))
        try:
            return self.__history_log[index]
        except IndexError:
            return None
        
    
    def __create_agent(self, agent_type: str = None) -> Agent:
        """
        Creates and returns an Agent that matches the given definition in the agents-config YAML file.
        """

        if agent_type is None or not agent_type.strip():
            raise ValueError("agent_type must be a valid type of agent defined in agent-configs.yaml.")
        
        agent_config = self.__agent_config_data.get(agent_type)
        if agent_config is None:
            raise ValueError(f"'{agent_type}' does not match an agent defined in agent-configs.yaml.")

        # Generate a timestamp string for unique naming
        now_string = datetime.now().strftime("%Y-%m-%dT%H:%M:%SU%s")

        # Build agent based on YAML specification
        agent_model_settings=ModelSettings(
            temperature=agent_config.get('temperature'),
            max_tokens=agent_config.get('max_tokens')
        )
        
        new_agent = Agent(
            name=agent_config.get('name') or f"{agent_type}_{now_string}",
            instructions=agent_config.get('instructions'),
            model=agent_config.get('model') or self.__agent_model_DEFAULT,
            output_type=globals().get(agent_config.get('output_type')),
            model_settings=agent_model_settings
        )

        return new_agent
    
    
    def __init_agents(self) -> int:
        """
        Creates an instance of each of the agent types loaded from agents-config.yaml.
        Returns the number of agents created.
        """
        
        self.__agents = {}

        for agent_type in self.__agent_config_data:
            new_agent = self.__create_agent(agent_type=agent_type)
        
            if new_agent is not None:
                self.__agents[agent_type] = new_agent

        return len(self.__agents)
    

    async def assign_task(self, 
                          task: str, 
                          success_criteria: str = None, 
                          max_loops: int = 5) -> tuple[str, bool]:
        """
        Assigns a task to a Worker-Validator pair and awaits a final response.

        Args:
            task (str): The primary task for the agents to execute.
            success_criteria (str, optional): Describes the criteria to be used in validating the output.
                                              Defaults to None.
            max_loops (int, optional): The maximum number of validation loops to attempt.
                                       Defaults to 5.

        Returns:
            tuple[str, bool]: A tuple containing:
                - str: The final, validated output from the Worker agent, or validator's failure 
                       assessment if output was not validated after maximum turns.
                - bool: A flag indicating if the output was successfully validated.
        """

        # Obtain worker and validator agents
        worker = self.__agents.get("worker")
        validator = self.__agents.get("validator")
 
        if worker is None or validator is None:
            raise NameError("One or more agents were not created. Are they correctly defined in agent-configs.yaml?")

        # Seed a new history dictionary
        history = self.__new_history()
        assignment = f"task: {task}"
        if success_criteria is not None and len(success_criteria) > 0:
            assignment += f"\nsuccess_criteria: {success_criteria}"
        history.append({"role": "user", "content": assignment})

        # Loop between worker completing the task and validator checking it. Exit when validator approves
        # or the number of iterations reaches max_loops.
        count_loop = 0
        output_validated = False
        result = None

        with trace(f"{worker.name}_{validator.name}"):
            while (not output_validated) and (count_loop < max_loops):
                count_loop+=1

                try:
                    worker_output = await Runner.run(starting_agent=worker, input=history)
                    history.append({"role": "assistant", "content": worker_output.final_output})
                except Exception as e:
                    history.append({"role": "user", "content": f"Error: {e}"})

                try:
                    validator_output = await Runner.run(starting_agent=validator, input=history)
                    assessment = validator_output.final_output_as(ValidatorResponse, raise_if_incorrect_type=True)
                    history.append({"role": "assistant", "content": assessment.model_dump_json()})

                    output_validated = assessment.is_valid()
                    result = worker_output.final_output if output_validated else assessment.model_dump_json()
                except Exception as e:
                    history.append({"role": "user", "content": f"Error: {e}"})
                    output_validated = False
                    result = f"Error: {e}"
            
        return (result, output_validated)
    

In [None]:
# Create a gradio interface to interact with the Worker-Validator pattern

import gradio as gr

use_pattern = False

with gr.Blocks(title="Worker-Validator — Agentic Design Patterns") as demo:

    pattern = WorkerValidatorPattern(agent_config_filename='./tmp/agents-config.yaml') if use_pattern else None

    gr.Markdown("""
    # 🛠️ Worker-Validator Pattern
    
    This is a simple worker-validator that can be used to perform single-step execution tasks whose output is validated before being returned.
    """)

    with gr.Row():
        with gr.Column(scale=9):
            task = gr.Textbox(label="Task", placeholder="What task do you want completed?", lines=4)
            success_criteria = gr.Textbox(label="Success Criteria", placeholder="Describe what success looks like.", lines=4)
        with gr.Column(scale=1):
            max_loops = gr.Number(label="Maximum loops", value=3)
            run_btn = gr.Button("Run Task", variant="primary")
    out = gr.Markdown(label="Result", show_label=True)
    history = gr.JSON(label="History", show_label=True)

    async def on_run(task:str, success_criteria: str, max_loops: int):
        run_btn.interactive = False  # disable button while running
        try:
            output = None
            hist = None
            if use_pattern:
                output, _ = await pattern.assign_task(task=task, success_criteria=success_criteria, max_loops=max_loops)
                hist_dict = pattern.get_history()
                hist = json.dumps(hist_dict, indent=2)
            else:
                output = await assign_task(task, success_criteria, max_loops)
                hist = "{}"
            return (output, hist)
        finally:
            run_btn.interactive = True  # re-enable button

    run_btn.click(on_run, inputs=[task, success_criteria, max_loops], outputs=[out, history])
    task.submit(on_run, inputs=[task, success_criteria, max_loops], outputs=[out, history])

demo.launch(inline=True)

In [None]:
task = """ 
Write a three-sentence executive summary of the financial performance of Tesla's Q1 2024 earnings. Your summary must focus on revenue, and it must explicitly state whether the Q1 revenue was higher or lower than the Q4 2023 revenue.
"""

criteria = """ 
The output must be a summary of Tesla's Q1 2024 earnings.
The summary must be exactly three sentences long.
The summary must explicitly mention the exact Q1 2024 revenue figure in USD.
The summary must contain a clear statement comparing the Q1 2024 revenue to the Q4 2023 revenue (i.e., whether it was higher or lower).
The summary must not contain the word "profit" or "loss".
All factual data, including the revenue numbers and the comparison, must be accurate and verifiable from official sources.
"""

await assign_task(task, criteria)