# Advanced Agent Patterns with Strand Agents

This notebook explores advanced patterns and techniques for building sophisticated multi-agent systems with Amazon Strand Agents and Ollama.

## Topics Covered
1. Agent Orchestration Patterns
2. Memory and Context Management
3. Error Handling and Resilience
4. Performance Optimization
5. Agent Composition and Workflows

In [None]:
# Setup and imports
import sys
import asyncio
import time
import json
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from enum import Enum

sys.path.append('..')

from agents.ollama_agent import OllamaStrandAgent
from agents.specialized_agents import *
from tools.custom_tools import *
from config.ollama_config import ollama_config

print("Advanced Agent Patterns - Setup Complete!")

## 1. Agent Orchestration Patterns

Let's implement different patterns for orchestrating multiple agents.

In [None]:
class AgentOrchestrator:
    """
    Orchestrates multiple agents for complex workflows
    """
    
    def __init__(self):
        self.agents = {}
        self.workflow_history = []
    
    def register_agent(self, name: str, agent: OllamaStrandAgent):
        """Register an agent with the orchestrator"""
        self.agents[name] = agent
        print(f"Registered agent: {name}")
    
    def sequential_workflow(self, workflow_steps: List[Dict[str, Any]]) -> List[str]:
        """
        Execute a sequential workflow where each step depends on the previous
        """
        results = []
        context = ""
        
        for i, step in enumerate(workflow_steps):
            agent_name = step['agent']
            prompt = step['prompt']
            
            if agent_name not in self.agents:
                raise ValueError(f"Agent {agent_name} not registered")
            
            # Include context from previous steps
            if context and step.get('use_context', True):
                full_prompt = f"Previous context:\n{context}\n\nNew request:\n{prompt}"
            else:
                full_prompt = prompt
            
            print(f"\nStep {i+1}: {agent_name}")
            print(f"Prompt: {prompt[:100]}...")
            
            response = self.agents[agent_name].chat(full_prompt)
            results.append(response)
            
            # Update context
            context += f"\nStep {i+1} ({agent_name}): {response[:200]}..."
            
            # Store in history
            self.workflow_history.append({
                'step': i+1,
                'agent': agent_name,
                'prompt': prompt,
                'response': response,
                'timestamp': time.time()
            })
        
        return results
    
    async def parallel_workflow(self, workflow_steps: List[Dict[str, Any]]) -> List[str]:
        """
        Execute multiple agents in parallel
        """
        tasks = []
        
        for step in workflow_steps:
            agent_name = step['agent']
            prompt = step['prompt']
            
            if agent_name not in self.agents:
                raise ValueError(f"Agent {agent_name} not registered")
            
            task = self.agents[agent_name].async_chat(prompt)
            tasks.append(task)
        
        results = await asyncio.gather(*tasks)
        
        # Store in history
        for i, (step, result) in enumerate(zip(workflow_steps, results)):
            self.workflow_history.append({
                'step': i+1,
                'agent': step['agent'],
                'prompt': step['prompt'],
                'response': result,
                'timestamp': time.time(),
                'type': 'parallel'
            })
        
        return results
    
    def consensus_workflow(self, prompt: str, agent_names: List[str]) -> Dict[str, Any]:
        """
        Get responses from multiple agents and analyze consensus
        """
        responses = {}
        
        for agent_name in agent_names:
            if agent_name not in self.agents:
                continue
            
            response = self.agents[agent_name].chat(prompt)
            responses[agent_name] = response
        
        # Analyze consensus (simplified)
        consensus_analysis = {
            'prompt': prompt,
            'responses': responses,
            'agent_count': len(responses),
            'timestamp': time.time()
        }
        
        return consensus_analysis

# Create orchestrator and register agents
orchestrator = AgentOrchestrator()

# Create and register agents
math_agent_instance = math_agent()
code_agent_instance = code_agent()
creative_agent_instance = creative_agent()
analysis_agent_instance = analysis_agent()

orchestrator.register_agent('math', math_agent_instance)
orchestrator.register_agent('code', code_agent_instance)
orchestrator.register_agent('creative', creative_agent_instance)
orchestrator.register_agent('analysis', analysis_agent_instance)

print("\nOrchestrator setup complete!")

In [None]:
# Example: Sequential workflow for building a simple game
game_workflow = [
    {
        'agent': 'creative',
        'prompt': 'Create a concept for a simple text-based adventure game. Include theme, setting, and basic storyline.'
    },
    {
        'agent': 'code',
        'prompt': 'Based on the game concept, write Python code structure for a simple text adventure game with classes and methods.'
    },
    {
        'agent': 'analysis',
        'prompt': 'Analyze the game concept and code structure. Suggest improvements and potential features to add.'
    }
]

print("=== SEQUENTIAL WORKFLOW: GAME DEVELOPMENT ===")
game_results = orchestrator.sequential_workflow(game_workflow)

print("\n=== WORKFLOW COMPLETE ===")
for i, result in enumerate(game_results):
    print(f"\nStep {i+1} Result:")
    print(result[:300] + "..." if len(result) > 300 else result)

In [None]:
# Example: Parallel workflow for different perspectives
parallel_workflow = [
    {
        'agent': 'creative',
        'prompt': 'Write a creative story about AI and robotics in 2030 (100 words)'
    },
    {
        'agent': 'analysis',
        'prompt': 'Provide a data-driven analysis of AI and robotics trends for 2030'
    },
    {
        'agent': 'code',
        'prompt': 'Describe the technical architecture for AI systems in 2030'
    }
]

print("\n=== PARALLEL WORKFLOW: AI IN 2030 ===")
parallel_results = await orchestrator.parallel_workflow(parallel_workflow)

print("\n=== PARALLEL RESULTS ===")
for i, result in enumerate(parallel_results):
    agent_name = parallel_workflow[i]['agent']
    print(f"\n{agent_name.upper()} PERSPECTIVE:")
    print(result[:400] + "..." if len(result) > 400 else result)

## 2. Memory and Context Management

Let's implement context-aware agents with memory.

In [None]:
class ContextualAgent:
    """
    Agent wrapper that maintains conversation context and memory
    """
    
    def __init__(self, base_agent: OllamaStrandAgent, max_context_length: int = 4000):
        self.base_agent = base_agent
        self.conversation_history = []
        self.max_context_length = max_context_length
        self.context_summary = ""
    
    def chat_with_context(self, message: str) -> str:
        """
        Chat with the agent while maintaining conversation context
        """
        # Build context from conversation history
        context = self._build_context()
        
        # Create full prompt with context
        if context:
            full_prompt = f"Previous conversation context:\n{context}\n\nCurrent message:\n{message}"
        else:
            full_prompt = message
        
        # Get response from base agent
        response = self.base_agent.chat(full_prompt)
        
        # Store in conversation history
        self.conversation_history.append({
            'user': message,
            'assistant': response,
            'timestamp': time.time()
        })
        
        # Manage context length
        self._manage_context_length()
        
        return response
    
    def _build_context(self) -> str:
        """
        Build context string from conversation history
        """
        if not self.conversation_history:
            return ""
        
        context_parts = []
        if self.context_summary:
            context_parts.append(f"Summary: {self.context_summary}")
        
        # Add recent conversations
        for entry in self.conversation_history[-3:]:  # Last 3 exchanges
            context_parts.append(f"User: {entry['user']}")
            context_parts.append(f"Assistant: {entry['assistant'][:200]}...")
        
        return "\n".join(context_parts)
    
    def _manage_context_length(self):
        """
        Manage context length by summarizing old conversations
        """
        if len(self.conversation_history) > 10:
            # Summarize older conversations
            old_conversations = self.conversation_history[:-5]
            summary_text = ""
            
            for conv in old_conversations:
                summary_text += f"User asked about: {conv['user'][:100]}\n"
                summary_text += f"Assistant discussed: {conv['assistant'][:100]}\n"
            
            # Create summary
            summary_prompt = f"Summarize this conversation in 2-3 sentences:\n{summary_text}"
            self.context_summary = self.base_agent.chat(summary_prompt)
            
            # Keep only recent conversations
            self.conversation_history = self.conversation_history[-5:]
    
    def get_conversation_stats(self) -> Dict[str, Any]:
        """
        Get statistics about the conversation
        """
        return {
            'total_exchanges': len(self.conversation_history),
            'context_summary_length': len(self.context_summary) if self.context_summary else 0,
            'first_interaction': self.conversation_history[0]['timestamp'] if self.conversation_history else None,
            'last_interaction': self.conversation_history[-1]['timestamp'] if self.conversation_history else None
        }
    
    def clear_context(self):
        """
        Clear conversation context
        """
        self.conversation_history = []
        self.context_summary = ""
        print("Context cleared")

# Create contextual agents
contextual_math = ContextualAgent(math_agent())
contextual_creative = ContextualAgent(creative_agent())

print("Contextual agents created!")

In [None]:
# Test contextual conversation
print("=== CONTEXTUAL CONVERSATION TEST ===")

# Start a math conversation
print("\n1. First math question:")
response1 = contextual_math.chat_with_context("I have a rectangle with width 5 and height 8. What's the area?")
print(f"Response: {response1}")

print("\n2. Follow-up question (using context):")
response2 = contextual_math.chat_with_context("What about the perimeter of the same rectangle?")
print(f"Response: {response2}")

print("\n3. Another follow-up:")
response3 = contextual_math.chat_with_context("If I scale this rectangle by a factor of 2, what would be the new area?")
print(f"Response: {response3}")

# Check conversation stats
stats = contextual_math.get_conversation_stats()
print(f"\nConversation stats: {stats}")

## 3. Error Handling and Resilience

Let's implement robust error handling for agent operations.

In [None]:
class ResilientAgent:
    """
    Agent wrapper with error handling and retry logic
    """
    
    def __init__(self, base_agent: OllamaStrandAgent, max_retries: int = 3, timeout: int = 30):
        self.base_agent = base_agent
        self.max_retries = max_retries
        self.timeout = timeout
        self.error_log = []
    
    def safe_chat(self, message: str, **kwargs) -> Dict[str, Any]:
        """
        Chat with retry logic and error handling
        """
        for attempt in range(self.max_retries):
            try:
                start_time = time.time()
                
                # Attempt to get response
                response = self.base_agent.chat(message, **kwargs)
                
                end_time = time.time()
                
                # Check for timeout
                if end_time - start_time > self.timeout:
                    raise TimeoutError(f"Response took {end_time - start_time:.2f}s, exceeding {self.timeout}s timeout")
                
                return {
                    'success': True,
                    'response': response,
                    'attempt': attempt + 1,
                    'duration': end_time - start_time
                }
                
            except Exception as e:
                error_info = {
                    'attempt': attempt + 1,
                    'error': str(e),
                    'error_type': type(e).__name__,
                    'timestamp': time.time(),
                    'message': message[:100] + "..." if len(message) > 100 else message
                }
                self.error_log.append(error_info)
                
                print(f"Attempt {attempt + 1} failed: {e}")
                
                if attempt < self.max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    return {
                        'success': False,
                        'error': str(e),
                        'attempts': self.max_retries,
                        'error_log': self.error_log[-self.max_retries:]
                    }
    
    def get_health_status(self) -> Dict[str, Any]:
        """
        Get agent health and error statistics
        """
        total_errors = len(self.error_log)
        recent_errors = len([e for e in self.error_log if time.time() - e['timestamp'] < 3600])  # Last hour
        
        return {
            'agent_name': self.base_agent.name,
            'total_errors': total_errors,
            'recent_errors_1h': recent_errors,
            'error_rate': recent_errors / max(1, total_errors) if total_errors > 0 else 0,
            'last_error': self.error_log[-1] if self.error_log else None
        }
    
    def clear_error_log(self):
        """
        Clear the error log
        """
        self.error_log = []
        print("Error log cleared")

# Create resilient agents
resilient_math = ResilientAgent(math_agent(), max_retries=2, timeout=15)
resilient_code = ResilientAgent(code_agent(), max_retries=2, timeout=20)

print("Resilient agents created!")

In [None]:
# Test resilient agent
print("=== RESILIENT AGENT TEST ===")

# Normal operation
result1 = resilient_math.safe_chat("Calculate 15 * 24 + 36")
print(f"\nResult 1: {result1}")

# Test with a complex question
result2 = resilient_code.safe_chat("Write a Python function to implement binary search")
print(f"\nResult 2: {result2['success']}, Duration: {result2.get('duration', 'N/A')}s")

# Check health status
health_math = resilient_math.get_health_status()
health_code = resilient_code.get_health_status()

print(f"\nMath agent health: {health_math}")
print(f"Code agent health: {health_code}")

## 4. Performance Monitoring and Optimization

Let's implement performance monitoring for our agents.

In [None]:
class PerformanceMonitor:
    """
    Monitor and analyze agent performance metrics
    """
    
    def __init__(self):
        self.metrics = []
        self.agent_stats = {}
    
    def log_interaction(self, agent_name: str, prompt: str, response: str, duration: float, success: bool = True):
        """
        Log an agent interaction for performance analysis
        """
        metric = {
            'agent_name': agent_name,
            'prompt_length': len(prompt),
            'response_length': len(response),
            'duration': duration,
            'success': success,
            'timestamp': time.time(),
            'tokens_per_second': len(response.split()) / max(duration, 0.1)
        }
        
        self.metrics.append(metric)
        
        # Update agent stats
        if agent_name not in self.agent_stats:
            self.agent_stats[agent_name] = {
                'total_requests': 0,
                'successful_requests': 0,
                'total_duration': 0,
                'total_tokens': 0
            }
        
        stats = self.agent_stats[agent_name]
        stats['total_requests'] += 1
        if success:
            stats['successful_requests'] += 1
        stats['total_duration'] += duration
        stats['total_tokens'] += len(response.split())
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """
        Get performance summary across all agents
        """
        if not self.metrics:
            return {'error': 'No metrics recorded'}
        
        total_requests = len(self.metrics)
        successful_requests = sum(1 for m in self.metrics if m['success'])
        average_duration = sum(m['duration'] for m in self.metrics) / total_requests
        average_tokens_per_sec = sum(m['tokens_per_second'] for m in self.metrics) / total_requests
        
        return {
            'total_requests': total_requests,
            'success_rate': successful_requests / total_requests,
            'average_duration': round(average_duration, 2),
            'average_tokens_per_second': round(average_tokens_per_sec, 2),
            'agents_active': len(self.agent_stats)
        }
    
    def get_agent_performance(self, agent_name: str) -> Dict[str, Any]:
        """
        Get performance metrics for a specific agent
        """
        if agent_name not in self.agent_stats:
            return {'error': f'No data for agent {agent_name}'}
        
        stats = self.agent_stats[agent_name]
        
        return {
            'agent_name': agent_name,
            'total_requests': stats['total_requests'],
            'success_rate': stats['successful_requests'] / max(stats['total_requests'], 1),
            'average_duration': round(stats['total_duration'] / max(stats['total_requests'], 1), 2),
            'average_tokens_per_request': round(stats['total_tokens'] / max(stats['successful_requests'], 1), 2),
            'requests_per_minute': stats['total_requests'] / max((time.time() - self.metrics[0]['timestamp']) / 60, 1) if self.metrics else 0
        }
    
    def get_slowest_requests(self, top_n: int = 5) -> List[Dict[str, Any]]:
        """
        Get the slowest requests for optimization analysis
        """
        sorted_metrics = sorted(self.metrics, key=lambda x: x['duration'], reverse=True)
        return sorted_metrics[:top_n]

# Create performance monitor
perf_monitor = PerformanceMonitor()

class MonitoredAgent:
    """
    Agent wrapper that logs performance metrics
    """
    
    def __init__(self, base_agent: OllamaStrandAgent, monitor: PerformanceMonitor):
        self.base_agent = base_agent
        self.monitor = monitor
    
    def chat(self, message: str, **kwargs) -> str:
        start_time = time.time()
        success = True
        response = ""
        
        try:
            response = self.base_agent.chat(message, **kwargs)
        except Exception as e:
            success = False
            response = f"Error: {str(e)}"
        finally:
            duration = time.time() - start_time
            self.monitor.log_interaction(
                self.base_agent.name,
                message,
                response,
                duration,
                success
            )
        
        return response
    
    def __getattr__(self, name):
        # Delegate other attributes to the base agent
        return getattr(self.base_agent, name)

# Create monitored agents
monitored_math = MonitoredAgent(math_agent(), perf_monitor)
monitored_creative = MonitoredAgent(creative_agent(), perf_monitor)
monitored_code = MonitoredAgent(code_agent(), perf_monitor)

print("Performance monitoring setup complete!")

In [None]:
# Test performance monitoring
print("=== PERFORMANCE MONITORING TEST ===")

# Run various requests to generate performance data
test_requests = [
    (monitored_math, "Calculate the factorial of 10"),
    (monitored_creative, "Write a haiku about programming"),
    (monitored_code, "Write a function to reverse a string"),
    (monitored_math, "What is the derivative of x^3 + 2x^2 - 5x + 1?"),
    (monitored_creative, "Create a short story about a robot learning to paint"),
    (monitored_code, "Implement a binary tree in Python")
]

print("Running test requests...")
for i, (agent, prompt) in enumerate(test_requests):
    print(f"\nRequest {i+1}: {agent.name} - {prompt[:50]}...")
    response = agent.chat(prompt)
    print(f"Response length: {len(response)} characters")

print("\n=== PERFORMANCE ANALYSIS ===")

# Overall performance summary
summary = perf_monitor.get_performance_summary()
print(f"\nOverall Performance:")
for key, value in summary.items():
    print(f"  {key}: {value}")

# Individual agent performance
for agent_name in ['MathAgent', 'CreativeAgent', 'CodeAgent']:
    agent_perf = perf_monitor.get_agent_performance(agent_name)
    if 'error' not in agent_perf:
        print(f"\n{agent_name} Performance:")
        for key, value in agent_perf.items():
            if key != 'agent_name':
                print(f"  {key}: {value}")

# Slowest requests
slowest = perf_monitor.get_slowest_requests(3)
print(f"\nSlowest Requests:")
for i, request in enumerate(slowest):
    print(f"  {i+1}. {request['agent_name']}: {request['duration']:.2f}s ({request['prompt_length']} chars prompt)")

## 5. Advanced Agent Composition

Let's create complex agent compositions and workflows.

In [None]:
class AgentPipeline:
    """
    Create complex pipelines of agent operations
    """
    
    def __init__(self, name: str):
        self.name = name
        self.stages = []
        self.results = []
    
    def add_stage(self, agent: OllamaStrandAgent, transform_fn=None, condition_fn=None):
        """
        Add a stage to the pipeline
        
        Args:
            agent: The agent to use in this stage
            transform_fn: Function to transform input before sending to agent
            condition_fn: Function to determine if this stage should run
        """
        stage = {
            'agent': agent,
            'transform_fn': transform_fn,
            'condition_fn': condition_fn
        }
        self.stages.append(stage)
        return self
    
    def run(self, initial_input: str) -> Dict[str, Any]:
        """
        Run the pipeline with initial input
        """
        current_input = initial_input
        self.results = []
        
        for i, stage in enumerate(self.stages):
            # Check condition if provided
            if stage['condition_fn'] and not stage['condition_fn'](current_input, self.results):
                print(f"Stage {i+1} skipped due to condition")
                continue
            
            # Transform input if provided
            stage_input = current_input
            if stage['transform_fn']:
                stage_input = stage['transform_fn'](current_input, self.results)
            
            print(f"\nStage {i+1}: {stage['agent'].name}")
            print(f"Input: {stage_input[:100]}...")
            
            # Run the agent
            try:
                stage_output = stage['agent'].chat(stage_input)
                
                stage_result = {
                    'stage': i+1,
                    'agent': stage['agent'].name,
                    'input': stage_input,
                    'output': stage_output,
                    'success': True,
                    'timestamp': time.time()
                }
                
                self.results.append(stage_result)
                current_input = stage_output  # Output becomes input for next stage
                
            except Exception as e:
                error_result = {
                    'stage': i+1,
                    'agent': stage['agent'].name,
                    'input': stage_input,
                    'error': str(e),
                    'success': False,
                    'timestamp': time.time()
                }
                self.results.append(error_result)
                print(f"Stage {i+1} failed: {e}")
                break
        
        return {
            'pipeline_name': self.name,
            'initial_input': initial_input,
            'final_output': current_input,
            'stages_completed': len([r for r in self.results if r['success']]),
            'total_stages': len(self.stages),
            'results': self.results
        }

# Example: Research and Content Creation Pipeline
content_pipeline = AgentPipeline("Content Creation Pipeline")

# Stage 1: Research (Analysis Agent)
def research_transform(input_text, results):
    return f"Research this topic and provide key insights: {input_text}"

content_pipeline.add_stage(
    analysis_agent(),
    transform_fn=research_transform
)

# Stage 2: Structure (Code Agent for logical structure)
def structure_transform(input_text, results):
    return f"Based on this research, create a logical outline for an article: {input_text[:500]}"

content_pipeline.add_stage(
    code_agent(),
    transform_fn=structure_transform
)

# Stage 3: Write Content (Creative Agent)
def content_transform(input_text, results):
    research = results[0]['output'][:300] if results else ""
    return f"Write an engaging article using this outline and research:\nOutline: {input_text}\nResearch: {research}"

content_pipeline.add_stage(
    creative_agent(),
    transform_fn=content_transform
)

print("Content creation pipeline configured!")

In [None]:
# Run the content creation pipeline
print("=== RUNNING CONTENT CREATION PIPELINE ===")

topic = "The impact of artificial intelligence on remote work productivity"
pipeline_result = content_pipeline.run(topic)

print(f"\n=== PIPELINE RESULTS ===")
print(f"Pipeline: {pipeline_result['pipeline_name']}")
print(f"Topic: {pipeline_result['initial_input']}")
print(f"Stages completed: {pipeline_result['stages_completed']}/{pipeline_result['total_stages']}")

for result in pipeline_result['results']:
    if result['success']:
        print(f"\nStage {result['stage']} ({result['agent']}):")
        print(f"Output: {result['output'][:300]}...")
    else:
        print(f"\nStage {result['stage']} FAILED: {result['error']}")

print(f"\nFinal Article Length: {len(pipeline_result['final_output'])} characters")

## Summary

In this advanced patterns notebook, we've explored:

1. **Agent Orchestration**: Sequential, parallel, and consensus workflows
2. **Context Management**: Memory-aware agents that maintain conversation history
3. **Error Handling**: Resilient agents with retry logic and error recovery
4. **Performance Monitoring**: Metrics collection and analysis for optimization
5. **Agent Composition**: Complex pipelines and multi-stage workflows

These patterns enable you to build sophisticated, production-ready AI agent systems that can handle complex tasks, maintain context, recover from errors, and provide insights into their performance.

## Next Steps

- Implement custom orchestration patterns for your specific use cases
- Add persistent storage for conversation context and metrics
- Integrate with monitoring systems for production deployments
- Explore distributed agent architectures
- Implement agent learning and adaptation mechanisms