spike(workflow): concurrent dynamic dispatch harness (DynamicNodeSupervisor + ctx.pipeline)#3
Draft
caohy1988 wants to merge 25 commits into
Draft
spike(workflow): concurrent dynamic dispatch harness (DynamicNodeSupervisor + ctx.pipeline)#3caohy1988 wants to merge 25 commits into
caohy1988 wants to merge 25 commits into
Conversation
3e9ce6e to
7d366b5
Compare
…rvisor + ctx.pipeline) RFC google#92 reference harness. DynamicNodeSupervisor (gate-on-leaf, TaskGroup fan-out) + ctx.pipeline/ctx.parallel on the real ADK Workflow engine. 11 deterministic CI-safe tests (no LLM) + an env-gated live Gemini E2E. Proves barrier-free execution, failed-item isolation, control-exception cancellation (requires TaskGroup, not gather), nested no-deadlock with leaf gating (+ driver-gating deadlock contrast), and resume exactly-once for completed children. pyink/isort/mdformat clean.
A model emits a declarative, validated WorkflowSpec (typed data, not code) that the framework validates and executes on the real ADK engine via the google#92 supervisor. authoring.py (WorkflowSpec plain kind-tagged recursive union; CapabilityRegistry; WorkflowSpecValidator; SpecInterpreter for step/fan_out/ branch/loop_until), 10 deterministic tests, env-gated live planner sweep (multi-stage/branch/loop on gemini-3.5-flash, shape-specific assertions). Findings folded into the RFC: open dict[str,X] maps are a structured-output hazard (Branch.routes -> list[Route]); Gemini response_schema rejects Field(discriminator=...), so the vocabulary is a plain kind-tagged union; planning vs capability quality are separable. pyink/isort/mdformat clean.
4a70aa3 to
8e8905f
Compare
A discoverable `root_agent` (a Workflow) that exposes the google#93 flow in ADK Web: the model authors a typed WorkflowSpec, ADK validates it against the capability registry, freezes spec+hash to session state, and executes it on the real engine via the google#92 supervisor — each step surfaced as a chat message so the authored plan, validation, capabilities, frozen hash, and final output are all visible in the ADK Web chat / State / Events surfaces. adk web contributing/samples/workflows/authored_workflow_demo Load-or-author: if a frozen spec already exists in the session it is REUSED (planner not re-invoked) and replayed — so the resume/reproducibility claim is real, verified by a CI-safe no-LLM test (test_demo_agent.py: import + name + registry + spec validation + reuse-path-with-stub-registry). Reuses the authored_workflow_spike/ stack; model from env (default gemini-2.5-flash; gemini-3.5-flash needs location=global); no hardcoded project. README is the ~7-min recording script. pyink/isort/mdformat clean; 4 demo tests pass.
8e8905f to
fd3bd52
Compare
…rage tiers) Standalone design for the authored-workflow spike: data model, validator, interpreter, frozen-spec contract, security model, testing, empirical findings, and the plan export/storage tiers (v1 per-run persist; v1.1 portable JSON export envelope; v2 reusable templates with import-time registry revalidation; compiled Workflow is a derived artifact, never the stored source of truth).
…a, import-input rule DESIGN.md §10: spec_hash/task_input_digest defined as sha256 over canonical JSON; envelope carries an optional task_input_schema; import contract — digest is advisory provenance for replaying the original run, template reuse validates a new task input against task_input_schema, else replay-only on matching digest or explicit template promotion. Never silently bind a stored plan to an incompatible task shape.
One FrozenWorkflowRecord backs session state, audit event, and export envelope
(§5/§10) — v1 persists the full record under authored_workflow:frozen_record,
not a weaker {spec,hash} subset. import_plan recomputes spec_hash (reject on
mismatch) and re-runs validation against the current registry rather than
trusting envelope.validation; replay vs template execution-input rule made
explicit. 'discriminated-by-kind' -> 'plain kind-tagged union' wording fix.
…-first, 3-5-task build gate)
Demo persists only {spec, hash}; production v1 stores the full
FrozenWorkflowRecord (DESIGN.md §5). State it explicitly in DESIGN.md, the demo
agent.py freeze step, and DEMO_NARRATIVE.md so the demo isn't misread as the
canonical persistence contract.
…aude Code comparison Pipeline/PipelineStage make barrier-free multi-stage per-item flow first-class so the authoring vocabulary is not less expressive than its google#92 executor. Add a candid 'Comparison to Claude Code Dynamic Workflows' (wins: audit/safety; gaps: expressiveness/maturity, plan-size ceiling, quality-pattern templates, scale). Hierarchical/sub-plan authoring noted as post-gate future, not MVP.
authoring.py now covers step/fan_out/pipeline/branch/loop_until. Pipeline + PipelineStage compile to google#92 ctx.pipeline: each item threads ALL stages barrier-free (item A in stage k while item B in stage 1) — NOT two barriered fan_outs. Validator: over is a list, every stage capability exists and takes an item, stage input scope. Interpreter: stage[0] input defaults to the per-item element, stage[n] to stage[n-1] output; collect=list. 3 new deterministic tests (13 total): validator accept/reject pipeline, and an ordered + BARRIER-FREE proof (a verifier starts before the slow reviewer finishes — impossible with two barriered fan_outs).
P2: each Pipeline stage dispatches once per item, so every stage capability is subject to the same data-dependent fan_out cap as FanOut. The interpreter now rejects (pre-dispatch) when len(items) > stage cap.max_fan_out, closing a gap where a pipeline over N items bypassed a capability's cap — the RFC security model relies on runtime enforcement of these caps. New deterministic test (14 total) asserts rejection before any stage runs. P3: sync stale shape lists (add pipeline) and test counts after the prior Pipeline addition — authoring 10/13 -> 14, totals 11+14+4 = 29, across authoring.py, test_authoring.py, both READMEs, DESIGN.md, DEMO_NARRATIVE.md.
…y-audit demo The demo now exercises Pipeline — the construct that closes the Claude Code gap — without adding visual complexity. The planner authors pipeline -> step -> step: a reviewer->verifier pipeline over the files (each file reviewed then its finding verified, barrier-free per item), then triager, then formatter. - agent.py: add a 'verifier' capability (Finding -> confirmed Finding); planner instruction authors the pipeline; capability collection walks pipeline stages so the displayed/audited capability set includes stage caps. - test_demo_agent.py: demo spec + stub registry use Pipeline/verifier; reuse path still proves no-LLM frozen replay. - Narrative + README: Beat 1 plan is pipeline -> step -> step; Beat 4 calls out reviewer/verifier interleaving per file; fresh captured hash 1f4c0883beb6. Validated live on gemini-3.5-flash: planner authored the pipeline 3/3 trials (no flakiness); reuse replays the same hash without re-invoking the model.
…e registry The first chat message hardcoded (reviewer, triager, formatter) and so contradicted the validation list after verifier was added. Derive it from CapabilityRegistry.names() (new accessor) so the recording can't drift from the registered set again.
…plan + demo beat Makes the frozen plan a first-class, portable artifact (DESIGN.md §10), so the RFC's enterprise claim (reviewable / diffable / replayable model-authored plans) is real, not paper. authoring.py: - FrozenWorkflowRecord (the single §5 shape) + ValidationResult; FrozenWorkflowRecord.freeze() captures spec_hash, planner_model, registry + per-capability versions, validation, task_input_digest. - canonical_json / sha256_hex — the one fixed hash definition (sort_keys + tight separators) so two exporters agree. - export_plan(record) -> dict; import_plan(envelope, registry, task_input=None) that NEVER trusts the envelope: recompute sha256 (reject tamper), re-validate vs CURRENT registry (reject dropped capability), reject per-capability version drift; execution-input contract (replay needs matching input digest; template needs task_input_schema). - Capability.version + CapabilityRegistry.capability_versions() for drift detection; referenced_capabilities() walker. test_authoring.py (+5 -> 19): round-trip replays same hash; tamper rejected; dropped capability rejected; version drift rejected; new input without template schema rejected (and accepted once a schema is attached). demo: an 'Export plan' beat writes the full envelope to security_audit_plan.json and re-imports it (proving defensive import). Unifies the displayed hash on the canonical definition. Narrative/README gain Beat 3b; counts 11+19+4 = 34. Generated envelope + demo session dbs gitignored. Validated live on gemini-3.5-flash: export beat writes a complete envelope, re-import passes, reuse replays the same hash.
…sort P1: isort ordering in test_authoring.py imports (PlanImportError after PipelineStage) — pre-commit clean. P2: import_plan now hard-errors on registry_version drift (envelope vs current registry.version), matching DESIGN.md §10 'registry-version match … drift = hard error'. Previously only dropped capabilities + per-capability versions were checked. P3: import_plan rejects an unsupported schema_version (only 'v1' supported) — a defensive importer refuses formats it can't read. +2 deterministic tests (-> 21): registry-version drift and unsupported schema_version both rejected. Counts 11+21+4 = 36.
…bservability Adds DESIGN §11 'Convergence with ADK AgentConfig' (renumbers Future -> §12) in response to reviewer questions on issue google#93: - Lower the static subset (sequence/parallel/loop) to ADK's Sequential/Parallel/ LoopAgentConfig instead of reinventing serialization; keep branch/fan_out/ pipeline as new types only because config can't express them (static sub_agents resolved once at load; no ConditionalAgent; needs google#92 ctx.pipeline). - Why the planner does NOT emit raw AgentConfig: static graph; Discriminator union rejected by response_schema; FQN tool/agent/callback refs (importlib, no allow-list) re-open the code-exec surface the declarative+allow-list model closes. - Q1 storage: FrozenWorkflowRecord in session State + audit event + export envelope. - Q2 custom tools: registered capability by registry name (allow-list), not FQN. - Q3 version/observability: spec_hash + registry/capability versions -> drift rejected on import; compiled Workflow runs on the real engine so ADK tracing applies. All claims source-verified against agents/agent_config.py and config_agent_utils.py.
Address review on the convergence section: - 'design converges / should lower', not 'now lowers' — the spike does not yet implement an AgentConfig-lowering compiler (explicit caveat added). - precise table: static parallel block -> ParallelAgentConfig; runtime fan_out/pipeline/branch have no direct config equivalent (ParallelAgentConfig is static parallel sub-agents, not data-mapping over a runtime list). - soften FQN wording to a trust-boundary mismatch (FQN imports are fine for developer-authored config; the concern is a MODEL authoring raw FQNs), not 'config is unsafe'.
Tie the demo to RFC google#93 §11 without overclaiming: this plan's top-level sequence is the kind of static shape that should lower to SequentialAgentConfig, while the reviewer->verifier pipeline (per-item over a runtime list) is exactly what AgentConfig can't express. Explicitly notes this is a design direction — the demo runs via SpecInterpreter and does NOT lower to AgentConfig (no such compiler in the spike). README section + a presenter aside in the narrative.
… (§11) Make the convergence concrete instead of paper. lower_to_agent_config() projects a WorkflowSpec's static skeleton onto ADK AgentConfig shapes: - sequence -> SequentialAgent; loop -> LoopAgent (max_iterations); leaf step -> LlmAgent, referenced by ALLOW-LISTED capability name (never an importable FQN); - dynamic blocks (fan_out over a runtime list, pipeline, branch) are emitted as explicit <no-AgentConfig-equivalent> markers, never fabricated as config. Illustrative structural projection — NOT a loadable root_agent.yaml. A full loadable-config compiler stays future (DESIGN §12). - authoring.py: lower_to_agent_config / agent_config_coverage / _lower_block. - test_authoring.py (+4 -> 25): pure-sequence lowers to SequentialAgent; loop -> LoopAgent; dynamic blocks flagged unsupported; projection never emits an FQN. - demo: a '🧬 AgentConfig lowering' beat prints the projection (2/3 of the demo plan lowers; pipeline flagged) — validated live on gemini-3.5-flash. - demo test (+1 -> 5): demo plan lowers to SequentialAgent + 2 LlmAgent leaves, pipeline no-equivalent, no FQN. - docs: README/DESIGN §11/narrative updated to 'demonstrated' (not 'not shown'); counts 11+25+5 = 41.
…loop lowering; fix count - §11: AgentConfig + Sequential/Parallel/LoopAgentConfig + BaseAgentConfig are @deprecated + @experimental in this checkout (agent_config.py:72-73, sequential_agent_config.py:28, loop_agent_config.py:30) — so this is convergence with the existing config SHAPE for compatibility, not a long-term dependency on deprecated YAML config. - qualify LoopUntil lowering: only the max_iterations skeleton; the until-predicate has no AgentConfig field (enforced by the interpreter). - §9: demo count 4 -> 5 CI-safe tests.
…o docs Match RFC §11 in the demo-facing materials so an ADK TL watching the demo gets the same one-sentence caveat: AgentConfig + the concrete config classes are @deprecated + @experimental in ADK source, so the lowering is convergence with the config SHAPE for compatibility, not a long-term dependency on YAML config. Added to the README talking point and the narrative aside.
…e trailer Declutter the 🧬 ADK config lowering beat: - remove the 'reason' field from the fan_out/pipeline/branch unsupported markers (workflowspec_kind already names the construct); no test referenced it. - drop the trailing italic 'illustrative structural projection…' line from the demo chat message (the README/narrative/DESIGN already carry that framing).
caohy1988
added a commit
that referenced
this pull request
Jun 8, 2026
Caught in review of #6: the C7 pair keys (pause_kind, function_call_id) were being passed via EventData.extra_attributes, which _enrich_attributes() copies at the top of attrs *before* attrs["adk"] = _build_adk_envelope(...). That landed them at attributes.pause_kind / attributes.function_call_id, not attributes.adk.pause_kind / attributes.adk.function_call_id. The customer SQL pinned in google#293 v5 acceptance #3 is: JSON_VALUE(attributes, '$.adk.function_call_id') = JSON_VALUE(...) so the pair join would have returned null on every row. This commit makes the contract match the SQL. Changes: * EventData gains adk_extras: dict[str, Any], a sibling of extra_attributes that lives INSIDE attributes.adk. * _enrich_attributes merges adk_extras into the envelope after _build_adk_envelope (envelope wins on conflict — producer-derived identity fields like source_event_id are the source of truth). * The two emit sites (TOOL_PAUSED in on_event_callback, TOOL_COMPLETED in on_user_message_callback) pass the pair keys via adk_extras= instead of extra_attributes=. * The three C7 tests are updated to assert json.loads(row["attributes"])["adk"]["pause_kind"] etc., locking in the right shape this time. Full plugin suite: 252 passed.
4 tasks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
1.
dynamic_supervisor_spike/— RFC google#92 (concurrent dynamic dispatch)Prototype
DynamicNodeSupervisor(gate-on-leaf,TaskGroupfan-out) +ctx.pipeline/ctx.parallelon the real ADK engine.supervisor.py,test_dynamic_supervisor_spike.py(11 deterministic tests),test_live_gemini_e2e.py(env-gated),README.md.TaskGroup, notgather), nested no-deadlock with leaf gating (+ driver-gating deadlock contrast), resume exactly-once for completed children.2.
authored_workflow_spike/— RFC google#93 (agent-authored typed Workflows)A model emits a declarative, validated
WorkflowSpec(typed data, not code) executed on the real engine via the google#92 supervisor.authoring.py(WorkflowSpecplainkind-tagged-union tree,CapabilityRegistry,WorkflowSpecValidator,SpecInterpreterfor step/fan_out/pipeline/branch/loop_until, plusFrozenWorkflowRecord/export_plan/import_plan),test_authoring.py(25 deterministic tests, incl. a barrier-free pipeline proof, per-stagemax_fan_outenforcement, and plan export/import round-trip + tamper / capability+registry version drift / schema_version checks, plus ADK-config lowering of the static subset),test_live_planner_sweep.py(env-gated; multi-stage/branch/loop ongemini-3.5-flash),DESIGN.md(canonical technical design incl. plan export/storage tiers),README.md.call_agent_asyncfunction not provided google/adk-python#93: open-dictmaps are a structured-output hazard (Branch.routes→list[Route]); Geminiresponse_schemarejectsField(discriminator=...)so the vocabulary is a plainkind-tagged union; planning vs capability quality are separable.root_agent.yaml(DESIGN §11, per reviewer feedback on Tutorial: Updatedcall_agent_asyncfunction not provided google/adk-python#93): static graph skeletons should lower/export toward thecontributing/samples/workflows/loop_config/root_agent.yamlstyle (agent_class: Workflow, staticedges, child YAML), whileWorkflowSpecremains the model-facing source of truth. Raw YAML is not the planner output becauseloop_configintentionally resolves Python function refs (.agent.route_headline),_coderefs, child config paths, tools/callbacks, and possibly FQNs.branch/runtimefan_out/pipelinestay new typed blocks because config does not directly express runtime per-item dispatch / barrier-free multi-stage flow; YAML would need a wrapper node. Caveat:Workflowitself is not deprecated, but the current config loader path and agent-config sugar classes are@deprecated+@experimental; this is convergence with the Workflow config shape for compatibility, not a long-term dependency on today's loader or deprecated sugar.3.
authored_workflow_demo/— ADK Web demo wrapperA discoverable
root_agent(aWorkflow) that exposes the flow in ADK Web — authors → validates → freezes-to-state → exports → lowers static config shape → executes, surfacing each step as a chat message (authored plan, validation, capabilities, frozen hash, exported plan, config projection, output).security_audit_planner/agent.py,test_demo_agent.py(5 CI-safe tests, incl. a no-LLM reuse-path test + the config-lowering assertion),README.md(the ~7-min recording script),DEMO_NARRATIVE.md(beat-by-beat narration from a real run).adk web contributing/samples/workflows/authored_workflow_demo.Hygiene
pyink/isort/mdformat/pre-commitclean. Deterministic suites: 11 + 25 + 5 = 41 green. Live tests env-gated (skip withoutSPIKE_LIVE+ project). The fork-onlyagent-triage-pull-requestcheck fails on an emptyGITHUB_TOKEN(non-actionable; won't occur upstream).