# Magentic Orchestration Deep Dive

Explore how AgenticFleet layers Microsoft Agent Framework's Magentic workflow across planning, delegation, streaming telemetry, plan review, and checkpoint resume capabilities. Follow the sections below to wire up a notebook-friendly fleet, execute a task, then inspect every artifact the orchestrator produces.


In [None]:
import json
from dataclasses import dataclass, field
from typing import Any

try:
    import agent_framework  # noqa: F401
except ModuleNotFoundError as exc:  # pragma: no cover - environment guard
    raise RuntimeError(
        "This notebook requires the `agent-framework` package. "
        "Install it with `uv add agent-framework`."
    ) from exc

from agenticfleet.cli.ui import AgentMessage, FinalRenderData
from agenticfleet.config import settings
from agenticfleet.core.approval import (
    ApprovalDecision,
    ApprovalHandler,
    ApprovalRequest,
    ApprovalResponse,
)
from agenticfleet.core.approved_tools import set_approval_handler
from agenticfleet.fleet.callbacks import ConsoleCallbacks
from agenticfleet.fleet.magentic_fleet import MagenticFleet

if not settings.openai_api_key:
    print(
        "WARNING: OPENAI_API_KEY is not configured. Hosted model calls will fail until it is set."
    )

print("✓ Core imports loaded")

✓ Imports loaded successfully


## Notebook observer utilities

Collect Magentic callbacks in-memory so we can review task plans, progress ledgers, agent responses, and final answers after each run.


In [None]:
@dataclass
class NotebookUI:
    plan_history: list[dict[str, list[str]]] = field(default_factory=list)
    progress_history: list[dict[str, Any]] = field(default_factory=list)
    agent_messages: list[AgentMessage] = field(default_factory=list)
    notices: list[dict[str, str]] = field(default_factory=list)
    final_payloads: list[dict[str, Any]] = field(default_factory=list)

    @staticmethod
    def _to_lines(value: Any) -> list[str]:
        if value is None:
            return []
        if isinstance(value, str):
            return [line.strip() for line in value.splitlines() if line.strip()]
        if isinstance(value, dict):
            return [f"{key}: {val}" for key, val in value.items()]
        if isinstance(value, (list, tuple, set)):
            lines: list[str] = []
            for item in value:
                lines.extend(NotebookUI._to_lines(item))
            return lines
        text = str(value).strip()
        return [text] if text else []

    def reset(self) -> None:
        self.plan_history.clear()
        self.progress_history.clear()
        self.agent_messages.clear()
        self.notices.clear()
        self.final_payloads.clear()

    def log_plan(
        self, facts: list[str] | tuple[str, ...] | None, plan: list[str] | tuple[str, ...] | None
    ) -> None:
        self.plan_history.append(
            {
                "facts": self._to_lines(facts),
                "plan": self._to_lines(plan),
            }
        )

    def log_progress(self, status: str, next_speaker: str, instruction: str | None = None) -> None:
        self.progress_history.append(
            {
                "status": status,
                "next_speaker": next_speaker,
                "instruction": self._to_lines(instruction),
            }
        )

    def log_agent_message(self, message: AgentMessage) -> None:
        self.agent_messages.append(
            AgentMessage(agent_name=message.agent_name, content=message.content, mode=message.mode)
        )

    def log_notice(self, text: str, *, style: str = "blue") -> None:
        self.notices.append({"text": text, "style": style})

    def log_final(self, result: FinalRenderData | str | Any | None) -> None:
        if isinstance(result, FinalRenderData):
            payload = {"sections": result.sections, "raw_text": result.raw_text}
        elif isinstance(result, str) or result is None:
            payload = {"raw_text": result}
        else:
            payload = {"raw_object": str(result)}
        self.final_payloads.append(payload)

    def latest_summary(self) -> dict[str, Any]:
        return {
            "plans": self.plan_history,
            "progress": self.progress_history,
            "agents": [
                {"agent": message.agent_name, "content": message.content}
                for message in self.agent_messages
            ],
            "notices": self.notices,
            "final": self.final_payloads,
        }

✓ Researcher agent configured


## Human-in-the-loop policy for notebooks

Magentic fleets expect an approval handler when HITL is enabled. We use a lightweight handler that auto-approves tool calls but rewrites the first plan to show how plan review feedback alters execution.


In [None]:
class NotebookApprovalHandler(ApprovalHandler):
    def __init__(self, *, add_plan_revision: bool = True) -> None:
        self.add_plan_revision = add_plan_revision
        self.history: list[ApprovalResponse] = []
        self._plan_review_round = 0

    async def request_approval(self, request: ApprovalRequest) -> ApprovalResponse:
        if request.operation_type == "plan_review" and self.add_plan_revision:
            self._plan_review_round += 1
            if self._plan_review_round == 1:
                plan_text = ""
                if request.details:
                    plan_text = str(request.details.get("plan_text", "")).strip()
                plan_lines = NotebookUI._to_lines(plan_text) or []
                if "Validate energy baselines before final answer." not in plan_lines:
                    plan_lines.append("Validate energy baselines before final answer.")
                revised_plan = "\n".join(plan_lines)
                response = ApprovalResponse(
                    request_id=request.request_id,
                    decision=ApprovalDecision.MODIFIED,
                    modified_code=revised_plan,
                    reason="Notebook approval inserted a validation step into the plan.",
                )
            else:
                response = ApprovalResponse(
                    request_id=request.request_id,
                    decision=ApprovalDecision.APPROVED,
                    modified_code=None,
                    reason="Subsequent plans approved without changes.",
                )
        else:
            response = ApprovalResponse(
                request_id=request.request_id,
                decision=ApprovalDecision.APPROVED,
                modified_code=None,
                reason="Auto-approved for notebook demo.",
            )
        self.history.append(response)
        return response

    def reset(self) -> None:
        self.history.clear()
        self._plan_review_round = 0

✓ Coder agent configured


## Build a notebook-friendly Magentic fleet

Create the fleet with default agents, wire our observer UI into the callback pipeline, and register the auto-approval policy. Checkpointing follows the repository configuration, so you can resume from prior runs when storage is enabled.


In [None]:
notebook_ui = NotebookUI()
approval_handler = NotebookApprovalHandler()

workflow_config = settings.workflow_config.get("workflow", {}) or {}
hitl_config = workflow_config.get("human_in_the_loop", {}) or {}
set_approval_handler(
    approval_handler,
    require_operations=hitl_config.get("require_approval_for", []),
    trusted_operations=hitl_config.get("trusted_operations", []),
)

checkpoint_storage = settings.create_checkpoint_storage()

fleet = MagenticFleet(
    checkpoint_storage=checkpoint_storage,
    approval_handler=approval_handler,
    approval_policy=hitl_config,
    console_callbacks=ConsoleCallbacks(notebook_ui),
)

storage_label = type(checkpoint_storage).__name__ if checkpoint_storage else "disabled"
print(f"✓ Fleet ready (checkpoint storage: {storage_label})")

✓ Event callback defined


## Inspect workflow configuration

Summarize the active Magentic settings so we know which features are available during the run (manager model, plan review, HITL, and checkpoint persistence).


In [None]:
fleet_config = settings.workflow_config.get("fleet", {}) or {}
checkpoint_config = (
    workflow_config.get("checkpointing", {}) if isinstance(workflow_config, dict) else {}
)

config_summary = {
    "manager_model": fleet_config.get("manager", {}).get("model"),
    "plan_review_enabled": fleet_config.get("plan_review", {}).get("enabled"),
    "orchestrator_limits": {
        "max_rounds": fleet_config.get("orchestrator", {}).get("max_round_count"),
        "max_stalls": fleet_config.get("orchestrator", {}).get("max_stall_count"),
        "max_resets": fleet_config.get("orchestrator", {}).get("max_reset_count"),
    },
    "hitl": {
        "enabled": hitl_config.get("enabled"),
        "requires": hitl_config.get("require_approval_for", []),
        "trusted": hitl_config.get("trusted_operations", []),
    },
    "checkpointing": {
        "enabled": checkpoint_config.get("enabled"),
        "storage_type": checkpoint_config.get("storage_type"),
        "storage_path": checkpoint_config.get("storage_path"),
        "active_storage": type(checkpoint_storage).__name__ if checkpoint_storage else "disabled",
    },
}

print(json.dumps(config_summary, indent=2))



Building Magentic workflow...
✓ Workflow built successfully


## Define a sample orchestration task

Use an energy-efficiency comparison prompt that forces the researcher, coder, and analyst to collaborate. The task is identical to the CLI demo so checkpoints can be reused between environments.


In [None]:
TASK_REQUEST = (
    "I am preparing a report on the energy efficiency of different "
    "machine learning model architectures. "
    "Compare the estimated training and inference energy "
    "consumption of ResNet-50, BERT-base, and GPT-2 on "
    "standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, "
    "WebText for GPT-2). Estimate the CO2 emissions for each "
    "model assuming training on an Azure Standard_NC6s_v3 VM "
    "for 24 hours, and surface the assumptions you make. "
    "Provide tables for clarity, recommend the most "
    "energy-efficient model per task type (image classification, "
    "text classification, text generation), and call out any data "
    "gaps that should be verified."
)

print(f"✓ Task prepared ({len(TASK_REQUEST.split())} words)")

✓ Task defined: I am preparing a report on the energy efficiency of different machine learning model architectures. ...


## Async helpers for running and summarizing the fleet

The helpers below reset state before each run, invoke the Magentic workflow with optional checkpoint resume, and materialize a summary of every artifact captured by the notebook UI.


In [None]:
def reset_observers() -> None:
    notebook_ui.reset()
    approval_handler.reset()


async def run_magentic_demo(task: str, *, resume_from: str | None = None) -> str:
    reset_observers()
    fleet.set_workflow_id("notebook-demo")
    result = await fleet.run(task, resume_from_checkpoint=resume_from)
    return result


def build_summary() -> dict[str, Any]:
    summary = notebook_ui.latest_summary()
    summary["approvals"] = [
        {
            "request_id": response.request_id,
            "decision": response.decision.value,
            "reason": response.reason,
        }
        for response in approval_handler.history
    ]
    return summary


def display_summary() -> None:
    if not notebook_ui.plan_history:
        print("No orchestration run has been recorded yet.")
        return
    print(json.dumps(build_summary(), indent=2))


Starting workflow execution...


[ORCH:user_task]

I am preparing a report on the energy efficiency of different machine learning model architectures. Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). Then, estimate the CO2 emissions associated with each, assuming training on an Azure Standard_NC6s_v3 VM for 24 hours. Provide tables for clarity, and recommend the most energy-efficient model per task type (image classification, text classification, and text generation).
--------------------------

[ORCH:task_ledger]


We are working to address the following user request:

I am preparing a report on the energy efficiency of different machine learning model architectures. Compare the estimated training and inference energy consumption of ResNet-50, BERT-base, and GPT-2 on standard datasets (e.g., ImageNet for ResNet, GLUE for BERT, WebText for GPT-2). Then,

## Execute the workflow (optional)

Run the coroutine in an async-enabled notebook cell (IPython already exposes `await`). The run streams Magentic events, applies the notebook HITL policy, and persists checkpoints when enabled.


In [None]:
# Uncomment the lines below after configuring credentials to execute the workflow.
# result_text = await run_magentic_demo(TASK_REQUEST)
# print(result_text)

### Inspect orchestration artifacts

Use the helper below after a run to show every plan, progress ledger evaluation, agent response, and approval captured during orchestration.


In [None]:
display_summary()

### Plan review details

The approval handler stores every plan review interaction. The snippet below highlights how the first plan is revised while subsequent requests are auto-approved.


In [None]:
[
    {
        "request_id": response.request_id,
        "decision": response.decision.value,
        "reason": response.reason,
        "modified_plan_preview": (response.modified_code or "")[:200],
    }
    for response in approval_handler.history
]

### Checkpoint catalog

When checkpointing is enabled, this call returns the available resume points for the current workflow identifier (`notebook-demo`).


In [None]:
# await fleet.list_checkpoints()

### Resume from a checkpoint (optional)

Pick a checkpoint identifier from the catalog and pass it to `run_magentic_demo`. This lets you inspect the behavior of the orchestrator when recovering mid-plan.


In [None]:
# checkpoint_id = "fleet_xxxxxxxx"  # Replace with a value from fleet.list_checkpoints()
# result_from_resume = await run_magentic_demo(TASK_REQUEST, resume_from=checkpoint_id)
# print(result_from_resume)

### Next steps

- Swap in different tasks to stress-test delegation and replanning.
- Toggle HITL requirements in `workflow.yaml` to compare automated versus interactive plan reviews.
- Use the checkpoint catalog to resume stalled runs or branch off alternative investigation paths.
