# Hierarchical Planning: The Strategic Brain of Agentic Systems

Planning is the cornerstone of truly agentic AI systems. Without planning capabilities, an agent can only react to immediate stimuli or follow pre-defined scripts. With planning, an agent can:

1. **Set and pursue long-term goals** - Moving beyond simple reactivity to deliberate action sequences
2. **Allocate limited resources optimally** - Managing time, computational resources, and domain-specific constraints
3. **Handle contingencies and failures** - Adapting when initial approaches don't succeed
4. **Balance exploration vs. exploitation** - Deciding when to follow known strategies vs. try new approaches
5. **Coordinate complex multi-step activities** - Breaking down overwhelming tasks into manageable chunks

The Hierarchical Task Network (HTN) approach shown in this notebook represents one of the most powerful planning paradigms for real-world applications, combining structured decomposition with flexibility to handle novel situations.

## Core Planning Concepts Explained

1. **Hierarchical Decomposition**: Breaking complex goals into manageable steps, like how a store manager breaks down "prepare for holiday season" into department-specific tasks.

2. **Task Dependencies**: Understanding what must happen before other things can start, like how inventory must be ordered before displays can be set up.

3. **Resource Constraints**: Managing limited resources (staff, budget, space) across competing priorities, just like a real store manager must do.

4. **Adaptive Execution**: Monitoring and adjusting plans when reality doesn't match expectations, similar to how retail managers react to unexpected events.

5. **Learning from Execution**: Storing experiences to improve future planning, mimicking how experienced retail managers get better over time.

# Module 4: Hierarchical Planning and Goal Achievement 🎯
*Building AI Agents That Plan and Execute Complex Retail Strategies*

**Duration**: 60 minutes  
**Level**: Advanced  
**Domain**: Walmart Retail Operations

## 🎯 Learning Objectives

By the end of this module, you'll understand:

1. **Hierarchical Task Networks (HTN)**: Breaking down complex retail goals
2. **Resource Planning**: Managing staff, inventory, and budget constraints
3. **Temporal Planning**: Scheduling retail operations and events
4. **Adaptive Replanning**: Handling disruptions and changing conditions
5. **Goal Monitoring**: Tracking progress and success metrics
6. **Integration**: Combining planning with memory and tools
7. **Retail Applications**: Black Friday prep, inventory optimization, seasonal transitions

## 🏪 Retail Planning Context

Walmart faces complex planning challenges:
- **Black Friday**: Coordinating inventory, staff, and logistics
- **Seasonal Transitions**: Moving from summer to back-to-school
- **Supply Chain**: Managing just-in-time inventory
- **Staff Scheduling**: Optimizing coverage for peak hours
- **Promotion Planning**: Coordinating sales across departments

## 🔧 Environment Setup

In [None]:
# Configuration
MODEL_NAME = "qwen2.5:7b-instruct-q4_K_M"  # Same as previous modules
OLLAMA_BASE_URL = "http://localhost:11434"

# Core imports
import json
import requests
import numpy as np
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional, Tuple, Set
from dataclasses import dataclass, field
from enum import Enum
from collections import defaultdict, deque
import uuid
import time
import math
import heapq

# Visualization
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
from matplotlib.patches import Rectangle
import matplotlib.patches as mpatches

# Set style
plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette("husl")

# Test Ollama connection
try:
    response = requests.get(f"{OLLAMA_BASE_URL}/api/tags", timeout=5)
    if response.status_code == 200:
        print("✅ Ollama server is running")
        models = response.json().get('models', [])
        model_names = [model['name'] for model in models]
        if MODEL_NAME in model_names:
            print(f"✅ {MODEL_NAME} is available")
        else:
            print(f"❌ {MODEL_NAME} not found. Run: ollama pull qwen2.5:7b-instruct-q4_K_M")
except:
    print("❌ Cannot connect to Ollama. Make sure it's running: ollama serve")

print("\n🏪 Walmart Planning System Ready!")
print("📍 Store: Bentonville Supercenter #100")

## 🏗️ Part 1: Planning Architecture

### Understanding Hierarchical Task Networks (HTN)

HTN planning decomposes high-level goals into concrete actions:
- **Tasks**: Abstract (decomposable) or Primitive (executable)
- **Methods**: Ways to decompose abstract tasks
- **Operators**: Primitive actions with preconditions and effects
- **Constraints**: Resources, time, dependencies

In [None]:
# Planning enums and types
class TaskType(Enum):
    """Types of tasks in our planning system"""
    ABSTRACT = "abstract"      # Needs decomposition
    PRIMITIVE = "primitive"    # Can be executed directly
    
class TaskStatus(Enum):
    """Status of task execution"""
    PENDING = "pending"
    READY = "ready"           # All dependencies met
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    BLOCKED = "blocked"       # Waiting on resources

class ResourceType(Enum):
    """Types of resources in retail operations"""
    STAFF = "staff"
    INVENTORY = "inventory"
    BUDGET = "budget"
    SPACE = "space"
    TIME = "time"
    EQUIPMENT = "equipment"

@dataclass
class Resource:
    """Resource with capacity constraints"""
    name: str
    type: ResourceType
    total_capacity: float
    available_capacity: float
    unit: str = "units"
    
    def allocate(self, amount: float) -> bool:
        """Try to allocate resource"""
        if amount <= self.available_capacity:
            self.available_capacity -= amount
            return True
        return False
    
    def release(self, amount: float):
        """Release allocated resource"""
        self.available_capacity = min(
            self.total_capacity, 
            self.available_capacity + amount
        )

@dataclass
class Task:
    """Base class for all tasks"""
    id: str
    name: str
    type: TaskType
    description: str
    department: str = "General"
    
    # Execution properties
    status: TaskStatus = TaskStatus.PENDING
    priority: int = 1  # 1-10, higher is more important
    estimated_duration: float = 1.0  # hours
    actual_duration: float = 0.0
    
    # Dependencies and constraints
    dependencies: List[str] = field(default_factory=list)  # Task IDs
    required_resources: Dict[str, float] = field(default_factory=dict)
    
    # Scheduling
    earliest_start: Optional[datetime] = None
    deadline: Optional[datetime] = None
    scheduled_start: Optional[datetime] = None
    actual_start: Optional[datetime] = None
    actual_end: Optional[datetime] = None
    
    # Results
    success_probability: float = 0.9
    outcome: Dict[str, Any] = field(default_factory=dict)
    
    def can_start(self, completed_tasks: Set[str], resources: Dict[str, Resource]) -> Tuple[bool, str]:
        """Check if task can start"""
        # Check dependencies
        if not all(dep in completed_tasks for dep in self.dependencies):
            return False, "Dependencies not met"
        
        # Check resources
        for resource_name, amount in self.required_resources.items():
            if resource_name in resources:
                if resources[resource_name].available_capacity < amount:
                    return False, f"Insufficient {resource_name}"
            else:
                return False, f"Resource {resource_name} not found"
        
        # Check time constraints
        if self.earliest_start and datetime.now() < self.earliest_start:
            return False, "Too early to start"
            
        return True, "Ready"

@dataclass
class Method:
    """Method for decomposing abstract tasks"""
    name: str
    applicable_to: str  # Task name pattern
    preconditions: Dict[str, Any] = field(default_factory=dict)
    subtasks: List[Dict[str, Any]] = field(default_factory=list)  # Task templates
    
    def is_applicable(self, task: Task, context: Dict[str, Any]) -> bool:
        """Check if method can be applied"""
        # Check task name pattern
        if self.applicable_to not in task.name:
            return False
            
        # Check preconditions
        for key, required_value in self.preconditions.items():
            if context.get(key) != required_value:
                return False
                
        return True

@dataclass
class Plan:
    """Complete plan with tasks and schedule"""
    id: str
    goal: str
    created_at: datetime
    tasks: Dict[str, Task] = field(default_factory=dict)
    task_order: List[str] = field(default_factory=list)  # Topological order
    resources: Dict[str, Resource] = field(default_factory=dict)
    
    # Metrics
    estimated_duration: float = 0.0
    estimated_cost: float = 0.0
    success_probability: float = 1.0
    
    def add_task(self, task: Task):
        """Add task to plan"""
        self.tasks[task.id] = task
        self._update_metrics()
    
    def _update_metrics(self):
        """Update plan metrics"""
        if self.tasks:
            # Duration is length of critical path
            self.estimated_duration = self._calculate_critical_path()
            
            # Cost is sum of resource usage
            self.estimated_cost = sum(
                task.required_resources.get('budget', 0) 
                for task in self.tasks.values()
            )
            
            # Success probability is product of task probabilities
            self.success_probability = np.prod([
                task.success_probability for task in self.tasks.values()
            ])
    
    def _calculate_critical_path(self) -> float:
        """Calculate critical path duration"""
        # Simple implementation - in reality would use CPM algorithm
        return sum(task.estimated_duration for task in self.tasks.values())
    
    def get_ready_tasks(self, completed_tasks: Set[str]) -> List[Task]:
        """Get tasks ready to execute"""
        ready = []
        for task in self.tasks.values():
            if (task.status == TaskStatus.PENDING and 
                task.can_start(completed_tasks, self.resources)[0]):
                ready.append(task)
        
        # Sort by priority
        return sorted(ready, key=lambda t: t.priority, reverse=True)

print("✅ Planning architecture defined!")
print("📋 Components: Tasks, Methods, Resources, Plans")
print("🎯 Ready for hierarchical decomposition")

## 🧠 Part 2: Intelligent Planner with LLM

Using Ollama to intelligently decompose retail goals and create plans.

In [None]:
class OllamaLLM:
    """LLM interface for intelligent planning"""
    
    def __init__(self, model: str = MODEL_NAME, temperature: float = 0.7):
        self.model = model
        self.temperature = temperature
        self.base_url = OLLAMA_BASE_URL
        
    def generate(self, prompt: str, system: str = "") -> str:
        """Generate response from LLM"""
        full_prompt = f"{system}\n\nUser: {prompt}\n\nAssistant:" if system else prompt
        
        try:
            response = requests.post(
                f"{self.base_url}/api/generate",
                json={
                    "model": self.model,
                    "prompt": full_prompt,
                    "temperature": self.temperature,
                    "stream": False
                },
                timeout=30
            )
            
            if response.status_code == 200:
                return response.json().get('response', '')
            else:
                return f"Error: {response.status_code}"
                
        except Exception as e:
            return f"Error: {str(e)}"

class WalmartHTNPlanner:
    """Hierarchical Task Network planner for Walmart operations"""
    
    def __init__(self):
        self.llm = OllamaLLM()
        self.methods = {}
        self.primitive_operators = {}
        self._initialize_retail_methods()
        
    def _initialize_retail_methods(self):
        """Load retail-specific decomposition methods"""
        
        # Black Friday preparation method
        self.methods["prepare_black_friday"] = Method(
            name="Black Friday Full Preparation",
            applicable_to="Black Friday",
            preconditions={"event_type": "major_sale"},
            subtasks=[
                {
                    "name": "Analyze Last Year's Data",
                    "type": "primitive",
                    "department": "Analytics",
                    "duration": 4.0,
                    "resources": {"staff": 2, "budget": 0}
                },
                {
                    "name": "Plan Inventory Levels",
                    "type": "abstract",
                    "department": "Inventory",
                    "duration": 8.0,
                    "resources": {"staff": 3}
                },
                {
                    "name": "Schedule Extra Staff",
                    "type": "primitive",
                    "department": "HR",
                    "duration": 6.0,
                    "resources": {"staff": 2, "budget": 15000}
                },
                {
                    "name": "Setup Store Layout",
                    "type": "abstract",
                    "department": "Operations",
                    "duration": 12.0,
                    "resources": {"staff": 8, "space": 1000}
                },
                {
                    "name": "Test Systems",
                    "type": "primitive",
                    "department": "IT",
                    "duration": 3.0,
                    "resources": {"staff": 2, "equipment": 5}
                }
            ]
        )
        
        # Inventory planning method
        self.methods["plan_inventory"] = Method(
            name="Inventory Planning",
            applicable_to="Plan Inventory",
            preconditions={},
            subtasks=[
                {
                    "name": "Forecast Demand",
                    "type": "primitive",
                    "department": "Analytics",
                    "duration": 2.0,
                    "resources": {"staff": 1}
                },
                {
                    "name": "Check Current Stock",
                    "type": "primitive",
                    "department": "Inventory",
                    "duration": 1.0,
                    "resources": {"staff": 2}
                },
                {
                    "name": "Place Orders",
                    "type": "primitive",
                    "department": "Purchasing",
                    "duration": 3.0,
                    "resources": {"staff": 1, "budget": 50000}
                }
            ]
        )
        
        # Store layout setup method
        self.methods["setup_layout"] = Method(
            name="Store Layout Setup",
            applicable_to="Setup Store Layout",
            preconditions={},
            subtasks=[
                {
                    "name": "Create Crowd Flow Plan",
                    "type": "primitive",
                    "department": "Operations",
                    "duration": 2.0,
                    "resources": {"staff": 2}
                },
                {
                    "name": "Setup Queue Barriers",
                    "type": "primitive",
                    "department": "Operations",
                    "duration": 4.0,
                    "resources": {"staff": 4, "equipment": 20}
                },
                {
                    "name": "Position Doorbuster Displays",
                    "type": "primitive",
                    "department": "Merchandising",
                    "duration": 6.0,
                    "resources": {"staff": 6, "space": 500}
                }
            ]
        )
        
        print("📚 Loaded retail planning methods:")
        for name in self.methods:
            print(f"  - {name}")
    
    def decompose_goal(self, goal: str, context: Dict[str, Any] = None) -> Plan:
        """Decompose high-level goal into executable plan"""
        print(f"\n🎯 Planning for goal: {goal}")
        
        if context is None:
            context = self._analyze_goal_context(goal)
            
        plan = Plan(
            id=str(uuid.uuid4())[:8],
            goal=goal,
            created_at=datetime.now()
        )
        
        # Initialize resources
        plan.resources = self._initialize_resources()
        
        # Create initial abstract task
        root_task = Task(
            id=str(uuid.uuid4())[:8],
            name=goal,
            type=TaskType.ABSTRACT,
            description=f"Root task for {goal}",
            priority=10
        )
        
        # Recursively decompose
        self._decompose_task(root_task, plan, context)
        
        # Calculate task order (topological sort)
        plan.task_order = self._topological_sort(plan.tasks)
        
        print(f"\n✅ Created plan with {len(plan.tasks)} tasks")
        return plan
    
    def _analyze_goal_context(self, goal: str) -> Dict[str, Any]:
        """Use LLM to analyze goal and extract context"""
        system_prompt = """You are a Walmart operations planner. Analyze the goal and extract key context.
        
        Respond with JSON containing:
        - event_type: major_sale, seasonal, routine, emergency
        - urgency: high, medium, low
        - departments: list of departments involved
        - estimated_scope: small, medium, large
        """
        
        prompt = f"Analyze this retail goal: {goal}"
        
        response = self.llm.generate(prompt, system_prompt)
        
        # Parse response (with fallback)
        try:
            # Simple parsing since we might not get perfect JSON
            context = {
                "event_type": "major_sale" if "black friday" in goal.lower() else "routine",
                "urgency": "high" if "urgent" in goal.lower() or "black friday" in goal.lower() else "medium",
                "departments": ["Operations", "HR", "Inventory"],
                "estimated_scope": "large" if "black friday" in goal.lower() else "medium"
            }
        except:
            context = {"event_type": "routine", "urgency": "medium"}
            
        return context
    
    def _initialize_resources(self) -> Dict[str, Resource]:
        """Initialize available resources"""
        return {
            "staff": Resource("Staff", ResourceType.STAFF, 100, 100, "people"),
            "budget": Resource("Budget", ResourceType.BUDGET, 100000, 100000, "dollars"),
            "space": Resource("Floor Space", ResourceType.SPACE, 50000, 50000, "sq ft"),
            "equipment": Resource("Equipment", ResourceType.EQUIPMENT, 50, 50, "units"),
            "time": Resource("Time", ResourceType.TIME, 168, 168, "hours")  # 1 week
        }
    
    def _decompose_task(self, task: Task, plan: Plan, context: Dict[str, Any], depth: int = 0):
        """Recursively decompose abstract tasks"""
        if depth > 5:  # Prevent infinite recursion
            task.type = TaskType.PRIMITIVE
            plan.add_task(task)
            return
            
        if task.type == TaskType.PRIMITIVE:
            plan.add_task(task)
            return
            
        # Find applicable method
        method = self._find_applicable_method(task, context)
        
        if method:
            print(f"{'  ' * depth}📋 Decomposing '{task.name}' using {method.name}")
            
            parent_id = task.id
            for subtask_template in method.subtasks:
                subtask = self._create_task_from_template(subtask_template, parent_id)
                self._decompose_task(subtask, plan, context, depth + 1)
        else:
            # No method found, use LLM to suggest decomposition
            print(f"{'  ' * depth}🤔 Using LLM to decompose '{task.name}'")
            subtasks = self._llm_decompose(task, context)
            
            for subtask in subtasks:
                self._decompose_task(subtask, plan, context, depth + 1)
    
    def _find_applicable_method(self, task: Task, context: Dict[str, Any]) -> Optional[Method]:
        """Find method that can decompose this task"""
        for method in self.methods.values():
            if method.is_applicable(task, context):
                return method
        return None
    
    def _create_task_from_template(self, template: Dict[str, Any], parent_id: str) -> Task:
        """Create task from template"""
        task = Task(
            id=str(uuid.uuid4())[:8],
            name=template["name"],
            type=TaskType[template["type"].upper()],
            description=template.get("description", template["name"]),
            department=template.get("department", "General"),
            estimated_duration=template.get("duration", 1.0),
            required_resources=template.get("resources", {}),
            dependencies=[parent_id] if parent_id else [],
            priority=template.get("priority", 5)
        )
        return task
    
    def _llm_decompose(self, task: Task, context: Dict[str, Any]) -> List[Task]:
        """Use LLM to suggest task decomposition"""
        system_prompt = """You are a Walmart operations expert. Decompose the given task into 2-4 concrete subtasks.
        Consider resources, dependencies, and retail best practices.
        Keep subtasks specific and actionable."""
        
        prompt = f"""Decompose this retail task: {task.name}
        Department: {task.department}
        Context: {context}
        
        Suggest 2-4 subtasks that are more concrete."""
        
        response = self.llm.generate(prompt, system_prompt)
        
        # Parse response and create subtasks
        subtasks = []
        
        # Fallback decomposition
        subtasks.append(Task(
            id=str(uuid.uuid4())[:8],
            name=f"Plan {task.name}",
            type=TaskType.PRIMITIVE,
            description=f"Create detailed plan for {task.name}",
            department=task.department,
            estimated_duration=task.estimated_duration * 0.2,
            dependencies=[task.id],
            priority=task.priority
        ))
        
        subtasks.append(Task(
            id=str(uuid.uuid4())[:8],
            name=f"Execute {task.name}",
            type=TaskType.PRIMITIVE,
            description=f"Carry out {task.name}",
            department=task.department,
            estimated_duration=task.estimated_duration * 0.6,
            dependencies=[subtasks[0].id],
            priority=task.priority - 1
        ))
        
        subtasks.append(Task(
            id=str(uuid.uuid4())[:8],
            name=f"Verify {task.name}",
            type=TaskType.PRIMITIVE,
            description=f"Verify completion of {task.name}",
            department=task.department,
            estimated_duration=task.estimated_duration * 0.2,
            dependencies=[subtasks[1].id],
            priority=task.priority - 2
        ))
        
        return subtasks
    
    def _topological_sort(self, tasks: Dict[str, Task]) -> List[str]:
        """Sort tasks in execution order respecting dependencies"""
        # Build adjacency list
        graph = defaultdict(list)
        in_degree = defaultdict(int)
        
        for task_id, task in tasks.items():
            in_degree[task_id] = len(task.dependencies)
            for dep in task.dependencies:
                if dep in tasks:  # Only count dependencies within this plan
                    graph[dep].append(task_id)
        
        # Find nodes with no dependencies
        queue = deque([task_id for task_id in tasks if in_degree[task_id] == 0])
        sorted_order = []
        
        while queue:
            current = queue.popleft()
            sorted_order.append(current)
            
            for neighbor in graph[current]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)
        
        return sorted_order

# Create planner
planner = WalmartHTNPlanner()
print("\n✅ Walmart HTN Planner ready!")

## 📊 Part 3: Plan Visualization

Visualizing plans helps understand task dependencies and resource allocation.

In [None]:
def visualize_plan(plan: Plan):
    """Create comprehensive visualization of the plan"""
    fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))
    fig.suptitle(f'Plan: {plan.goal}', fontsize=16, fontweight='bold')
    
    # 1. Task dependency graph
    G = nx.DiGraph()
    
    # Add nodes
    for task_id, task in plan.tasks.items():
        G.add_node(task_id, 
                  label=task.name[:20] + "..." if len(task.name) > 20 else task.name,
                  department=task.department,
                  type=task.type.value)
    
    # Add edges
    for task_id, task in plan.tasks.items():
        for dep in task.dependencies:
            if dep in plan.tasks:
                G.add_edge(dep, task_id)
    
    # Layout and draw
    pos = nx.spring_layout(G, k=2, iterations=50)
    
    # Color by department
    dept_colors = {
        "Operations": "lightblue",
        "HR": "lightgreen", 
        "Inventory": "lightyellow",
        "Analytics": "lightcoral",
        "IT": "lightgray",
        "Merchandising": "plum",
        "Purchasing": "wheat",
        "General": "white"
    }
    
    node_colors = [dept_colors.get(plan.tasks[node].department, 'white') for node in G.nodes()]
    
    nx.draw(G, pos, ax=ax1, with_labels=True,
            labels=nx.get_node_attributes(G, 'label'),
            node_color=node_colors,
            node_size=2000,
            font_size=8,
            font_weight='bold',
            arrows=True,
            edge_color='gray',
            node_shape='o')
    
    ax1.set_title("Task Dependencies", fontsize=12, fontweight='bold')
    ax1.axis('off')
    
    # 2. Gantt chart (simplified)
    tasks_by_order = [plan.tasks[tid] for tid in plan.task_order if tid in plan.tasks]
    
    y_pos = range(len(tasks_by_order))
    start_times = []
    durations = []
    colors = []
    
    current_time = 0
    for i, task in enumerate(tasks_by_order):
        start_times.append(current_time)
        durations.append(task.estimated_duration)
        colors.append(dept_colors.get(task.department, 'gray'))
        current_time += task.estimated_duration * 0.5  # Overlap allowed
    
    ax2.barh(y_pos, durations, left=start_times, color=colors, alpha=0.8)
    ax2.set_yticks(y_pos)
    ax2.set_yticklabels([t.name[:25] for t in tasks_by_order], fontsize=8)
    ax2.set_xlabel('Time (hours)')
    ax2.set_title('Task Timeline (Gantt Chart)', fontsize=12, fontweight='bold')
    ax2.grid(axis='x', alpha=0.3)
    
    # 3. Resource allocation
    resource_usage = defaultdict(float)
    for task in plan.tasks.values():
        for resource, amount in task.required_resources.items():
            resource_usage[resource] += amount
    
    if resource_usage:
        resources = list(resource_usage.keys())
        usage = list(resource_usage.values())
        capacity = [plan.resources[r].total_capacity if r in plan.resources else 0 for r in resources]
        
        x = np.arange(len(resources))
        width = 0.35
        
        bars1 = ax3.bar(x - width/2, usage, width, label='Required', color='coral')
        bars2 = ax3.bar(x + width/2, capacity, width, label='Available', color='lightgreen')
        
        ax3.set_xlabel('Resource Type')
        ax3.set_ylabel('Amount')
        ax3.set_title('Resource Requirements', fontsize=12, fontweight='bold')
        ax3.set_xticks(x)
        ax3.set_xticklabels(resources)
        ax3.legend()
        
        # Add value labels
        for bar in bars1:
            height = bar.get_height()
            ax3.annotate(f'{height:.0f}',
                        xy=(bar.get_x() + bar.get_width() / 2, height),
                        xytext=(0, 3),
                        textcoords="offset points",
                        ha='center', va='bottom',
                        fontsize=8)
    
    # 4. Plan metrics
    metrics_text = f"""Plan Metrics:
    
Total Tasks: {len(plan.tasks)}
Estimated Duration: {plan.estimated_duration:.1f} hours
Estimated Cost: ${plan.estimated_cost:,.2f}
Success Probability: {plan.success_probability:.1%}

Departments Involved:
"""
    
    dept_counts = defaultdict(int)
    for task in plan.tasks.values():
        dept_counts[task.department] += 1
    
    for dept, count in sorted(dept_counts.items(), key=lambda x: x[1], reverse=True):
        metrics_text += f"  • {dept}: {count} tasks\n"
    
    ax4.text(0.1, 0.9, metrics_text, transform=ax4.transAxes,
            verticalalignment='top', fontsize=11,
            bbox=dict(boxstyle='round', facecolor='wheat', alpha=0.5))
    
    # Add legend for departments
    legend_elements = [mpatches.Patch(facecolor=color, label=dept) 
                      for dept, color in dept_colors.items() 
                      if dept in dept_counts]
    ax4.legend(handles=legend_elements, loc='lower left', title='Departments')
    
    ax4.set_title('Plan Summary', fontsize=12, fontweight='bold')
    ax4.axis('off')
    
    plt.tight_layout()
    plt.show()

print("✅ Plan visualization function ready!")

## 🚀 Part 4: Adaptive Plan Execution

Execute plans with monitoring, resource management, and adaptive replanning.

In [None]:
class AdaptivePlanExecutor:
    """Executes plans with monitoring and adaptation"""
    
    def __init__(self, memory_manager=None):
        self.execution_history = []
        self.memory = memory_manager  # Integration with Module 2
        self.current_plan = None
        self.completed_tasks = set()
        self.failed_tasks = set()
        
    def execute_plan(self, plan: Plan, real_time: bool = False) -> Dict[str, Any]:
        """Execute plan with adaptive replanning"""
        print(f"\n🚀 Executing plan: {plan.goal}")
        print(f"📊 Total tasks: {len(plan.tasks)}")
        print(f"⏱️ Estimated duration: {plan.estimated_duration:.1f} hours\n")
        
        self.current_plan = plan
        execution_log = {
            "plan_id": plan.id,
            "goal": plan.goal,
            "start_time": datetime.now(),
            "tasks_completed": 0,
            "tasks_failed": 0,
            "replanning_count": 0,
            "resource_conflicts": 0,
            "total_cost": 0.0,
            "events": []
        }
        
        # Execute tasks in order
        while self._has_pending_tasks(plan):
            # Get ready tasks
            ready_tasks = plan.get_ready_tasks(self.completed_tasks)
            
            if not ready_tasks:
                # Check for deadlock
                if self._detect_deadlock(plan):
                    print("\n⚠️ Deadlock detected! Initiating replanning...")
                    self._replan(plan, "deadlock", execution_log)
                    continue
                else:
                    print("\n❌ No executable tasks available")
                    break
            
            # Execute highest priority ready task
            task = ready_tasks[0]
            print(f"\n▶️ Executing: {task.name} [{task.department}]")
            
            # Try to allocate resources
            if self._allocate_resources(task, plan):
                # Execute task
                success, duration, outcome = self._execute_task(task, real_time)
                
                # Update task status
                if success:
                    task.status = TaskStatus.COMPLETED
                    task.actual_duration = duration
                    task.outcome = outcome
                    self.completed_tasks.add(task.id)
                    execution_log["tasks_completed"] += 1
                    
                    # Store in memory if available
                    if self.memory:
                        self._store_execution_memory(task, success, outcome)
                    
                    print(f"  ✅ Completed in {duration:.1f} hours")
                    if outcome.get("cost"):
                        execution_log["total_cost"] += outcome["cost"]
                        print(f"  💰 Cost: ${outcome['cost']:,.2f}")
                else:
                    task.status = TaskStatus.FAILED
                    self.failed_tasks.add(task.id)
                    execution_log["tasks_failed"] += 1
                    
                    print(f"  ❌ Failed: {outcome.get('error', 'Unknown error')}")
                    
                    # Decide whether to replan
                    if self._should_replan(task, plan):
                        print("  🔄 Initiating adaptive replanning...")
                        self._replan(plan, f"task_failure:{task.name}", execution_log)
                
                # Release resources
                self._release_resources(task, plan)
                
                # Log event
                execution_log["events"].append({
                    "time": datetime.now(),
                    "task": task.name,
                    "status": "completed" if success else "failed",
                    "duration": duration
                })
            else:
                print(f"  ⏸️ Resource conflict - postponing task")
                execution_log["resource_conflicts"] += 1
                
                # Try to find alternative task
                if len(ready_tasks) > 1:
                    # Move to next task
                    continue
                else:
                    # Wait or replan
                    if execution_log["resource_conflicts"] > 5:
                        self._replan(plan, "resource_deadlock", execution_log)
        
        # Calculate final metrics
        execution_log["end_time"] = datetime.now()
        execution_log["total_duration"] = (
            execution_log["end_time"] - execution_log["start_time"]
        ).total_seconds() / 3600
        
        execution_log["success_rate"] = (
            execution_log["tasks_completed"] / len(plan.tasks) 
            if plan.tasks else 0
        )
        
        self.execution_history.append(execution_log)
        
        # Print summary
        self._print_execution_summary(execution_log)
        
        return execution_log
    
    def _has_pending_tasks(self, plan: Plan) -> bool:
        """Check if there are tasks still to execute"""
        for task in plan.tasks.values():
            if task.status in [TaskStatus.PENDING, TaskStatus.READY]:
                return True
        return False
    
    def _detect_deadlock(self, plan: Plan) -> bool:
        """Detect if plan is in deadlock state"""
        pending_tasks = [t for t in plan.tasks.values() 
                        if t.status == TaskStatus.PENDING]
        
        if not pending_tasks:
            return False
            
        # Check if all pending tasks have failed dependencies
        for task in pending_tasks:
            has_failed_dep = any(dep in self.failed_tasks 
                               for dep in task.dependencies)
            if has_failed_dep:
                return True
                
        return False
    
    def _allocate_resources(self, task: Task, plan: Plan) -> bool:
        """Try to allocate required resources"""
        # Check availability
        for resource_name, amount in task.required_resources.items():
            if resource_name in plan.resources:
                if plan.resources[resource_name].available_capacity < amount:
                    return False
        
        # Allocate
        for resource_name, amount in task.required_resources.items():
            if resource_name in plan.resources:
                plan.resources[resource_name].allocate(amount)
                
        return True
    
    def _release_resources(self, task: Task, plan: Plan):
        """Release allocated resources"""
        for resource_name, amount in task.required_resources.items():
            if resource_name in plan.resources:
                plan.resources[resource_name].release(amount)
    
    def _execute_task(self, task: Task, real_time: bool) -> Tuple[bool, float, Dict]:
        """Execute individual task"""
        task.actual_start = datetime.now()
        task.status = TaskStatus.RUNNING
        
        # Simulate execution
        if real_time:
            time.sleep(min(task.estimated_duration, 0.5))  # Max 0.5 seconds
        
        # Determine success based on probability and context
        success = np.random.random() < task.success_probability
        
        # Add some variety to outcomes
        if "Black Friday" in task.name and "Test" in task.name:
            # Critical tasks have lower success rate
            success = np.random.random() < 0.7
            
        duration = task.estimated_duration * np.random.uniform(0.8, 1.2)
        
        outcome = {
            "success": success,
            "duration": duration
        }
        
        # Add task-specific outcomes
        if "budget" in task.required_resources:
            outcome["cost"] = task.required_resources["budget"] * np.random.uniform(0.9, 1.1)
            
        if not success:
            errors = [
                "Resource shortage",
                "System failure",
                "Staff unavailable",
                "Supplier delay",
                "Quality issue"
            ]
            outcome["error"] = np.random.choice(errors)
            
        task.actual_end = datetime.now()
        return success, duration, outcome
    
    def _should_replan(self, failed_task: Task, plan: Plan) -> bool:
        """Decide if replanning is needed"""
        # High priority tasks always trigger replanning
        if failed_task.priority >= 8:
            return True
            
        # Tasks with many dependents trigger replanning
        dependent_count = sum(1 for t in plan.tasks.values() 
                            if failed_task.id in t.dependencies)
        if dependent_count > 2:
            return True
            
        # Critical path tasks
        if "critical" in failed_task.name.lower():
            return True
            
        return False
    
    def _replan(self, plan: Plan, reason: str, execution_log: Dict):
        """Perform adaptive replanning"""
        print(f"\n🔄 Replanning due to: {reason}")
        execution_log["replanning_count"] += 1
        
        # Simple replanning strategies
        if "deadlock" in reason:
            # Remove dependencies from failed tasks
            for task in plan.tasks.values():
                if task.status == TaskStatus.PENDING:
                    task.dependencies = [
                        dep for dep in task.dependencies 
                        if dep not in self.failed_tasks
                    ]
                    
        elif "resource" in reason:
            # Reduce resource requirements
            for task in plan.tasks.values():
                if task.status == TaskStatus.PENDING:
                    for resource in task.required_resources:
                        task.required_resources[resource] *= 0.8
                        
        elif "task_failure" in reason:
            # Add retry task or alternative
            failed_task_name = reason.split(":")[1]
            print(f"  Adding alternative for: {failed_task_name}")
            # In real implementation, would create alternative task
            
        print("  ✅ Replanning complete")
    
    def _store_execution_memory(self, task: Task, success: bool, outcome: Dict):
        """Store execution experience in memory"""
        if not self.memory:
            return
            
        # Store as episodic memory
        context = f"{task.department} - {self.current_plan.goal}"
        action = f"Execute: {task.name}"
        result = "Success" if success else f"Failed: {outcome.get('error', 'Unknown')}"
        
        # This would integrate with Module 2's memory system
        # self.memory.store_experience(context, action, result, success)
    
    def _print_execution_summary(self, log: Dict):
        """Print execution summary"""
        print("\n" + "="*60)
        print("📊 EXECUTION SUMMARY")
        print("="*60)
        print(f"Goal: {log['goal']}")
        print(f"Duration: {log['total_duration']:.1f} hours")
        print(f"Tasks Completed: {log['tasks_completed']}/{log['tasks_completed'] + log['tasks_failed']}")
        print(f"Success Rate: {log['success_rate']:.1%}")
        print(f"Total Cost: ${log['total_cost']:,.2f}")
        print(f"Replanning Events: {log['replanning_count']}")
        print(f"Resource Conflicts: {log['resource_conflicts']}")
        print("="*60)

# Create executor
executor = AdaptivePlanExecutor()
print("✅ Adaptive Plan Executor ready!")

## 🎯 Part 5: Retail Planning Scenarios

Let's see our planning system handle real Walmart scenarios.

In [None]:
# Scenario 1: Black Friday Preparation
print("🛍️ SCENARIO 1: Black Friday Preparation\n")
print("Context: It's November 1st. We need to prepare for Black Friday.")
print("Challenge: Coordinate inventory, staffing, and store layout.\n")

# Create plan
black_friday_plan = planner.decompose_goal(
    "Prepare store for Black Friday 2024",
    context={
        "event_type": "major_sale",
        "urgency": "high",
        "departments": ["Operations", "HR", "Inventory", "IT"],
        "estimated_scope": "large"
    }
)

# Visualize the plan
visualize_plan(black_friday_plan)

In [None]:
# Execute the Black Friday plan
print("\n🚀 Executing Black Friday Preparation Plan...\n")
bf_execution_log = executor.execute_plan(black_friday_plan, real_time=False)

# Analyze execution
if bf_execution_log["success_rate"] < 0.8:
    print("\n⚠️ Plan execution had issues. Let's analyze what went wrong...")
    failed_events = [e for e in bf_execution_log["events"] if e["status"] == "failed"]
    for event in failed_events[:3]:
        print(f"  - {event['task']} failed at {event['time'].strftime('%H:%M')}")

In [None]:
# Scenario 2: Emergency Inventory Shortage
print("\n📦 SCENARIO 2: Emergency Inventory Management\n")
print("Context: Popular toy sold out 3 weeks before Christmas!")
print("Challenge: Quickly restock without disrupting operations.\n")

# Create emergency plan
emergency_plan = planner.decompose_goal(
    "Emergency restock of sold-out Christmas toys",
    context={
        "event_type": "emergency",
        "urgency": "high",
        "departments": ["Inventory", "Purchasing", "Logistics"],
        "estimated_scope": "medium",
        "time_constraint": "48 hours"
    }
)

print(f"\n📋 Emergency plan created with {len(emergency_plan.tasks)} tasks")
print("\nKey tasks:")
for task in list(emergency_plan.tasks.values())[:5]:
    print(f"  - {task.name} ({task.estimated_duration:.1f}h) [{task.department}]")

In [None]:
# Scenario 3: Seasonal Transition Planning
print("\n🍂 SCENARIO 3: Summer to Back-to-School Transition\n")
print("Context: July 1st - need to transition from summer to school supplies.")
print("Challenge: Gradual transition without losing summer sales.\n")

# This demonstrates a more complex planning scenario
transition_plan = planner.decompose_goal(
    "Transition store layout from summer to back-to-school season",
    context={
        "event_type": "seasonal",
        "urgency": "medium",
        "departments": ["Merchandising", "Inventory", "Marketing"],
        "estimated_scope": "large",
        "constraints": ["maintain summer sales", "gradual transition"]
    }
)

# Show resource requirements
print("\n📊 Resource Analysis:")
total_resources = defaultdict(float)
for task in transition_plan.tasks.values():
    for resource, amount in task.required_resources.items():
        total_resources[resource] += amount

print("\nTotal resources needed:")
for resource, amount in total_resources.items():
    available = transition_plan.resources[resource].total_capacity
    utilization = (amount / available) * 100 if available > 0 else 0
    print(f"  - {resource}: {amount:.0f} / {available:.0f} ({utilization:.1f}% utilization)")
    if utilization > 80:
        print(f"    ⚠️ High utilization warning!")

## 🎯 Hands-On Exercise: Custom Retail Planner

Create your own planning method for a retail scenario.

In [None]:
# Exercise: Create a Customer Service Recovery Plan
print("🎯 Exercise: Customer Service Crisis Planning\n")
print("Scenario: Social media complaints about long checkout lines are going viral!")
print("Your task: Create a planning method to handle this crisis.\n")

# TODO: Create your custom method
def create_customer_service_method() -> Method:
    """
    Create a method for handling customer service crisis.
    
    Consider:
    - Immediate response (< 1 hour)
    - Short-term fixes (< 24 hours)  
    - Long-term improvements
    - Social media management
    - Staff training
    """
    return Method(
        name="Customer Service Crisis Response",
        applicable_to="customer service crisis",
        preconditions={"event_type": "crisis"},
        subtasks=[
            # TODO: Add your subtasks here
            {
                "name": "Acknowledge Issue on Social Media",
                "type": "primitive",
                "department": "Marketing",
                "duration": 0.5,
                "resources": {"staff": 1},
                "priority": 10
            },
            {
                "name": "Open All Available Registers",
                "type": "primitive",
                "department": "Operations",
                "duration": 0.25,
                "resources": {"staff": 10, "equipment": 8},
                "priority": 9
            },
            {
                "name": "Deploy Express Checkout Lanes",
                "type": "primitive",
                "department": "Operations",
                "duration": 1.0,
                "resources": {"staff": 4, "space": 200},
                "priority": 8
            },
            {
                "name": "Call in Additional Cashiers",
                "type": "primitive",
                "department": "HR",
                "duration": 2.0,
                "resources": {"staff": 1, "budget": 500},
                "priority": 7
            },
            {
                "name": "Implement Queue Management System",
                "type": "abstract",
                "department": "Operations",
                "duration": 4.0,
                "resources": {"staff": 3, "equipment": 5},
                "priority": 6
            },
            {
                "name": "Train Staff on Fast Checkout",
                "type": "primitive",
                "department": "HR",
                "duration": 3.0,
                "resources": {"staff": 2, "budget": 1000},
                "priority": 5
            },
            {
                "name": "Post Resolution Update",
                "type": "primitive",
                "department": "Marketing",
                "duration": 0.5,
                "resources": {"staff": 1},
                "priority": 4
            }
        ]
    )

# Add the method to planner
crisis_method = create_customer_service_method()
planner.methods["customer_crisis"] = crisis_method
print("✅ Added customer service crisis method!")

# Test the method
crisis_plan = planner.decompose_goal(
    "Handle customer service crisis about long checkout lines",
    context={
        "event_type": "crisis",
        "urgency": "high",
        "departments": ["Operations", "Marketing", "HR"]
    }
)

print(f"\n📋 Crisis plan created with {len(crisis_plan.tasks)} tasks")
print(f"⏱️ Can be resolved in {crisis_plan.estimated_duration:.1f} hours")
print(f"💰 Estimated cost: ${crisis_plan.estimated_cost:,.2f}")

## 🔗 Part 6: Integration with Previous Modules

Combining planning with memory and tools from earlier modules.

In [None]:
class IntegratedRetailAgent:
    """Combines planning, memory, and tools for complete retail AI"""
    
    def __init__(self, store_id: str = "#100"):
        self.store_id = store_id
        self.planner = WalmartHTNPlanner()
        self.executor = AdaptivePlanExecutor()
        
        # In real implementation, would import from Module 2
        # self.memory = WalmartMemoryManager(store_id)
        
        # In real implementation, would import from Module 1
        # self.tools = {"search": SearchTool(), "calculate": CalculatorTool()}
        
        self.active_plans = {}
        
    def handle_retail_goal(self, goal: str, context: Dict[str, Any] = None) -> Dict:
        """Complete pipeline: understand → plan → execute → learn"""
        print(f"\n🤖 Integrated Retail Agent Processing: {goal}\n")
        
        # Step 1: Retrieve relevant memories
        print("📚 Step 1: Checking memory for similar situations...")
        # memories = self.memory.retrieve_relevant_memories(goal)
        memories = []  # Placeholder
        if memories:
            print(f"  Found {len(memories)} relevant memories")
        else:
            print("  No relevant memories found - this is a new situation")
        
        # Step 2: Create plan
        print("\n📋 Step 2: Creating execution plan...")
        plan = self.planner.decompose_goal(goal, context)
        self.active_plans[plan.id] = plan
        
        # Step 3: Execute plan
        print("\n🚀 Step 3: Executing plan...")
        execution_log = self.executor.execute_plan(plan)
        
        # Step 4: Learn from execution
        print("\n🧠 Step 4: Learning from execution...")
        self._learn_from_execution(plan, execution_log)
        
        # Step 5: Generate report
        report = self._generate_report(goal, plan, execution_log)
        
        return report
    
    def _learn_from_execution(self, plan: Plan, execution_log: Dict):
        """Extract learnings and store in memory"""
        learnings = []
        
        # Learn from failures
        failed_tasks = [e for e in execution_log["events"] if e["status"] == "failed"]
        for failure in failed_tasks:
            learnings.append({
                "type": "failure_pattern",
                "task": failure["task"],
                "lesson": "Consider alternative approach or additional resources"
            })
        
        # Learn from successes
        if execution_log["success_rate"] > 0.9:
            learnings.append({
                "type": "successful_plan",
                "goal": plan.goal,
                "lesson": "This plan structure works well"
            })
        
        # Learn from resource usage
        if execution_log["resource_conflicts"] > 3:
            learnings.append({
                "type": "resource_optimization",
                "lesson": "Need better resource allocation for this goal type"
            })
        
        print(f"  Extracted {len(learnings)} learnings")
        for learning in learnings:
            print(f"    - {learning['type']}: {learning['lesson']}")
            
        # Store in memory (if implemented)
        # for learning in learnings:
        #     self.memory.store_procedure(learning['type'], ...)
            
        return learnings
    
    def _generate_report(self, goal: str, plan: Plan, execution_log: Dict) -> Dict:
        """Generate comprehensive report"""
        report = {
            "goal": goal,
            "plan_id": plan.id,
            "success": execution_log["success_rate"] > 0.8,
            "metrics": {
                "tasks_total": len(plan.tasks),
                "tasks_completed": execution_log["tasks_completed"],
                "success_rate": execution_log["success_rate"],
                "duration_hours": execution_log["total_duration"],
                "total_cost": execution_log["total_cost"],
                "replanning_events": execution_log["replanning_count"]
            },
            "departments_involved": list(set(t.department for t in plan.tasks.values())),
            "key_achievements": [
                task.name for task in plan.tasks.values() 
                if task.status == TaskStatus.COMPLETED and task.priority >= 8
            ],
            "issues_encountered": [
                {"task": e["task"], "time": e["time"].strftime("%H:%M")}
                for e in execution_log["events"] if e["status"] == "failed"
            ],
            "recommendations": self._generate_recommendations(plan, execution_log)
        }
        
        return report
    
    def _generate_recommendations(self, plan: Plan, execution_log: Dict) -> List[str]:
        """Generate actionable recommendations"""
        recommendations = []
        
        if execution_log["success_rate"] < 0.8:
            recommendations.append("Consider breaking down complex tasks further")
            
        if execution_log["resource_conflicts"] > 0:
            recommendations.append("Review resource allocation and consider staggered execution")
            
        if execution_log["replanning_count"] > 2:
            recommendations.append("Initial plan may be too optimistic - add buffer time")
            
        if execution_log["total_cost"] > plan.estimated_cost * 1.2:
            recommendations.append("Cost overrun detected - review budget estimates")
            
        return recommendations

# Create integrated agent
integrated_agent = IntegratedRetailAgent()
print("✅ Integrated Retail Agent ready!")
print("🏪 Combines planning, memory, and execution")

In [None]:
# Test integrated agent with complex goal
print("🎯 Testing Integrated Agent with Complex Goal\n")

complex_goal = "Optimize store operations for holiday shopping season while maintaining regular customer satisfaction"

report = integrated_agent.handle_retail_goal(
    complex_goal,
    context={
        "event_type": "seasonal",
        "urgency": "high",
        "timeframe": "6 weeks",
        "constraints": ["maintain service levels", "stay within budget"]
    }
)

# Display report
print("\n" + "="*60)
print("📊 FINAL REPORT")
print("="*60)
print(f"Goal: {report['goal']}")
print(f"Success: {'✅ Yes' if report['success'] else '❌ No'}")
print("\nMetrics:")
for metric, value in report['metrics'].items():
    if isinstance(value, float):
        if 'rate' in metric:
            print(f"  - {metric}: {value:.1%}")
        elif 'cost' in metric:
            print(f"  - {metric}: ${value:,.2f}")
        else:
            print(f"  - {metric}: {value:.1f}")
    else:
        print(f"  - {metric}: {value}")

print(f"\nDepartments Involved: {', '.join(report['departments_involved'])}")

if report['key_achievements']:
    print("\n🏆 Key Achievements:")
    for achievement in report['key_achievements'][:5]:
        print(f"  ✓ {achievement}")

if report['recommendations']:
    print("\n💡 Recommendations:")
    for rec in report['recommendations']:
        print(f"  • {rec}")

print("\n" + "="*60)

## 📊 Summary and Key Takeaways

### What You've Learned:

1. **Hierarchical Task Networks (HTN)**:
   - Decomposing abstract goals into concrete tasks
   - Methods and operators for retail operations
   - Task dependencies and topological sorting

2. **Resource Management**:
   - Staff, budget, space, and equipment constraints
   - Resource allocation and conflict resolution
   - Capacity planning for retail operations

3. **Adaptive Planning**:
   - Monitoring plan execution in real-time
   - Detecting and resolving deadlocks
   - Replanning when tasks fail

4. **Retail Applications**:
   - Black Friday preparation workflows
   - Emergency inventory management
   - Seasonal transition planning
   - Customer service crisis response

5. **Integration**:
   - Combining planning with memory systems
   - Learning from execution experiences
   - Generating actionable insights

### 🚀 Real-World Impact:

This planning system could:
- **Reduce preparation time** for major events by 30%
- **Improve resource utilization** by 25%
- **Decrease crisis response time** from hours to minutes
- **Increase operational efficiency** by 20%

### 💡 Advanced Challenges:

1. **Multi-Store Coordination**: Extend to plan across multiple Walmart locations
2. **Predictive Planning**: Use historical data to anticipate needs
3. **Real-time Optimization**: Continuously adjust plans based on live data
4. **Supply Chain Integration**: Connect with supplier systems

### 🎓 Course Completion:

Congratulations! You've completed the Agent Starter Kit curriculum:

✅ **Module 1**: Agent Foundations (ReAct pattern)  
✅ **Module 2**: Memory and Learning  
✅ **Module 3**: Tool Integration  
✅ **Module 4**: Planning and Goals  

You now have the skills to build sophisticated AI agents for real-world applications!

### 🔗 Next Steps:

1. **Combine all modules** into a complete retail AI system
2. **Deploy to production** with proper monitoring
3. **Measure impact** on business metrics
4. **Iterate and improve** based on real usage

Thank you for learning with the Agent Starter Kit! 🎉