feat(workflow): add dynamic-node orchestration#896
Merged
Conversation
da7998d to
236da7e
Compare
0ffb269 to
1fe9bcc
Compare
eb20bd1 to
111962b
Compare
1fe9bcc to
a47f993
Compare
4330cdd to
10fa9c6
Compare
a47f993 to
426bae0
Compare
10fa9c6 to
ad9311d
Compare
e818086 to
ac10bf2
Compare
hanorik
reviewed
May 27, 2026
1a24cc0 to
7760b0e
Compare
hanorik
approved these changes
May 27, 2026
c455805 to
904a622
Compare
7760b0e to
e953f8d
Compare
Build the user-facing API for dynamic workflows on top of the sub-scheduler skeleton: a single constructor for the orchestrator, a generic helper for scheduling children, and the option to override the auto-generated run id. * `workflow/dynamic_node.go`: `NewDynamicNode[IN, OUT]` and `NewDynamicNodeWithSchema[IN, OUT]` constructors; `DynamicFn[IN, OUT]` generic type alias for the orchestrator body signature `func(ctx NodeContext, in IN, emit func(*session.Event) error) (OUT, error)`. `cfg.RerunOnResume` defaults to &true when unset — the constructor knows this is a dynamic node, so the user shouldn't have to remember the requirement (adk-python checks lazily at the first `ctx.run_node` call and raises ValueError, which is poor UX). An explicit &false is respected for advanced cases (testing with mocked RunNode). `dynamicNode.Run` wires the sub-scheduler into a NodeContext for the orchestrator body, drives the user function, forwards events via `emit` (= yield adapter), and emits a terminal Event.Output. ErrNodeInterrupted returned from a RunNode call is swallowed at this boundary: the sub-scheduler has already forwarded the RequestedInput event upstream, so the engine sees the pause via that event, not via a yielded error. Typed input coercion mirrors FunctionNode (type assertion + typeutil JSON roundtrip fallback) so an upstream tool node emitting map[string]any can feed a downstream dynamic node expecting a typed struct. * `workflow/run_node.go`: `RunNode[OUT any](ctx, child, input, opts...)` top-level generic helper. Top-level (not a method) because Go has no generic methods; the established workaround for typed per-call APIs. Reaches the per-activation sub-scheduler via type-assertion on `*nodeContext` — the concrete impl is unexported but always the one the engine constructs, including after `WithGoCtx`. Misuse (a static NodeContext, or a child-of-child re-parented incorrectly) surfaces as ErrInvalidRunNodeContext. Output is `rawOut.(OUT)`-asserted; a type mismatch yields a clear error naming the child and the actual vs. expected types. `WithRunID(id)` overrides the auto-counter with a stable user-supplied identifier. Validation lives in the sub-scheduler (single source of truth for the rule), so misuse surfaces as ErrInvalidRunID from RunNode. * `session/session.go`: add `Event.NodeInfo *NodeInfo` field with `NodeInfo.Path` carrying the composite path of the emitting node within its workflow activation; empty for top-level static nodes, "<parent_path>/<child_name>@<run_id>" for dynamic children. The substruct shape mirrors adk-python's `event.nodeInfo` envelope so future fields (output_for, message_as_output) land here rather than swelling Event itself. Required so the top-level scheduler can scope its per-activation Output/Routes invariants per emitting path rather than collapsing them onto the top-level node name. Without this, a dynamic node forwarding a child's Output-bearing event upstream while also emitting its own terminal Output would trip ErrMultipleOutputs. This matches adk-python's behavior — Python explicitly allows multiple output-bearing events per `ctx.run_node` (one from the child, one from the parent), distinguishing them by node_info.path in the shared invocation queue. * `workflow/scheduler.go::handleEvent`: pass through events whose NodeInfo.Path identifies a descendant of the top-level node without updating the top-level Output/Routes accumulator. Descendant RequestedInput is the exception — promoted to the parent's accumulator so Workflow.Resume can match by InterruptID against the parent's NodeState.PendingRequest. * `workflow/dynamic_scheduler.go`: stamp NodeInfo.Path on every forwarded child event so handleEvent can scope correctly. * `workflow/dynamic_node.go`: stamp NodeInfo.Path on the dynamic node's own terminal event. Tests: 19 new test cases covering constructor defaults, explicit RerunOnResume override, end-to-end sequential RunNode chains, typed input direct + JSON-fallback coercion, mid-body emit forwarding, HITL swallowing (no error yielded, RequestedInput present in stream), child failure propagation, terminal output event, schema constructor smoke test, plus all RunNode error paths (invalid context, type mismatch, interrupted, failed), WithRunID composite-path appearance, WithRunID invalid rejection, nil-output zero-value. Plus an end-to-end integration test driving a dynamic orchestrator through workflow.New + Run that verifies a child's terminal output and the parent's terminal output both reach the workflow stream without tripping ErrMultipleOutputs. A second integration test in agent/workflowagent verifies HITL round-trip: child RequestedInput inside a dynamic orchestrator transitions the orchestrator to NodeWaiting, and resume re-enters the orchestrator to produce the final output. `go build ./...`, `go test -race ./workflow/... ./agent/workflowagent/...`, `gofmt -l` all clean. Stacked on the sub-scheduler skeleton PR. After both merge, the core dynamic-workflows surface is in place; resume/replay-skip, parent re-entry across more scenarios, and parallel HITL detection follow in subsequent PRs.
e953f8d to
a69f764
Compare
wolo-lab
added a commit
that referenced
this pull request
Jun 2, 2026
Wires the user-facing API for dynamic workflows on top of the sub-scheduler skeleton from the previous PR. A dynamic node's execution order is expressed as Go code (loops, branches, goroutines) that calls other nodes inline via RunNode, branches on their typed output, and pauses for HITL input. The public surface: workflow.NewDynamicNode[IN, OUT](name, fn, cfg) — orchestrator constructor; cfg.RerunOnResume defaults to &true (an explicit &false is respected). workflow.RunNode[OUT](ctx, child, input, opts...) — generic helper for scheduling a child. Returns its typed output, or errors.Is-matchable ErrNodeInterrupted / ErrNodeFailed. workflow.WithRunID(id) — option overriding the auto-counter with a stable id (rejected if empty, purely numeric, or containing / or @). session.NodeInfo — substruct on Event carrying the emitting node's composite path; shape mirrors adk-python's event.nodeInfo. The scheduler's handleEvent scopes per-activation Output/Routes invariants by NodeInfo.Path, so a dynamic node forwarding a child's terminal output plus its own no longer trips ErrMultipleOutputs. Descendant RequestedInput events are promoted onto the parent's accumulator so Workflow.Resume matches the InterruptID against the parent's NodeState.PendingRequest — enabling HITL inside a dynamic orchestrator.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Wires the user-facing API for dynamic workflows on top of the
sub-scheduler skeleton from the previous PR. A dynamic node's
execution order is expressed as Go code (loops, branches, goroutines)
that calls other nodes inline via
RunNode, branches on their typedoutput, and pauses for HITL input.
The public surface:
workflow.NewDynamicNode[IN, OUT](name, fn, cfg)— orchestratorconstructor;
cfg.RerunOnResumedefaults to&true(an explicit&falseis respected).workflow.RunNode[OUT](ctx, child, input, opts...)— generichelper for scheduling a child. Returns its typed output, or
errors.Is-matchableErrNodeInterrupted/ErrNodeFailed.workflow.WithRunID(id)— option overriding the auto-counterwith a stable id (rejected if empty, purely numeric, or
containing
/or@).session.NodeInfo— substruct onEventcarrying the emittingnode's composite path; shape mirrors adk-python's
event.nodeInfo.The scheduler's
handleEventscopes per-activationOutput/Routesinvariants by
NodeInfo.Path, so a dynamic node forwarding a child'sterminal output plus its own no longer trips
ErrMultipleOutputs.Descendant
RequestedInputevents are promoted onto the parent'saccumulator so
Workflow.Resumematches the InterruptID against theparent's
NodeState.PendingRequest— enabling HITL inside a dynamicorchestrator.
Stacked on the sub-scheduler skeleton PR; follow-ups add
resume/replay-skip, broader parent re-entry scenarios, and parallel
HITL detection.
Tested:
go build ./...,go test -race ./workflow/... ./agent/workflowagent/...,gofmt -lclean. Coverage includesconstructor defaults and overrides, sequential
RunNodechains,typed-input coercion (direct + JSON fallback), mid-body emit, HITL
swallow, error propagation, terminal output, plus end-to-end tests
for child+parent output forwarding and HITL round-trip via
Workflow.Run+Workflow.Resume.