# Toy Autonomous Agent Platform - Iterative Planning Demo

This is a toy implementation of an Autonomous Agent Platform that demonstrates **sophisticated iterative planning**. The system showcases:

1. **Intent Detection** - Understanding complex user requests
2. **Multi-phase Planning** - Initial sketches, detailed plans, and dynamic replanning
3. **Adaptive Execution** - Handling failures, missing data, and changing requirements
4. **Multiple Planning Iterations** - True iterative replanning based on execution results

## Use Case: Customer Health Assessment Platform

This demo involves a **Customer Health Assessment** system that analyzes account health through multiple data sources and may require several planning iterations due to:
- **Data dependencies** between different systems
- **Conditional workflows** based on intermediate results  
- **Failure recovery** when services are unavailable
- **Dynamic requirement changes** discovered during execution

## LLM Integration Points in Advanced System

This implementation shows where an LLM would be called in a production system:

1. **Enhanced Intent Detection** - Multi-turn conversation understanding
2. **Dynamic Agent Selection** - Context-aware team assembly
3. **Sophisticated Planning** - Multi-iteration adaptive planning
4. **Intelligent Error Recovery** - Failure analysis and replanning strategies
5. **Conditional Logic** - Runtime decision making based on intermediate results
6. **Result Synthesis** - Comprehensive report generation and recommendations

In [5]:
import re

# Extract account from query helper
def extract_account_from_query(query: str) -> str:
    """Extract account name from user query"""
    # Simple pattern matching for common formats
    patterns = [
        r"for\s+(\w+)",
        r"account\s+(\w+)", 
        r"customer\s+(\w+)",
        r"(\w+(?:\s+\w+)*?)(?:\s+account|$)"
    ]
    
    for pattern in patterns:
        match = re.search(pattern, query, re.IGNORECASE)
        if match:
            account = match.group(1).strip()
            if account.lower() not in ['the', 'a', 'an', 'this', 'that', 'account', 'customer']:
                return account
    
    return "DefaultAccount"


In [6]:
import random
from typing import List, Dict, Any


In [7]:
# Mock Tools for Customer Health Assessment
import time

# Core data retrieval tools
def fetch_account_basics(account: str) -> Dict[str, Any]:
    """Basic account information - always available"""
    return {
        "account": account, 
        "tier": "Enterprise",
        "contract_value": "$500K",
        "start_date": "2023-01-15"
    }

def fetch_usage_metrics(account: str) -> Dict[str, Any]:
    """Usage analytics - may fail for some accounts"""
    # Simulate occasional failures
    if random.random() < 0.3:
        raise Exception(f"Usage Analytics API unavailable for {account}")
    
    return {
        "daily_active_users": 1250,
        "feature_adoption": {"feature_a": 0.85, "feature_b": 0.62, "feature_c": 0.34},
        "api_calls_last_30d": 45000
    }

def fetch_support_tickets(account: str) -> Dict[str, Any]:
    """Support ticket analysis"""
    tickets = [
        {"id": "T001", "severity": "high", "status": "open", "days_old": 5},
        {"id": "T002", "severity": "medium", "status": "closed", "days_old": 2},
        {"id": "T003", "severity": "low", "status": "open", "days_old": 12}
    ]
    return {"tickets": tickets, "open_count": 2, "high_severity_count": 1}

def fetch_financial_data(account: str) -> Dict[str, Any]:
    """Financial metrics - requires account basics first"""
    return {
        "payment_status": "current",
        "invoice_count_overdue": 0,
        "expansion_revenue": "$75K",
        "churn_risk_score": 0.15
    }

def check_compliance_status(account: str) -> Dict[str, Any]:
    """Compliance and security audit - slow operation"""
    time.sleep(1)  # Simulate slow operation
    return {
        "security_score": 0.92,
        "compliance_violations": [],
        "last_audit_date": "2024-08-15",
        "soc2_compliant": True
    }

def analyze_engagement_trends(account: str, usage_data: Dict) -> Dict[str, Any]:
    """Advanced analytics requiring usage data"""
    if not usage_data:
        raise Exception("Cannot analyze engagement without usage data")
    
    dau = usage_data.get("daily_active_users", 0)
    trend = "increasing" if dau > 1000 else "stable" if dau > 500 else "declining"
    
    return {
        "engagement_trend": trend,
        "health_score": 0.85 if trend == "increasing" else 0.65 if trend == "stable" else 0.35,
        "recommendation": f"Account shows {trend} engagement pattern"
    }

def generate_health_report(account: str, **kwargs) -> Dict[str, Any]:
    """Generate comprehensive health assessment"""
    # Calculate overall health score based on available data
    health_factors = []
    
    if "churn_risk_score" in kwargs:
        health_factors.append(1 - kwargs["churn_risk_score"])
    if "health_score" in kwargs:
        health_factors.append(kwargs["health_score"])
    if "security_score" in kwargs:
        health_factors.append(kwargs["security_score"])
    
    overall_health = sum(health_factors) / len(health_factors) if health_factors else 0.5
    
    status = "Healthy" if overall_health > 0.8 else "At Risk" if overall_health > 0.5 else "Critical"
    
    return {
        "overall_health_score": overall_health,
        "status": status,
        "report": f"Customer Health Report for {account}",
        "recommendations": [
            "Monitor engagement trends closely",
            "Schedule customer success check-in",
            "Review feature adoption metrics"
        ]
    }

def escalate_to_csm(account: str, health_data: Dict) -> Dict[str, Any]:
    """Escalate critical accounts to Customer Success Manager"""
    return {
        "escalation_created": True,
        "csm_assigned": "Sarah Johnson",
        "priority": "High" if health_data.get("overall_health_score", 1) < 0.4 else "Medium",
        "follow_up_date": "2024-09-30"
    }


# Enhanced Agent System
class Agent:
    def __init__(self, name, capabilities, specialization=None):
        self.name = name
        self.capabilities = capabilities  # list of tool names
        self.specialization = specialization

    def __repr__(self):
        return f"Agent({self.name})"


# Specialized agents for different aspects of customer health
data_retrieval_agent = Agent("DataRetriever", 
    ["fetch_account_basics", "fetch_usage_metrics", "fetch_support_tickets", "fetch_financial_data"],
    "Retrieves core customer data from various systems")

analytics_agent = Agent("AnalyticsEngine", 
    ["analyze_engagement_trends", "check_compliance_status"],
    "Performs advanced analytics and compliance checks")

reporting_agent = Agent("ReportGenerator", 
    ["generate_health_report"],
    "Creates comprehensive reports and assessments")

escalation_agent = Agent("EscalationManager", 
    ["escalate_to_csm"],
    "Handles escalations and customer success workflows")

AGENTS = [data_retrieval_agent, analytics_agent, reporting_agent, escalation_agent]

# Tool registry for easy lookup
TOOL_REGISTRY = {
    "fetch_account_basics": fetch_account_basics,
    "fetch_usage_metrics": fetch_usage_metrics,
    "fetch_support_tickets": fetch_support_tickets,
    "fetch_financial_data": fetch_financial_data,
    "check_compliance_status": check_compliance_status,
    "analyze_engagement_trends": analyze_engagement_trends,
    "generate_health_report": generate_health_report,
    "escalate_to_csm": escalate_to_csm
}

## LLM Integration - Enhanced Intent Detection

The advanced system's `detect_intent()` would make sophisticated LLM calls:

```python
response = llm.chat([{
    "role": "system",
    "content": """You are an enterprise intent classifier. Analyze user queries for customer success workflows.
    Available intents: customer_health_assessment, usage_analysis, compliance_check, escalation_workflow, support_analysis"""
}, {
    "role": "user", 
    "content": f"Classify this request with confidence scores: {query}"
}])
```

**Advanced capabilities:**
- **Multi-intent detection** - Handle complex requests spanning multiple workflows
- **Confidence scoring** - Uncertainty handling for ambiguous requests  
- **Context awareness** - Consider conversation history and user role
- **Intent refinement** - Clarification questions for unclear requests


## LLM Integration - Dynamic Agent Selection & High-Level Planning

### Agent Selection with Specialization Awareness

```python
response = llm.chat([{
    "role": "system",
    "content": f"""Select optimal agents for this task. Available agents:
    {[(agent.name, agent.specialization) for agent in AGENTS]}
    Consider task complexity, required capabilities, and potential parallelization."""
}, {
    "role": "user",
    "content": f"Intent: {intent}, Query: {query}"
}])
```

### Sophisticated High-Level Planning

```python
response = llm.chat([{
    "role": "system", 
    "content": """Create adaptive task breakdowns considering:
    - Data dependencies and prerequisites
    - Failure scenarios and optional steps
    - Conditional workflows based on intermediate results
    - Resource optimization and parallelization opportunities"""
}, {
    "role": "user",
    "content": f"Plan tasks for {intent} with potential complications"
}])
```


In [8]:
# Enhanced Intent Detection for Customer Health Assessment
def detect_intent(query: str) -> str:
    query_lower = query.lower()
    
    if any(term in query_lower for term in ["health", "assessment", "analyze customer", "account health"]):
        return "customer_health_assessment"
    elif any(term in query_lower for term in ["usage", "engagement", "analytics"]):
        return "usage_analysis"
    elif any(term in query_lower for term in ["compliance", "security", "audit"]):
        return "compliance_check"
    elif any(term in query_lower for term in ["escalate", "csm", "customer success"]):
        return "escalation_workflow"
    elif "support" in query_lower:
        return "support_analysis"
    else:
        return "generic_task"


def select_agents(intent: str) -> List[Agent]:
    """Select agents based on intent and potential planning needs"""
    if intent == "customer_health_assessment":
        # Complex workflow requiring all agents potentially
        return [data_retrieval_agent, analytics_agent, reporting_agent, escalation_agent]
    elif intent == "usage_analysis":
        return [data_retrieval_agent, analytics_agent]
    elif intent == "compliance_check":
        return [analytics_agent, reporting_agent]
    elif intent == "escalation_workflow":
        return [reporting_agent, escalation_agent]
    elif intent == "support_analysis":
        return [data_retrieval_agent]
    return AGENTS

## LLM Integration - Advanced Detailed Planning

The `generate_plan()` function demonstrates sophisticated planning with multiple LLM calls:

### 1. Tool Selection & Dependency Analysis
```python
response = llm.chat([{
    "role": "system",
    "content": f"""You are an expert system planner. Available tools: {list(TOOL_REGISTRY.keys())}
    Map high-level tasks to executable steps considering:
    - Data dependencies (some tools need outputs from others)
    - Optional vs critical steps
    - Failure recovery strategies
    - Context injection requirements"""
}, {
    "role": "user",
    "content": f"Planning iteration {planning_iteration}. Tasks: {sketch}. Context: {context}. Account: {account}"
}])
```

### 2. Conditional Logic Generation
```python
response = llm.chat([{
    "role": "user",
    "content": f"""Analyze conditional steps:
    - Skip 'analyze engagement' if no usage data: {bool(context.get('daily_active_users'))}
    - Escalate if health score < 0.6: {context.get('overall_health_score', 0.5)}
    - Run compliance check based on: {list(context.keys())}
    Generate appropriate conditional logic."""
}])
```

**Key advanced features:**
- **Dynamic replanning** based on execution results
- **Context-aware planning** using accumulated data
- **Conditional step generation** with runtime decision points
- **Failure scenario planning** with optional steps marked


In [9]:
# -----------------------------
# Enhanced Planner with Dynamic Planning Capabilities
# -----------------------------
def generate_high_level_sketch(intent: str, context: Dict[str, Any] = None) -> List[str]:
    print(f"PLANNING PHASE: Generating high-level sketch for intent '{intent}'")
    
    if intent == "customer_health_assessment":
        sketch = [
            "Fetch account basics",
            "Gather core metrics",
            "Analyze data quality",
            "Generate health assessment",
            "Determine next actions"
        ]
    elif intent == "usage_analysis":
        sketch = [
            "Fetch usage metrics",
            "Analyze engagement trends",
            "Generate insights"
        ]
    elif intent == "compliance_check":
        sketch = [
            "Check compliance status",
            "Generate compliance report"
        ]
    elif intent == "escalation_workflow":
        sketch = [
            "Assess situation",
            "Create escalation"
        ]
    elif intent == "support_analysis":
        sketch = [
            "Fetch support tickets",
            "Analyze ticket patterns"
        ]
    else:
        sketch = ["Perform generic analysis", "Return results"]
    
    print(f"INITIAL PLAN CREATED: {len(sketch)} tasks identified")
    for i, task in enumerate(sketch, 1):
        print(f"   {i}. {task}")
    
    return sketch


## LLM Integration - Intelligent Execution & Multi-Iteration Replanning

### 1. Error Analysis & Recovery Planning
```python
# When execution failures occur
response = llm.chat([{
    "role": "system",
    "content": "You are an execution recovery specialist. Analyze failures and suggest recovery strategies."
}, {
    "role": "user", 
    "content": f"""Execution failures: {execution_failures}
    Available tools: {list(TOOL_REGISTRY.keys())}
    Current context: {context}
    Determine: Should we retry, skip, use alternatives, or abort?"""
}])
```

### 2. Replanning Decision Logic
```python
# Multi-iteration replanning decisions
response = llm.chat([{
    "role": "user",
    "content": f"""Iteration {planning_iteration} of {max_iterations} complete.
    Status: {len(completed_tasks)} done, {len(pending_tasks)} pending
    Failures: {execution_failures}
    New data available: {[k for k in context.keys() if not k.startswith('_')]}
    
    Should we:
    A) Continue with new iteration using available data
    B) Stop - sufficient results achieved  
    C) Change strategy completely
    
    Reason your decision."""
}])
```

### 3. Terminal Condition Assessment
```python
# Intelligent stopping conditions
response = llm.chat([{
    "role": "user",
    "content": f"""Assess if we've achieved the goal for intent '{intent}':
    Results so far: {results}
    Original query: {query}
    Have we satisfied the user's request sufficiently?"""
}])
```

### 4. Comprehensive Result Synthesis
```python
# Final response generation
response = llm.chat([{
    "role": "system",
    "content": "Synthesize complex multi-system results into actionable insights."
}, {
    "role": "user",
    "content": f"""Create executive summary for customer health assessment:
    Data gathered: {client_results}
    Execution summary: {planning_iteration} iterations, {completed_tasks}/{total_tasks} completed
    Include: Key findings, health score interpretation, recommended actions"""
}])
```


## Simplified Interface Demonstration

The code above can be significantly simplified by removing explicit account parameters and instead:

1. **Extract account information from queries** using pattern matching
2. **Pass context objects** to tools instead of individual parameters  
3. **Maintain state** through a shared context dictionary
4. **Provide defaults** when information is missing

This simplification makes the system more natural to use while maintaining all the core functionality.


In [10]:
# SIMPLIFIED VERSION - Context-based Interface Demo

# Simplified Tools - Context-based interface
def fetch_account_basics_simple(context: Dict[str, Any]) -> Dict[str, Any]:
    """Basic account information - simplified interface"""
    account = context.get("account", "Unknown Account")
    return {
        "account": account, 
        "tier": "Enterprise",
        "contract_value": "$500K",
        "start_date": "2023-01-15"
    }

def fetch_usage_metrics_simple(context: Dict[str, Any]) -> Dict[str, Any]:
    """Usage analytics - simplified interface"""
    account = context.get("account", "Unknown Account")
    # Simulate occasional failures
    if random.random() < 0.3:
        raise Exception(f"Usage Analytics API unavailable for {account}")
    
    return {
        "daily_active_users": 1250,
        "feature_adoption": {"feature_a": 0.85, "feature_b": 0.62, "feature_c": 0.34},
        "api_calls_last_30d": 45000
    }

def generate_health_report_simple(context: Dict[str, Any], **kwargs) -> Dict[str, Any]:
    """Generate comprehensive health assessment - simplified interface"""
    account = context.get("account", "Unknown Account")
    
    # Calculate overall health score based on available data
    health_factors = []
    
    if "churn_risk_score" in kwargs:
        health_factors.append(1 - kwargs["churn_risk_score"])
    if "health_score" in kwargs:
        health_factors.append(kwargs["health_score"])
    if "security_score" in kwargs:
        health_factors.append(kwargs["security_score"])
    
    overall_health = sum(health_factors) / len(health_factors) if health_factors else 0.5
    
    status = "Healthy" if overall_health > 0.8 else "At Risk" if overall_health > 0.5 else "Critical"
    
    return {
        "overall_health_score": overall_health,
        "status": status,
        "report": f"Customer Health Report for {account}",
        "recommendations": [
            "Monitor engagement trends closely",
            "Schedule customer success check-in",
            "Review feature adoption metrics"
        ]
    }

# Simplified Tools Registry
SIMPLE_TOOLS = {
    "fetch_account_basics": fetch_account_basics_simple,
    "fetch_usage_metrics": fetch_usage_metrics_simple,
    "generate_health_report": generate_health_report_simple,
}

# Simplified execution function
def execute_tool_simple(tool_name: str, context: Dict[str, Any]) -> Dict[str, Any]:
    """Execute a tool with context-based parameters"""
    tool = SIMPLE_TOOLS.get(tool_name)
    if not tool:
        raise Exception(f"Tool {tool_name} not found")
    
    print(f"   Executing {tool_name} with context: {list(context.keys())}")
    try:
        result = tool(context)
        print(f"   ✓ Success: {tool_name}")
        return result
    except Exception as e:
        print(f"   ✗ Failed: {tool_name} - {e}")
        return {}

# Simplified run function without account parameter
def simplified_run(query: str) -> Dict[str, Any]:
    """Run a simplified agent workflow"""
    print(f"SIMPLIFIED AGENT STARTED")
    print(f"User Query: {query}")
    
    # Extract account from query
    account = extract_account_from_query(query)
    print(f"Extracted Account: {account}")
    print("=" * 60)
    
    # Initialize context with extracted account
    context = {"account": account, "query": query}
    
    # Simple workflow: fetch basics -> usage -> report
    workflow = ["fetch_account_basics", "fetch_usage_metrics", "generate_health_report"]
    
    for tool_name in workflow:
        result = execute_tool_simple(tool_name, context)
        context.update(result)  # Add results to context
    
    print("=" * 60)
    print("Final Results:")
    # Filter out internal data for client response
    client_results = {k: v for k, v in context.items() 
                     if not k.startswith("_") and k not in ["query"]}
    
    for key, value in client_results.items():
        if isinstance(value, dict) and len(str(value)) > 100:
            print(f"   {key}: [Complex object with {len(value)} fields]")
        else:
            print(f"   {key}: {value}")
    
    return context


In [11]:
# Demo of Simplified Interface

print("DEMO: Simplified Autonomous Agent Platform")
print("=" * 60)
print("Key simplifications:")
print("1. No explicit account parameters")
print("2. Account extracted from query")
print("3. Context-based tool interface")
print("4. Simplified workflow")
print("=" * 60)

# Demo queries
queries = [
    "Analyze customer health for TechCorp",
    "Health assessment for StartupXYZ",  
    "Check account status"
]

for i, query in enumerate(queries, 1):
    print(f"\nDEMO {i}:")
    try:
        random.seed(42 + i)  # For reproducible results
        simplified_run(query)
    except Exception as e:
        print(f"Error: {e}")
    print()


DEMO: Simplified Autonomous Agent Platform
Key simplifications:
1. No explicit account parameters
2. Account extracted from query
3. Context-based tool interface
4. Simplified workflow

DEMO 1:
SIMPLIFIED AGENT STARTED
User Query: Analyze customer health for TechCorp
Extracted Account: TechCorp
   Executing fetch_account_basics with context: ['account', 'query']
   ✓ Success: fetch_account_basics
   Executing fetch_usage_metrics with context: ['account', 'query', 'tier', 'contract_value', 'start_date']
   ✗ Failed: fetch_usage_metrics - Usage Analytics API unavailable for TechCorp
   Executing generate_health_report with context: ['account', 'query', 'tier', 'contract_value', 'start_date']
   ✓ Success: generate_health_report
Final Results:
   account: TechCorp
   tier: Enterprise
   contract_value: $500K
   start_date: 2023-01-15
   overall_health_score: 0.5
   status: Critical
   report: Customer Health Report for TechCorp
   recommendations: ['Monitor engagement trends closely', 'Sc

In [12]:
def generate_plan(sketch: List[str], agents: List[Agent], account: str, context: Dict[str, Any] = None, planning_iteration: int = 1) -> List[Dict]:
    """Generate detailed execution plan with dynamic tool mapping"""
    print(f"DETAILED PLANNING: Converting {len(sketch)} high-level tasks into executable plan (iteration {planning_iteration})")
    
    if context is None:
        context = {}
    
    plan = []
    for i, step in enumerate(sketch, 1):
        step_lower = step.lower()
        print(f"   Planning step {i}: '{step}'")
        
        # Dynamic tool mapping based on step content and context
        tool_plan = None
        
        if "account basics" in step_lower or "basic" in step_lower:
            tool_plan = {"tool": "fetch_account_basics", "args": {"account": account}}
            print(f"     -> Tool: fetch_account_basics (always available)")
            
        elif "core metrics" in step_lower or "gather" in step_lower:
            # Plan multiple data gathering steps
            metrics_plan = []
            
            # Always try financial data
            metrics_plan.append({"tool": "fetch_financial_data", "args": {"account": account}})
            
            # Try usage metrics (may fail)
            metrics_plan.append({"tool": "fetch_usage_metrics", "args": {"account": account}, "optional": True})
            
            # Get support tickets
            metrics_plan.append({"tool": "fetch_support_tickets", "args": {"account": account}})
            
            print(f"     -> Multi-step plan: {len(metrics_plan)} data sources")
            print(f"     -> Including optional step that may require replanning")
            
            # Add all metrics steps to plan
            for metrics_step in metrics_plan:
                plan.append(metrics_step)
            continue
            
        elif "data quality" in step_lower or "analyze data" in step_lower:
            # Only plan this if we have some data
            if context.get("daily_active_users") or context.get("churn_risk_score"):
                tool_plan = {"tool": "analyze_engagement_trends", "args": {"account": account, "usage_data": context}}
                print(f"     -> Tool: analyze_engagement_trends (using available context)")
            else:
                print(f"     -> CONDITIONAL SKIP: No usage data available for analysis")
                print(f"     -> This may trigger replanning if usage data becomes available")
                continue
                
        elif "health assessment" in step_lower or "generate health" in step_lower:
            # Health report generation using all available context
            tool_plan = {"tool": "generate_health_report", "args": {"account": account}}
            print(f"     -> Tool: generate_health_report")
            print(f"     -> Will use all available context: {list(context.keys())}")
            
        elif "next actions" in step_lower or "determine" in step_lower:
            # Conditional logic based on health score
            health_score = context.get("overall_health_score", 0.5)
            if health_score < 0.6:  # At risk or critical
                tool_plan = {"tool": "escalate_to_csm", "args": {"account": account, "health_data": context}}
                print(f"     -> Tool: escalate_to_csm (health score: {health_score:.2f})")
                print(f"     -> ESCALATION TRIGGERED: Account needs attention")
            else:
                print(f"     -> CONDITIONAL SKIP: Account healthy (score: {health_score:.2f}), no escalation needed")
                continue
                
        elif "compliance" in step_lower:
            tool_plan = {"tool": "check_compliance_status", "args": {"account": account}}
            print(f"     -> Tool: check_compliance_status (slow operation)")
            
        elif "usage metrics" in step_lower:
            tool_plan = {"tool": "fetch_usage_metrics", "args": {"account": account}, "optional": True}
            print(f"     -> Tool: fetch_usage_metrics (may fail, will trigger replanning)")
            
        elif "engagement trends" in step_lower:
            if context.get("daily_active_users"):
                tool_plan = {"tool": "analyze_engagement_trends", "args": {"account": account, "usage_data": context}}
                print(f"     -> Tool: analyze_engagement_trends")
            else:
                print(f"     -> DEPENDENCY MISSING: Need usage data first")
                print(f"     -> This will trigger replanning")
                continue
                
        elif "support" in step_lower:
            tool_plan = {"tool": "fetch_support_tickets", "args": {"account": account}}
            print(f"     -> Tool: fetch_support_tickets")
            
        else:
            print(f"     -> UNMAPPED STEP: No specific tool found for '{step}'")
            print(f"     -> May require manual intervention or replanning")
            continue
        
        if tool_plan:
            plan.append(tool_plan)
    
    print(f"PLAN GENERATED: {len(plan)} executable steps ready")
    
    if len(plan) == 0:
        print(f"WARNING: No executable steps generated - may need replanning with different approach")
    
    return plan


In [13]:
def execute_plan(plan: List[Dict], context: Dict[str, Any], todo_list: List[str]) -> Dict[str, Any]:
    print(f"EXECUTION PHASE: Starting execution of {len(plan)} planned steps")
    print(f"TODO LIST STATUS: {len([t for t in todo_list if '[DONE]' not in t])} pending, {len([t for t in todo_list if '[DONE]' in t])} completed")
    
    results = {}
    execution_failures = []
    step_index = 0
    
    for i, step in enumerate(plan):
        tool_name = step["tool"]
        tool = TOOL_REGISTRY[tool_name]
        args = step["args"].copy()
        is_optional = step.get("optional", False)
        
        print(f"\nEXECUTING STEP {i+1}/{len(plan)}")
        print(f"   Tool: {tool_name}")
        print(f"   Optional: {is_optional}")

        # Dynamically inject context values for tools that need them
        if tool_name == "analyze_engagement_trends":
            # Inject latest usage data from context
            usage_data = {k: v for k, v in context.items() 
                         if k in ["daily_active_users", "feature_adoption", "api_calls_last_30d"]}
            args["usage_data"] = usage_data
            print(f"   Context injected: usage_data keys = {list(usage_data.keys())}")
            
        elif tool_name == "generate_health_report":
            # Inject all relevant health metrics
            health_kwargs = {k: v for k, v in context.items() 
                           if k in ["churn_risk_score", "health_score", "security_score", 
                                   "overall_health_score", "engagement_trend"]}
            args.update(health_kwargs)
            print(f"   Context injected: {list(health_kwargs.keys())}")
            
        elif tool_name == "escalate_to_csm":
            args["health_data"] = context
            print(f"   Full context passed for escalation decision")

        print(f"   Args: {args}")
        
        try:
            # Execute the tool
            output = tool(**args)
            print(f"   SUCCESS: {tool_name} completed")
            
            # Store results in context for downstream use
            results.update(output)
            context.update(output)
            
            # Find corresponding todo item and mark as done
            # Handle multi-step plans where todo list may not align 1:1 with plan steps
            if step_index < len(todo_list) and "[DONE]" not in todo_list[step_index]:
                old_task = todo_list[step_index]
                todo_list[step_index] = f"{old_task} [DONE]"
                print(f"TODO UPDATED: '{old_task}' -> '{todo_list[step_index]}'")
                step_index += 1
            
        except Exception as e:
            error_msg = str(e)
            print(f"   FAILURE: {tool_name} failed - {error_msg}")
            
            failure_info = {
                "tool": tool_name,
                "error": error_msg,
                "step_number": i + 1,
                "is_optional": is_optional
            }
            execution_failures.append(failure_info)
            
            if is_optional:
                print(f"   OPTIONAL FAILURE: Continuing execution despite failure")
                continue
            else:
                print(f"   CRITICAL FAILURE: This will require replanning")
                # Don't break immediately - let's see if we can continue with other steps
                continue
        
        # Show current progress
        pending_count = len([t for t in todo_list if '[DONE]' not in t])
        completed_count = len([t for t in todo_list if '[DONE]' in t])
        print(f"PROGRESS: {completed_count}/{len(todo_list)} tasks completed, {pending_count} remaining")
    
    # Add execution summary to results
    results["_execution_summary"] = {
        "total_steps": len(plan),
        "successful_steps": len(plan) - len(execution_failures),
        "failed_steps": len(execution_failures),
        "failures": execution_failures
    }
    
    if execution_failures:
        print(f"\nEXECUTION SUMMARY: {len(execution_failures)} failures occurred")
        for failure in execution_failures:
            status = "OPTIONAL" if failure["is_optional"] else "CRITICAL"
            print(f"   {status} FAILURE: {failure['tool']} - {failure['error']}")
        
        if any(not f["is_optional"] for f in execution_failures):
            print(f"   REPLANNING REQUIRED: Critical failures detected")
            results["_replanning_needed"] = True
            results["_replanning_reason"] = "Critical execution failures"
    else:
        print(f"\nEXECUTION COMPLETE: All {len(plan)} steps finished successfully")
    
    return results

In [14]:
def run(query: str, account: str):
    print(f"AUTONOMOUS AGENT STARTED")
    print(f"User Query: {query}")
    print(f"Account: {account}")
    print("=" * 80)

    # Step 1: Intent Detection
    print(f"\nSTEP 1: INTENT DETECTION")
    intent = detect_intent(query)
    print(f"Detected Intent: {intent}")

    # Step 2: Agent Selection
    print(f"\nSTEP 2: AGENT SELECTION")
    agents = select_agents(intent)
    print(f"Selected Agents: {[agent.name for agent in agents]}")
    for agent in agents:
        print(f"   {agent.name}: {agent.specialization}")

    # Step 3: High-Level Sketch / To-Do
    print(f"\nSTEP 3: INITIAL PLANNING")
    sketch = generate_high_level_sketch(intent)
    todo_list = sketch.copy()
    print(f"Initial To-Do List Created: {len(todo_list)} tasks")

    # Step 4: Iterative Planning + Execution
    print(f"\nSTEP 4: ITERATIVE EXECUTION LOOP")
    context = {}
    planning_iteration = 0
    max_iterations = 5
    
    while todo_list and planning_iteration < max_iterations:
        planning_iteration += 1
        
        # Check current state and decide on planning approach
        pending_tasks = [task for task in todo_list if "[DONE]" not in task]
        completed_tasks = [task for task in todo_list if "[DONE]" in task]
        
        print(f"\n{'REPLANNING' if planning_iteration > 1 else 'INITIAL'} ITERATION {planning_iteration}")
        print(f"Current Status: {len(completed_tasks)} completed, {len(pending_tasks)} pending")
        
        # Determine replanning reasons
        replanning_reasons = []
        
        # Check for execution failures from previous iteration
        if context.get("_replanning_needed"):
            replanning_reasons.append(context.get("_replanning_reason", "Unknown failure"))
        
        # Check for missing dependencies
        if planning_iteration > 1:
            if not context.get("daily_active_users") and any("analyze" in task.lower() for task in pending_tasks):
                replanning_reasons.append("Missing usage data for analysis")
            
            if not context.get("overall_health_score") and any("next actions" in task.lower() for task in pending_tasks):
                replanning_reasons.append("Missing health score for decision making")
        
        if planning_iteration == 1:
            print(f"INITIAL PLANNING: First-time planning for all {len(pending_tasks)} tasks")
        else:
            print(f"REPLANNING TRIGGERED")
            print(f"   Iteration: {planning_iteration} of {max_iterations}")
            if replanning_reasons:
                print(f"   Reasons: {', '.join(replanning_reasons)}")
            print(f"   Previous context: {len(context)} data points available")
            print(f"   Strategy: Adaptive planning based on current state")
            
            # Show what data we have for replanning
            available_data = [k for k in context.keys() if not k.startswith("_")]
            if available_data:
                print(f"   Available data: {available_data}")
        
        # Clear execution metadata for fresh planning
        context.pop("_replanning_needed", None)
        context.pop("_replanning_reason", None)
        context.pop("_execution_summary", None)
        
        # Generate plan for current pending tasks with iteration context
        plan = generate_plan(pending_tasks, agents, account, context, planning_iteration)
        
        if not plan:
            print(f"\nWARNING: NO EXECUTABLE PLAN GENERATED")
            print(f"   This may indicate missing dependencies or completed workflow")
            break
        
        # Execute the plan
        results = execute_plan(plan, context, todo_list)
        
        # Analyze results for terminal conditions
        terminal_conditions = {
            "customer_health_assessment": "overall_health_score" in results or "report" in results,
            "usage_analysis": "engagement_trend" in results,
            "compliance_check": "security_score" in results,
            "escalation_workflow": "escalation_created" in results,
            "support_analysis": "tickets" in results,
            "generic_task": len(pending_tasks) == 0
        }
        
        if terminal_conditions.get(intent, False):
            print(f"\nTERMINAL CONDITION REACHED for intent '{intent}'")
            if intent == "customer_health_assessment":
                health_score = results.get("overall_health_score", 0)
                status = results.get("status", "Unknown")
                print(f"   Customer health assessment completed: {status} (score: {health_score:.2f})")
                
                if results.get("escalation_created"):
                    print(f"   Escalation created: {results.get('csm_assigned', 'Unknown CSM')}")
                    
            elif intent == "usage_analysis":
                trend = results.get("engagement_trend", "unknown")
                print(f"   Usage analysis completed: {trend} engagement trend")
            elif intent == "compliance_check":
                score = results.get("security_score", 0)
                print(f"   Compliance check completed: security score {score}")
            elif intent == "escalation_workflow":
                print(f"   Escalation workflow completed successfully")
            elif intent == "support_analysis":
                ticket_count = len(results.get("tickets", []))
                print(f"   Support analysis completed: {ticket_count} tickets analyzed")
            else:
                print(f"   Generic task completed")
            break

        # Sophisticated replanning decision logic
        still_pending = [task for task in todo_list if "[DONE]" not in task]
        needs_replanning = False
        replanning_reasons = []
        
        if still_pending:
            replanning_reasons.append(f"{len(still_pending)} tasks incomplete")
            needs_replanning = True
        
        if results.get("_replanning_needed"):
            replanning_reasons.append("Execution failures detected")
            needs_replanning = True
            
        # Check if new data enables previously blocked tasks
        if planning_iteration > 1:
            if context.get("daily_active_users") and any("analyze" in task.lower() for task in still_pending):
                replanning_reasons.append("Usage data now available for analysis")
                needs_replanning = True
        
        if needs_replanning and planning_iteration < max_iterations:
            print(f"\nREPLANNING DECISION:")
            print(f"   Reasons: {', '.join(replanning_reasons)}")
            print(f"   Pending tasks: {still_pending}")
            print(f"   Context available: {[k for k in context.keys() if not k.startswith('_')]}")
            print(f"   Will attempt iteration {planning_iteration + 1}")
        elif not still_pending:
            print(f"\nALL TASKS COMPLETED - No replanning needed")
            break
        else:
            print(f"\nMAXIMUM ITERATIONS REACHED - Stopping execution")
            break

    # Step 5: Return response to client
    print(f"\nSTEP 5: FINAL RESULTS")
    print(f"Agent execution completed after {planning_iteration} planning iteration(s)")
    
    total_tasks = len(todo_list)
    completed_tasks = len([t for t in todo_list if '[DONE]' in t])
    print(f"Final todo status: {completed_tasks}/{total_tasks} completed")
    
    if completed_tasks < total_tasks:
        print(f"WARNING: INCOMPLETE EXECUTION: {total_tasks - completed_tasks} tasks remain")
        incomplete_tasks = [t for t in todo_list if '[DONE]' not in t]
        for task in incomplete_tasks:
            print(f"   - {task}")
    
    print("=" * 80)
    print("Final Response to Client:")
    
    # Filter out internal execution metadata
    client_results = {k: v for k, v in context.items() if not k.startswith("_")}
    
    if client_results:
        for key, value in client_results.items():
            if isinstance(value, dict) and len(str(value)) > 100:
                print(f"   {key}: [Complex object with {len(value)} fields]")
            else:
                print(f"   {key}: {value}")
    else:
        print("   No results available - execution may have failed")
    
    print("=" * 80)
    
    return context


In [15]:
# Example Runs to Demonstrate Iterative Planning

print("DEMO 1: Customer Health Assessment (Multiple Planning Iterations)")
print("=" * 90)
print("This demo shows how the system handles:")
print("- Failed API calls that trigger replanning")
print("- Conditional workflows based on intermediate results")  
print("- Dynamic escalation based on health scores")
print("=" * 90)
print()

# Set a seed for reproducible demo
random.seed(42)

if __name__ == "__main__":
    # Demo 1: Complex customer health assessment
    context = run("Analyze customer health for TechCorp", account="TechCorp")
    
    print("\n" + "="*90)
    print("DEMO 2: Let's try another account that might trigger different paths")
    print("=" * 90)
    
    # Demo 2: Another account with potentially different outcomes
    random.seed(123)  # Different seed for different failure patterns
    context2 = run("Perform comprehensive customer health assessment for StartupXYZ", account="StartupXYZ")

DEMO 1: Customer Health Assessment (Multiple Planning Iterations)
This demo shows how the system handles:
- Failed API calls that trigger replanning
- Conditional workflows based on intermediate results
- Dynamic escalation based on health scores

AUTONOMOUS AGENT STARTED
User Query: Analyze customer health for TechCorp
Account: TechCorp

STEP 1: INTENT DETECTION
Detected Intent: customer_health_assessment

STEP 2: AGENT SELECTION
Selected Agents: ['DataRetriever', 'AnalyticsEngine', 'ReportGenerator', 'EscalationManager']
   DataRetriever: Retrieves core customer data from various systems
   AnalyticsEngine: Performs advanced analytics and compliance checks
   ReportGenerator: Creates comprehensive reports and assessments
   EscalationManager: Handles escalations and customer success workflows

STEP 3: INITIAL PLANNING
PLANNING PHASE: Generating high-level sketch for intent 'customer_health_assessment'
INITIAL PLAN CREATED: 5 tasks identified
   1. Fetch account basics
   2. Gather co

In [16]:
# Additional Demo: Edge Cases and Complex Scenarios

print("\n" + "="*90)
print("DEMO 3: Edge Cases - Forced Failure and Recovery")
print("=" * 90)
print("This demo shows how the system handles:")
print("- Multiple consecutive failures")
print("- Recovery through replanning") 
print("- Maximum iteration limits")
print("=" * 90)

# Temporarily increase failure rate to demonstrate replanning
original_random = random.random

def always_fail_usage():
    """Force usage metrics to always fail for this demo"""
    return 1.0  # Always > 0.3, so always fails

# Monkey patch for demo
random.random = always_fail_usage

print("WARNING: FORCING USAGE METRICS FAILURES FOR DEMO...")
try:
    context3 = run("Complete health assessment with forced failures", account="FailureDemo Inc")
finally:
    # Restore original random function
    random.random = original_random
    print("SUCCESS: Restored normal operation")

print("\n" + "="*90) 
print("DEMO 4: Different Intent - Usage Analysis Only")
print("=" * 90)

# Demo different intent path
random.seed(999)
context4 = run("Analyze usage and engagement trends", account="EngagementCorp")

print("\n" + "="*90)
print("SUMMARY: All Demos Completed")
print("=" * 90)
print("The demos illustrated:")
print("1. SUCCESS: Successful multi-iteration planning with conditional workflows")
print("2. SUCCESS: Failure handling and replanning mechanisms")  
print("3. SUCCESS: Different execution paths based on available data")
print("4. SUCCESS: Intent-based agent selection and planning")
print("5. SUCCESS: Dynamic escalation based on health metrics")
print("=" * 90)



DEMO 3: Edge Cases - Forced Failure and Recovery
This demo shows how the system handles:
- Multiple consecutive failures
- Recovery through replanning
- Maximum iteration limits
AUTONOMOUS AGENT STARTED
User Query: Complete health assessment with forced failures
Account: FailureDemo Inc

STEP 1: INTENT DETECTION
Detected Intent: customer_health_assessment

STEP 2: AGENT SELECTION
Selected Agents: ['DataRetriever', 'AnalyticsEngine', 'ReportGenerator', 'EscalationManager']
   DataRetriever: Retrieves core customer data from various systems
   AnalyticsEngine: Performs advanced analytics and compliance checks
   ReportGenerator: Creates comprehensive reports and assessments
   EscalationManager: Handles escalations and customer success workflows

STEP 3: INITIAL PLANNING
PLANNING PHASE: Generating high-level sketch for intent 'customer_health_assessment'
INITIAL PLAN CREATED: 5 tasks identified
   1. Fetch account basics
   2. Gather core metrics
   3. Analyze data quality
   4. Generat