# Multi-Agent Workflows

## Coordinated Agent Systems for Complex Tasks

**Module Duration:** 15 minutes | **Focus:** Workflow orchestration and agent coordination

---

### Learning Objectives

Master multi-agent coordination patterns for complex business processes:

- **Sequential Workflows:** Step-by-step task progression with state management
- **Parallel Processing:** Concurrent agent execution with coordination
- **Agent Handoffs:** Clean state transfer between specialized agents
- **Workflow Orchestration:** Complex business process automation
- **Result Aggregation:** Combining outputs from multiple agents

**What You'll Build:**
- Sequential workflow system with state management
- Parallel agent coordination framework
- Agent handoff and state transfer mechanisms
- Complex business process automation
- Result validation and aggregation patterns

This covers orchestration patterns used in enterprise AI automation systems.

In [1]:
# Multi-Agent Workflow Foundation
import asyncio
import uuid
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Union, Callable
from dataclasses import dataclass, field
from enum import Enum
import json
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("🤖 MULTI-AGENT WORKFLOWS")
print("=" * 35)
print(f"Session: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("Focus: Coordinated agent systems for complex tasks")
print()

class TaskStatus(Enum):
    """Task execution status"""
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"
    WAITING = "waiting"

class WorkflowStatus(Enum):
    """Workflow execution status"""
    CREATED = "created"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    PAUSED = "paused"

@dataclass
class TaskResult:
    """Individual task execution result"""
    task_id: str
    agent_id: str
    status: TaskStatus
    result: Any
    error: Optional[str] = None
    execution_time: float = 0.0
    metadata: Dict[str, Any] = field(default_factory=dict)

@dataclass
class WorkflowContext:
    """Shared context across workflow execution"""
    workflow_id: str
    created_at: str
    status: WorkflowStatus
    current_step: int
    total_steps: int
    shared_data: Dict[str, Any] = field(default_factory=dict)
    task_results: Dict[str, TaskResult] = field(default_factory=dict)
    execution_history: List[Dict[str, Any]] = field(default_factory=list)

class WorkflowOrchestrator:
    """Enterprise workflow orchestration engine"""
    
    def __init__(self):
        self.workflows = {}
        self.active_tasks = {}
        self.agent_registry = {}
        
    def register_agent(self, agent_id: str, agent_instance: Any):
        """Register agent for workflow participation"""
        self.agent_registry[agent_id] = agent_instance
        logger.info(f"Registered agent: {agent_id}")
    
    def create_workflow(self, workflow_definition: Dict[str, Any]) -> str:
        """Create new workflow instance"""
        workflow_id = str(uuid.uuid4())
        
        context = WorkflowContext(
            workflow_id=workflow_id,
            created_at=datetime.now().isoformat(),
            status=WorkflowStatus.CREATED,
            current_step=0,
            total_steps=len(workflow_definition.get('steps', []))
        )
        
        self.workflows[workflow_id] = {
            'context': context,
            'definition': workflow_definition
        }
        
        logger.info(f"Created workflow: {workflow_id}")
        return workflow_id
    
    def get_workflow_status(self, workflow_id: str) -> Dict[str, Any]:
        """Get current workflow status and progress"""
        if workflow_id not in self.workflows:
            return {"error": "Workflow not found"}
        
        workflow = self.workflows[workflow_id]
        context = workflow['context']
        
        return {
            "workflow_id": workflow_id,
            "status": context.status.value,
            "progress": f"{context.current_step}/{context.total_steps}",
            "completion_percentage": (context.current_step / max(context.total_steps, 1)) * 100,
            "task_results": len(context.task_results),
            "shared_data_keys": list(context.shared_data.keys())
        }

# Initialize orchestrator
orchestrator = WorkflowOrchestrator()

print("✅ Multi-agent workflow foundation ready:")
print("   Workflow orchestration engine")
print("   Task status and result tracking")
print("   Shared context management")

🤖 MULTI-AGENT WORKFLOWS
Session: 2025-06-16 14:57:33
Focus: Coordinated agent systems for complex tasks

✅ Multi-agent workflow foundation ready:
   Workflow orchestration engine
   Task status and result tracking
   Shared context management


### Sequential Workflow Patterns

Sequential workflows process tasks in order, with each step depending on previous results:

**Key Features:**
- **State Management:** Passing data between sequential steps
- **Dependency Resolution:** Ensuring prerequisites are met
- **Error Propagation:** Handling failures in the chain
- **Progress Tracking:** Monitoring workflow advancement

In [3]:
# Sequential Workflow Implementation
import time

class SequentialWorkflow:
    """Sequential task execution with state management"""
    
    def __init__(self, orchestrator: WorkflowOrchestrator):
        self.orchestrator = orchestrator
    
    async def execute_step(self, workflow_id: str, step_definition: Dict[str, Any], context: WorkflowContext) -> TaskResult:
        """Execute individual workflow step"""
        task_id = f"{workflow_id}_step_{context.current_step}"
        agent_id = step_definition['agent_id']
        task_type = step_definition['task_type']
        parameters = step_definition.get('parameters', {})
        
        # Add shared context to parameters
        parameters['shared_data'] = context.shared_data
        parameters['previous_results'] = list(context.task_results.values())
        
        start_time = time.time()
        
        try:
            # Simulate agent task execution
            if agent_id == "data_processor":
                result = await self._process_data_task(task_type, parameters)
            elif agent_id == "analyzer":
                result = await self._analyze_task(task_type, parameters)
            elif agent_id == "reporter":
                result = await self._report_task(task_type, parameters)
            else:
                result = await self._generic_task(agent_id, task_type, parameters)
            
            execution_time = time.time() - start_time
            
            task_result = TaskResult(
                task_id=task_id,
                agent_id=agent_id,
                status=TaskStatus.COMPLETED,
                result=result,
                execution_time=execution_time,
                metadata={"task_type": task_type}
            )
            
            # Update shared context with results
            if 'output_key' in step_definition:
                context.shared_data[step_definition['output_key']] = result
            
            logger.info(f"Completed step {context.current_step}: {task_type} by {agent_id}")
            return task_result
            
        except Exception as e:
            execution_time = time.time() - start_time
            error_result = TaskResult(
                task_id=task_id,
                agent_id=agent_id,
                status=TaskStatus.FAILED,
                result=None,
                error=str(e),
                execution_time=execution_time,
                metadata={"task_type": task_type}
            )
            
            logger.error(f"Failed step {context.current_step}: {str(e)}")
            return error_result
    
    async def _process_data_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate data processing agent task"""
        await asyncio.sleep(0.1)  # Simulate processing time
        
        if task_type == "extract_data":
            return {
                "extracted_records": 1250,
                "data_sources": ["database", "api", "files"],
                "extraction_time": "0.1s",
                "quality_score": 0.95
            }
        elif task_type == "clean_data":
            previous_count = parameters.get('shared_data', {}).get('extracted_records', 1000)
            cleaned_count = int(previous_count * 0.92)  # 8% data cleaning loss
            return {
                "cleaned_records": cleaned_count,
                "removed_duplicates": previous_count - cleaned_count,
                "validation_passed": True,
                "data_quality": "high"
            }
        else:
            return {"status": "completed", "task_type": task_type}
    
    async def _analyze_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate analysis agent task"""
        await asyncio.sleep(0.15)  # Simulate analysis time
        
        if task_type == "statistical_analysis":
            record_count = parameters.get('shared_data', {}).get('cleaned_records', 1000)
            return {
                "total_records": record_count,
                "mean_value": 145.7,
                "std_deviation": 23.4,
                "outliers_detected": 12,
                "correlation_score": 0.73,
                "insights": ["Strong positive correlation", "Seasonal trends detected"]
            }
        elif task_type == "trend_analysis":
            return {
                "trend_direction": "upward",
                "growth_rate": "12.5%",
                "confidence_level": 0.87,
                "forecast_period": "Q1 2025",
                "key_drivers": ["market expansion", "product innovation"]
            }
        else:
            return {"status": "analyzed", "task_type": task_type}
    
    async def _report_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate reporting agent task"""
        await asyncio.sleep(0.08)  # Simulate report generation
        
        if task_type == "generate_report":
            shared_data = parameters.get('shared_data', {})
            return {
                "report_id": f"RPT_{uuid.uuid4().hex[:8]}",
                "pages_generated": 15,
                "charts_created": 8,
                "executive_summary": "Data analysis reveals strong growth trends with high confidence levels",
                "data_sources": shared_data.get('data_sources', []),
                "report_quality": "high",
                "generated_at": datetime.now().isoformat()
            }
        else:
            return {"status": "reported", "task_type": task_type}
    
    async def _generic_task(self, agent_id: str, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Generic task execution for unknown agents"""
        await asyncio.sleep(0.05)
        return {
            "agent_id": agent_id,
            "task_type": task_type,
            "status": "completed",
            "parameters_received": len(parameters),
            "timestamp": datetime.now().isoformat()
        }
    
    async def execute_workflow(self, workflow_id: str) -> WorkflowContext:
        """Execute complete sequential workflow"""
        if workflow_id not in self.orchestrator.workflows:
            raise ValueError(f"Workflow {workflow_id} not found")
        
        workflow = self.orchestrator.workflows[workflow_id]
        context = workflow['context']
        steps = workflow['definition']['steps']
        
        context.status = WorkflowStatus.RUNNING
        
        try:
            for i, step in enumerate(steps):
                context.current_step = i + 1
                
                # Execute step
                result = await self.execute_step(workflow_id, step, context)
                context.task_results[result.task_id] = result
                
                # Record execution history
                context.execution_history.append({
                    "step": i + 1,
                    "agent_id": result.agent_id,
                    "status": result.status.value,
                    "execution_time": result.execution_time,
                    "timestamp": datetime.now().isoformat()
                })
                
                # Check for failure
                if result.status == TaskStatus.FAILED:
                    context.status = WorkflowStatus.FAILED
                    logger.error(f"Workflow {workflow_id} failed at step {i + 1}")
                    return context
            
            context.status = WorkflowStatus.COMPLETED
            logger.info(f"Workflow {workflow_id} completed successfully")
            return context
            
        except Exception as e:
            context.status = WorkflowStatus.FAILED
            logger.error(f"Workflow {workflow_id} execution failed: {str(e)}")
            return context

# Initialize sequential workflow engine
sequential_engine = SequentialWorkflow(orchestrator)

# Create sample sequential workflow
workflow_definition = {
    "name": "Data Analysis Pipeline",
    "description": "Extract, clean, analyze, and report on business data",
    "steps": [
        {
            "agent_id": "data_processor",
            "task_type": "extract_data",
            "output_key": "extracted_records",
            "parameters": {"source": "business_db"}
        },
        {
            "agent_id": "data_processor", 
            "task_type": "clean_data",
            "output_key": "cleaned_records",
            "parameters": {"validation_rules": ["remove_duplicates", "check_nulls"]}
        },
        {
            "agent_id": "analyzer",
            "task_type": "statistical_analysis",
            "output_key": "analysis_results",
            "parameters": {"analysis_type": "comprehensive"}
        },
        {
            "agent_id": "analyzer",
            "task_type": "trend_analysis", 
            "output_key": "trend_results",
            "parameters": {"period": "quarterly"}
        },
        {
            "agent_id": "reporter",
            "task_type": "generate_report",
            "output_key": "final_report",
            "parameters": {"format": "executive_summary"}
        }
    ]
}

# Create workflow instance
workflow_id = orchestrator.create_workflow(workflow_definition)

print("\n🔄 Sequential workflow created:")
print(f"   Workflow ID: {workflow_id[:8]}")
print(f"   Steps: {len(workflow_definition['steps'])}")
print("   Agents: data_processor, analyzer, reporter")

INFO:__main__:Created workflow: 956cdd50-8f8b-4da5-80bd-0bb37c6ac787



🔄 Sequential workflow created:
   Workflow ID: 956cdd50
   Steps: 5
   Agents: data_processor, analyzer, reporter


### Parallel Agent Coordination

Parallel workflows execute multiple agents simultaneously with coordination:

**Coordination Patterns:**
- **Concurrent Execution:** Multiple agents working simultaneously
- **Synchronization Points:** Waiting for all agents to complete
- **Resource Sharing:** Managing shared data access
- **Result Aggregation:** Combining parallel execution results

In [4]:
# Parallel Agent Coordination System
import asyncio
from concurrent.futures import ThreadPoolExecutor

class ParallelWorkflow:
    """Parallel agent execution with coordination"""
    
    def __init__(self, orchestrator: WorkflowOrchestrator):
        self.orchestrator = orchestrator
        self.max_concurrent = 5
    
    async def execute_parallel_group(self, workflow_id: str, group_definition: List[Dict[str, Any]], 
                                   context: WorkflowContext) -> List[TaskResult]:
        """Execute group of tasks in parallel"""
        tasks = []
        
        for i, task_def in enumerate(group_definition):
            task_id = f"{workflow_id}_parallel_{context.current_step}_{i}"
            task = self._create_parallel_task(task_id, task_def, context)
            tasks.append(task)
        
        # Execute all tasks concurrently
        start_time = time.time()
        results = await asyncio.gather(*tasks, return_exceptions=True)
        total_time = time.time() - start_time
        
        # Process results
        task_results = []
        for i, result in enumerate(results):
            if isinstance(result, Exception):
                task_result = TaskResult(
                    task_id=f"{workflow_id}_parallel_{context.current_step}_{i}",
                    agent_id=group_definition[i]['agent_id'],
                    status=TaskStatus.FAILED,
                    result=None,
                    error=str(result),
                    execution_time=total_time
                )
            else:
                task_result = result
            
            task_results.append(task_result)
        
        logger.info(f"Completed parallel group: {len(task_results)} tasks in {total_time:.2f}s")
        return task_results
    
    async def _create_parallel_task(self, task_id: str, task_def: Dict[str, Any], 
                                  context: WorkflowContext) -> TaskResult:
        """Create individual parallel task"""
        agent_id = task_def['agent_id']
        task_type = task_def['task_type']
        parameters = task_def.get('parameters', {})
        
        # Add context (thread-safe read-only access)
        parameters['shared_data'] = dict(context.shared_data)  # Copy for thread safety
        
        start_time = time.time()
        
        try:
            # Route to appropriate agent simulation
            if agent_id == "validator":
                result = await self._validation_task(task_type, parameters)
            elif agent_id == "enricher":
                result = await self._enrichment_task(task_type, parameters)
            elif agent_id == "classifier":
                result = await self._classification_task(task_type, parameters)
            elif agent_id == "scorer":
                result = await self._scoring_task(task_type, parameters)
            else:
                result = await self._generic_parallel_task(agent_id, task_type, parameters)
            
            execution_time = time.time() - start_time
            
            return TaskResult(
                task_id=task_id,
                agent_id=agent_id,
                status=TaskStatus.COMPLETED,
                result=result,
                execution_time=execution_time,
                metadata={"task_type": task_type, "parallel": True}
            )
            
        except Exception as e:
            execution_time = time.time() - start_time
            return TaskResult(
                task_id=task_id,
                agent_id=agent_id,
                status=TaskStatus.FAILED,
                result=None,
                error=str(e),
                execution_time=execution_time,
                metadata={"task_type": task_type, "parallel": True}
            )
    
    async def _validation_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate data validation agent"""
        await asyncio.sleep(0.12)  # Simulate validation time
        
        record_count = parameters.get('shared_data', {}).get('cleaned_records', 1000)
        validation_rate = 0.94  # 94% pass validation
        
        return {
            "validation_type": task_type,
            "records_validated": record_count,
            "passed_validation": int(record_count * validation_rate),
            "failed_validation": int(record_count * (1 - validation_rate)),
            "validation_rules_applied": 8,
            "confidence_score": 0.94,
            "issues_found": ["missing_dates", "invalid_formats"]
        }
    
    async def _enrichment_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate data enrichment agent"""
        await asyncio.sleep(0.18)  # Simulate enrichment time
        
        record_count = parameters.get('shared_data', {}).get('cleaned_records', 1000)
        enrichment_rate = 0.87  # 87% successfully enriched
        
        return {
            "enrichment_type": task_type,
            "records_processed": record_count,
            "successfully_enriched": int(record_count * enrichment_rate),
            "enrichment_sources": ["external_api", "reference_db", "ml_model"],
            "new_fields_added": 5,
            "data_completeness": 0.92,
            "enrichment_quality": "high"
        }
    
    async def _classification_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate classification agent"""
        await asyncio.sleep(0.14)  # Simulate classification time
        
        record_count = parameters.get('shared_data', {}).get('cleaned_records', 1000)
        
        return {
            "classification_type": task_type,
            "records_classified": record_count,
            "categories": {
                "high_value": int(record_count * 0.15),
                "medium_value": int(record_count * 0.35), 
                "low_value": int(record_count * 0.50)
            },
            "classification_confidence": 0.89,
            "model_version": "v2.1.3",
            "accuracy_score": 0.91
        }
    
    async def _scoring_task(self, task_type: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Simulate scoring agent"""
        await asyncio.sleep(0.10)  # Simulate scoring time
        
        record_count = parameters.get('shared_data', {}).get('cleaned_records', 1000)
        
        return {
            "scoring_type": task_type,
            "records_scored": record_count,
            "average_score": 73.2,
            "score_distribution": {
                "0-25": int(record_count * 0.08),
                "26-50": int(record_count * 0.22),
                "51-75": int(record_count * 0.45),
                "76-100": int(record_count * 0.25)
            },
            "scoring_model": "ensemble_v1.2",
            "feature_importance": ["recency", "frequency", "monetary"]
        }
    
    async def _generic_parallel_task(self, agent_id: str, task_type: str, 
                                   parameters: Dict[str, Any]) -> Dict[str, Any]:
        """Generic parallel task execution"""
        await asyncio.sleep(0.05)
        return {
            "agent_id": agent_id,
            "task_type": task_type,
            "status": "completed",
            "parallel_execution": True,
            "parameters_processed": len(parameters),
            "timestamp": datetime.now().isoformat()
        }
    
    def aggregate_parallel_results(self, results: List[TaskResult]) -> Dict[str, Any]:
        """Aggregate results from parallel execution"""
        successful_tasks = [r for r in results if r.status == TaskStatus.COMPLETED]
        failed_tasks = [r for r in results if r.status == TaskStatus.FAILED]
        
        total_execution_time = max([r.execution_time for r in results]) if results else 0
        
        # Aggregate metrics from successful tasks
        aggregated_data = {
            "total_tasks": len(results),
            "successful_tasks": len(successful_tasks),
            "failed_tasks": len(failed_tasks),
            "success_rate": len(successful_tasks) / len(results) if results else 0,
            "total_execution_time": total_execution_time,
            "agent_performance": {}
        }
        
        # Aggregate by agent type
        for result in successful_tasks:
            agent_id = result.agent_id
            if agent_id not in aggregated_data["agent_performance"]:
                aggregated_data["agent_performance"][agent_id] = {
                    "tasks_completed": 0,
                    "average_time": 0,
                    "results": []
                }
            
            agent_perf = aggregated_data["agent_performance"][agent_id]
            agent_perf["tasks_completed"] += 1
            agent_perf["average_time"] = (agent_perf["average_time"] + result.execution_time) / agent_perf["tasks_completed"]
            agent_perf["results"].append(result.result)
        
        return aggregated_data

# Initialize parallel workflow engine
parallel_engine = ParallelWorkflow(orchestrator)

# Test parallel execution
print("\n⚡ Parallel workflow engine ready:")
print("   Concurrent task execution")
print("   Result aggregation and coordination")
print("   Thread-safe context management")


⚡ Parallel workflow engine ready:
   Concurrent task execution
   Result aggregation and coordination
   Thread-safe context management


### Agent Handoffs and State Transfer

Complex workflows require smooth transitions between specialized agents:

**Handoff Patterns:**
- **State Serialization:** Capturing complete agent state
- **Context Transfer:** Moving relevant data between agents
- **Validation Checkpoints:** Ensuring data integrity during handoffs
- **Rollback Mechanisms:** Handling handoff failures gracefully