Conversation
📝 WalkthroughWalkthroughThis PR introduces a complete workflow execution system. A new workflow package executes DAG-based workflow definitions on the local machine, managing node scheduling, model invocations, and relay-based event streaming. The daemon routes workflow messages and coordinates execution. A dependency is bumped, and fake-claude gains prompt detection logic. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Daemon
participant Executor
participant Scheduler
participant ModelBinary as Model Binary
participant Relay
Client->>Daemon: WorkflowRun (definition, binary)
activate Daemon
Daemon->>Executor: NewExecutor(def, cwd, relay, binary)
Daemon->>Executor: Run(ctx)
deactivate Daemon
activate Executor
Executor->>Relay: WorkflowStarted
rect rgba(135, 206, 250, 0.5)
loop Until terminal state
Executor->>Scheduler: ReadyNodes(def, completed, running)
Scheduler-->>Executor: ready nodes
par Execute ready nodes concurrently
rect rgba(144, 238, 144, 0.5)
alt Prompt node execution
Executor->>ModelBinary: os/exec (with stdin/stdout)
activate ModelBinary
ModelBinary-->>Executor: NDJSON stream
deactivate ModelBinary
Executor->>Executor: Parse events, extract text/usage
Executor->>Relay: WorkflowNodeStream
else Script/Input/Directory node
Executor->>Executor: Execute bash/read input/set CWD
end
Executor->>Executor: Update completed/running/failed state
Executor->>Relay: WorkflowNodeCompleted/Error
end
end
end
end
Executor->>Relay: WorkflowCompleted (or WorkflowError)
deactivate Executor
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (4)
internal/workflow/types.go (1)
49-56: Consider adding basic validation to ParseDefinition.The function parses JSON successfully but doesn't validate semantic constraints (e.g., duplicate node IDs, edges referencing non-existent nodes). Invalid definitions could cause subtle runtime issues in the executor.
🔧 Optional: Add validation for node ID uniqueness
func ParseDefinition(data json.RawMessage) (*Definition, error) { var def Definition if err := json.Unmarshal(data, &def); err != nil { return nil, err } + seen := make(map[string]bool, len(def.Nodes)) + for _, n := range def.Nodes { + if seen[n.ID] { + return nil, fmt.Errorf("duplicate node ID: %s", n.ID) + } + seen[n.ID] = true + } return &def, nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workflow/types.go` around lines 49 - 56, Enhance ParseDefinition to perform basic semantic validation after json.Unmarshal: inspect the returned Definition (type Definition) and validate that all Node IDs (e.g., fields on Node) are unique (detect duplicates), that every Edge/transition references existing node IDs, and any required root/start node exists; return a descriptive error if any check fails. Locate ParseDefinition and add a small validator that builds a map[string]bool of node IDs, iterates nodes to detect duplicates, then iterates edges/transitions to ensure their source/target IDs are present in the map, returning errors that mention the offending ID and using the existing error return from ParseDefinition.internal/workflow/executor.go (2)
309-347: Stderr is not captured, making failure diagnosis difficult.If the claude binary writes error details to stderr (e.g., authentication failures, invalid arguments), this information is lost. The caller only receives a generic "claude exited" error.
🔧 Capture stderr for error diagnostics
stdout, err := cmd.StdoutPipe() if err != nil { return nil, fmt.Errorf("stdout pipe: %w", err) } + var stderrBuf strings.Builder + cmd.Stderr = &stderrBuf if err := cmd.Start(); err != nil { return nil, fmt.Errorf("start claude: %w", err) } // ... scanning loop ... if err := cmd.Wait(); err != nil { - return nil, fmt.Errorf("claude exited: %w", err) + return nil, fmt.Errorf("claude exited: %w\nstderr: %s", err, stderrBuf.String()) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workflow/executor.go` around lines 309 - 347, The child process stderr is not captured, so when cmd.Wait() returns an error you lose diagnostic details; call cmd.StderrPipe() alongside cmd.StdoutPipe() (or set cmd.Stderr to a bytes.Buffer), start a goroutine to read the stderr stream into a buffer while NewNDJSONScanner reads stdout, and when cmd.Wait() returns include the collected stderr content in the returned error (e.g. append the stderr buffer string to the fmt.Errorf for "claude exited") so failures from cmd, authentication, or CLI args are surfaced; update references around cmd, stdout, resultRaw, and the cmd.Wait() error path to use that buffer.
123-136: Batch waiting reduces DAG parallelism.The
wg.Wait()blocks until all currently executing nodes complete before checking for newly ready nodes. If node A completes and enables downstream node C while node B is still running, C won't start until B finishes.For deeper DAGs with independent branches, this could significantly increase total execution time.
🔧 Optional: Use a channel-based approach for immediate scheduling
An alternative pattern schedules newly ready nodes as soon as any node completes:
// Instead of batch-waiting, use a completion channel doneCh := make(chan string) // receives completed node IDs // In executeNode, send to doneCh when done // In Run, select on doneCh to immediately schedule newly ready nodes🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workflow/executor.go` around lines 123 - 136, The current Run loop uses wg.Wait to block until every node in the batch finishes, which prevents newly ready nodes from starting immediately; change to a channel-driven completion model: create a doneCh (e.g., chan string) and remove the batch wg.Wait blocking; when launching each node in the loop (the goroutine calling executeNode), have executeNode (or its wrapper) send the node ID on doneCh when it completes and update e.running under e.mu; in Run, loop receiving from doneCh and on each completion evaluate and schedule any newly ready nodes (mark running and spawn goroutines) so downstream nodes like C can start as soon as a predecessor finishes; keep a small WaitGroup or counter to know when all work is finished and close doneCh appropriately to exit the scheduling loop.internal/workflow/ndjson.go (1)
29-72: Consider exposing scanner errors for caller inspection.The
Scan()method logs errors at debug level but doesn't expose them to callers. While this keeps the API simple, debugging production issues could be difficult if callers can't distinguish EOF from parse errors.Consider adding an
Err()method (followingbufio.Scannerconventions):🔧 Optional: Add Err() method for error inspection
+// Err returns any error encountered during scanning. +func (n *NDJSONScanner) Err() error { + return n.scanner.Err() +}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/workflow/ndjson.go` around lines 29 - 72, Add error exposure to NDJSONScanner by giving the struct a stored error field and an Err() method; in Scan(), when json.Unmarshal fails for the outer envelope or inner event envelope set that error into the new field (e.g., n.err = err) instead of only logging, and also set n.err = n.scanner.Err() if scanner.Err() is non-nil at the end of Scan(); implement func (n *NDJSONScanner) Err() error that returns n.err (or falls back to n.scanner.Err()) so callers can inspect parse or scanner errors while Scan() retains its current skip-on-parse-error behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/loop/daemon.go`:
- Around line 362-370: The current code unconditionally sets
d.workflows[msg.WorkflowRunID] = exec then starts exec.Run, which allows
duplicate WorkflowRunID to replace an existing entry; the first goroutine to
finish may delete the newer executor and break workflowStop. Fix by checking for
an existing entry under d.workflowMu before assigning: acquire d.workflowMu, if
an entry for msg.WorkflowRunID already exists return an error or reject the
start (do not overwrite), otherwise set d.workflows[msg.WorkflowRunID] = exec
and release the lock; ensure the goroutine that calls exec.Run still deletes
only if the map entry still matches exec (compare pointer/ID) before deleting to
avoid removing a newer runner.
- Around line 366-367: The current goroutine calls
exec.Run(context.Background()) which prevents daemon shutdown cancellation from
reaching the spawned Claude process; change it to call exec.Run with the
daemon's lifecycle context (e.g., pass the existing daemon ctx variable or a
context derived from the daemon stop context) so cancellation propagates:
replace context.Background() with the daemon's ctx, ensure any exec.Stop/Cancel
method or context-aware cleanup is invoked when ctx.Done() triggers, and if
needed track the goroutine with the daemon's wait group so shutdown waits for
in-flight exec.Run to exit.
- Around line 412-419: When cmd.StdoutPipe() or cmd.Start() fails in the
design-chat goroutine, don't just slog.Error and return; send a terminal/error
message back to the requester (the same stream/channel used for design-chat
responses) including the actual err, ensure any opened resources are cleaned up
(close pipes, cancel context or close the stream), and then terminate the
goroutine. Update the error branches around cmd.StdoutPipe() and cmd.Start() to
push a clear failure event into the requester's response channel/terminal stream
(and include err in the payload), call any existing cleanup/cancel function, and
only then return.
- Around line 422-448: Accumulate Claude's final patch payloads while scanning
instead of always sending an empty array: when iterating with
workflow.NewNDJSONScanner (scanner) collect/parse patch JSON from
scanner.Event().Raw into a patches variable (e.g., []json.RawMessage or a single
json.RawMessage) and use that variable for the Patches field of
WorkflowDesignChatComplete; also change the error handling around scanner.Scan()
and cmd.Wait() so scanner/command failures do not fall through to sending a
successful WorkflowDesignChatComplete — on error, send an appropriate failure
notification (or return) instead of reporting completion. Ensure you update the
d.client.Send call that constructs &protocol.WorkflowDesignChatComplete to use
the collected patches and reference scanner, cmd.Wait, and
WorkflowDesignChatComplete in the fix.
In `@internal/workflow/scheduler_test.go`:
- Around line 29-33: The test currently only checks the length of ids returned
from nodeIDs(ready); update the assertion to verify the actual IDs match the
expected slice (e.g., ["b","c"]) rather than just len(ids). Replace the len(ids)
check with a strict equality check using reflect.DeepEqual or slices.Equal (or
t.Fatalf/t.Errorf with a formatted comparison) comparing ids to the expected
slice, so ReadyNodes(def, completed, running) is validated for the correct node
IDs.
---
Nitpick comments:
In `@internal/workflow/executor.go`:
- Around line 309-347: The child process stderr is not captured, so when
cmd.Wait() returns an error you lose diagnostic details; call cmd.StderrPipe()
alongside cmd.StdoutPipe() (or set cmd.Stderr to a bytes.Buffer), start a
goroutine to read the stderr stream into a buffer while NewNDJSONScanner reads
stdout, and when cmd.Wait() returns include the collected stderr content in the
returned error (e.g. append the stderr buffer string to the fmt.Errorf for
"claude exited") so failures from cmd, authentication, or CLI args are surfaced;
update references around cmd, stdout, resultRaw, and the cmd.Wait() error path
to use that buffer.
- Around line 123-136: The current Run loop uses wg.Wait to block until every
node in the batch finishes, which prevents newly ready nodes from starting
immediately; change to a channel-driven completion model: create a doneCh (e.g.,
chan string) and remove the batch wg.Wait blocking; when launching each node in
the loop (the goroutine calling executeNode), have executeNode (or its wrapper)
send the node ID on doneCh when it completes and update e.running under e.mu; in
Run, loop receiving from doneCh and on each completion evaluate and schedule any
newly ready nodes (mark running and spawn goroutines) so downstream nodes like C
can start as soon as a predecessor finishes; keep a small WaitGroup or counter
to know when all work is finished and close doneCh appropriately to exit the
scheduling loop.
In `@internal/workflow/ndjson.go`:
- Around line 29-72: Add error exposure to NDJSONScanner by giving the struct a
stored error field and an Err() method; in Scan(), when json.Unmarshal fails for
the outer envelope or inner event envelope set that error into the new field
(e.g., n.err = err) instead of only logging, and also set n.err =
n.scanner.Err() if scanner.Err() is non-nil at the end of Scan(); implement func
(n *NDJSONScanner) Err() error that returns n.err (or falls back to
n.scanner.Err()) so callers can inspect parse or scanner errors while Scan()
retains its current skip-on-parse-error behavior.
In `@internal/workflow/types.go`:
- Around line 49-56: Enhance ParseDefinition to perform basic semantic
validation after json.Unmarshal: inspect the returned Definition (type
Definition) and validate that all Node IDs (e.g., fields on Node) are unique
(detect duplicates), that every Edge/transition references existing node IDs,
and any required root/start node exists; return a descriptive error if any check
fails. Locate ParseDefinition and add a small validator that builds a
map[string]bool of node IDs, iterates nodes to detect duplicates, then iterates
edges/transitions to ensure their source/target IDs are present in the map,
returning errors that mention the offending ID and using the existing error
return from ParseDefinition.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: ff1d74b8-54c3-4fbc-a4c6-bdc979742a3b
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (11)
cmd/fake-claude/main.gogo.modinternal/loop/daemon.gointernal/workflow/executor.gointernal/workflow/extract.gointernal/workflow/ndjson.gointernal/workflow/resolve.gointernal/workflow/resolve_test.gointernal/workflow/scheduler.gointernal/workflow/scheduler_test.gointernal/workflow/types.go
| d.workflowMu.Lock() | ||
| d.workflows[msg.WorkflowRunID] = exec | ||
| d.workflowMu.Unlock() | ||
|
|
||
| go func() { | ||
| exec.Run(context.Background()) | ||
| d.workflowMu.Lock() | ||
| delete(d.workflows, msg.WorkflowRunID) | ||
| d.workflowMu.Unlock() |
There was a problem hiding this comment.
Reject duplicate WorkflowRunIDs before replacing the registry entry.
Overwriting d.workflows[msg.WorkflowRunID] lets two executors run under the same ID, and the first goroutine to finish will delete the newer executor from the map. After that, workflowStop can no longer find the active run.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/loop/daemon.go` around lines 362 - 370, The current code
unconditionally sets d.workflows[msg.WorkflowRunID] = exec then starts exec.Run,
which allows duplicate WorkflowRunID to replace an existing entry; the first
goroutine to finish may delete the newer executor and break workflowStop. Fix by
checking for an existing entry under d.workflowMu before assigning: acquire
d.workflowMu, if an entry for msg.WorkflowRunID already exists return an error
or reject the start (do not overwrite), otherwise set
d.workflows[msg.WorkflowRunID] = exec and release the lock; ensure the goroutine
that calls exec.Run still deletes only if the map entry still matches exec
(compare pointer/ID) before deleting to avoid removing a newer runner.
| go func() { | ||
| exec.Run(context.Background()) |
There was a problem hiding this comment.
Bind workflow execution to the daemon lifecycle.
exec.Run(context.Background()) means graceful shutdown never propagates cancellation to in-flight workflow runs. Since this feature shells out to the Claude binary, that can leave child work running after the daemon has begun shutting down.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/loop/daemon.go` around lines 366 - 367, The current goroutine calls
exec.Run(context.Background()) which prevents daemon shutdown cancellation from
reaching the spawned Claude process; change it to call exec.Run with the
daemon's lifecycle context (e.g., pass the existing daemon ctx variable or a
context derived from the daemon stop context) so cancellation propagates:
replace context.Background() with the daemon's ctx, ensure any exec.Stop/Cancel
method or context-aware cleanup is invoked when ctx.Done() triggers, and if
needed track the goroutine with the daemon's wait group so shutdown waits for
in-flight exec.Run to exit.
| stdout, err := cmd.StdoutPipe() | ||
| if err != nil { | ||
| slog.Error("workflow design chat: stdout pipe", "err", err) | ||
| return | ||
| } | ||
| if err := cmd.Start(); err != nil { | ||
| slog.Error("workflow design chat: start", "err", err) | ||
| return |
There was a problem hiding this comment.
Don't return silently on design-chat startup failures.
If StdoutPipe or Start fails here, the daemon only logs and exits the goroutine. The requester never gets a terminal message, so the browser will hang waiting for a completion/error that never comes.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/loop/daemon.go` around lines 412 - 419, When cmd.StdoutPipe() or
cmd.Start() fails in the design-chat goroutine, don't just slog.Error and
return; send a terminal/error message back to the requester (the same
stream/channel used for design-chat responses) including the actual err, ensure
any opened resources are cleaned up (close pipes, cancel context or close the
stream), and then terminate the goroutine. Update the error branches around
cmd.StdoutPipe() and cmd.Start() to push a clear failure event into the
requester's response channel/terminal stream (and include err in the payload),
call any existing cleanup/cancel function, and only then return.
| scanner := workflow.NewNDJSONScanner(stdout) | ||
| for scanner.Scan() { | ||
| event := scanner.Event() | ||
| sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||
| if err := d.client.Send(sendCtx, &protocol.WorkflowDesignChatStream{ | ||
| Type: protocol.MsgTypeWorkflowDesignChatStream, | ||
| WorkflowID: msg.WorkflowID, | ||
| ChannelID: msg.ChannelID, | ||
| Event: event.Raw, | ||
| }); err != nil { | ||
| slog.Warn("workflow design chat: stream send failed", "err", err) | ||
| } | ||
| cancel() | ||
| } | ||
|
|
||
| if err := cmd.Wait(); err != nil { | ||
| slog.Warn("workflow design chat: claude exited", "err", err) | ||
| } | ||
|
|
||
| sendCtx, cancel := context.WithTimeout(ctx, 5*time.Second) | ||
| defer cancel() | ||
| if err := d.client.Send(sendCtx, &protocol.WorkflowDesignChatComplete{ | ||
| Type: protocol.MsgTypeWorkflowDesignChatComplete, | ||
| WorkflowID: msg.WorkflowID, | ||
| ChannelID: msg.ChannelID, | ||
| Patches: json.RawMessage(`[]`), | ||
| }); err != nil { |
There was a problem hiding this comment.
Parse and return the generated patch array instead of always completing with [].
This path streams raw events, but it never extracts Claude's final JSON patch payload into Patches; WorkflowDesignChatComplete always reports an empty array. It also treats scanner/command failures as a successful completion. As written, the new design-chat feature can never return actual patches to the caller.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/loop/daemon.go` around lines 422 - 448, Accumulate Claude's final
patch payloads while scanning instead of always sending an empty array: when
iterating with workflow.NewNDJSONScanner (scanner) collect/parse patch JSON from
scanner.Event().Raw into a patches variable (e.g., []json.RawMessage or a single
json.RawMessage) and use that variable for the Patches field of
WorkflowDesignChatComplete; also change the error handling around scanner.Scan()
and cmd.Wait() so scanner/command failures do not fall through to sending a
successful WorkflowDesignChatComplete — on error, send an appropriate failure
notification (or return) instead of reporting completion. Ensure you update the
d.client.Send call that constructs &protocol.WorkflowDesignChatComplete to use
the collected patches and reference scanner, cmd.Wait, and
WorkflowDesignChatComplete in the fix.
| ready = ReadyNodes(def, completed, running) | ||
| ids := nodeIDs(ready) | ||
| if len(ids) != 2 { | ||
| t.Errorf("expected [b, c], got %v", ids) | ||
| } |
There was a problem hiding this comment.
Assert the actual ready node IDs here, not just the count.
A regression that returned the wrong two nodes would still pass this step because only len(ids) is checked. Please verify the expected IDs too.
Suggested tightening
ready = ReadyNodes(def, completed, running)
ids := nodeIDs(ready)
- if len(ids) != 2 {
+ if len(ids) != 2 || ids[0] != "b" || ids[1] != "c" {
t.Errorf("expected [b, c], got %v", ids)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ready = ReadyNodes(def, completed, running) | |
| ids := nodeIDs(ready) | |
| if len(ids) != 2 { | |
| t.Errorf("expected [b, c], got %v", ids) | |
| } | |
| ready = ReadyNodes(def, completed, running) | |
| ids := nodeIDs(ready) | |
| if len(ids) != 2 || ids[0] != "b" || ids[1] != "c" { | |
| t.Errorf("expected [b, c], got %v", ids) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@internal/workflow/scheduler_test.go` around lines 29 - 33, The test currently
only checks the length of ids returned from nodeIDs(ready); update the assertion
to verify the actual IDs match the expected slice (e.g., ["b","c"]) rather than
just len(ids). Replace the len(ids) check with a strict equality check using
reflect.DeepEqual or slices.Equal (or t.Fatalf/t.Errorf with a formatted
comparison) comparing ids to the expected slice, so ReadyNodes(def, completed,
running) is validated for the correct node IDs.
What changed
internal/workflowworkflowRun,workflowStop, andworkflowDesignChatprotocol messages in the main daemon loopgithub.com/gsd-build/protocol-go v0.11.1Why
The cloud flowstate UI and relay need a daemon consumer that can execute workflow graphs, stream node events, stop in-flight runs, and support workflow design-chat patch generation.
How it works now
workflowRunpayload, parses the workflow definition, resolves the working directory, and starts a workflow executor tracked by run idworkflowStoplooks up the active executor by run id and stops it if presentworkflowDesignChatshells out to the configured Claude binary with a strict JSON-patch response contract and streams NDJSON events back to the browser channelVerification
GOWORK=off GOPATH=/tmp/gsd-go GOCACHE=/tmp/gsd-daemon-clean-build-cache go test ./...GOWORK=off GOPATH=/tmp/gsd-go GOCACHE=/tmp/gsd-daemon-clean-build-cache go build ./...Dependency order
v0.11.1Post-merge
v0.11.1Summary by CodeRabbit
Release Notes
New Features
Improvements