# CrewAI Workflows Advanced

This notebook covers advanced patterns and production deployment for CrewAI Workflows.

## Topics Covered
- Advanced agent patterns
- Complex workflow orchestration
- Production monitoring
- External system integration
- Enterprise deployment strategies

## 1. Setup and Configuration

In [None]:
# Advanced setup for CrewAI Workflows
import os
import json
import logging
import asyncio
import time
from datetime import datetime
from typing import Dict, List, Optional
from dataclasses import dataclass

from crewai import Agent, Task, Crew, Process
from crewai_tools import SerperDevTool, FileReadTool, DirectoryReadTool
from dotenv import load_dotenv

load_dotenv()
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

print("Advanced CrewAI setup complete!")

In [None]:
@dataclass
class WorkflowConfig:
    name: str
    max_iterations: int = 5
    timeout: int = 300
    retry_attempts: int = 3
    enable_monitoring: bool = True

config = WorkflowConfig(
    name="Advanced Pipeline",
    max_iterations=10,
    timeout=600
)

print(f"Configuration: {config.name}")
print(f"Max iterations: {config.max_iterations}")
print(f"Timeout: {config.timeout}s")

## 2. Advanced Agent Factory

In [None]:
class AdvancedAgentFactory:
    @staticmethod
    def create_supervisor(config: WorkflowConfig) -> Agent:
        return Agent(
            role="Workflow Supervisor",
            goal="Orchestrate and monitor workflow execution",
            backstory="Experienced supervisor coordinating multiple agents",
            verbose=True,
            allow_delegation=True,
            max_iter=config.max_iterations,
            tools=[SerperDevTool()]
        )
    
    @staticmethod
    def create_specialist(specialty: str) -> Agent:
        return Agent(
            role=f"{specialty.title()} Specialist",
            goal=f"Provide expert {specialty} analysis",
            backstory=f"Domain expert in {specialty}",
            verbose=True,
            tools=[SerperDevTool()]
        )

# Create specialized agents
factory = AdvancedAgentFactory()
supervisor = factory.create_supervisor(config)
security_agent = factory.create_specialist("security")
compliance_agent = factory.create_specialist("compliance")

print("Advanced agents created:")
print(f"- {supervisor.role}")
print(f"- {security_agent.role}")
print(f"- {compliance_agent.role}")

## 3. Workflow Pipeline with Error Recovery

In [None]:
class RobustPipeline:
    def __init__(self, config: WorkflowConfig):
        self.config = config
        self.execution_history = []
        self.error_log = []
    
    def create_stages(self):
        # Stage 1: Data Collection
        collector = Agent(
            role="Data Collector",
            goal="Gather data from multiple sources",
            backstory="Expert in data collection",
            tools=[SerperDevTool(), FileReadTool()],
            verbose=True
        )
        
        collect_task = Task(
            description="Collect data on {topic}",
            expected_output="Structured dataset with sources",
            agent=collector
        )
        
        # Stage 2: Processing
        processor = Agent(
            role="Data Processor",
            goal="Process and structure information",
            backstory="Specialist in data processing",
            tools=[FileReadTool()],
            verbose=True
        )
        
        process_task = Task(
            description="Process collected data",
            expected_output="Processed information with insights",
            agent=processor
        )
        
        stage1 = Crew([collector], [collect_task], Process.sequential)
        stage2 = Crew([processor], [process_task], Process.sequential)
        
        return [stage1, stage2]
    
    def execute_with_retry(self, crew, inputs, stage_name):
        for attempt in range(self.config.retry_attempts):
            try:
                logger.info(f"Executing {stage_name}, attempt {attempt + 1}")
                
                # Demo execution
                result = f"Demo result for {stage_name}"
                
                self.execution_history.append({
                    'stage': stage_name,
                    'attempt': attempt + 1,
                    'success': True,
                    'timestamp': datetime.now().isoformat()
                })
                
                return result
                
            except Exception as e:
                self.error_log.append({
                    'stage': stage_name,
                    'attempt': attempt + 1,
                    'error': str(e)
                })
                
                if attempt < self.config.retry_attempts - 1:
                    time.sleep(2)
                else:
                    return None

pipeline = RobustPipeline(config)
stages = pipeline.create_stages()

print(f"Pipeline created with {len(stages)} stages")
print("Features: Error recovery, retry logic, execution tracking")

## 4. Monitoring System

In [None]:
class WorkflowMonitor:
    def __init__(self):
        self.metrics = {
            'executions': 0,
            'successes': 0,
            'failures': 0,
            'total_time': 0
        }
        self.alerts = []
    
    def start_monitoring(self, workflow_id: str) -> str:
        execution_id = f"{workflow_id}_{int(time.time())}"
        self.metrics['executions'] += 1
        logger.info(f"Started monitoring: {execution_id}")
        return execution_id
    
    def log_activity(self, execution_id: str, agent_id: str, activity: str):
        log_entry = {
            'execution_id': execution_id,
            'agent_id': agent_id,
            'activity': activity,
            'timestamp': datetime.now().isoformat()
        }
        logger.info(f"Activity logged: {agent_id} - {activity}")
    
    def complete_monitoring(self, execution_id: str, success: bool, duration: float):
        if success:
            self.metrics['successes'] += 1
        else:
            self.metrics['failures'] += 1
        
        self.metrics['total_time'] += duration
        
        success_rate = self.metrics['successes'] / self.metrics['executions']
        avg_time = self.metrics['total_time'] / self.metrics['executions']
        
        logger.info(f"Execution complete: {execution_id}")
        logger.info(f"Success rate: {success_rate:.2%}")
        logger.info(f"Average time: {avg_time:.2f}s")
    
    def get_report(self):
        success_rate = self.metrics['successes'] / max(self.metrics['executions'], 1)
        avg_time = self.metrics['total_time'] / max(self.metrics['executions'], 1)
        
        return {
            'total_executions': self.metrics['executions'],
            'success_rate': success_rate,
            'average_time': avg_time,
            'generated_at': datetime.now().isoformat()
        }

# Example usage
monitor = WorkflowMonitor()
execution_id = monitor.start_monitoring("demo_workflow")
monitor.log_activity(execution_id, "agent_01", "data_collection")
monitor.complete_monitoring(execution_id, True, 45.2)

report = monitor.get_report()
print(f"Monitoring report: {report}")

## 5. External System Integration

In [None]:
class SystemIntegrator:
    def __init__(self):
        self.integrations = {}
        self.circuit_breakers = {}
    
    def register_system(self, name: str, config: dict):
        self.integrations[name] = {
            'config': config,
            'status': 'active',
            'errors': 0,
            'successes': 0
        }
        
        self.circuit_breakers[name] = {
            'state': 'closed',
            'failures': 0,
            'threshold': config.get('failure_threshold', 5)
        }
        
        logger.info(f"Registered system: {name}")
    
    async def call_system(self, system_name: str, operation: str, data: dict):
        if system_name not in self.integrations:
            raise ValueError(f"System {system_name} not registered")
        
        try:
            # Simulate API call
            await asyncio.sleep(0.1)
            
            result = {
                'system': system_name,
                'operation': operation,
                'status': 'success',
                'timestamp': datetime.now().isoformat()
            }
            
            self.integrations[system_name]['successes'] += 1
            return result
            
        except Exception as e:
            self.integrations[system_name]['errors'] += 1
            logger.error(f"System call failed: {system_name}.{operation}")
            raise

# Setup integrations
integrator = SystemIntegrator()
integrator.register_system('salesforce', {'timeout': 30, 'failure_threshold': 3})
integrator.register_system('slack', {'timeout': 10, 'failure_threshold': 2})

print("External system integrations configured:")
for system in integrator.integrations:
    print(f"- {system}: active")

print("\nFeatures:")
print("✓ Circuit breaker pattern")
print("✓ Failure tracking")
print("✓ Configurable thresholds")

## 6. Production Deployment

In [None]:
# Production deployment configuration
deployment_config = {
    'containerization': {
        'base_image': 'python:3.11-slim',
        'features': ['health_checks', 'non_root_user', 'security_scanning']
    },
    'services': {
        'api_server': 'FastAPI application',
        'redis': 'Caching and queuing',
        'postgres': 'Data persistence',
        'monitoring': 'Prometheus metrics'
    },
    'scaling': {
        'horizontal': 'Multiple API instances',
        'auto_scaling': 'CPU and memory based',
        'load_balancing': 'Traffic distribution'
    }
}

print("Production Deployment Configuration:")
print(f"Base image: {deployment_config['containerization']['base_image']}")
print(f"Services: {len(deployment_config['services'])}")
print(f"Scaling options: {len(deployment_config['scaling'])}")

print("\nProduction Features:")
print("✓ Docker containerization")
print("✓ Multi-service architecture")
print("✓ Health monitoring")
print("✓ Auto-scaling")
print("✓ Load balancing")
print("✓ Security hardening")

In [None]:
class ProductionAPI:
    def __init__(self):
        self.endpoints = {
            'execute': '/workflows/execute',
            'status': '/workflows/{id}/status',
            'list': '/workflows',
            'cancel': '/workflows/{id}',
            'health': '/health',
            'metrics': '/metrics'
        }
        
        self.features = [
            'async_execution',
            'real_time_status',
            'workflow_cancellation',
            'health_monitoring',
            'prometheus_metrics'
        ]
    
    def get_api_info(self):
        return {
            'title': 'CrewAI Workflows API',
            'version': '1.0.0',
            'endpoints': len(self.endpoints),
            'features': len(self.features)
        }

api = ProductionAPI()
info = api.get_api_info()

print(f"API: {info['title']} v{info['version']}")
print(f"Endpoints: {info['endpoints']}")
print(f"Features: {info['features']}")

print("\nAPI Endpoints:")
for name, endpoint in api.endpoints.items():
    print(f"- {name.upper()}: {endpoint}")

## 7. Enterprise Workflow Example

In [None]:
class EnterpriseOrchestrator:
    def __init__(self, config: WorkflowConfig):
        self.config = config
        self.monitor = WorkflowMonitor()
        self.integrator = SystemIntegrator()
        
        # Setup enterprise systems
        self.integrator.register_system('salesforce', {'timeout': 30})
        self.integrator.register_system('slack', {'timeout': 10})
        self.integrator.register_system('jira', {'timeout': 20})
    
    def create_enterprise_agents(self):
        agents = {
            'data_analyst': Agent(
                role="Senior Data Analyst",
                goal="Analyze enterprise data and generate insights",
                backstory="Expert in enterprise data analysis",
                tools=[SerperDevTool(), FileReadTool()],
                verbose=True
            ),
            'compliance_officer': Agent(
                role="Compliance Officer",
                goal="Ensure regulatory compliance",
                backstory="Expert in regulatory requirements",
                tools=[SerperDevTool()],
                verbose=True
            ),
            'qa_manager': Agent(
                role="Quality Assurance Manager",
                goal="Validate quality standards",
                backstory="Expert in quality management",
                tools=[FileReadTool()],
                verbose=True
            )
        }
        return agents
    
    async def execute_enterprise_workflow(self, objective: str):
        execution_id = self.monitor.start_monitoring('enterprise_001')
        
        try:
            start_time = time.time()
            
            # Create agents
            agents = self.create_enterprise_agents()
            
            # Execute workflow stages
            stages = [
                ('data_extraction', 'Extract enterprise data'),
                ('compliance_review', 'Review compliance'),
                ('quality_validation', 'Validate quality')
            ]
            
            results = {}
            
            for stage_name, description in stages:
                self.monitor.log_activity(execution_id, stage_name, 'processing')
                
                # Simulate processing
                await asyncio.sleep(0.5)
                
                results[stage_name] = f"Completed {description} for {objective}"
            
            duration = time.time() - start_time
            self.monitor.complete_monitoring(execution_id, True, duration)
            
            return {
                'success': True,
                'execution_id': execution_id,
                'results': results,
                'duration': duration
            }
            
        except Exception as e:
            logger.error(f"Enterprise workflow failed: {e}")
            raise

# Initialize enterprise orchestrator
enterprise_config = WorkflowConfig(
    name="Enterprise Data Pipeline",
    max_iterations=8,
    timeout=1200
)

orchestrator = EnterpriseOrchestrator(enterprise_config)

print("Enterprise Workflow Orchestrator Initialized")
print(f"Configuration: {enterprise_config.name}")
print(f"Integrations: {len(orchestrator.integrator.integrations)}")
print("\nCapabilities:")
print("✓ Multi-agent coordination")
print("✓ External system integration")
print("✓ Real-time monitoring")
print("✓ Enterprise compliance")
print("✓ Quality assurance")

# Demo execution would be:
# result = await orchestrator.execute_enterprise_workflow("Q4 Revenue Analysis")
print("\nTo execute: await orchestrator.execute_enterprise_workflow('Business Objective')")

## 8. Production Checklist and Best Practices

### Production Deployment Checklist

#### Infrastructure ✅
- **Containerization**: Docker images with security scanning
- **Orchestration**: Kubernetes or Docker Compose
- **Load Balancing**: Horizontal scaling capabilities
- **Service Discovery**: Dynamic service registration

#### Monitoring & Observability ✅
- **Metrics**: Prometheus integration
- **Logging**: Structured logging with correlation IDs
- **Alerting**: Real-time alerts for critical issues
- **Dashboards**: Grafana visualization

#### Security ✅
- **Authentication**: API key or OAuth integration
- **Authorization**: Role-based access control
- **Secrets Management**: Secure credential storage
- **Network Security**: TLS encryption

#### Reliability ✅
- **Error Handling**: Circuit breakers and retry logic
- **Health Checks**: Comprehensive monitoring
- **Backup & Recovery**: Data persistence strategies
- **Disaster Recovery**: Multi-region deployment

### Performance Optimization ✅
- **Caching**: Redis for fast data access
- **Async Processing**: Background task execution
- **Resource Optimization**: Memory and CPU efficiency
- **Auto-scaling**: Load-based scaling

### Next Steps
1. Set up CI/CD pipeline for automated deployments
2. Implement comprehensive testing suite
3. Configure monitoring stack (Prometheus, Grafana, ELK)
4. Establish incident response procedures
5. Create operational runbooks

### Resources
- [CrewAI Documentation](https://docs.crewai.com/)
- [12-Factor App Methodology](https://12factor.net/)
- [Kubernetes Best Practices](https://kubernetes.io/docs/concepts/)
- [Site Reliability Engineering](https://sre.google/sre-book/)

**Congratulations!** You've mastered advanced CrewAI Workflows and are ready for enterprise production deployment.