# Another simple agent

We implement some code for another simple agent

In [2]:
import random
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
import heapq

class TaskStatus(Enum):
    PENDING = "pending"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    OVERDUE = "overdue"
    BLOCKED = "blocked"

class Priority(Enum):
    LOW = 1
    MEDIUM = 2
    HIGH = 3
    CRITICAL = 4

@dataclass
class Task:
    id: str
    name: str
    description: str
    priority: Priority
    estimated_duration: int  # in minutes
    deadline: datetime
    dependencies: List[str] = None  # list of task IDs this depends on
    status: TaskStatus = TaskStatus.PENDING
    progress: float = 0.0  # 0.0 to 1.0
    created_at: datetime = None
    
    def __post_init__(self):
        if self.dependencies is None:
            self.dependencies = []
        if self.created_at is None:
            self.created_at = datetime.now()
    
    def is_ready(self, completed_tasks: set) -> bool:
        """Check if all dependencies are completed"""
        return all(dep_id in completed_tasks for dep_id in self.dependencies)
    
    def urgency_score(self) -> float:
        """Calculate urgency based on deadline and priority"""
        now = datetime.now()
        time_until_deadline = (self.deadline - now).total_seconds() / 3600  # hours
        
        # Base urgency from priority
        priority_weight = self.priority.value / 4.0
        
        # Time pressure (higher score = more urgent)
        if time_until_deadline <= 0:
            time_weight = 2.0  # Overdue
        elif time_until_deadline <= 2:
            time_weight = 1.5  # Very urgent
        elif time_until_deadline <= 24:
            time_weight = 1.0  # Urgent
        else:
            time_weight = 0.5  # Not urgent
        
        return priority_weight * time_weight

class TaskEnvironment:
    """Environment that generates and manages tasks"""
    
    def __init__(self, complexity_level: str = "medium"):
        self.tasks: Dict[str, Task] = {}
        self.completed_tasks: set = set()
        self.current_time = datetime.now()
        self.complexity_level = complexity_level
        self.task_counter = 0
        
        # Generate initial tasks
        self._generate_initial_tasks()
        
    def _generate_initial_tasks(self):
        """Generate a set of initial tasks with dependencies"""
        task_templates = [
            ("Write proposal", "Draft project proposal document", Priority.HIGH, 120),
            ("Research competitors", "Analyze market competition", Priority.MEDIUM, 90),
            ("Design mockups", "Create UI/UX mockups", Priority.MEDIUM, 180),
            ("Code review", "Review teammate's code", Priority.HIGH, 45),
            ("Update documentation", "Update project documentation", Priority.LOW, 60),
            ("Client meeting prep", "Prepare for client presentation", Priority.CRITICAL, 30),
            ("Bug fixes", "Fix reported bugs", Priority.HIGH, 150),
            ("Database backup", "Backup production database", Priority.MEDIUM, 20),
            ("Team standup", "Daily team standup meeting", Priority.MEDIUM, 15),
            ("Performance testing", "Test application performance", Priority.MEDIUM, 120)
        ]
        
        # Determine number of tasks based on complexity
        complexity_map = {"simple": 4, "medium": 6, "complex": 8}
        num_tasks = complexity_map.get(self.complexity_level, 6)
        
        selected_templates = random.sample(task_templates, min(num_tasks, len(task_templates)))
        
        created_task_ids = []
        for i, (name, desc, priority, duration) in enumerate(selected_templates):
            task_id = f"task_{i+1:02d}"
            
            # Random deadline between 1 hour and 3 days from now
            deadline_hours = random.randint(1, 72)
            deadline = self.current_time + timedelta(hours=deadline_hours)
            
            task = Task(
                id=task_id,
                name=name,
                description=desc,
                priority=priority,
                estimated_duration=duration,
                deadline=deadline
            )
            
            self.tasks[task_id] = task
            created_task_ids.append(task_id)
        
        # Add some dependencies for complex scenarios
        if self.complexity_level in ["medium", "complex"]:
            self._add_dependencies(created_task_ids)
    
    def _add_dependencies(self, task_ids: List[str]):
        """Add realistic task dependencies"""
        # Some common dependency patterns
        dependency_patterns = [
            ("Write proposal", "Client meeting prep"),
            ("Research competitors", "Write proposal"),
            ("Design mockups", "Code review"),
            ("Bug fixes", "Performance testing")
        ]
        
        for prereq_name, dependent_name in dependency_patterns:
            prereq_task = None
            dependent_task = None
            
            for task in self.tasks.values():
                if prereq_name.lower() in task.name.lower():
                    prereq_task = task
                elif dependent_name.lower() in task.name.lower():
                    dependent_task = task
            
            if prereq_task and dependent_task:
                dependent_task.dependencies.append(prereq_task.id)
    
    def add_random_task(self) -> Optional[Task]:
        """Randomly add a new urgent task (simulates real work environment)"""
        if random.random() < 0.3:  # 30% chance to add urgent task
            urgent_tasks = [
                ("Emergency bug fix", "Critical production issue", Priority.CRITICAL, 60),
                ("Client callback", "Return urgent client call", Priority.HIGH, 15),
                ("Server maintenance", "Urgent server maintenance needed", Priority.HIGH, 90),
                ("Security patch", "Apply critical security update", Priority.CRITICAL, 30)
            ]
            
            name, desc, priority, duration = random.choice(urgent_tasks)
            self.task_counter += 1
            task_id = f"urgent_{self.task_counter}"
            
            # Urgent tasks have very short deadlines
            deadline = self.current_time + timedelta(minutes=random.randint(30, 180))
            
            task = Task(
                id=task_id,
                name=f"{name} #{self.task_counter}",
                description=desc,
                priority=priority,
                estimated_duration=duration,
                deadline=deadline
            )
            
            self.tasks[task_id] = task
            print(f"🚨 URGENT TASK ADDED: {task.name} (Due: {task.deadline.strftime('%H:%M')})")
            return task
        
        return None
    
    def get_available_tasks(self) -> List[Task]:
        """Get tasks that are ready to be worked on"""
        available = []
        for task in self.tasks.values():
            if (task.status == TaskStatus.PENDING and 
                task.is_ready(self.completed_tasks)):
                available.append(task)
        return available
    
    def complete_task(self, task_id: str) -> bool:
        """Mark a task as completed"""
        if task_id in self.tasks:
            self.tasks[task_id].status = TaskStatus.COMPLETED
            self.tasks[task_id].progress = 1.0
            self.completed_tasks.add(task_id)
            return True
        return False
    
    def update_overdue_tasks(self):
        """Update status of overdue tasks"""
        now = datetime.now()
        for task in self.tasks.values():
            if (task.status in [TaskStatus.PENDING, TaskStatus.IN_PROGRESS] and 
                now > task.deadline):
                task.status = TaskStatus.OVERDUE
    
    def get_status_summary(self) -> Dict:
        """Get summary of current task status"""
        status_counts = {status: 0 for status in TaskStatus}
        total_estimated_time = 0
        
        for task in self.tasks.values():
            status_counts[task.status] += 1
            if task.status != TaskStatus.COMPLETED:
                total_estimated_time += task.estimated_duration
        
        return {
            "status_counts": status_counts,
            "total_tasks": len(self.tasks),
            "completed_tasks": len(self.completed_tasks),
            "remaining_time_minutes": total_estimated_time
        }

class TaskSchedulingAgent:
    """AI agent that intelligently schedules and manages tasks"""
    
    def __init__(self, strategy: str = "balanced", work_capacity_hours: int = 8):
        self.strategy = strategy  # "urgent_first", "important_first", "balanced", "dependencies_first"
        self.work_capacity_minutes = work_capacity_hours * 60
        self.current_task = None
        self.work_session_start = None
        self.total_work_time = 0
        self.completed_count = 0
        self.decision_history = []
        
        # Agent characteristics
        self.focus_duration = random.randint(45, 120)  # How long agent can focus
        self.context_switch_penalty = 10  # Minutes lost when switching tasks
        self.estimation_accuracy = 0.8  # How accurate time estimates are
        
        print(f"🤖 Task Scheduling Agent initialized")
        print(f"   Strategy: {strategy}")
        print(f"   Daily capacity: {work_capacity_hours} hours")
        print(f"   Focus duration: {self.focus_duration} minutes")
    
    def analyze_workload(self, environment: TaskEnvironment) -> Dict:
        """Analyze current workload and provide insights"""
        available_tasks = environment.get_available_tasks()
        
        if not available_tasks:
            return {"message": "No available tasks", "recommendations": []}
        
        # Calculate total work needed
        total_time = sum(task.estimated_duration for task in available_tasks)
        
        # Identify urgent tasks (deadline within 4 hours)
        now = datetime.now()
        urgent_tasks = [
            task for task in available_tasks 
            if (task.deadline - now).total_seconds() / 3600 <= 4
        ]
        
        # Find potential bottlenecks (tasks that others depend on)
        blocking_tasks = []
        for task in available_tasks:
            dependents = [
                t for t in environment.tasks.values() 
                if task.id in t.dependencies and t.status == TaskStatus.PENDING
            ]
            if dependents:
                blocking_tasks.append((task, len(dependents)))
        
        analysis = {
            "total_available_tasks": len(available_tasks),
            "total_estimated_time": total_time,
            "urgent_tasks": len(urgent_tasks),
            "blocking_tasks": len(blocking_tasks),
            "workload_vs_capacity": total_time / self.work_capacity_minutes,
            "recommendations": []
        }
        
        # Generate recommendations
        if analysis["workload_vs_capacity"] > 1.5:
            analysis["recommendations"].append("⚠️ Workload exceeds capacity - consider delegation")
        
        if urgent_tasks:
            analysis["recommendations"].append(f"🔥 {len(urgent_tasks)} urgent tasks need immediate attention")
        
        if blocking_tasks:
            analysis["recommendations"].append(f"🔒 {len(blocking_tasks)} tasks are blocking others")
        
        return analysis
    
    def prioritize_tasks(self, available_tasks: List[Task]) -> List[Task]:
        """Prioritize tasks based on agent's strategy"""
        if not available_tasks:
            return []
        
        if self.strategy == "urgent_first":
            # Sort by deadline (earliest first)
            return sorted(available_tasks, key=lambda t: t.deadline)
        
        elif self.strategy == "important_first":
            # Sort by priority then deadline
            return sorted(available_tasks, 
                         key=lambda t: (-t.priority.value, t.deadline))
        
        elif self.strategy == "dependencies_first":
            # Prioritize tasks that unblock others
            def dependency_score(task):
                # Count how many tasks depend on this one
                dependents = sum(1 for t in available_tasks if task.id in t.dependencies)
                return dependents
            
            return sorted(available_tasks, 
                         key=lambda t: (-dependency_score(t), t.urgency_score()), 
                         reverse=True)
        
        else:  # balanced strategy
            # Use urgency score (combines priority and time pressure)
            return sorted(available_tasks, key=lambda t: -t.urgency_score())
    
    def make_scheduling_decision(self, environment: TaskEnvironment) -> Optional[Task]:
        """Make intelligent decision about which task to work on next"""
        available_tasks = environment.get_available_tasks()
        
        if not available_tasks:
            return None
        
        # Analyze current situation
        analysis = self.analyze_workload(environment)
        
        # Get prioritized task list
        prioritized_tasks = self.prioritize_tasks(available_tasks)
        
        # Apply some intelligent decision-making logic
        chosen_task = None
        
        # Emergency override: if there's a critical overdue task
        critical_overdue = [
            task for task in available_tasks 
            if task.priority == Priority.CRITICAL and task.status == TaskStatus.OVERDUE
        ]
        if critical_overdue:
            chosen_task = critical_overdue[0]
            decision_reason = "Critical overdue task - emergency override"
        
        # If currently working on something, consider context switching cost
        elif self.current_task and self.current_task.status == TaskStatus.IN_PROGRESS:
            # Should we continue current task or switch?
            current_task_progress = self.current_task.progress
            
            # If we're more than 50% done, tend to stick with it
            if current_task_progress > 0.5:
                chosen_task = self.current_task
                decision_reason = "Continuing current task to avoid context switching"
            else:
                # Check if there's something much more urgent
                top_priority = prioritized_tasks[0]
                if (top_priority.urgency_score() > 
                    self.current_task.urgency_score() * 1.5):
                    chosen_task = top_priority
                    decision_reason = "Switching to much more urgent task"
                else:
                    chosen_task = self.current_task
                    decision_reason = "Continuing current task"
        
        else:
            # Start new task - choose top priority
            chosen_task = prioritized_tasks[0]
            decision_reason = f"Starting top priority task ({self.strategy} strategy)"
        
        # Log decision
        self.decision_history.append({
            "timestamp": datetime.now(),
            "chosen_task": chosen_task.name if chosen_task else None,
            "reason": decision_reason,
            "available_options": len(available_tasks)
        })
        
        print(f"🧠 Decision: {chosen_task.name if chosen_task else 'None'}")
        print(f"   Reason: {decision_reason}")
        
        return chosen_task
    
    def work_on_task(self, task: Task, work_minutes: int) -> Dict:
        """Simulate working on a task for specified minutes"""
        if task.status == TaskStatus.PENDING:
            task.status = TaskStatus.IN_PROGRESS
            self.work_session_start = datetime.now()
        
        self.current_task = task
        
        # Simulate work progress (with some variability)
        efficiency = random.uniform(0.8, 1.2)  # Some days we're more/less efficient
        actual_progress = (work_minutes * efficiency) / task.estimated_duration
        
        task.progress = min(1.0, task.progress + actual_progress)
        self.total_work_time += work_minutes
        
        result = {
            "task_name": task.name,
            "work_minutes": work_minutes,
            "progress_added": actual_progress,
            "total_progress": task.progress,
            "completed": task.progress >= 1.0
        }
        
        if task.progress >= 1.0:
            task.status = TaskStatus.COMPLETED
            self.completed_count += 1
            self.current_task = None
            result["completion_time"] = datetime.now()
            print(f"✅ Completed: {task.name}")
        else:
            remaining_time = task.estimated_duration * (1 - task.progress)
            print(f"⏳ Worked on: {task.name} ({task.progress:.1%} done, ~{remaining_time:.0f}min remaining)")
        
        return result
    
    def get_performance_metrics(self) -> Dict:
        """Get agent performance statistics"""
        return {
            "total_work_time": self.total_work_time,
            "tasks_completed": self.completed_count,
            "average_decision_time": len(self.decision_history),
            "current_task": self.current_task.name if self.current_task else None,
            "work_efficiency": self.completed_count / max(1, self.total_work_time / 60)  # tasks per hour
        }

def demo_task_scheduling_agent():
    """Demonstrate the task scheduling agent in action"""
    
    print("🎯 Task Scheduling Agent Demo")
    print("=" * 50)
    
    # Create environment with different complexity levels
    print("\n🌍 Creating task environment...")
    env = TaskEnvironment(complexity_level="medium")
    
    # Create agent with different strategies
    strategies = ["balanced", "urgent_first", "important_first", "dependencies_first"]
    chosen_strategy = random.choice(strategies)
    
    agent = TaskSchedulingAgent(strategy=chosen_strategy, work_capacity_hours=6)
    
    # Show initial state
    print(f"\n📋 Initial Tasks:")
    for task in env.tasks.values():
        deps_str = f" (depends on: {', '.join(task.dependencies)})" if task.dependencies else ""
        deadline_str = task.deadline.strftime("%m/%d %H:%M")
        print(f"  • {task.name} - {task.priority.name} priority - Due: {deadline_str}{deps_str}")
    
    # Initial analysis
    analysis = agent.analyze_workload(env)
    print(f"\n🔍 Workload Analysis:")
    print(f"  • Total tasks: {analysis['total_available_tasks']}")
    print(f"  • Estimated time: {analysis['total_estimated_time']} minutes")
    print(f"  • Workload ratio: {analysis['workload_vs_capacity']:.1f}")
    for rec in analysis['recommendations']:
        print(f"  • {rec}")
    
    # Simulate work day
    print(f"\n🚀 Starting work simulation...")
    work_session = 0
    max_sessions = 15
    
    while work_session < max_sessions:
        work_session += 1
        print(f"\n--- Work Session {work_session} ---")
        
        # Update overdue tasks
        env.update_overdue_tasks()
        
        # Possibly add urgent task
        env.add_random_task()
        
        # Agent makes decision
        chosen_task = agent.make_scheduling_decision(env)
        
        if chosen_task is None:
            print("🎉 All available tasks completed!")
            break
        
        # Work for 20-60 minutes (random work session length)
        work_time = random.randint(20, 60)
        work_result = agent.work_on_task(chosen_task, work_time)
        
        # Complete task if finished
        if work_result["completed"]:
            env.complete_task(chosen_task.id)
        
        # Show current status
        status = env.get_status_summary()
        print(f"📊 Status: {status['completed_tasks']}/{status['total_tasks']} completed")
        
        # Short pause for readability
        time.sleep(0.5)
    
    # Final results
    print(f"\n🏁 Final Results:")
    status = env.get_status_summary()
    metrics = agent.get_performance_metrics()
    
    print(f"  📈 Tasks completed: {status['completed_tasks']}/{status['total_tasks']}")
    print(f"  ⏱️  Total work time: {metrics['total_work_time']} minutes")
    print(f"  🎯 Work efficiency: {metrics['work_efficiency']:.1f} tasks/hour")
    print(f"  🧠 Strategy used: {agent.strategy}")
    
    # Show any overdue tasks
    overdue_tasks = [t for t in env.tasks.values() if t.status == TaskStatus.OVERDUE]
    if overdue_tasks:
        print(f"  ⚠️  Overdue tasks: {len(overdue_tasks)}")
        for task in overdue_tasks:
            print(f"     • {task.name}")

def compare_agent_strategies():
    """Compare different agent strategies"""
    print("\n🔬 Comparing Agent Strategies")
    print("=" * 40)
    
    strategies = ["urgent_first", "important_first", "balanced", "dependencies_first"]
    results = {}
    
    for strategy in strategies:
        print(f"\n🧪 Testing {strategy} strategy...")
        
        # Create consistent environment
        env = TaskEnvironment(complexity_level="medium")
        agent = TaskSchedulingAgent(strategy=strategy, work_capacity_hours=4)
        
        # Run simulation (abbreviated for demo)
        completed = 0
        for _ in range(10):  # Limited simulation
            task = agent.make_scheduling_decision(env)
            if task:
                result = agent.work_on_task(task, 30)
                if result["completed"]:
                    env.complete_task(task.id)
                    completed += 1
        
        results[strategy] = {
            "completed": completed,
            "efficiency": agent.get_performance_metrics()["work_efficiency"]
        }
    
    print(f"\n📊 Strategy Comparison Results:")
    for strategy, result in results.items():
        print(f"  {strategy:20} | Completed: {result['completed']:2d} | Efficiency: {result['efficiency']:.1f}")

if __name__ == "__main__":
    demo_task_scheduling_agent()
    compare_agent_strategies()

🎯 Task Scheduling Agent Demo

🌍 Creating task environment...
🤖 Task Scheduling Agent initialized
   Strategy: important_first
   Daily capacity: 6 hours
   Focus duration: 54 minutes

📋 Initial Tasks:
  • Database backup - MEDIUM priority - Due: 06/17 13:56
  • Bug fixes - HIGH priority - Due: 06/18 01:56
  • Update documentation - LOW priority - Due: 06/18 07:56
  • Write proposal - HIGH priority - Due: 06/19 15:56
  • Code review - HIGH priority - Due: 06/18 14:56
  • Performance testing - MEDIUM priority - Due: 06/19 14:56 (depends on: task_02)

🔍 Workload Analysis:
  • Total tasks: 5
  • Estimated time: 395 minutes
  • Workload ratio: 1.1
  • 🔒 1 tasks are blocking others

🚀 Starting work simulation...

--- Work Session 1 ---
🧠 Decision: Bug fixes
   Reason: Starting top priority task (important_first strategy)
⏳ Worked on: Bug fixes (38.7% done, ~92min remaining)
📊 Status: 0/6 completed

--- Work Session 2 ---
🧠 Decision: Bug fixes
   Reason: Continuing current task
⏳ Worked on: B