In [1]:
from dataclasses import dataclass, field
from typing import List, Dict, Callable


In [2]:
@dataclass
class Task:
    name: str
    action: Callable
    dependencies: List[str] = field(default_factory=list)


In [3]:
@dataclass
class Milestone:
    name: str
    tasks: List[Task]


In [4]:
@dataclass
class Plan:
    goal: str
    milestones: List[Milestone]
    constraints: Dict[str, str]


In [5]:
# Planner Agent (Plan Generation)

class PlannerAgent:
    def create_plan(self, goal: str) -> Plan:
        print(f"[Planner] Creating plan for goal: {goal}")

        tasks_ingestion = [
            Task("load_pdfs", self.load_pdfs),
            Task("extract_metadata", self.extract_metadata)
        ]

        tasks_indexing = [
            Task("generate_embeddings", self.generate_embeddings, ["load_pdfs"]),
            Task("build_index", self.build_index, ["generate_embeddings"])
        ]

        tasks_retrieval = [
            Task("test_retrieval", self.test_retrieval, ["build_index"])
        ]

        milestones = [
            Milestone("Data Ingestion Ready", tasks_ingestion),
            Milestone("Vector Index Built", tasks_indexing),
            Milestone("Retrieval Working", tasks_retrieval)
        ]

        constraints = {
            "data": "internal PDFs only",
            "latency": "< 2 seconds",
            "vector_db": "open-source"
        }

        return Plan(goal, milestones, constraints)

    # --- Task Actions ---
    def load_pdfs(self):
        print("Loading internal PDFs")

    def extract_metadata(self):
        print("Extracting metadata")

    def generate_embeddings(self):
        print("Generating embeddings")

    def build_index(self):
        print("Building vector index")

    def test_retrieval(self):
        print("Testing retrieval")


In [6]:
# Executor Agent (Task Execution)

class ExecutorAgent:
    def __init__(self):
        self.completed_tasks = set()

    def execute_task(self, task: Task):
        for dep in task.dependencies:
            if dep not in self.completed_tasks:
                raise RuntimeError(f"Dependency not met: {dep}")

        print(f"[Executor] Executing task: {task.name}")
        task.action()
        self.completed_tasks.add(task.name)


In [7]:
# Monitoring Agent

class MonitoringAgent:
    def validate(self, task_name: str) -> bool:
        print(f"[Monitor] Validating output of {task_name}")
        return True  # Stub for real validation


In [8]:
#  Replanning Agent

class ReplannerAgent:
    def replan(self, failed_task: str):
        print(f"[Replanner] Replanning due to failure in {failed_task}")


In [9]:
# Orchestrator (System Controller)

class AgenticSystem:
    def __init__(self):
        self.planner = PlannerAgent()
        self.executor = ExecutorAgent()
        self.monitor = MonitoringAgent()
        self.replanner = ReplannerAgent()

    def run(self, goal: str):
        plan = self.planner.create_plan(goal)

        print("\n[System] Constraints Applied:")
        for k, v in plan.constraints.items():
            print(f" - {k}: {v}")

        print("\n[System] Executing Plan\n")

        for milestone in plan.milestones:
            print(f"\n=== Milestone: {milestone.name} ===")
            for task in milestone.tasks:
                try:
                    self.executor.execute_task(task)
                    if not self.monitor.validate(task.name):
                        raise RuntimeError("Validation failed")
                except Exception as e:
                    print(f"[Error] {e}")
                    self.replanner.replan(task.name)
                    return

        print("\n[System] Final Aggregation Complete")
        print("Goal achieved successfully!")


In [10]:
# Run the System

if __name__ == "__main__":
    system = AgenticSystem()
    system.run("Build a RAG-based chatbot for internal documents")


[Planner] Creating plan for goal: Build a RAG-based chatbot for internal documents

[System] Constraints Applied:
 - data: internal PDFs only
 - latency: < 2 seconds
 - vector_db: open-source

[System] Executing Plan


=== Milestone: Data Ingestion Ready ===
[Executor] Executing task: load_pdfs
Loading internal PDFs
[Monitor] Validating output of load_pdfs
[Executor] Executing task: extract_metadata
Extracting metadata
[Monitor] Validating output of extract_metadata

=== Milestone: Vector Index Built ===
[Executor] Executing task: generate_embeddings
Generating embeddings
[Monitor] Validating output of generate_embeddings
[Executor] Executing task: build_index
Building vector index
[Monitor] Validating output of build_index

=== Milestone: Retrieval Working ===
[Executor] Executing task: test_retrieval
Testing retrieval
[Monitor] Validating output of test_retrieval

[System] Final Aggregation Complete
Goal achieved successfully!
