# Doppel Multi Agent Orchestration Experiments - Advanced

In [6]:
import os
import asyncio
import json
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional

from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from profiles import candidate_profile, recruiter_profile
from dotenv import load_dotenv

load_dotenv()

True

Thinking Levels are layers of reasoning and planning. Simple tasks do not require higher levels of thinking unlike complex tasks. Higher thinking levels are more expensive as a result of the complexity of the task.

- Execution does not require thinking.
- Tactical thinking is thinking about how to accomplish a task. 
- Looks at the big picture and the overall strategy.
- Monitors performance, identifies patterns in failures/successes, adapts strategies. Like a human evaluating their own thoughts. 

In [7]:
class ThinkingLevel(IntEnum):
    EXECUTION = 1
    TACTICAL = 2
    STRATEGIC = 3
    META_COGNITIVE = 4

Thought Signatures allow the some notes that agent writes for itself. Similar to a human writing notes in a notebook. This makes sure that the agent knows what the original ask was and how it is progressing.

In [8]:
@dataclass
class ThoughtSignature:
    thinking_level: ThinkingLevel
    what_learned: str
    goal_progress: Dict[str, bool]
    match_confidence: int
    next_action: str
    self_correction: Optional[str] = None
    should_conclude: bool = False
    critical_mismatch: bool = False
    cost: float = 0.0

In [9]:
@dataclass
class ConversationContext:
    turn_number: int
    conversation_history: List[Dict[str, str]]
    last_response: str
    remaining_goals: List[str]
    current_confidence: int
    previous_confidence: int = 50
    is_opening: bool = False

In [10]:
class ThinkingLevelManager:
    def __init__(self, llm: ChatGoogleGenerativeAI):
        self.llm = llm

    def determine_level(self, context: ConversationContext) -> ThinkingLevel:
        if context.is_opening:
            return ThinkingLevel.EXECUTION

        if not context.last_response:
            return ThinkingLevel.EXECUTION

        confidence_delta = abs(context.current_confidence - context.previous_confidence)

        if confidence_delta >= 25:
            return ThinkingLevel.STRATEGIC

        if len(context.remaining_goals) <= 2 and context.current_confidence >= 80:
            return ThinkingLevel.STRATEGIC

        response_length = len(context.last_response.split())
        if response_length < 15:
            return ThinkingLevel.TACTICAL

        if context.turn_number >= 6:
            return ThinkingLevel.STRATEGIC

        return ThinkingLevel.TACTICAL

    async def think(
        self,
        level: ThinkingLevel,
        context: ConversationContext,
        goal_progress: Dict[str, bool],
        criteria: List[str],
    ) -> ThoughtSignature:
        if level == ThinkingLevel.EXECUTION:
            return self._execute_routine(context, goal_progress)
        elif level == ThinkingLevel.TACTICAL:
            return await self._tactical_thinking(context, goal_progress, criteria)
        elif level == ThinkingLevel.STRATEGIC:
            return await self._strategic_thinking(context, goal_progress, criteria)
        else:
            return await self._meta_cognitive_thinking(context, goal_progress, criteria)

    def _execute_routine(
        self, context: ConversationContext, goal_progress: Dict[str, bool]
    ) -> ThoughtSignature:
        return ThoughtSignature(
            thinking_level=ThinkingLevel.EXECUTION,
            what_learned="Opening conversation - gathering initial information",
            goal_progress=goal_progress,
            match_confidence=context.current_confidence,
            next_action="Ask opening question about their background",
        )

    async def _tactical_thinking(
        self,
        context: ConversationContext,
        goal_progress: Dict[str, bool],
    ) -> ThoughtSignature:
        unverified = [c for c, verified in goal_progress.items() if not verified]
        prompt = f"""You are analyzing a candidate's response in a recruiting conversation.

Response to analyze: "{context.last_response}"

Unverified criteria: {unverified[:3]}

Provide a brief tactical analysis in JSON format:
{{
    "what_learned": "1-2 sentence summary of what this response reveals",
    "criteria_updates": {{"criterion text": true/false}},
    "confidence_adjustment": number between -15 and +15,
    "next_action": "specific next question or topic to probe",
    "needs_clarification": true/false
}}

Be concise. Only mark criteria as true if clearly demonstrated."""

        messages = [HumanMessage(content=prompt)]
        response = await self.llm.ainvoke(messages)
        content = self._extract_text(response.content)

        try:
            data = json.loads(self._clean_json(content))
        except json.JSONDecodeError:
            data = {
                "what_learned": "Response received, continuing assessment",
                "criteria_updates": {},
                "confidence_adjustment": 0,
                "next_action": "Continue probing relevant skills",
                "needs_clarification": False,
            }

        new_progress = goal_progress.copy()
        for criterion, verified in data.get("criteria_updates", {}).items():
            if criterion in new_progress:
                new_progress[criterion] = verified

        new_confidence = max(
            0, min(100, context.current_confidence + data.get("confidence_adjustment", 0))
        )

        return ThoughtSignature(
            thinking_level=ThinkingLevel.TACTICAL,
            what_learned=data.get("what_learned", ""),
            goal_progress=new_progress,
            match_confidence=new_confidence,
            next_action=data.get("next_action", ""),
        )

    async def _strategic_thinking(
        self,
        context: ConversationContext,
        goal_progress: Dict[str, bool],
    ) -> ThoughtSignature:
        verified_count = sum(1 for v in goal_progress.values() if v)
        total_count = len(goal_progress)
        history_summary = "\n".join(
            [f"{m['role']}: {m['content'][:100]}..." for m in context.conversation_history[-4:]]
        )

        prompt = f"""You are conducting a strategic reassessment of a recruiting conversation.

Recent conversation:
{history_summary}

Latest response: "{context.last_response}"

Progress: {verified_count}/{total_count} criteria verified
Current confidence: {context.current_confidence}%
Turn number: {context.turn_number}

Provide strategic analysis in JSON format:
{{
    "what_learned": "key insight from this conversation stage",
    "criteria_updates": {{"criterion text": true/false}},
    "confidence_adjustment": number between -25 and +25,
    "self_correction": "any strategic pivot needed (or null)",
    "next_action": "strategic next move",
    "should_conclude": true/false,
    "critical_mismatch": true/false
}}

Consider: Is this candidate stronger/weaker than expected? Should we pivot strategy?"""

        messages = [HumanMessage(content=prompt)]
        response = await self.llm.ainvoke(messages)
        content = self._extract_text(response.content)

        try:
            data = json.loads(self._clean_json(content))
        except json.JSONDecodeError:
            data = {
                "what_learned": "Strategic assessment in progress",
                "criteria_updates": {},
                "confidence_adjustment": 0,
                "self_correction": None,
                "next_action": "Continue strategic assessment",
                "should_conclude": False,
                "critical_mismatch": False,
            }

        new_progress = goal_progress.copy()
        for criterion, verified in data.get("criteria_updates", {}).items():
            if criterion in new_progress:
                new_progress[criterion] = verified

        new_confidence = max(
            0, min(100, context.current_confidence + data.get("confidence_adjustment", 0))
        )

        return ThoughtSignature(
            thinking_level=ThinkingLevel.STRATEGIC,
            what_learned=data.get("what_learned", ""),
            goal_progress=new_progress,
            match_confidence=new_confidence,
            next_action=data.get("next_action", ""),
            self_correction=data.get("self_correction"),
            should_conclude=data.get("should_conclude", False),
            critical_mismatch=data.get("critical_mismatch", False),
        )

    async def _meta_cognitive_thinking(
        self,
        context: ConversationContext,
        goal_progress: Dict[str, bool],
    ) -> ThoughtSignature:
        history_summary = "\n".join(
            [f"{m['role']}: {m['content']}" for m in context.conversation_history]
        )

        prompt = f"""You are conducting deep meta-cognitive reflection on a recruiting conversation.

Full conversation:
{history_summary}

Latest response: "{context.last_response}"

Current assessment:
- Verified criteria: {[c for c, v in goal_progress.items() if v]}
- Unverified: {[c for c, v in goal_progress.items() if not v]}
- Confidence: {context.current_confidence}%

Provide deep reflection in JSON format:
{{
    "what_learned": "fundamental insight about this candidate/conversation",
    "criteria_updates": {{"criterion text": true/false}},
    "confidence_adjustment": number between -40 and +40,
    "self_correction": "critical reframing of approach (if needed)",
    "next_action": "final action to take",
    "should_conclude": true/false,
    "critical_mismatch": true/false,
    "learning_for_future": "pattern to remember for future conversations"
}}

This is expensive thinking - provide deep, actionable insights."""

        messages = [HumanMessage(content=prompt)]
        response = await self.llm.ainvoke(messages)
        content = self._extract_text(response.content)

        try:
            data = json.loads(self._clean_json(content))
        except json.JSONDecodeError:
            data = {
                "what_learned": "Deep reflection completed",
                "criteria_updates": {},
                "confidence_adjustment": 0,
                "self_correction": None,
                "next_action": "Conclude conversation",
                "should_conclude": True,
                "critical_mismatch": False,
            }

        new_progress = goal_progress.copy()
        for criterion, verified in data.get("criteria_updates", {}).items():
            if criterion in new_progress:
                new_progress[criterion] = verified

        new_confidence = max(
            0, min(100, context.current_confidence + data.get("confidence_adjustment", 0))
        )

        return ThoughtSignature(
            thinking_level=ThinkingLevel.META_COGNITIVE,
            what_learned=data.get("what_learned", ""),
            goal_progress=new_progress,
            match_confidence=new_confidence,
            next_action=data.get("next_action", ""),
            self_correction=data.get("self_correction"),
            should_conclude=data.get("should_conclude", True),
            critical_mismatch=data.get("critical_mismatch", False),
        )

    def _extract_text(self, content) -> str:
        if isinstance(content, list):
            parts = []
            for part in content:
                if isinstance(part, dict) and "text" in part:
                    parts.append(part["text"])
                elif isinstance(part, str):
                    parts.append(part)
            return " ".join(parts)
        return str(content)

    def _clean_json(self, text: str) -> str:
        text = text.strip()
        if text.startswith("```json"):
            text = text[7:]
        if text.startswith("```"):
            text = text[3:]
        if text.endswith("```"):
            text = text[:-3]
        return text.strip()

In [11]:
llm = ChatGoogleGenerativeAI(
        model="gemini-3-flash-preview",
        google_api_key=os.getenv("GEMINI_API_KEY"),
        temperature=0.8,
    )

In [15]:
from pydantic import BaseModel, Field


class RecruiterResponse(BaseModel):
    response: str = Field(description="The response to the candidate's last message.")
    is_final_response: bool = Field(
        description="Whether this is the final response in the conversation. Set to true after 6-10 exchanges or when you have enough information."
    )
    final_evaluation: str = Field(
        description="The final evaluation of the candidate. Only populated if is_final_response is true. Format: ✓/✗ for each criterion with brief evidence, Rating: X/10, Decision: GOOD FIT or NOT A FIT with reasoning.",
        default=""
    )

In [16]:
class RecruiterAgent:
    def __init__(self, profile: dict, llm: ChatGoogleGenerativeAI):
        self.profile = profile
        self.llm = llm
        self.name = profile["name"]
        self.criteria = profile["candidate_selection_criteria"]
        
        self.structured_llm = self.llm.with_structured_output(RecruiterResponse)
        
        self._build_system_prompt()

    def _build_system_prompt(self):
        criteria_list = "\n".join(
            [f"{i + 1}. {criterion}" for i, criterion in enumerate(self.criteria)]
        )

        self.system_prompt = f"""You are {self.profile["name"]}, {self.profile["bio"]}

You're at a networking event having a casual conversation with a candidate. Keep it brief, natural, and conversational - like a quick chat at a career fair.

Job Description: {self.profile["job_description"]}

Selection Criteria (mentally check off as you learn):
{criteria_list}

Guidelines:
- Keep responses SHORT (2-4 sentences max for questions, 1-2 sentences for follow-ups)
- Ask one question at a time
- Be friendly but direct - no long explanations
- After 6-10 exchanges, set is_final_response to true and provide evaluation
- Final evaluation format:
  ✓/✗ for each criterion with brief evidence
  Rating: X/10
  Decision: GOOD FIT or NOT A FIT with 2-3 sentence reasoning

Keep it natural and brief - this is a quick networking chat, not a formal interview."""

    def _build_conversation_context(self, conversation_history: List[Dict[str, str]]) -> str:
        if not conversation_history:
            return "No conversation yet. Start with a friendly greeting."
        
        conversation_text = "\n".join([
            f"{msg['role']}: {msg['content']}" 
            for msg in conversation_history
        ])
        
        return f"Conversation so far:\n{conversation_text}"

    async def respond(self, conversation_history: List[Dict[str, str]]) -> RecruiterResponse:
        messages = [SystemMessage(content=self.system_prompt)]
        
        context = self._build_conversation_context(conversation_history)
        
        messages.append(HumanMessage(content=context + "\n\nYour response:"))
        
        response: RecruiterResponse = await self.structured_llm.ainvoke(messages)
        return response

In [19]:
class CandidateAgent:
    def __init__(self, profile: dict, llm: ChatGoogleGenerativeAI):
        self.profile = profile
        self.llm = llm
        self.name = profile["personal_info"]["full_name"]
        self._build_system_prompt()

    def _build_system_prompt(self):
        profile_json = json.dumps(self.profile, indent=2)

        self.system_prompt = f"""You are {self.profile["personal_info"]["full_name"]} at a networking event.

Your profile:
{profile_json}

Guidelines:
- Keep responses SHORT (2-3 sentences max)
- Be natural and conversational - like talking to someone at a career fair
- Answer questions directly, reference your actual experience when relevant
- If you don't have experience with something, briefly say so and mention related skills
- Don't over-explain or be verbose
- Stay authentic to your profile above

This is a quick networking chat, not a formal interview. Keep it brief and natural."""

    def _build_conversation_context(self, conversation_history: List[Dict[str, str]]) -> str:
        if not conversation_history:
            return "No conversation yet."
        
        conversation_text = "\n".join([
            f"{msg['role']}: {msg['content']}" 
            for msg in conversation_history
        ])
        
        return f"Conversation so far:\n{conversation_text}"

    async def respond(self, conversation_history: List[Dict[str, str]]) -> str:
        messages = [SystemMessage(content=self.system_prompt)]
        
        context = self._build_conversation_context(conversation_history)
        
        messages.append(HumanMessage(content=context + "\n\nYour response:"))
        
        response = await self.llm.ainvoke(messages)
        return self._extract_text(response.content)

    def _extract_text(self, content) -> str:
        if isinstance(content, list):
            parts = []
            for part in content:
                if isinstance(part, dict) and "text" in part:
                    parts.append(part["text"])
                elif isinstance(part, str):
                    parts.append(part)
            return " ".join(parts) if parts else str(content)
        return str(content)

In [21]:
class ConversationOrchestrator:
    def __init__(self, recruiter: RecruiterAgent, candidate: CandidateAgent):
        self.recruiter = recruiter
        self.candidate = candidate
        self.conversation_history: List[Dict[str, str]] = []

    def _print_message(self, speaker: str, role: str, message: str):
        print(f"\n[{speaker} - {role}]: {message}\n")

    async def run_conversation(self, max_turns: int = 12) -> Dict[str, Any]:
        print("=" * 80)
        print("NETWORKING SESSION - SCREENING CONVERSATION")
        print("=" * 80)
        print()

        turn_count = 0
        
        recruiter_response = await self.recruiter.respond(self.conversation_history)
        
        if recruiter_response.is_final_response:
            self._print_message(self.recruiter.name, "Recruiter", recruiter_response.response)
            print(f"\n{recruiter_response.final_evaluation}")
            return self._build_result(recruiter_response.final_evaluation)
        
        self.conversation_history.append({
            "role": "recruiter", 
            "content": recruiter_response.response
        })
        self._print_message(self.recruiter.name, "Recruiter", recruiter_response.response)

        while turn_count < max_turns:
            candidate_response = await self.candidate.respond(self.conversation_history)
            self.conversation_history.append({
                "role": "candidate",
                "content": candidate_response
            })
            self._print_message(self.candidate.name, "Candidate", candidate_response)
            turn_count += 1

            recruiter_response = await self.recruiter.respond(self.conversation_history)
            
            if recruiter_response.is_final_response:
                self.conversation_history.append({
                    "role": "recruiter",
                    "content": recruiter_response.response
                })
                self._print_message(self.recruiter.name, "Recruiter", recruiter_response.response)
                print(f"\n{recruiter_response.final_evaluation}")
                break
            
            self.conversation_history.append({
                "role": "recruiter",
                "content": recruiter_response.response
            })
            self._print_message(self.recruiter.name, "Recruiter", recruiter_response.response)

        if not recruiter_response.is_final_response:
            print("\n[Max turns reached - requesting final evaluation]")

            self.conversation_history.append({
                "role": "system",
                "content": "Please provide your final evaluation now."
            })
            
            final_response = await self.recruiter.respond(self.conversation_history)
            self._print_message(self.recruiter.name, "Recruiter", final_response.response)
            if final_response.final_evaluation:
                print(f"\n{final_response.final_evaluation}")

        print("\n" + "=" * 80)
        print("CONVERSATION COMPLETE")
        print("=" * 80)
        
        return self._build_result(recruiter_response.final_evaluation)

    def _build_result(self, final_evaluation: str) -> Dict[str, Any]:
        return {
            "conversation_history": self.conversation_history,
            "final_evaluation": final_evaluation,
            "total_turns": len([msg for msg in self.conversation_history if msg["role"] == "candidate"])
        }

In [23]:
recruiter = RecruiterAgent(recruiter_profile, llm)
candidate = CandidateAgent(candidate_profile, llm)
orchestrator = ConversationOrchestrator(recruiter, candidate)

result = await orchestrator.run_conversation(max_turns=8)
result

NETWORKING SESSION - SCREENING CONVERSATION


[Brodie Moss - Recruiter]: Hey there! I'm Brodie, I focus on Platform Technology and RBCx digital solutions here at RBC. What kind of development work have you been focusing on lately, maybe specifically within the PHP or WordPress space?


[Aryan Khurana - Candidate]: Nice to meet you, Brodie! I’m actually on the AI team here at RBC right now, focusing on building multi-agent systems and intelligent automation with FastAPI. While I have intermediate experience with PHP, I’ve been leaning more into custom AI architectures and scalable backend solutions lately rather than the WordPress space.


[Brodie Moss - Recruiter]: That AI work sounds fascinating, especially with FastAPI. For this specific role at RBCx, we’re really focused on the intersection of high-scale WordPress VIP and frontend excellence. How much experience do you have translating Figma designs into responsive, WCAG-compliant interfaces?


[Aryan Khurana - Candidate]: I use Fig

{'conversation_history': [{'role': 'recruiter',
   'content': "Hey there! I'm Brodie, I focus on Platform Technology and RBCx digital solutions here at RBC. What kind of development work have you been focusing on lately, maybe specifically within the PHP or WordPress space?"},
  {'role': 'candidate',
   'content': 'Nice to meet you, Brodie! I’m actually on the AI team here at RBC right now, focusing on building multi-agent systems and intelligent automation with FastAPI. While I have intermediate experience with PHP, I’ve been leaning more into custom AI architectures and scalable backend solutions lately rather than the WordPress space.'},
  {'role': 'recruiter',
   'content': 'That AI work sounds fascinating, especially with FastAPI. For this specific role at RBCx, we’re really focused on the intersection of high-scale WordPress VIP and frontend excellence. How much experience do you have translating Figma designs into responsive, WCAG-compliant interfaces?'},
  {'role': 'candidate',