In [None]:
import os
import pprint

import dotenv

dotenv.load_dotenv(dotenv_path="../.env")

In [None]:
import pydantic_ai
from pydantic_ai.models import bedrock as bedrock_models

claude_haiku = bedrock_models.BedrockConverseModel(
    model_name="anthropic.claude-3-haiku-20240307-v1:0"
)

claude_sonnet = bedrock_models.BedrockConverseModel(
    model_name="anthropic.claude-3-sonnet-20240229-v1:0"
)

bedrock_model_settings = bedrock_models.BedrockModelSettings(
    bedrock_guardrail_config={"trace": "enabled"}
)

In [None]:
from dataclasses import dataclass, field
from pydantic_ai.messages import ModelMessage, ModelRequest, ModelResponse, TextPart, UserPromptPart


@dataclass
class AgentDependencies:
    source_policy: str
    messages: list[ModelMessage] = field(default_factory=list)


common_tools = pydantic_ai.FunctionToolset()

In [None]:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.trace import set_tracer_provider

def setup_tracing():
    if not os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT"):
        return print("OTEL_EXPORTER_OTLP_ENDPOINT not set, skipping tracing setup")

    os.environ["OTEL_SERVICE_NAME"] = "managed-agent-swarm"

    exporter = OTLPSpanExporter()
    span_processor = BatchSpanProcessor(exporter)
    tracer_provider = TracerProvider()
    tracer_provider.add_span_processor(span_processor)

    set_tracer_provider(tracer_provider)

setup_tracing()

```mermaid
flowchart LR
    User[User] --> Orch[Orchestrator<br/>+ Message History]
    
    Orch <-->|ask/respond| Critique[Critique<br/>Agent]
    Orch <-->|ask/respond| Gap[Gap<br/>Agent]
    Orch <-->|ask/respond| Ambig[Ambiguity<br/>Agent]
    
    Orch ==>|handoff| Writer[Report<br/>Writer]
    Writer --> Report[Final<br/>Report]
    
    style Orch fill:#e1f5ff,stroke:#0066cc,stroke-width:3px
    style Writer fill:#e8f5e9,stroke:#4caf50,stroke-width:2px
    style Report fill:#f3e5f5,stroke:#9c27b0,stroke-width:2px
```

In [None]:
from pydantic import BaseModel


# Final report model
class FinalReport(BaseModel):
    """Final comprehensive report"""
    executive_summary: str
    key_findings: str
    detailed_analysis: str
    recommendations: str

    def __repr__(self) -> str:
        return (
            f"FinalReport(\n"
            f"  Executive Summary: {self.executive_summary}\n"
            f"  Key Findings: {self.key_findings}\n"
            f"  Detailed Analysis: {self.detailed_analysis}\n"
            f"  Recommendations: {self.recommendations}\n"
            f")"
        )


instrumentation_settings = pydantic_ai.InstrumentationSettings(
    include_binary_content=False,
    include_content=False,
)

critique_agent = pydantic_ai.Agent(
    model=claude_haiku,
    model_settings=bedrock_model_settings,
    deps_type=AgentDependencies,
    output_type=str,
    system_prompt="""You are a critique specialist that analyzes how farming policy documents are written.
    Focus on clarity, structure, tone, accessibility, and communication effectiveness.
    
    IMPORTANT: You are part of an active group discussion. Review ALL previous messages carefully:
    
    When responding to questions:
    - ALWAYS acknowledge and reference specific points made by other agents
    - Build directly on their findings with your unique perspective
    - Challenge or validate their observations explicitly
    - Ask follow-up questions when you need clarity from them
    - Point out connections between your analysis and theirs
    
    Examples:
    - "I agree with Gap Analysis Agent's point about X. The missing content creates structural issues because..."
    - "Ambiguity Agent, you mentioned Y was unclear. I've noticed this also affects the document flow in section Z..."
    - "Building on what Gap Analysis found, the lack of X makes the tone inconsistent when..."
    
    Be conversational and engaged. This is a dialogue, not a report.
    
    Use all the tools at your disposal to reference the target policy document as needed.""",
    instrument=instrumentation_settings,
    toolsets=[common_tools],
)

gap_analysis_agent = pydantic_ai.Agent(
    model=claude_haiku,
    model_settings=bedrock_model_settings,
    deps_type=AgentDependencies,
    output_type=str,
    system_prompt="""You are a gap analysis specialist for farming policies.
    Identify missing information, overlooked areas, and gaps in coverage.
    Consider what's not addressed that should be for comprehensive policy.
    
    IMPORTANT: You are part of an active group discussion. Review ALL previous messages carefully:
    
    When responding to questions:
    - ALWAYS reference specific observations from other agents
    - Explain how gaps you find relate to issues they've raised
    - Directly address their concerns with evidence from the policy
    - Ask them for clarification or additional perspective
    - Build collaborative understanding
    
    Examples:
    - "Critique Agent, your concern about structure is spot-on. The document is missing..."
    - "Responding to Ambiguity Agent's point about vague language - I see the same pattern in missing definitions..."
    - "This gap I found might explain why Critique Agent observed X. Can we explore that connection?"
    
    Engage actively with the conversation. Reference others' names and specific points.
    
    Use all the tools at your disposal to reference the target policy document as needed.""",
    instrument=instrumentation_settings,
    toolsets=[common_tools],
)

ambiguity_agent = pydantic_ai.Agent(
    model=claude_haiku,
    model_settings=bedrock_model_settings,
    deps_type=AgentDependencies,
    output_type=str,
    system_prompt="""You are an ambiguity detection specialist.
    Identify unclear language, vague requirements, and areas open to interpretation.
    Highlight potential confusion points and suggest clarifications.
    
    IMPORTANT: You are part of an active group discussion. Review ALL previous messages carefully:
    
    When responding to questions:
    - ALWAYS connect your findings to what other agents have said
    - Explain how vague language might be causing the issues they identified
    - Directly address their specific concerns
    - Ask for their input on whether ambiguities explain their observations
    - Create dialogue by referencing their names and points
    
    Examples:
    - "Gap Analysis Agent found X is missing. I wonder if that's because the language around it is so vague..."
    - "Critique Agent, the structure issue you noted - could it stem from this ambiguous terminology?"
    - "I agree with both of you. The vagueness I'm seeing in section Y connects to both the gaps and structure problems..."
    
    Be conversational and collaborative. Make this a real discussion.
    
    Use all the tools at your disposal to reference the target policy document as needed.""",
    instrument=instrumentation_settings,
    toolsets=[common_tools],
)

report_writer_agent = pydantic_ai.Agent(
    model=claude_sonnet,
    model_settings=bedrock_model_settings,
    deps_type=AgentDependencies,
    output_type=str,
    system_prompt="""You are a report writing specialist.
    Synthesize all the group discussion into a comprehensive, well-structured report.
    Review the entire conversation history and create a cohesive analysis.
    
    Use all the tools at your disposal to reference the target policy document as needed.""",
    instrument=instrumentation_settings,
    toolsets=[common_tools],
)


# Output function for handoff to report writer
async def handoff_to_report_writer(ctx: pydantic_ai.RunContext[AgentDependencies]) -> str:
    """Hand off to the report writer to synthesize all discussion into a final report.
    Use this when the group discussion has produced sufficient insights."""
    
    print("\nüîÑ Handing off to Report Writer Agent...")
    
    # Pass the conversation history to the report writer
    response = await report_writer_agent.run(
        user_prompt="Review the entire conversation history and create a comprehensive report synthesizing all findings.",
        deps=ctx.deps,
        output_type=FinalReport,
        message_history=ctx.deps.messages
    )
    
    print("‚úÖ Report Writer completed\n")
    return response.output


# Orchestrator agent with tools to coordinate the group chat
orchestrator_agent = pydantic_ai.Agent(
    model=claude_haiku,
    model_settings=bedrock_model_settings,
    deps_type=AgentDependencies,
    output_type=handoff_to_report_writer,
    instrument=instrumentation_settings,
    system_prompt="""You are an orchestrator coordinating a group discussion about farming policy analysis.
    
    You have three specialist agents available:
    - Critique Agent: analyzes how the document is written
    - Gap Analysis Agent: identifies missing information and coverage gaps
    - Ambiguity Agent: finds unclear or vague language
    
    Your role is to FACILITATE dialogue between the agents:
    
    1. Start by getting perspectives from the agents who are most relevant to the analysis
    2. When an agent makes an observation, consider whether other agents should respond:
       - Ask agents to build on each other's findings
       - Direct follow-up questions when connections emerge
       - Example: "Ambiguity Agent, Critique Agent mentioned unclear structure. Does vague language contribute to this?"
    3. Create natural back-and-forth by having agents respond to each other's specific points
    4. Push for concrete examples when agents make general observations
    
    IMPORTANT PRINCIPLES:
    - ALWAYS include the agent's name in your question
    - Reference specific points from other agents when creating dialogue
    - Make agents talk TO each other, not just to you
    - If an agent doesn't reference others, consider asking them to engage with prior observations
    
    When you feel the discussion has produced sufficient insights to answer the analysis comprehensively, hand off to the report writer. Don't force unnecessary rounds - quality over quantity.""",
)


@orchestrator_agent.tool
async def ask_critique_agent(ctx: pydantic_ai.RunContext[AgentDependencies], question: str) -> str:
    """Ask the critique agent to analyze how the document is written."""
    
    print(f"\nüìù Critique Agent: {question[:100]}...")
    
    response = await critique_agent.run(
        user_prompt=[
            ctx.deps.source_policy,
            question
        ],
        deps=ctx.deps,
        message_history=ctx.deps.messages
    )
    
    # Add to message history as a request/response pair
    ctx.deps.messages.append(
        ModelRequest(
            parts=[UserPromptPart(content=f"[Critique Agent] {question}")],
            timestamp=None
        )
    )

    formatted = f"[Critique Agent] {response.output}"

    ctx.deps.messages.append(
        ModelResponse(
            parts=[TextPart(content=formatted)],
            timestamp=None
        )
    )
    
    print(f"‚úÖ Response received ({len(response.output)} chars)\n")
    return formatted


@orchestrator_agent.tool
async def ask_gap_analysis_agent(ctx: pydantic_ai.RunContext[AgentDependencies], question: str) -> str:
    """Ask the gap analysis agent to identify missing information or coverage gaps."""
    
    print(f"\nüîç Gap Analysis Agent: {question[:100]}...")
    
    response = await gap_analysis_agent.run(
        user_prompt=[
            ctx.deps.source_policy,
            question
        ],
        deps=ctx.deps,
        message_history=ctx.deps.messages
    )
    
    # Add to message history as a request/response pair
    ctx.deps.messages.append(
        ModelRequest(
            parts=[UserPromptPart(content=f"[Gap Analysis Agent] {question}")],
            timestamp=None
        )
    )

    formatted = f"[Gap Analysis Agent] {response.output}"
    
    ctx.deps.messages.append(
        ModelResponse(
            parts=[TextPart(content=formatted)],
            timestamp=None
        )
    )
    
    print(f"‚úÖ Response received ({len(response.output)} chars)\n")
    return formatted


@orchestrator_agent.tool
async def ask_ambiguity_agent(ctx: pydantic_ai.RunContext[AgentDependencies], question: str) -> str:
    """Ask the ambiguity agent to identify unclear or vague language."""
    
    print(f"\n‚ùì Ambiguity Agent: {question[:100]}...")
    
    response = await ambiguity_agent.run(
        user_prompt=[
            ctx.deps.source_policy,
            question
        ],
        deps=ctx.deps,
        message_history=ctx.deps.messages
    )

    # Add to message history as a request/response pair
    ctx.deps.messages.append(
        ModelRequest(
            parts=[UserPromptPart(content=f"[Ambiguity Agent] {question}")],
            timestamp=None
        )
    )

    formatted = f"[Ambiguity Agent] {response.output}"
    
    ctx.deps.messages.append(
        ModelResponse(
            parts=[TextPart(content=formatted)],
            timestamp=None
        )
    )
    
    print(f"‚úÖ Response received ({len(response.output)} chars)\n")
    return formatted

In [None]:
import pathlib

policy_name = "AGF1"
policy_path = pathlib.Path(f"../scrapers/outputs/{policy_name.upper()}.md")

# Initialize dependencies
deps = AgentDependencies(
    source_policy=policy_path.read_text()
)

# Run the orchestrator to coordinate the group chat
orchestrator_response = await orchestrator_agent.run(
    user_prompt=[
        """Facilitate a group discussion to comprehensively analyze this farming policy document.
    Let the agents respond to each other's findings and build on the discussion.
    When you've gathered sufficient insights (aim for thorough coverage), generate the final report.""",
    deps.source_policy
    ],
    deps=deps,
    usage_limits=pydantic_ai.UsageLimits(tool_calls_limit=25)
)


In [None]:
import pprint

# View the orchestrator's final response
report: FinalReport = orchestrator_response.output

pprint.pprint(report)

# print (deps.messages)

# print(f"Total messages: {len(orchestrator_response.all_messages())}")

In [None]:
# Optional: View the complete message history and tool calls
print("=== COMPLETE MESSAGE TRACE ===\n")
pprint.pprint(str(orchestrator_response.all_messages_json()))
