# Workflows

**Orchestrating Complex Multi-Agent Systems**

---

Welcome to this comprehensive tutorial on **workflows** in the Strands framework! This notebook demonstrates how to orchestrate complex multi-agent systems, build intelligent pipelines, and create sophisticated AI applications. By the end of this 10-minute tutorial, you'll master the art of agent orchestration.

### 🎯 What You'll Learn

In this tutorial, you will:
- Build multi-agent workflows
- Implement sequential and parallel pipelines
- Create conditional workflows
- Handle state management between agents
- Build complex orchestration patterns
- Implement error handling and retries

### 🔄 Why Workflows Matter

Workflows enable you to:
- **Divide complex tasks** into manageable steps
- **Leverage specialized agents** for specific roles
- **Build scalable systems** with modular components
- **Implement business logic** through orchestration
- **Create maintainable** AI applications

## 📦 Step 1: Installing Required Packages

### Overview
Let's install the necessary packages for building workflows.

### 📚 Packages We'll Install
- **strands-agents**: Core framework with workflow support
- **strands-agents-tools**: Additional workflow tools
- **asyncio**: For async workflow execution

In [None]:
# Install required packages
%pip install strands-agents strands-agents-tools strands-agents-builder -q

print("✅ All packages installed successfully!")
print("   Ready to build workflows! 🔄")

## 🔐 Step 2: Setting Up AWS Authentication

### Overview
We'll configure AWS Bedrock for our workflow agents.

### 🔑 Authentication Options
1. **AWS Profile** (Recommended for development)
2. **Environment Variables**
3. **Direct Credentials** (Less secure)
4. **IAM Roles** (Recommended for production)

In [None]:
import boto3
from strands import Agent, tool
from strands.models import BedrockModel
import asyncio
import time
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
import json

# Configure AWS session
session = boto3.Session(
    # aws_access_key_id='your_access_key',
    # aws_secret_access_key='your_secret_key',
    # aws_session_token='your_session_token',  # If using temporary credentials
    # region_name='us-west-2',
    profile_name='default'  # Optional: Use a specific AWS profile
)

# Create a Bedrock model instance
bedrock_model = BedrockModel(
    model_id="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    boto_session=session
)

print("✅ AWS Bedrock configured successfully!")
print(f"   Model: Claude 3.7 Sonnet")
print(f"   Profile: {session.profile_name}")

## 🏗️ Step 3: Building Basic Sequential Workflows

### Sequential Processing
Let's start with simple sequential workflows where agents process tasks one after another.

In [None]:
# Define workflow state
@dataclass
class WorkflowState:
    """Shared state between workflow steps"""
    input_text: str
    processed_text: Optional[str] = None
    summary: Optional[str] = None
    analysis: Optional[Dict[str, Any]] = None
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.metadata is None:
            self.metadata = {}

# Create specialized agents
text_processor = Agent(
    model=bedrock_model,
    system_prompt="""You are a text processing specialist.
    Clean and format text for clarity and readability.
    Fix grammar and spelling errors while preserving meaning."""
)

summarizer = Agent(
    model=bedrock_model,
    system_prompt="""You are a summarization expert.
    Create concise, accurate summaries that capture key points.
    Use bullet points for clarity."""
)

analyzer = Agent(
    model=bedrock_model,
    system_prompt="""You are a text analysis expert.
    Analyze sentiment, tone, and key themes.
    Provide structured insights."""
)

# Sequential workflow
class SequentialWorkflow:
    """Sequential text processing workflow"""
    
    def __init__(self, processor: Agent, summarizer: Agent, analyzer: Agent):
        self.processor = processor
        self.summarizer = summarizer
        self.analyzer = analyzer
    
    def run(self, text: str) -> WorkflowState:
        """Execute sequential workflow"""
        print("🔄 Starting Sequential Workflow...")
        state = WorkflowState(input_text=text)
        
        # Step 1: Process text
        print("\n📝 Step 1: Processing text...")
        state.processed_text = str(self.processor(f"Process this text: {text}"))
        state.metadata['processing_complete'] = True
        
        # Step 2: Summarize
        print("\n📋 Step 2: Summarizing...")
        state.summary = str(self.summarizer(f"Summarize: {state.processed_text}"))
        state.metadata['summary_complete'] = True
        
        # Step 3: Analyze
        print("\n🔍 Step 3: Analyzing...")
        analysis_response = self.analyzer(
            f"Analyze this text for sentiment, tone, and themes: {state.processed_text}"
        )
        state.analysis = {"raw_analysis": str(analysis_response)}
        state.metadata['analysis_complete'] = True
        
        print("\n✅ Workflow complete!")
        return state

# Create and test workflow
workflow = SequentialWorkflow(text_processor, summarizer, analyzer)

# Test with sample text
sample_text = """Artificial intelligence is transforming industries across the globe. 
From healthcare to finance, AI applications are improving efficiency and creating new opportunities. 
However, we must also consider ethical implications and ensure responsible development."""

result = workflow.run(sample_text)

print("\n📊 Workflow Results:")
print(f"Summary: {result.summary}")
print(f"\nMetadata: {result.metadata}")

## ⚡ Step 4: Implementing Parallel Workflows

### Parallel Processing
Let's build workflows that execute multiple agents in parallel for improved performance.

In [None]:
class ParallelWorkflow:
    """Parallel execution workflow"""
    
    def __init__(self):
        # Create specialized agents
        self.sentiment_analyzer = Agent(
            model=bedrock_model,
            system_prompt="Analyze text sentiment. Return: positive, negative, or neutral."
        )
        
        self.keyword_extractor = Agent(
            model=bedrock_model,
            system_prompt="Extract key terms and concepts from text. Return as a list."
        )
        
        self.language_detector = Agent(
            model=bedrock_model,
            system_prompt="Detect the language of the text and provide confidence score."
        )
    
    async def analyze_sentiment(self, text: str) -> Dict[str, Any]:
        """Analyze sentiment asynchronously"""
        start_time = time.time()
        result = self.sentiment_analyzer(f"Analyze sentiment: {text}")
        return {
            "sentiment": str(result),
            "duration": time.time() - start_time
        }
    
    async def extract_keywords(self, text: str) -> Dict[str, Any]:
        """Extract keywords asynchronously"""
        start_time = time.time()
        result = self.keyword_extractor(f"Extract keywords: {text}")
        return {
            "keywords": str(result),
            "duration": time.time() - start_time
        }
    
    async def detect_language(self, text: str) -> Dict[str, Any]:
        """Detect language asynchronously"""
        start_time = time.time()
        result = self.language_detector(f"Detect language: {text}")
        return {
            "language": str(result),
            "duration": time.time() - start_time
        }
    
    async def run(self, text: str) -> Dict[str, Any]:
        """Execute parallel workflow"""
        print("⚡ Starting Parallel Workflow...")
        overall_start = time.time()
        
        # Execute all analyses in parallel
        tasks = [
            self.analyze_sentiment(text),
            self.extract_keywords(text),
            self.detect_language(text)
        ]
        
        print("🔄 Executing 3 agents in parallel...")
        results = await asyncio.gather(*tasks)
        
        # Combine results
        combined_results = {
            "sentiment": results[0],
            "keywords": results[1],
            "language": results[2],
            "total_duration": time.time() - overall_start
        }
        
        print("\n✅ Parallel execution complete!")
        return combined_results

# Create and test parallel workflow
parallel_workflow = ParallelWorkflow()

# Run parallel workflow
parallel_results = await parallel_workflow.run(sample_text)

print("\n📊 Parallel Workflow Results:")
print(f"Total Duration: {parallel_results['total_duration']:.2f}s")
print(f"\nSentiment Analysis: {parallel_results['sentiment']['sentiment']}")
print(f"  Duration: {parallel_results['sentiment']['duration']:.2f}s")
print(f"\nKeywords: {parallel_results['keywords']['keywords']}")
print(f"  Duration: {parallel_results['keywords']['duration']:.2f}s")
print(f"\nLanguage: {parallel_results['language']['language']}")
print(f"  Duration: {parallel_results['language']['duration']:.2f}s")

# Compare with sequential execution time
sequential_time = sum([
    parallel_results['sentiment']['duration'],
    parallel_results['keywords']['duration'],
    parallel_results['language']['duration']
])
print(f"\n⚡ Speed improvement: {sequential_time / parallel_results['total_duration']:.2f}x faster!")

## 🌳 Step 5: Building Conditional Workflows

### Dynamic Routing
Let's create workflows that make decisions and route to different agents based on conditions.

In [None]:
class ConditionalWorkflow:
    """Workflow with conditional branching"""
    
    def __init__(self):
        # Router agent
        self.router = Agent(
            model=bedrock_model,
            system_prompt="""Classify content into categories:
            - TECHNICAL: Programming, technology, engineering
            - BUSINESS: Finance, marketing, management
            - CREATIVE: Art, writing, design
            Return only the category name."""
        )
        
        # Specialized handlers
        self.technical_handler = Agent(
            model=bedrock_model,
            system_prompt="""You are a technical expert.
            Provide detailed technical analysis and code examples when relevant."""
        )
        
        self.business_handler = Agent(
            model=bedrock_model,
            system_prompt="""You are a business analyst.
            Focus on ROI, market impact, and strategic implications."""
        )
        
        self.creative_handler = Agent(
            model=bedrock_model,
            system_prompt="""You are a creative consultant.
            Emphasize aesthetics, user experience, and innovation."""
        )
    
    def route_content(self, content: str) -> str:
        """Determine content category"""
        category = str(self.router(f"Classify this content: {content}"))
        category = category.strip().upper()
        
        # Ensure valid category
        if category not in ["TECHNICAL", "BUSINESS", "CREATIVE"]:
            print(f"⚠️  Unknown category '{category}', defaulting to BUSINESS")
            category = "BUSINESS"
        
        return category
    
    def process_content(self, content: str, task: str) -> Dict[str, Any]:
        """Process content based on its category"""
        print("🌳 Starting Conditional Workflow...")
        
        # Step 1: Route content
        print("\n🔀 Routing content...")
        category = self.route_content(content)
        print(f"   Category: {category}")
        
        # Step 2: Process with appropriate handler
        print(f"\n📝 Processing with {category} handler...")
        
        if category == "TECHNICAL":
            response = self.technical_handler(f"{task}: {content}")
        elif category == "BUSINESS":
            response = self.business_handler(f"{task}: {content}")
        else:  # CREATIVE
            response = self.creative_handler(f"{task}: {content}")
        
        return {
            "category": category,
            "response": str(response),
            "handler_used": f"{category.lower()}_handler"
        }

# Create conditional workflow
conditional_workflow = ConditionalWorkflow()

# Test with different content types
test_contents = [
    "How to implement a binary search algorithm in Python",
    "Strategies for increasing quarterly revenue by 20%",
    "Design principles for creating engaging user interfaces"
]

print("🧪 Testing Conditional Workflow\n")

for content in test_contents:
    print("=" * 60)
    print(f"Content: {content[:50]}...")
    
    result = conditional_workflow.process_content(
        content, 
        "Provide expert analysis"
    )
    
    print(f"\n🏷️  Category: {result['category']}")
    print(f"📊 Response: {result['response'][:200]}...")
    print()

## 🔄 Step 6: Implementing Workflow Orchestration

### Complex Orchestration
Let's build a sophisticated workflow orchestrator that combines multiple patterns.

In [None]:
class WorkflowOrchestrator:
    """Advanced workflow orchestration system"""
    
    def __init__(self):
        self.workflows = {}
        self.execution_history = []
    
    def register_workflow(self, name: str, workflow: Any):
        """Register a workflow"""
        self.workflows[name] = workflow
        print(f"✅ Registered workflow: {name}")
    
    async def execute_workflow(self, name: str, input_data: Any) -> Dict[str, Any]:
        """Execute a named workflow"""
        if name not in self.workflows:
            raise ValueError(f"Workflow '{name}' not found")
        
        start_time = time.time()
        
        try:
            # Execute workflow
            workflow = self.workflows[name]
            
            # Handle both sync and async workflows
            if asyncio.iscoroutinefunction(workflow.run):
                result = await workflow.run(input_data)
            else:
                result = workflow.run(input_data)
            
            execution_record = {
                "workflow": name,
                "status": "success",
                "duration": time.time() - start_time,
                "timestamp": time.time()
            }
            
        except Exception as e:
            result = {"error": str(e)}
            execution_record = {
                "workflow": name,
                "status": "failed",
                "error": str(e),
                "duration": time.time() - start_time,
                "timestamp": time.time()
            }
        
        self.execution_history.append(execution_record)
        return result
    
    def get_execution_stats(self) -> Dict[str, Any]:
        """Get workflow execution statistics"""
        if not self.execution_history:
            return {"message": "No executions recorded"}
        
        total = len(self.execution_history)
        successful = sum(1 for e in self.execution_history if e["status"] == "success")
        failed = total - successful
        
        avg_duration = sum(e["duration"] for e in self.execution_history) / total
        
        return {
            "total_executions": total,
            "successful": successful,
            "failed": failed,
            "success_rate": (successful / total) * 100,
            "avg_duration": avg_duration
        }

# Complex multi-stage workflow
class ResearchWorkflow:
    """Multi-stage research workflow"""
    
    def __init__(self):
        self.researcher = Agent(
            model=bedrock_model,
            system_prompt="Research the topic and provide comprehensive information."
        )
        
        self.fact_checker = Agent(
            model=bedrock_model,
            system_prompt="Verify facts and identify any inaccuracies or concerns."
        )
        
        self.report_writer = Agent(
            model=bedrock_model,
            system_prompt="Write a professional report with clear sections and conclusions."
        )
    
    async def run(self, topic: str) -> Dict[str, Any]:
        """Execute research workflow"""
        print(f"🔬 Starting Research Workflow for: {topic}")
        
        # Stage 1: Research
        print("\n📚 Stage 1: Researching...")
        research_data = str(self.researcher(f"Research this topic: {topic}"))
        
        # Stage 2: Fact check (parallel with report drafting)
        print("\n🔍 Stage 2: Fact checking and drafting report...")
        
        fact_check_task = asyncio.create_task(
            asyncio.to_thread(
                self.fact_checker,
                f"Fact check this research: {research_data[:500]}..."
            )
        )
        
        report_task = asyncio.create_task(
            asyncio.to_thread(
                self.report_writer,
                f"Write a report on {topic} based on: {research_data}"
            )
        )
        
        # Wait for both tasks
        fact_check_results, report = await asyncio.gather(
            fact_check_task, report_task
        )
        
        # Stage 3: Finalize
        print("\n✍️ Stage 3: Finalizing report...")
        
        return {
            "topic": topic,
            "research": research_data[:500] + "...",
            "fact_check": str(fact_check_results),
            "report": str(report),
            "status": "completed"
        }

# Create orchestrator and register workflows
orchestrator = WorkflowOrchestrator()
orchestrator.register_workflow("sequential", workflow)
orchestrator.register_workflow("parallel", parallel_workflow)
orchestrator.register_workflow("conditional", conditional_workflow)
orchestrator.register_workflow("research", ResearchWorkflow())

# Execute research workflow
research_result = await orchestrator.execute_workflow(
    "research", 
    "The impact of quantum computing on cybersecurity"
)

print("\n📊 Research Workflow Results:")
print(f"Status: {research_result.get('status', 'unknown')}")
print(f"\nFact Check Results: {research_result.get('fact_check', 'N/A')[:200]}...")

# Show execution stats
print("\n📈 Workflow Execution Statistics:")
stats = orchestrator.get_execution_stats()
for key, value in stats.items():
    print(f"   {key}: {value}")

## 🛡️ Step 7: Error Handling and Retry Logic

### Resilient Workflows
Let's implement robust error handling and retry mechanisms for our workflows.

In [None]:
class ResilientWorkflow:
    """Workflow with error handling and retry logic"""
    
    def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        
        # Create agents with potential for failures
        self.data_fetcher = Agent(
            model=bedrock_model,
            system_prompt="Fetch and validate data. Sometimes simulate failures for testing."
        )
        
        self.data_processor = Agent(
            model=bedrock_model,
            system_prompt="Process data and handle edge cases gracefully."
        )
    
    async def fetch_with_retry(self, query: str) -> Dict[str, Any]:
        """Fetch data with retry logic"""
        last_error = None
        
        for attempt in range(self.max_retries):
            try:
                print(f"\n🔄 Attempt {attempt + 1}/{self.max_retries}: Fetching data...")
                
                # Simulate potential failure
                if attempt == 0 and "fail" in query.lower():
                    raise Exception("Simulated network error")
                
                result = self.data_fetcher(f"Fetch data for: {query}")
                
                return {
                    "success": True,
                    "data": str(result),
                    "attempts": attempt + 1
                }
                
            except Exception as e:
                last_error = e
                print(f"   ❌ Error: {str(e)}")
                
                if attempt < self.max_retries - 1:
                    print(f"   ⏳ Retrying in {self.retry_delay}s...")
                    await asyncio.sleep(self.retry_delay)
                    # Exponential backoff
                    self.retry_delay *= 2
        
        # All retries failed
        return {
            "success": False,
            "error": str(last_error),
            "attempts": self.max_retries
        }
    
    async def run(self, query: str) -> Dict[str, Any]:
        """Execute resilient workflow"""
        print("🛡️ Starting Resilient Workflow...")
        workflow_result = {
            "query": query,
            "stages": {}
        }
        
        # Stage 1: Fetch data with retry
        fetch_result = await self.fetch_with_retry(query)
        workflow_result["stages"]["fetch"] = fetch_result
        
        if not fetch_result["success"]:
            workflow_result["status"] = "failed"
            workflow_result["error"] = "Failed to fetch data after all retries"
            return workflow_result
        
        # Stage 2: Process data
        try:
            print("\n📊 Processing fetched data...")
            processed = self.data_processor(
                f"Process this data: {fetch_result['data']}"
            )
            
            workflow_result["stages"]["process"] = {
                "success": True,
                "result": str(processed)
            }
            workflow_result["status"] = "completed"
            
        except Exception as e:
            workflow_result["stages"]["process"] = {
                "success": False,
                "error": str(e)
            }
            workflow_result["status"] = "partial_failure"
        
        return workflow_result

# Test resilient workflow
resilient_workflow = ResilientWorkflow(max_retries=3, retry_delay=0.5)

# Test with simulated failure
print("🧪 Testing with simulated failure...")
failure_result = await resilient_workflow.run("Fail on first attempt then succeed")

print("\n📊 Resilient Workflow Results:")
print(f"Status: {failure_result.get('status', 'unknown')}")
print(f"Fetch attempts: {failure_result['stages']['fetch']['attempts']}")
print(f"Fetch success: {failure_result['stages']['fetch']['success']}")

# Test with successful case
print("\n\n🧪 Testing with successful case...")
success_result = await resilient_workflow.run("Process customer data")
print(f"\nStatus: {success_result.get('status', 'unknown')}")

## 📊 Step 8: State Management in Workflows

### Managing Shared State
Let's implement sophisticated state management for complex workflows.

In [None]:
from enum import Enum
from datetime import datetime

class WorkflowStatus(Enum):
    """Workflow execution status"""
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    CANCELLED = "cancelled"

class StatefulWorkflow:
    """Workflow with comprehensive state management"""
    
    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.state = {
            "id": workflow_id,
            "status": WorkflowStatus.PENDING,
            "created_at": datetime.now().isoformat(),
            "steps_completed": [],
            "current_step": None,
            "data": {},
            "errors": [],
            "metadata": {}
        }
        
        # Create agents
        self.validator = Agent(
            model=bedrock_model,
            system_prompt="Validate input data and identify any issues."
        )
        
        self.transformer = Agent(
            model=bedrock_model,
            system_prompt="Transform data into the required format."
        )
        
        self.analyzer = Agent(
            model=bedrock_model,
            system_prompt="Analyze transformed data and provide insights."
        )
    
    def update_state(self, updates: Dict[str, Any]):
        """Update workflow state"""
        self.state.update(updates)
        self.state["last_updated"] = datetime.now().isoformat()
    
    def add_step_result(self, step_name: str, result: Any):
        """Add step result to state"""
        self.state["steps_completed"].append({
            "step": step_name,
            "timestamp": datetime.now().isoformat(),
            "result": result
        })
        self.state["data"][step_name] = result
    
    async def execute(self, input_data: str) -> Dict[str, Any]:
        """Execute stateful workflow"""
        print(f"📊 Starting Stateful Workflow: {self.workflow_id}")
        self.update_state({
            "status": WorkflowStatus.RUNNING,
            "started_at": datetime.now().isoformat(),
            "input": input_data
        })
        
        try:
            # Step 1: Validation
            self.state["current_step"] = "validation"
            print("\n🔍 Step 1: Validating input...")
            
            validation_result = self.validator(f"Validate: {input_data}")
            self.add_step_result("validation", str(validation_result))
            
            # Step 2: Transformation
            self.state["current_step"] = "transformation"
            print("\n🔄 Step 2: Transforming data...")
            
            transform_result = self.transformer(
                f"Transform based on validation: {validation_result}"
            )
            self.add_step_result("transformation", str(transform_result))
            
            # Step 3: Analysis
            self.state["current_step"] = "analysis"
            print("\n📈 Step 3: Analyzing data...")
            
            analysis_result = self.analyzer(
                f"Analyze: {transform_result}"
            )
            self.add_step_result("analysis", str(analysis_result))
            
            # Workflow completed
            self.update_state({
                "status": WorkflowStatus.COMPLETED,
                "completed_at": datetime.now().isoformat(),
                "current_step": None
            })
            
        except Exception as e:
            self.state["errors"].append({
                "step": self.state["current_step"],
                "error": str(e),
                "timestamp": datetime.now().isoformat()
            })
            
            self.update_state({
                "status": WorkflowStatus.FAILED,
                "failed_at": datetime.now().isoformat()
            })
        
        return self.get_state_summary()
    
    def get_state_summary(self) -> Dict[str, Any]:
        """Get workflow state summary"""
        return {
            "id": self.state["id"],
            "status": self.state["status"].value,
            "steps_completed": len(self.state["steps_completed"]),
            "current_step": self.state["current_step"],
            "has_errors": len(self.state["errors"]) > 0,
            "data_keys": list(self.state["data"].keys()),
            "execution_time": self._calculate_execution_time()
        }
    
    def _calculate_execution_time(self) -> Optional[float]:
        """Calculate total execution time"""
        if "started_at" not in self.state:
            return None
        
        start = datetime.fromisoformat(self.state["started_at"])
        
        if "completed_at" in self.state:
            end = datetime.fromisoformat(self.state["completed_at"])
        elif "failed_at" in self.state:
            end = datetime.fromisoformat(self.state["failed_at"])
        else:
            end = datetime.now()
        
        return (end - start).total_seconds()

# Test stateful workflow
stateful_workflow = StatefulWorkflow("WF-001")

# Execute workflow
result = await stateful_workflow.execute(
    "Analyze customer feedback data for Q1 2024"
)

print("\n📊 Workflow State Summary:")
for key, value in result.items():
    print(f"   {key}: {value}")

# Show detailed state
print("\n📋 Completed Steps:")
for step in stateful_workflow.state["steps_completed"]:
    print(f"   - {step['step']} at {step['timestamp']}")

## 🚀 Step 9: Production-Ready Workflow Patterns

### Best Practices
Let's explore production-ready workflow patterns and best practices.

In [None]:
print("🚀 PRODUCTION WORKFLOW BEST PRACTICES")
print("=" * 60)

best_practices = {
    "🏗️ Architecture": [
        "Use single-responsibility agents",
        "Implement clear interfaces between steps",
        "Design for modularity and reusability",
        "Keep workflows testable",
        "Version your workflow definitions"
    ],
    "⚡ Performance": [
        "Parallelize independent steps",
        "Implement caching for expensive operations",
        "Use streaming for large data",
        "Monitor and optimize bottlenecks",
        "Set appropriate timeouts"
    ],
    "🛡️ Reliability": [
        "Implement comprehensive error handling",
        "Add retry logic with exponential backoff",
        "Use circuit breakers for external services",
        "Implement graceful degradation",
        "Add health checks"
    ],
    "📊 Observability": [
        "Log all state transitions",
        "Track execution metrics",
        "Implement distributed tracing",
        "Monitor resource usage",
        "Create meaningful dashboards"
    ],
    "🔄 State Management": [
        "Use immutable state where possible",
        "Implement state persistence",
        "Handle partial failures gracefully",
        "Support workflow resumption",
        "Maintain audit trails"
    ]
}

for category, practices in best_practices.items():
    print(f"\n{category}")
    for practice in practices:
        print(f"   • {practice}")

# Example production configuration
print("\n\n📋 Example Production Configuration")
print("=" * 60)

production_config = {
    "workflow": {
        "max_concurrent_workflows": 10,
        "default_timeout_seconds": 300,
        "retry_policy": {
            "max_attempts": 3,
            "initial_delay": 1,
            "max_delay": 60,
            "exponential_base": 2
        },
        "state_storage": {
            "type": "redis",
            "ttl_seconds": 86400
        }
    },
    "monitoring": {
        "metrics_enabled": True,
        "tracing_enabled": True,
        "log_level": "INFO"
    },
    "scaling": {
        "auto_scaling_enabled": True,
        "min_workers": 2,
        "max_workers": 20,
        "scale_up_threshold": 0.8,
        "scale_down_threshold": 0.2
    }
}

print(json.dumps(production_config, indent=2))

## 🎉 Congratulations!

### 🏆 What You've Accomplished
In this tutorial, you've mastered:
- ✅ Building sequential workflows
- ✅ Implementing parallel execution
- ✅ Creating conditional workflows
- ✅ Orchestrating complex multi-agent systems
- ✅ Implementing error handling and retries
- ✅ Managing workflow state
- ✅ Production-ready patterns

### 🔄 The Power of Workflows

You now have the skills to:
- **Build complex AI systems** with multiple specialized agents
- **Orchestrate sophisticated processes** with conditional logic
- **Scale efficiently** with parallel execution
- **Handle failures gracefully** with retry mechanisms
- **Maintain state** across complex operations

### 💡 Key Takeaways

1. **Modular Design**: Break complex tasks into specialized agents
2. **Parallel When Possible**: Maximize efficiency with concurrent execution
3. **Handle Failures**: Always implement error handling and retries
4. **Track State**: Maintain comprehensive state for debugging and recovery
5. **Monitor Everything**: Observability is crucial for production workflows

### 🔮 Advanced Patterns

Consider exploring:
- **Saga Pattern**: For distributed transactions
- **Event-Driven Workflows**: Using message queues
- **Human-in-the-Loop**: Workflows with manual approval steps
- **Dynamic Workflows**: Self-modifying based on results
- **Workflow Composition**: Nesting workflows within workflows

### 📚 Resources

- [Strands Documentation](https://strandsagents.com/0.1.x/)
- [AWS Step Functions](https://aws.amazon.com/step-functions/)
- [Apache Airflow](https://airflow.apache.org/)
- [Temporal.io](https://temporal.io/)

### 🌟 Next Steps

You're ready to:
1. Design complex multi-agent systems
2. Build production-grade workflow orchestration
3. Implement sophisticated business processes
4. Create scalable AI applications
5. Lead workflow architecture initiatives

### 🚀 Final Thoughts

Workflows transform individual AI agents into powerful, orchestrated systems. With the patterns and practices you've learned, you can build applications that handle complex, real-world processes reliably and efficiently.

Remember: Great AI applications are built one workflow at a time!

Happy orchestrating! 🔄🤖✨