# 📋 CrewAI Tasks Deep Dive

## Overview

This notebook provides a comprehensive exploration of **CrewAI Tasks** - the fundamental units of work that define what agents need to accomplish. You'll learn how to create, configure, and optimize tasks for maximum effectiveness.

### What You'll Learn:
- Core task concepts and components
- Task creation and configuration
- Output specifications and formatting
- Task dependencies and sequencing
- Error handling and validation
- Advanced task patterns and best practices

---

## 📦 Installation and Setup

Let's start by setting up our environment with the necessary packages.

In [None]:
# # Install CrewAI and related packages
# !pip install crewai crewai-tools python-dotenv

# # Optional: Install additional packages for advanced examples
# !pip install pydantic validators

In [None]:
# Import necessary libraries
import os
from dotenv import load_dotenv
from crewai import Agent, Task, Crew
from crewai_tools import (
    SerperDevTool,
    FileReadTool,
    WebsiteSearchTool
)
from pydantic import BaseModel, Field
from typing import List, Dict, Any, Optional
import json
from datetime import datetime

# Load environment variables
load_dotenv()

os.environ["OPENAI_API_KEY"] = os.getenv("OPEN_ROUTER_KEY")

print("✅ Environment setup complete!")

## 🎯 Understanding Task Fundamentals

### Core Task Components

Every CrewAI task consists of several key components:

1. **Description** - Clear explanation of what needs to be done
2. **Agent** - The agent responsible for executing the task
3. **Tools** - Specific tools available for this task
4. **Expected Output** - Format and content specifications
5. **Context** - Dependencies on other tasks
6. **Output File** - Optional file to save results

Let's explore each component in detail:

### 📝 1. Task Description Best Practices

The task description is crucial for agent understanding and performance. Let's see examples of effective descriptions:

In [None]:
# Examples of effective task descriptions

task_descriptions = {
    "poor": [
        "Research AI",
        "Write a report",
        "Analyze data",
        "Do some research"
    ],
    
    "good": [
        """Conduct comprehensive research on the latest developments in large language models 
        for 2024, focusing on architectural innovations, performance benchmarks, and real-world 
        applications. Include analysis of at least 5 major LLM releases and their impact on 
        the AI industry.""",
        
        """Create a detailed technical report analyzing the customer churn patterns in our 
        Q3 2024 data. Include statistical analysis, trend identification, key contributing 
        factors, and actionable recommendations for retention strategies.""",
        
        """Develop a comprehensive content marketing strategy for our new product launch, 
        including target audience analysis, content calendar for 3 months, distribution 
        channels, and success metrics. Focus on B2B SaaS market."""
    ]
}

print("📝 Task Description Examples")
print("=" * 50)

print("\n❌ Poor Task Descriptions:")
for i, desc in enumerate(task_descriptions["poor"], 1):
    print(f"  {i}. {desc}")

print("\n✅ Good Task Descriptions:")
for i, desc in enumerate(task_descriptions["good"], 1):
    print(f"\n  {i}. {desc.strip()}")

print("\n" + "=" * 50)
print("🎯 Key Elements of Good Descriptions:")
elements = [
    "Specific objectives and scope",
    "Clear deliverables and expectations",
    "Quantifiable metrics when possible",
    "Context and background information",
    "Success criteria definition"
]

for i, element in enumerate(elements, 1):
    print(f"  {i}. {element}")

### 🎯 2. Expected Output Specifications

Defining clear output expectations helps agents deliver exactly what you need:

In [None]:
# Examples of clear output specifications

output_specifications = {
    "research_report": {
        "format": "Markdown document",
        "structure": [
            "Executive Summary (200 words)",
            "Methodology section",
            "Key Findings (bullet points)",
            "Detailed Analysis (1000-1500 words)",
            "Recommendations (numbered list)",
            "Sources and References"
        ],
        "requirements": [
            "Include at least 10 credible sources",
            "Use proper markdown formatting",
            "Include data visualizations descriptions",
            "Provide actionable insights"
        ]
    },
    
    "data_analysis": {
        "format": "Structured JSON report",
        "structure": [
            "Summary statistics",
            "Trend analysis",
            "Anomaly detection results",
            "Correlation findings",
            "Predictions and forecasts"
        ],
        "requirements": [
            "Include confidence intervals",
            "Validate statistical assumptions",
            "Provide visualization recommendations",
            "Include data quality assessment"
        ]
    },
    
    "content_strategy": {
        "format": "Comprehensive plan document",
        "structure": [
            "Strategy Overview",
            "Target Audience Personas",
            "Content Calendar (3 months)",
            "Distribution Strategy",
            "Success Metrics and KPIs",
            "Budget Considerations"
        ],
        "requirements": [
            "Include specific content ideas",
            "Define posting schedules",
            "Specify content formats",
            "Include competitive analysis"
        ]
    }
}

print("🎯 Output Specification Examples")
print("=" * 50)

for output_type, specs in output_specifications.items():
    print(f"\n📊 {output_type.replace('_', ' ').title()}:")
    print(f"  Format: {specs['format']}")
    
    print(f"  \n  Structure:")
    for item in specs['structure']:
        print(f"    - {item}")
    
    print(f"  \n  Requirements:")
    for req in specs['requirements']:
        print(f"    - {req}")
    print("-" * 40)

## 🏗️ Creating Your First Task

Let's create a comprehensive task with all the essential components:

In [None]:
# First, let's create an agent to execute our task

# Initialize tools
search_tool = SerperDevTool()
file_tool = FileReadTool()
website_tool = WebsiteSearchTool()

# Create a research agent
research_agent = Agent(
    role='Senior Market Research Analyst',
    goal='Conduct thorough market research and provide actionable business insights',
    backstory="""You are an experienced market researcher with 10+ years in technology 
    sector analysis. You excel at identifying market trends, competitive landscapes, 
    and growth opportunities. Your reports are known for their depth and actionable recommendations.""",
    tools=[search_tool, website_tool, file_tool],
    verbose=True,
    memory=True
)

print("✅ Research Agent Created!")
print(f"Role: {research_agent.role}")
print(f"Available Tools: {len(research_agent.tools)}")

In [None]:
# Now let's create a comprehensive task

market_research_task = Task(
    description="""Conduct a comprehensive market analysis of the AI-powered customer 
    service automation industry for 2024. Your analysis should include:
    
    1. Market size and growth projections
    2. Key players and competitive landscape
    3. Emerging trends and technologies
    4. Customer adoption patterns
    5. Challenges and opportunities
    6. Regional market variations
    
    Focus on identifying actionable opportunities for a mid-size SaaS company 
    looking to enter this market. Include specific recommendations for market 
    entry strategies and positioning.""",
    
    agent=research_agent,
    
    tools=[search_tool, website_tool],  # Task-specific tools
    
    expected_output="""A comprehensive market research report in markdown format 
    containing:
    
    ## Executive Summary (300 words)
    - Key findings and recommendations
    
    ## Market Overview
    - Market size, growth rate, and projections
    - Key market drivers and barriers
    
    ## Competitive Landscape
    - Major players and market share
    - Competitive positioning analysis
    
    ## Technology Trends
    - Emerging technologies and innovations
    - Adoption timelines and impact assessment
    
    ## Market Entry Recommendations
    - Specific strategies for market entry
    - Positioning recommendations
    - Risk mitigation strategies
    
    ## Sources and References
    - List of all sources used with URLs when available
    
    The report should be 2000-3000 words and include actionable insights 
    throughout.""",
    
    output_file="market_research_report.md"  # Save results to file
)

print("📋 Comprehensive Market Research Task Created!")
print(f"\nTask Overview:")
print(f"  Agent: {market_research_task.agent.role}")
print(f"  Tools: {len(market_research_task.tools) if market_research_task.tools else 0}")
print(f"  Output File: {market_research_task.output_file}")
print(f"  Description Length: {len(market_research_task.description)} characters")

## 🔗 Task Dependencies and Context

Tasks can depend on the output of other tasks. Let's create a sequence of related tasks:

In [None]:
# Create additional agents for our task sequence

data_analyst = Agent(
    role='Senior Data Analyst',
    goal='Transform market research into actionable data insights and recommendations',
    backstory="""You specialize in quantitative analysis and data interpretation. 
    You excel at finding patterns in market data and translating research findings 
    into concrete business metrics and projections.""",
    tools=[file_tool],
    verbose=True
)

strategy_consultant = Agent(
    role='Business Strategy Consultant',
    goal='Develop comprehensive business strategies based on market analysis and data insights',
    backstory="""You are a senior strategy consultant with expertise in technology 
    market entry strategies. You excel at synthesizing research and data into 
    actionable business plans and strategic recommendations.""",
    tools=[file_tool],
    verbose=True
)

print("✅ Additional Agents Created!")
print(f"  Data Analyst: {data_analyst.role}")
print(f"  Strategy Consultant: {strategy_consultant.role}")

In [None]:
# Create a sequence of dependent tasks

# Task 1: Market Research (already created above)
task_1_research = market_research_task

# Task 2: Data Analysis (depends on Task 1)
task_2_analysis = Task(
    description="""Analyze the market research findings and create a quantitative 
    assessment of the AI customer service automation market opportunity. 
    
    Based on the research report, develop:
    1. Market sizing models with assumptions
    2. Revenue projections for market entry scenarios
    3. Investment requirements analysis
    4. Risk assessment with probability weightings
    5. ROI calculations for different market entry strategies
    6. Competitive positioning metrics
    
    Create financial models that can guide investment decisions and strategic planning.""",
    
    agent=data_analyst,
    
    context=[task_1_research],  # This task depends on the research task
    
    expected_output="""A detailed quantitative analysis report including:
    
    ## Market Sizing Model
    - TAM, SAM, SOM calculations with methodology
    - Growth rate projections with confidence intervals
    
    ## Financial Projections
    - 5-year revenue projections for 3 market entry scenarios
    - Investment requirements breakdown
    - Break-even analysis
    
    ## Risk Assessment
    - Identified risks with probability and impact ratings
    - Sensitivity analysis for key variables
    
    ## Competitive Metrics
    - Market share potential analysis
    - Competitive positioning scores
    
    Include all assumptions, methodologies, and data sources used in calculations.""",
    
    output_file="market_analysis_data.json"
)

# Task 3: Strategy Development (depends on Tasks 1 & 2)
task_3_strategy = Task(
    description="""Synthesize the market research and quantitative analysis to develop 
    a comprehensive market entry strategy for the AI customer service automation space.
    
    Create a detailed strategic plan that includes:
    1. Recommended market entry approach and timeline
    2. Product positioning and differentiation strategy
    3. Go-to-market strategy with specific tactics
    4. Partnership and alliance recommendations
    5. Resource allocation and organizational requirements
    6. Success metrics and milestone definitions
    7. Risk mitigation strategies
    8. Alternative scenarios and contingency plans
    
    The strategy should be actionable and include specific next steps.""",
    
    agent=strategy_consultant,
    
    context=[task_1_research, task_2_analysis],  # Depends on both previous tasks
    
    expected_output="""A comprehensive business strategy document including:
    
    ## Executive Summary
    - Strategic recommendation and rationale
    
    ## Market Entry Strategy
    - Phased approach with timelines
    - Resource requirements and budget
    
    ## Product Strategy
    - Positioning and differentiation
    - Feature prioritization
    
    ## Go-to-Market Plan
    - Customer acquisition strategy
    - Sales and marketing tactics
    - Channel strategy
    
    ## Implementation Roadmap
    - 18-month implementation plan
    - Key milestones and success metrics
    - Risk mitigation strategies
    
    ## Next Steps
    - Immediate actions required
    - Decision points and timelines
    
    The document should be executive-ready and actionable.""",
    
    output_file="market_entry_strategy.md"
)

print("🔗 Task Dependency Chain Created!")
print("\nTask Sequence:")
print(f"  1. {task_1_research.agent.role}: Market Research")
print(f"  2. {task_2_analysis.agent.role}: Data Analysis (depends on #1)")
print(f"  3. {task_3_strategy.agent.role}: Strategy Development (depends on #1 & #2)")

print("\n📊 Context Dependencies:")
print(f"  Task 2 context: {len(task_2_analysis.context)} task(s)")
print(f"  Task 3 context: {len(task_3_strategy.context)} task(s)")

## 📊 Structured Output with Pydantic Models

For complex outputs, you can use Pydantic models to ensure structured, validated results:

In [None]:
# Define Pydantic models for structured outputs

from pydantic import BaseModel, Field, validator
from typing import List, Dict, Optional
from enum import Enum

class RiskLevel(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class MarketSegment(BaseModel):
    name: str = Field(..., description="Name of the market segment")
    size_usd_millions: float = Field(..., description="Market size in USD millions")
    growth_rate_percent: float = Field(..., description="Annual growth rate percentage")
    key_players: List[str] = Field(..., description="Major players in this segment")
    
    @validator('growth_rate_percent')
    def validate_growth_rate(cls, v):
        if v < -100 or v > 1000:
            raise ValueError('Growth rate must be between -100% and 1000%')
        return v

class CompetitorProfile(BaseModel):
    name: str = Field(..., description="Company name")
    market_share_percent: float = Field(..., description="Market share percentage")
    strengths: List[str] = Field(..., description="Key competitive strengths")
    weaknesses: List[str] = Field(..., description="Competitive weaknesses")
    pricing_model: str = Field(..., description="Pricing strategy description")

class RiskFactor(BaseModel):
    description: str = Field(..., description="Risk description")
    probability_percent: float = Field(..., description="Probability of occurrence")
    impact: RiskLevel = Field(..., description="Impact level if risk occurs")
    mitigation_strategies: List[str] = Field(..., description="Risk mitigation approaches")

class MarketAnalysisOutput(BaseModel):
    """Structured output model for market analysis tasks."""
    
    executive_summary: str = Field(..., description="Key findings and recommendations")
    total_market_size_usd_billions: float = Field(..., description="Total addressable market size")
    market_segments: List[MarketSegment] = Field(..., description="Market segment breakdown")
    competitors: List[CompetitorProfile] = Field(..., description="Competitor analysis")
    risk_factors: List[RiskFactor] = Field(..., description="Identified risks")
    opportunities: List[str] = Field(..., description="Market opportunities")
    recommendations: List[str] = Field(..., description="Strategic recommendations")
    confidence_level: float = Field(..., description="Analysis confidence level (0-1)")
    sources: List[str] = Field(..., description="Data sources and references")
    
    @validator('confidence_level')
    def validate_confidence(cls, v):
        if v < 0 or v > 1:
            raise ValueError('Confidence level must be between 0 and 1')
        return v

print("📊 Structured Output Models Defined!")
print("\nModel Components:")
print(f"  - MarketAnalysisOutput: {len(MarketAnalysisOutput.__fields__)} fields")
print(f"  - MarketSegment: {len(MarketSegment.__fields__)} fields")
print(f"  - CompetitorProfile: {len(CompetitorProfile.__fields__)} fields")
print(f"  - RiskFactor: {len(RiskFactor.__fields__)} fields")
print(f"  - RiskLevel enum: {len(RiskLevel)} levels")

# Example of creating a task with structured output
structured_analysis_task = Task(
    description="""Conduct a structured market analysis of the AI customer service 
    automation industry. Your output must conform to the MarketAnalysisOutput 
    schema and include all required fields with validated data.
    
    Ensure all numerical values are realistic and properly validated. 
    Include at least 3 market segments, 5 competitors, and 4 risk factors.""",
    
    agent=research_agent,
    
    expected_output="""A JSON object conforming to the MarketAnalysisOutput schema 
    with complete and validated data for all fields. The output must be valid 
    JSON that can be parsed and validated against the Pydantic model.""",
    
    output_pydantic=MarketAnalysisOutput,  # Specify the Pydantic model
    
    output_file="structured_market_analysis.json"
)

print("\n✅ Structured Analysis Task Created with Pydantic validation!")

## 🛠️ Advanced Task Configuration

Let's explore advanced task configuration options and patterns:

In [None]:
# Advanced task configuration examples

# 1. Task with custom validation function
def validate_research_output(output: str) -> bool:
    """Custom validation function for research outputs."""
    required_sections = [
        "executive summary",
        "methodology",
        "findings",
        "recommendations",
        "sources"
    ]
    
    output_lower = output.lower()
    missing_sections = []
    
    for section in required_sections:
        if section not in output_lower:
            missing_sections.append(section)
    
    if missing_sections:
        print(f"⚠️ Missing required sections: {', '.join(missing_sections)}")
        return False
    
    # Check minimum length
    if len(output) < 1000:
        print("⚠️ Output too short (minimum 1000 characters)")
        return False
    
    print("✅ Output validation passed")
    return True

# Task with validation
validated_task = Task(
    description="""Conduct a comprehensive analysis of emerging AI technologies 
    in healthcare for 2024. Focus on practical applications, regulatory 
    considerations, and market adoption potential.""",
    
    agent=research_agent,
    
    expected_output="""A structured report with Executive Summary, Methodology, 
    Key Findings, Strategic Recommendations, and Sources sections. 
    Minimum 1000 words with actionable insights.""",
    
    
    callbacks=[validate_research_output],
    
    # Additional configuration
    async_execution=False,  # Synchronous execution
    human_input=False,      # No human intervention required
    
    output_file="healthcare_ai_analysis.md"
)

print("🛠️ Advanced Task Configurations:")
print(f"  ⚡ Async execution: {validated_task.async_execution}")
print(f"  👤 Human input required: {validated_task.human_input}")



In [None]:
# 2. Task with human-in-the-loop interaction

human_review_task = Task(
    description="""Create a detailed competitive analysis of the top 3 companies 
    in the AI customer service space. For each company, analyze their:
    
    1. Product offerings and features
    2. Pricing strategies
    3. Market positioning
    4. Customer base and testimonials
    5. Strengths and weaknesses
    6. Recent developments and news
    
    After completing the initial analysis, present findings to human reviewer 
    for feedback and additional insights before finalizing the report.""",
    
    agent=research_agent,
    
    expected_output="""A comprehensive competitive analysis report with:
    - Company profiles and comparisons
    - SWOT analysis for each competitor
    - Market positioning map
    - Strategic recommendations
    - Competitive threats and opportunities assessment
    
    The report should incorporate human reviewer feedback and insights.""",
    
    human_input=True,  # Enable human-in-the-loop
    
    output_file="competitive_analysis.md"
)

print("\n👤 Human-in-the-Loop Task Created:")
print(f"  Human input enabled: {human_review_task.human_input}")
print("  This task will pause for human review and feedback during execution.")

In [None]:
# 3. Parallel execution task example

# Create multiple agents for parallel work
regional_analysts = {
    'north_america': Agent(
        role='North America Market Analyst',
        goal='Analyze AI customer service market trends in North America',
        backstory="Expert in North American technology markets with deep understanding of regulatory environment and customer preferences.",
        tools=[search_tool],
        verbose=True
    ),
    'europe': Agent(
        role='European Market Analyst', 
        goal='Analyze AI customer service market trends in Europe',
        backstory="Specialist in European markets with expertise in GDPR compliance and diverse cultural market dynamics.",
        tools=[search_tool],
        verbose=True
    ),
    'asia_pacific': Agent(
        role='Asia-Pacific Market Analyst',
        goal='Analyze AI customer service market trends in Asia-Pacific',
        backstory="Expert in APAC markets with understanding of rapid technology adoption and diverse regulatory landscapes.",
        tools=[search_tool],
        verbose=True
    )
}

# Create parallel tasks for regional analysis
parallel_tasks = []

for region, agent in regional_analysts.items():
    task = Task(
        description=f"""Conduct regional market analysis for {region.replace('_', ' ').title()} 
        focusing on AI customer service automation. Include:
        
        1. Regional market size and growth rates
        2. Key local and international players
        3. Regulatory environment and compliance requirements
        4. Cultural factors affecting adoption
        5. Technology infrastructure readiness
        6. Customer preferences and behavior patterns
        7. Regional opportunities and challenges
        
        Provide region-specific insights and recommendations.""",
        
        agent=agent,
        
        expected_output=f"""Regional analysis report for {region.replace('_', ' ').title()} including:
        - Market overview and sizing
        - Competitive landscape
        - Regulatory considerations
        - Cultural and adoption factors
        - Strategic recommendations for market entry
        
        Report should be 1000-1500 words with region-specific insights.""",
        
        async_execution=True,  # Enable parallel execution
        
        output_file=f"{region}_market_analysis.md"
    )
    
    parallel_tasks.append(task)

print("⚡ Parallel Execution Tasks Created:")
for i, task in enumerate(parallel_tasks, 1):
    print(f"  {i}. {task.agent.role}")
    print(f"     Async: {task.async_execution}")
    print(f"     Output: {task.output_file}")

print("\n🌍 These tasks can execute simultaneously for faster completion!")

## 🔄 Task Execution Patterns

Let's explore different patterns for executing tasks:

In [None]:
# Task execution pattern examples

# 1. Simple sequential execution
def create_sequential_workflow():
    
    # Create tasks first
    research_task = Task(
        description="Research the latest trends in AI-powered chatbots for 2024",
        agent=research_agent,
        expected_output="A comprehensive research report on chatbot trends",
    )
    
    analysis_task = Task(
        description="Analyze the research findings and identify key opportunities",
        agent=data_analyst,
        expected_output="Data analysis with opportunity assessment",
        context=[research_task]  # Reference the actual task object
    )
    
    strategy_task = Task(
        description="Create a strategic plan based on research and analysis",
        agent=strategy_consultant,
        expected_output="Strategic plan with actionable recommendations",
        context=[research_task, analysis_task]  # Reference both task objects
    )
    
    return [research_task, analysis_task, strategy_task]
    

# 2. Parallel with convergence pattern
def create_parallel_convergence_workflow():
    """Create a workflow with parallel tasks converging to a final task."""
    
    # Parallel research tasks
    market_research = Task(
        description="Research market size and growth trends",
        agent=regional_analysts['north_america'],
        expected_output="Market research findings",
        async_execution=True
    )
    
    competitor_research = Task(
        description="Research competitor landscape and positioning",
        agent=regional_analysts['europe'],
        expected_output="Competitive analysis",
        async_execution=True
    )
    
    technology_research = Task(
        description="Research emerging technologies and innovations",
        agent=regional_analysts['asia_pacific'],
        expected_output="Technology trend analysis",
        async_execution=True
    )
    
    # Convergence task that depends on all parallel tasks
    synthesis_task = Task(
        description="Synthesize all research findings into comprehensive strategy",
        agent=strategy_consultant,
        expected_output="Integrated strategic analysis and recommendations",
        context=[market_research, competitor_research, technology_research]
    )
    
    return [market_research, competitor_research, technology_research, synthesis_task]

# 3. Iterative refinement pattern
def create_iterative_workflow():
    """Create a workflow with iterative refinement."""
    
    # Create reviewer agent
    reviewer = Agent(
        role='Senior Quality Reviewer',
        goal='Review and provide feedback for continuous improvement',
        backstory="Expert reviewer with high standards for quality and accuracy.",
        tools=[file_tool],
        verbose=True
    )
    
    
    analysis_task=Task(
            description="Create initial market analysis draft",
            agent=research_agent,
            expected_output="Initial analysis draft",
        )
        
    review_task=Task(
            description="Review the analysis and provide detailed feedback",
            agent=reviewer,
            expected_output="Detailed review with improvement suggestions",
            context=[analysis_task]
        )
        
    refine_task=Task(
            description="Refine the analysis based on reviewer feedback",
            agent=research_agent,
            expected_output="Refined analysis incorporating feedback",
            context=[analysis_task, review_task]
        )
        
    quality_task=Task(
            description="Final quality check and approval",
            agent=reviewer,
            expected_output="Final approved analysis",
            context=[refine_task]
        )
    return [analysis_task, review_task, refine_task, quality_task]
    
    

print("🔄 Task Execution Patterns Defined:")
print("\n1. Sequential Workflow:")
sequential_tasks = create_sequential_workflow()
for i, task in enumerate(sequential_tasks, 1):
    dependencies = f" (depends on tasks: {task.context})" if hasattr(task, 'context') and task.context else ""
    print(f"   Step {i}: {task.agent.role}{dependencies}")

print("\n2. Parallel Convergence Workflow:")
parallel_tasks = create_parallel_convergence_workflow()
for i, task in enumerate(parallel_tasks, 1):
    async_note = " (parallel)" if hasattr(task, 'async_execution') and task.async_execution else ""
    print(f"   Step {i}: {task.agent.role}{async_note}{dependencies}")

print("\n3. Iterative Refinement Workflow:")
iterative_tasks = create_iterative_workflow()
for i, task in enumerate(iterative_tasks, 1):
    dependencies = f" (depends on tasks: {task.context})" if hasattr(task, 'context') and task.context else ""
    print(f"   Step {i}: {task.agent.role}{dependencies}")

## 🎯 Task Performance Monitoring

Let's implement comprehensive task performance monitoring:

In [None]:
# Task performance monitoring system

import time
from datetime import datetime, timedelta
from typing import Dict, List, Any

class TaskPerformanceMonitor:
    def __init__(self):
        self.task_metrics = {}
        self.execution_history = []
        self.performance_stats = {}
    
    def start_task_monitoring(self, task_id: str, task_description: str, agent_role: str):
        """Start monitoring a task execution."""
        self.task_metrics[task_id] = {
            'description': task_description[:100] + '...',
            'agent_role': agent_role,
            'start_time': datetime.now(),
            'status': 'running',
            'iterations': 0,
            'tool_calls': 0,
            'errors': []
        }
        
        print(f"📊 Started monitoring task: {task_id}")
    
    def update_task_progress(self, task_id: str, iteration: int = None, tool_call: bool = False, error: str = None):
        """Update task progress metrics."""
        if task_id in self.task_metrics:
            metrics = self.task_metrics[task_id]
            
            if iteration:
                metrics['iterations'] = iteration
            
            if tool_call:
                metrics['tool_calls'] += 1
            
            if error:
                metrics['errors'].append({
                    'error': error,
                    'timestamp': datetime.now()
                })
    
    def complete_task_monitoring(self, task_id: str, success: bool = True, output_length: int = 0):
        """Complete task monitoring and record final metrics."""
        if task_id in self.task_metrics:
            metrics = self.task_metrics[task_id]
            end_time = datetime.now()
            execution_time = (end_time - metrics['start_time']).total_seconds()
            
            metrics.update({
                'end_time': end_time,
                'execution_time': execution_time,
                'success': success,
                'output_length': output_length,
                'status': 'completed' if success else 'failed'
            })
            
            # Add to execution history
            self.execution_history.append(metrics.copy())
            
            # Update performance stats
            self._update_performance_stats(metrics)
            
            print(f"✅ Completed monitoring task: {task_id} ({'success' if success else 'failed'})")
    
    def _update_performance_stats(self, metrics: Dict):
        """Update aggregate performance statistics."""
        agent_role = metrics['agent_role']
        
        if agent_role not in self.performance_stats:
            self.performance_stats[agent_role] = {
                'total_tasks': 0,
                'successful_tasks': 0,
                'total_execution_time': 0,
                'total_iterations': 0,
                'total_tool_calls': 0,
                'total_errors': 0,
                'avg_execution_time': 0,
                'success_rate': 0
            }
        
        stats = self.performance_stats[agent_role]
        stats['total_tasks'] += 1
        stats['total_execution_time'] += metrics['execution_time']
        stats['total_iterations'] += metrics['iterations']
        stats['total_tool_calls'] += metrics['tool_calls']
        stats['total_errors'] += len(metrics['errors'])
        
        if metrics['success']:
            stats['successful_tasks'] += 1
        
        # Update averages
        stats['avg_execution_time'] = stats['total_execution_time'] / stats['total_tasks']
        stats['success_rate'] = stats['successful_tasks'] / stats['total_tasks']
    
    def get_performance_report(self) -> str:
        """Generate comprehensive performance report."""
        report = "\n📊 Task Performance Report\n"
        report += "=" * 50 + "\n"
        
        # Overall statistics
        total_tasks = len(self.execution_history)
        successful_tasks = sum(1 for task in self.execution_history if task['success'])
        overall_success_rate = successful_tasks / total_tasks if total_tasks > 0 else 0
        
        report += f"\n📈 Overall Statistics:\n"
        report += f"  Total Tasks Executed: {total_tasks}\n"
        report += f"  Successful Tasks: {successful_tasks}\n"
        report += f"  Overall Success Rate: {overall_success_rate:.2%}\n"
        
        # Agent-specific statistics
        if self.performance_stats:
            report += f"\n🤖 Agent Performance:\n"
            for agent_role, stats in self.performance_stats.items():
                report += f"\n  {agent_role}:\n"
                report += f"    Tasks: {stats['total_tasks']}\n"
                report += f"    Success Rate: {stats['success_rate']:.2%}\n"
                report += f"    Avg Execution Time: {stats['avg_execution_time']:.2f}s\n"
                report += f"    Total Tool Calls: {stats['total_tool_calls']}\n"
                report += f"    Total Errors: {stats['total_errors']}\n"
        
        # Recent task history
        if self.execution_history:
            report += f"\n📋 Recent Task History (Last 5):\n"
            recent_tasks = self.execution_history[-5:]
            for i, task in enumerate(recent_tasks, 1):
                status_icon = "✅" if task['success'] else "❌"
                report += f"  {i}. {status_icon} {task['agent_role']} - {task['execution_time']:.2f}s\n"
                report += f"     {task['description']}\n"
        
        return report
    
    def get_task_insights(self) -> Dict[str, Any]:
        """Generate insights and recommendations based on performance data."""
        insights = {
            'bottlenecks': [],
            'recommendations': [],
            'trends': {}
        }
        
        if not self.execution_history:
            return insights
        
        # Identify bottlenecks
        avg_execution_time = sum(task['execution_time'] for task in self.execution_history) / len(self.execution_history)
        slow_tasks = [task for task in self.execution_history if task['execution_time'] > avg_execution_time * 2]
        
        if slow_tasks:
            insights['bottlenecks'].append(f"Found {len(slow_tasks)} tasks taking >2x average execution time")
        
        # Error analysis
        error_prone_agents = {}
        for agent_role, stats in self.performance_stats.items():
            error_rate = stats['total_errors'] / stats['total_tasks'] if stats['total_tasks'] > 0 else 0
            if error_rate > 0.1:  # More than 10% error rate
                error_prone_agents[agent_role] = error_rate
        
        if error_prone_agents:
            insights['bottlenecks'].extend([
                f"{agent}: {rate:.1%} error rate" for agent, rate in error_prone_agents.items()
            ])
        
        # Generate recommendations
        if slow_tasks:
            insights['recommendations'].append("Consider optimizing task descriptions for slow-running tasks")
            insights['recommendations'].append("Review tool selection for time-intensive operations")
        
        if error_prone_agents:
            insights['recommendations'].append("Review agent configurations for high-error agents")
            insights['recommendations'].append("Consider additional validation or error handling")
        
        # Success rate trends
        if len(self.execution_history) >= 5:
            recent_success_rate = sum(1 for task in self.execution_history[-5:] if task['success']) / 5
            overall_success_rate = sum(1 for task in self.execution_history if task['success']) / len(self.execution_history)
            
            insights['trends']['recent_vs_overall'] = {
                'recent_success_rate': recent_success_rate,
                'overall_success_rate': overall_success_rate,
                'improving': recent_success_rate > overall_success_rate
            }
        
        return insights

# Create performance monitor instance
task_monitor = TaskPerformanceMonitor()

print("📊 Task Performance Monitor Initialized!")
print("\nMonitoring Capabilities:")
capabilities = [
    "Real-time task execution tracking",
    "Agent performance analytics",
    "Error and bottleneck identification",
    "Success rate monitoring",
    "Performance insights and recommendations"
]

for i, capability in enumerate(capabilities, 1):
    print(f"  {i}. {capability}")

In [None]:
# Simulate task executions for demonstration

import random
import time

def simulate_task_execution(task_id: str, agent_role: str, description: str):
    """Simulate a task execution with monitoring."""
    
    # Start monitoring
    task_monitor.start_task_monitoring(task_id, description, agent_role)
    
    # Simulate execution with random characteristics
    execution_time = random.uniform(0.5, 3.0)  # Random execution time
    iterations = random.randint(1, 5)
    tool_calls = random.randint(0, 8)
    success_probability = 0.85  # 85% success rate
    
    # Simulate progress updates
    for i in range(iterations):
        task_monitor.update_task_progress(task_id, iteration=i+1)
        time.sleep(execution_time / iterations)  # Simulate work
    
    # Simulate tool calls
    for _ in range(tool_calls):
        task_monitor.update_task_progress(task_id, tool_call=True)
    
    # Simulate occasional errors
    if random.random() < 0.2:  # 20% chance of error
        task_monitor.update_task_progress(task_id, error="Simulated API timeout")
    
    # Complete task
    success = random.random() < success_probability
    output_length = random.randint(500, 3000) if success else 0
    
    task_monitor.complete_task_monitoring(task_id, success, output_length)
    
    return success

# Simulate multiple task executions
print("🔄 Simulating Task Executions...\n")

simulation_tasks = [
    ("task_001", "Senior Research Analyst", "Conduct comprehensive market research on AI trends"),
    ("task_002", "Lead Data Analyst", "Analyze customer behavior patterns in Q3 data"),
    ("task_003", "Business Strategy Consultant", "Develop market entry strategy for new product"),
    ("task_004", "Senior Research Analyst", "Research competitive landscape in fintech sector"),
    ("task_005", "Lead Data Analyst", "Create predictive model for customer churn"),
    ("task_006", "Business Strategy Consultant", "Optimize pricing strategy based on market analysis"),
    ("task_007", "Senior Research Analyst", "Investigate emerging technologies in healthcare AI"),
    ("task_008", "Lead Data Analyst", "Perform statistical analysis on user engagement metrics")
]

for task_id, agent_role, description in simulation_tasks:
    success = simulate_task_execution(task_id, agent_role, description)
    print(f"  {'✅' if success else '❌'} {task_id}: {agent_role}")

print("\n" + "=" * 50)
print(task_monitor.get_performance_report())

# Generate insights
insights = task_monitor.get_task_insights()
print("\n🔍 Performance Insights:")

if insights['bottlenecks']:
    print("\n⚠️ Identified Bottlenecks:")
    for bottleneck in insights['bottlenecks']:
        print(f"  - {bottleneck}")

if insights['recommendations']:
    print("\n💡 Recommendations:")
    for recommendation in insights['recommendations']:
        print(f"  - {recommendation}")

if insights['trends']:
    print("\n📈 Trends:")
    for trend_name, trend_data in insights['trends'].items():
        print(f"  {trend_name}: {trend_data}")

## 🎯 Task Optimization Best Practices

Here are the key best practices for creating and optimizing CrewAI tasks:

In [None]:
# Task optimization best practices

print("🎯 CrewAI Task Optimization Best Practices")
print("=" * 50)

best_practices = {
    "Task Description": {
        "principles": [
            "Be specific and detailed about objectives",
            "Include context and background information",
            "Define clear success criteria",
            "Specify quantifiable metrics when possible",
            "Break complex tasks into smaller, manageable parts"
        ],
        "examples": {
            "poor": "Research AI trends",
            "good": "Conduct comprehensive research on AI customer service trends in 2024, focusing on market size, key players, adoption rates, and emerging technologies. Include analysis of at least 10 sources and provide actionable insights."
        }
    },
    
    "Expected Output": {
        "principles": [
            "Define exact format requirements (JSON, Markdown, etc.)",
            "Specify structure and required sections",
            "Include length guidelines",
            "Define quality standards",
            "Use Pydantic models for structured data"
        ],
        "formats": [
            "Markdown reports with specific sections",
            "JSON objects with defined schemas",
            "CSV files with specified columns",
            "Structured data using Pydantic models"
        ]
    },
    
    "Task Dependencies": {
        "principles": [
            "Use context parameter for task dependencies",
            "Design clear information flow between tasks",
            "Avoid circular dependencies",
            "Consider parallel execution opportunities",
            "Plan for error handling in dependent tasks"
        ],
        "patterns": [
            "Sequential: A → B → C",
            "Parallel: A, B, C → D",
            "Iterative: A → B → A (refined) → C",
            "Hierarchical: Manager → Workers → Manager"
        ]
    },
    
    "Tool Selection": {
        "principles": [
            "Match tools to task requirements",
            "Avoid tool overload (3-5 tools max per task)",
            "Test tool compatibility and performance",
            "Consider task-specific vs agent-level tools",
            "Implement custom tools for specialized needs"
        ],
        "categories": {
            "Research": ["SerperDevTool", "WebsiteSearchTool", "PDFSearchTool"],
            "Data": ["CSVSearchTool", "JSONSearchTool", "FileReadTool"],
            "Content": ["FileReadTool", "TXTSearchTool", "MDXSearchTool"]
        }
    },
    
    "Validation": {
        "principles": [
            "Implement custom validation functions",
            "Use Pydantic models for data validation",
            "Check output format and structure",
            "Validate content quality and completeness",
            "Implement retry mechanisms for failed validations"
        ],
        "techniques": [
            "Schema validation with Pydantic",
            "Content length and structure checks",
            "Required section validation",
            "Data type and range validation",
            "Business rule validation"
        ]
    },
    
    "Performance": {
        "principles": [
            "Monitor execution time and success rates",
            "Optimize task descriptions for clarity",
            "Use async execution for parallel tasks",
            "Implement proper error handling",
            "Regular performance reviews and optimization"
        ],
        "metrics": [
            "Task completion time",
            "Success/failure rates",
            "Tool usage efficiency",
            "Output quality scores",
            "Resource utilization"
        ]
    }
}

for category, details in best_practices.items():
    print(f"\n📋 {category}:")
    
    if "principles" in details:
        print("  Principles:")
        for i, principle in enumerate(details["principles"], 1):
            print(f"    {i}. {principle}")
    
    if "examples" in details:
        print("  Examples:")
        for example_type, example in details["examples"].items():
            status = "❌" if example_type == "poor" else "✅"
            print(f"    {status} {example_type.title()}: {example}")
    
    if "formats" in details:
        print("  Recommended Formats:")
        for format_item in details["formats"]:
            print(f"    - {format_item}")
    
    if "patterns" in details:
        print("  Common Patterns:")
        for pattern in details["patterns"]:
            print(f"    - {pattern}")
    
    if "categories" in details:
        print("  Tool Categories:")
        for cat_name, tools in details["categories"].items():
            print(f"    {cat_name}: {', '.join(tools)}")
    
    if "techniques" in details:
        print("  Validation Techniques:")
        for technique in details["techniques"]:
            print(f"    - {technique}")
    
    if "metrics" in details:
        print("  Key Metrics:")
        for metric in details["metrics"]:
            print(f"    - {metric}")
    
    print("-" * 40)

print("\n💡 Key Takeaways:")
key_takeaways = [
    "Detailed task descriptions lead to better agent performance",
    "Clear output specifications reduce ambiguity and errors", 
    "Proper task dependencies enable complex workflows",
    "Validation ensures output quality and consistency",
    "Performance monitoring enables continuous optimization",
    "Structured outputs with Pydantic improve reliability"
]

for i, takeaway in enumerate(key_takeaways, 1):
    print(f"  {i}. {takeaway}")

## 🎓 Hands-On Exercise

Now it's your turn! Create a comprehensive task for your specific use case:

In [None]:
# Exercise: Create your own comprehensive task

# TODO: Complete this exercise by creating a task for your specific use case
# Consider the following questions:
# 1. What specific objective should your task accomplish?
# 2. What agent would be best suited for this task?
# 3. What tools would the task need?
# 4. How should the output be structured?
# 5. Are there any dependencies on other tasks?
# 6. What validation would ensure quality?

# Example template - customize for your needs:
your_task = Task(
    description="""[Your detailed task description here]
    
    Include:
    1. [Specific requirement 1]
    2. [Specific requirement 2] 
    3. [Specific requirement 3]
    
    Focus on [your specific focus area] and provide [expected insights/outcomes].""",
    
    agent=research_agent,  # Replace with appropriate agent
    
    tools=[],  # Add relevant tools
    
    expected_output="""[Your expected output specification]
    
    ## Section 1
    - [Requirements for section 1]
    
    ## Section 2 
    - [Requirements for section 2]
    
    The output should be [format] and include [specific requirements].""",
    
    # Optional configurations
    context=[],  # Add task dependencies if needed
    output_file="your_output.md",  # Specify output file
    # output_pydantic=YourModel,  # Add Pydantic model if needed
    # human_input=False,  # Enable human-in-the-loop if needed
    # async_execution=False  # Enable parallel execution if needed
)

print("🎓 Exercise: Create Your Comprehensive Task")
print("Replace the placeholder values above with your specific requirements.")
print("\nConsider these task types for inspiration:")

task_examples = [
    "Market Analysis Task - comprehensive industry research and insights",
    "Content Strategy Task - create editorial calendar and content plan",
    "Data Analysis Task - statistical analysis and visualization recommendations",
    "Competitive Intelligence Task - competitor analysis and positioning",
    "Customer Research Task - user behavior analysis and persona development",
    "Technical Documentation Task - API documentation and developer guides",
    "Financial Analysis Task - budget analysis and forecasting",
    "Product Strategy Task - feature prioritization and roadmap planning"
]

for i, example in enumerate(task_examples, 1):
    print(f"  {i}. {example}")

print("\n📋 Task Design Checklist:")
checklist = [
    "☐ Clear and specific task description",
    "☐ Appropriate agent assignment",
    "☐ Relevant tool selection",
    "☐ Detailed output specification",
    "☐ Validation requirements defined",
    "☐ Dependencies identified (if any)",
    "☐ Success criteria established",
    "☐ Error handling considered"
]

for item in checklist:
    print(f"  {item}")

# Uncomment and complete when ready:
# print(f"\n✅ Your Task Created: {your_task.description[:50]}...")

## 🎉 Summary

Congratulations! You've completed the CrewAI Tasks Deep Dive. Here's what you've learned:

### Key Concepts Covered:
1. **Task Fundamentals** - Description, output, context, and configuration
2. **Structured Outputs** - Pydantic models and validation
3. **Task Dependencies** - Context management and workflow patterns
4. **Advanced Configuration** - Validation, human-in-the-loop, parallel execution
5. **Execution Patterns** - Sequential, parallel, and iterative workflows
6. **Performance Monitoring** - Metrics, insights, and optimization
7. **Best Practices** - Guidelines for effective task creation

### Task Design Principles:
- **Clarity** - Clear, specific descriptions lead to better results
- **Structure** - Well-defined outputs ensure consistency
- **Validation** - Quality checks prevent errors and improve reliability
- **Dependencies** - Proper context management enables complex workflows
- **Monitoring** - Performance tracking enables continuous improvement

### Next Steps:
- 🔄 Explore the **Workflows** notebook to learn about crew orchestration
- 🛠️ Experiment with different task configurations
- 📊 Implement performance monitoring in your projects
- 🎯 Practice creating structured outputs with Pydantic

### Resources:
- [CrewAI Documentation](https://docs.crewai.com/)
- [Task Configuration Reference](https://docs.crewai.com/core-concepts/Tasks/)
- [Pydantic Documentation](https://docs.pydantic.dev/)

Happy task building! 🚀