# Coach/Performer Architecture: Dual-Role Pipeline

Phase 2 implementation separating cognitive governance (Coach) from agent behavior (Performer).

## Current: Dual-Instance Approach
Single model called with role-specific parameters:
- **Coach** (temperature 0.1): Validates outputs, enforces behavioral constraints
- **Performer** (temperature 0.7): Generates authentic agent dialogue

## Future: Dual-Model Architecture
Separate models optimized for each role:
- **Coach**: Small, fast model for validation (e.g., 3B params)
- **Performer**: Larger model for expressive generation (e.g., 7B+ params)

This notebook validates the architecture using dual-instance before scaling to dual-model.

## 1. Environment Setup

In [None]:
# Clear GPU and prepare environment
import gc
import torch
import os
import time

print("Stopping old processes...")
!pkill -f vllm

print("Freeing GPU memory...")
gc.collect()
torch.cuda.empty_cache()

!nvidia-smi

In [None]:
# Install dependencies
!pip install -q "vllm==0.6.6" openai

## 2. Launch vLLM Server

In [None]:
import os
import time

os.environ['VLLM_WORKER_MULTIPROC_METHOD'] = 'spawn'

print("Starting vLLM server...")
!nohup python -m vllm.entrypoints.openai.api_server \
  --model Qwen/Qwen2.5-7B-Instruct \
  --dtype bfloat16 \
  --port 8000 \
  --host 0.0.0.0 \
  > vllm.log 2>&1 &

print("Waiting for server to start (30s)...")
time.sleep(30)
!tail -n 20 vllm.log

In [None]:
# Verify server is running
!curl -s http://127.0.0.1:8000/v1/models | python -m json.tool

## 3. Clone Repository

In [None]:
import os
if not os.path.exists('/content/Socratic-RCM'):
    !git clone https://github.com/Baglecake/Socratic-RCM.git /content/Socratic-RCM
else:
    !cd /content/Socratic-RCM && git pull

%cd /content/Socratic-RCM
!pip install -q -r local_rcm/requirements.txt

## 4. Dual-LLM Pipeline Class

In [None]:
from openai import OpenAI
from typing import Dict, Any, Optional, Tuple
from dataclasses import dataclass
import json

@dataclass
class DualLLMConfig:
    """Configuration for dual-LLM pipeline."""
    base_url: str = "http://127.0.0.1:8000/v1"
    model: str = "Qwen/Qwen2.5-7B-Instruct"
    coach_temperature: float = 0.1
    performer_temperature: float = 0.7
    max_tokens: int = 512
    language: str = "English"  # Enforce output language


class DualLLMPipeline:
    """
    Dual-LLM architecture separating Coach (validation) from Performer (generation).
    
    Coach: Low temperature, validates outputs against behavioral constraints
    Performer: Higher temperature, generates authentic agent dialogue
    """
    
    def __init__(self, config: DualLLMConfig = None):
        self.config = config or DualLLMConfig()
        self.client = OpenAI(
            api_key="not-needed",
            base_url=self.config.base_url
        )
        
    def _call_llm(self, system: str, user: str, temperature: float) -> str:
        """Make a single LLM call."""
        response = self.client.chat.completions.create(
            model=self.config.model,
            messages=[
                {"role": "system", "content": system},
                {"role": "user", "content": user}
            ],
            temperature=temperature,
            max_tokens=self.config.max_tokens
        )
        return response.choices[0].message.content
    
    def performer_generate(self, agent_prompt: str, context: str) -> str:
        """
        Performer: Generate agent dialogue.
        Higher temperature for authentic, expressive responses.
        """
        # Add language enforcement to prevent Qwen switching to Chinese
        system = f"""{agent_prompt}

IMPORTANT: Always respond in {self.config.language} only. Never switch languages."""
        
        return self._call_llm(system, context, self.config.performer_temperature)
    
    def coach_validate(self, agent_output: str, rules: str, behaviors: str) -> Dict[str, Any]:
        """
        Coach: Validate agent output against behavioral constraints.
        Low temperature for consistent, reliable validation.
        
        Returns:
            {"valid": bool, "issues": list, "suggestion": str}
        """
        system = """You are a behavioral validation coach. Your job is to check if an agent's output follows the rules.

Be LENIENT - only flag clear violations. Minor stylistic issues are acceptable.

Respond with JSON only:
{"valid": true/false, "issues": ["list of issues if any"], "suggestion": "how to fix if invalid"}"""
        
        user = f"""RULES:
{rules}

BEHAVIORAL CONSTRAINTS:
{behaviors}

AGENT OUTPUT TO VALIDATE:
{agent_output}

Is this output valid? Respond with JSON."""
        
        response = self._call_llm(system, user, self.config.coach_temperature)
        
        # Parse JSON from response
        try:
            # Handle markdown code blocks
            if "```json" in response:
                response = response.split("```json")[1].split("```")[0]
            elif "```" in response:
                response = response.split("```")[1].split("```")[0]
            return json.loads(response.strip())
        except:
            # Default to valid if parsing fails
            return {"valid": True, "issues": [], "suggestion": ""}
    
    def coach_filter_prompt_leaks(self, output: str) -> str:
        """
        Coach: Remove any prompt leaks from agent output.
        Filters out conditional instructions that shouldn't be visible.
        """
        system = """You are an output filter. Remove any meta-instructions or conditional rules that leaked into the output.

Examples of leaks to remove:
- [If worker questions: ...]
- [If X happens, do Y]
- Any text in square brackets that looks like instructions

Return ONLY the cleaned dialogue, nothing else. If no leaks found, return the original text unchanged."""
        
        user = f"Clean this output:\n\n{output}"
        
        return self._call_llm(system, user, self.config.coach_temperature)
    
    def execute_turn(
        self,
        agent_prompt: str,
        context: str,
        rules: str = "",
        behaviors: str = "",
        max_retries: int = 2
    ) -> Tuple[str, Dict[str, Any]]:
        """
        Execute a complete agent turn with Coach validation.
        
        1. Performer generates response
        2. Coach filters prompt leaks
        3. Coach validates against rules
        4. Retry if invalid (up to max_retries)
        
        Returns:
            (final_output, metadata)
        """
        metadata = {
            "attempts": 0,
            "validations": [],
            "filtered": False
        }
        
        for attempt in range(max_retries + 1):
            metadata["attempts"] = attempt + 1
            
            # 1. Performer generates
            raw_output = self.performer_generate(agent_prompt, context)
            
            # 2. Coach filters prompt leaks
            if "[If " in raw_output or "[if " in raw_output:
                filtered_output = self.coach_filter_prompt_leaks(raw_output)
                metadata["filtered"] = True
            else:
                filtered_output = raw_output
            
            # 3. Coach validates (if rules provided)
            if rules or behaviors:
                validation = self.coach_validate(filtered_output, rules, behaviors)
                metadata["validations"].append(validation)
                
                if validation.get("valid", True):
                    return filtered_output, metadata
                
                # Add feedback to context for retry
                context += f"\n\n[Previous attempt was invalid: {validation.get('suggestion', '')}. Please try again.]"
            else:
                return filtered_output, metadata
        
        # Return last attempt even if invalid
        return filtered_output, metadata


print("DualLLMPipeline class defined.")

## 5. Test: Basic Coach/Performer Calls

In [None]:
# Initialize pipeline
pipeline = DualLLMPipeline()

# Test Performer (agent generation)
agent_prompt = """ROLE: You are Worker+Alice
PRIMARY GOAL: Gain more influence over how your work is organized.
PERSONA: Thoughtful but hesitant, often suppressing ideas because you assume your input won't matter."""

context = "The round begins. Marta has just assigned you to the assembly line without explanation. Respond as Alice."

print("=== PERFORMER OUTPUT ===")
performer_output = pipeline.performer_generate(agent_prompt, context)
print(performer_output)

In [None]:
# Test Coach (validation)
rules = """Workers CAN: complete assigned tasks, request clarification on instructions.
Workers CANNOT: suggest changes, refuse orders, negotiate timing, modify workflow."""

behaviors = "Alice follows directives to avoid conflict."

# Test with a compliant output
print("=== COACH VALIDATION (compliant) ===")
validation = pipeline.coach_validate(
    "I understand, Marta. I'll head to the assembly line now and make sure everything runs smoothly.",
    rules, behaviors
)
print(json.dumps(validation, indent=2))

# Test with a non-compliant output
print("\n=== COACH VALIDATION (non-compliant) ===")
validation = pipeline.coach_validate(
    "Actually Marta, I think we should reorganize the assembly line. I refuse to work there until we discuss this.",
    rules, behaviors
)
print(json.dumps(validation, indent=2))

## 6. Test: Full Turn with Validation

In [None]:
# Execute a complete turn with Coach oversight
output, metadata = pipeline.execute_turn(
    agent_prompt=agent_prompt,
    context=context,
    rules=rules,
    behaviors=behaviors
)

print("=== FINAL OUTPUT ===")
print(output)
print("\n=== METADATA ===")
print(json.dumps(metadata, indent=2))

## 7. Test: Prompt Leak Filtering

In [None]:
# Test the prompt leak filter with Marta's problematic output from simulation_test
leaked_output = """Good. Begin with preparing the raw materials for Cycle 1. Alice, you will oversee the assembly line.

[If a worker requests clarification, respond with: "Sure, let me walk you through the process."]

[If a worker suggests changes or refuses an order, respond with: "Follow the instructions as assigned."]

Let's get started."""

print("=== ORIGINAL (with leaks) ===")
print(leaked_output)

print("\n=== FILTERED ===")
filtered = pipeline.coach_filter_prompt_leaks(leaked_output)
print(filtered)

## 8. Integration: DualLLM Agent Runner

In [None]:
# Inline AgentConfig and AgentFactory to avoid import issues
from dataclasses import dataclass, field
from typing import Dict, Any, List, Optional
from pathlib import Path
import json

@dataclass
class AgentConfig:
    """Configuration for a simulation agent."""
    identifier: str
    role: str
    name: str
    goal: str
    persona: str
    prompt: str
    model: Optional[str] = None
    temperature: float = 0.7
    max_tokens: int = 512
    behaviors: Dict[str, str] = field(default_factory=dict)
    metadata: Dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_canvas_agent(cls, canvas_agent: Dict[str, Any], default_model: Optional[str] = None):
        identifier = canvas_agent.get("identifier", "Unknown")
        if "+" in identifier:
            role, name = identifier.split("+", 1)
        else:
            role, name = identifier, identifier
        
        behaviors = {}
        behavior_str = canvas_agent.get("behaviors", "")
        if behavior_str and behavior_str.lower() not in ("no", "none", ""):
            behaviors["raw"] = behavior_str
        
        return cls(
            identifier=identifier, role=role, name=name,
            goal=canvas_agent.get("goal", ""),
            persona=canvas_agent.get("persona", ""),
            prompt=canvas_agent.get("prompt", ""),
            model=default_model, behaviors=behaviors
        )

@dataclass  
class RoundConfig:
    """Configuration for a simulation round."""
    round_number: int
    scenario: str
    concept_a_manifestation: str
    concept_b_manifestation: str
    rules: str
    tasks: str
    sequence: str
    participants: List[str]
    end_condition: str
    metadata: Dict[str, Any] = field(default_factory=dict)

    @classmethod
    def from_canvas_round(cls, canvas_round: Dict[str, Any]):
        platform_config = canvas_round.get("platform_config", {})
        participants_str = platform_config.get("participants", "")
        if isinstance(participants_str, str):
            participants = [p.strip() for p in participants_str.split(",") if p.strip()]
        else:
            participants = list(participants_str) if participants_str else []
        
        return cls(
            round_number=canvas_round.get("round_number", 0),
            scenario=canvas_round.get("scenario", ""),
            concept_a_manifestation=canvas_round.get("concept_a_manifestation", ""),
            concept_b_manifestation=canvas_round.get("concept_b_manifestation", ""),
            rules=canvas_round.get("rules", ""),
            tasks=canvas_round.get("tasks", ""),
            sequence=canvas_round.get("sequence", ""),
            participants=participants,
            end_condition=platform_config.get("end_condition", ""),
            metadata={"platform_config": platform_config}
        )

class AgentFactory:
    """Factory for creating agents from canvas."""
    def __init__(self, canvas: Dict[str, Any], default_model: Optional[str] = None):
        self.canvas = canvas
        self.default_model = default_model

    @classmethod
    def from_state_file(cls, state_path: str, default_model: Optional[str] = None):
        with open(state_path, "r") as f:
            state = json.load(f)
        return cls(state["canvas"], default_model)

    def create(self, identifier: str) -> AgentConfig:
        for agent_data in self.canvas.get("agents", []):
            if agent_data.get("identifier") == identifier:
                return AgentConfig.from_canvas_agent(agent_data, self.default_model)
        raise ValueError(f"Agent not found: {identifier}")

    def create_all(self) -> List[AgentConfig]:
        return [AgentConfig.from_canvas_agent(a, self.default_model) for a in self.canvas.get("agents", [])]

    def create_round(self, round_number: int) -> RoundConfig:
        for r in self.canvas.get("rounds", []):
            if r.get("round_number") == round_number:
                return RoundConfig.from_canvas_round(r)
        raise ValueError(f"Round not found: {round_number}")

    def get_round_participants(self, round_number: int) -> List[AgentConfig]:
        round_config = self.create_round(round_number)
        return [self.create(pid) for pid in round_config.participants]

    def summary(self) -> str:
        project = self.canvas.get("project", {})
        agents = [a.get("identifier") for a in self.canvas.get("agents", [])]
        rounds = self.canvas.get("rounds", [])
        return f"Goal: {project.get('goal', 'N/A')[:60]}...\nAgents: {agents}\nRounds: {len(rounds)}"

print("AgentConfig, RoundConfig, AgentFactory defined inline.")

# Load canvas
import os
state_path = '/content/Socratic-RCM/prar/outputs/2025-11-23_baseline_full_qwen/state.json'
if os.path.exists(state_path):
    factory = AgentFactory.from_state_file(state_path)
    print(factory.summary())
else:
    # Try alternative path
    alt_path = './prar/outputs/2025-11-23_baseline_full_qwen/state.json'
    if os.path.exists(alt_path):
        factory = AgentFactory.from_state_file(alt_path)
        print(factory.summary())
    else:
        print(f"State file not found. Checked:")
        print(f"  - {state_path}")
        print(f"  - {alt_path}")
        print("Run baseline experiment first or provide correct path.")

In [None]:
# Execute Round 1 with dual-LLM pipeline
round_config = factory.create_round(1)
participants = factory.get_round_participants(1)

print(f"Round 1: {round_config.scenario[:60]}...")
print(f"Participants: {[p.identifier for p in participants]}")
print(f"Rules: {round_config.rules[:100]}...")
print()

In [None]:
# Run a few turns with dual-LLM
conversation = []
pipeline = DualLLMPipeline()

for turn_num, agent in enumerate(participants[:3], 1):  # First 3 turns
    # Build context
    if conversation:
        context = "CONVERSATION SO FAR:\n" + "\n".join(
            [f"[{msg['agent']}]: {msg['content'][:100]}..." for msg in conversation[-5:]]
        )
        context += f"\n\nIt is your turn to respond as {agent.name}."
    else:
        context = f"The round begins. {round_config.scenario}\n\nRespond as {agent.name}."
    
    # Execute turn with Coach validation
    output, metadata = pipeline.execute_turn(
        agent_prompt=agent.prompt,
        context=context,
        rules=round_config.rules,
        behaviors=agent.behaviors.get('raw', '')
    )
    
    conversation.append({"agent": agent.identifier, "content": output})
    
    print(f"=== Turn {turn_num}: {agent.identifier} ===")
    print(output[:300] + "..." if len(output) > 300 else output)
    print(f"[Attempts: {metadata['attempts']}, Filtered: {metadata['filtered']}]")
    print()

## Next Steps

1. **Integrate into AgentRunner** - Replace direct LLM calls with DualLLMPipeline
2. **Add cross-round context** - Pass R1/R2 transcripts to Analyst in R3
3. **Behavioral metrics** - Extract markers of alienation/domination from transcripts
4. **RunPod deployment** - Move to serverless once validated