Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds SFTP executor JSON Schema and tests; accepts numeric SSH/SFTP ports; makes enqueueDAGRun explicitly close attempt handles and join close/enqueue errors; replaces timing-based test waits with filesystem signaling; and includes related unit/integration/e2e test updates and helpers. Changes
Sequence Diagram(s)sequenceDiagram
participant Caller as Caller
participant Enqueue as enqueueDAGRun
participant Attempt as AttemptHandle
participant Queue as QueueStore
Caller->>Enqueue: enqueueDAGRun(ctx, run)
Enqueue->>Attempt: Write(status=Queued)
Attempt-->>Enqueue: write OK / writeErr
Enqueue->>Attempt: Close()
Attempt-->>Enqueue: closeErr (ok | error)
Enqueue->>Queue: Enqueue(dag-run)
alt Enqueue fails
Queue-->>Enqueue: enqueueErr
Enqueue-->>Caller: return errors.Join(closeErr?, enqueueErr)
else Enqueue succeeds
Queue-->>Enqueue: ok
Enqueue-->>Caller: return nil (log closeErr if present)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (2 warnings)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmn/schema/dag_schema_test.go`:
- Around line 672-695: The two negative tests "RejectInvalidDirection" and
"RejectUnknownConfigField" are missing the otherwise-valid SFTP connection
fields so failures might be due to missing required fields instead of the
intended rule; update the YAML specs used by those cases to include the same
valid SFTP baseline fields used in the passing SFTP test (e.g.,
host/port/auth/any required keys) and then only change the specific value under
test (set direction: sync for RejectInvalidDirection and add unknown_field for
RejectUnknownConfigField) so validation fails solely on the enum/unknown-field
rule.
In `@internal/cmn/schema/dag.schema.json`:
- Around line 2551-2554: Schema allows port as integer and permits empty
source/destination, but runtime SFTP config (NewSFTPExecutor and the SFTP config
struct fields) expects port as string and non-empty source/destination; to fix,
make schema constraints match runtime: change port and bastion.port to only
accept strings (or update NewSFTPExecutor/config types to accept integers if you
prefer), and add "minLength":1 (or equivalent non-empty string constraint) for
SFTP "source" and "destination" to disallow empty paths; update all occurrences
noted (port blocks and the SFTP source/destination blocks around the referenced
schema locations) so schema and runtime validation are consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: bfcbb0e2-ef1a-4c09-9cdd-31013419b876
📒 Files selected for processing (3)
internal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.gointernal/core/spec/step_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmn/schema/dag.schema.json`:
- Around line 2492-2495: The schema for sshExecutorConfig.port currently
restricts the port to a string, causing a mismatch with the runtime PortValue
parser in port.go (which accepts integers and whole-number floats) and the tests
in port_test.go; update the "port" schema definition in dag.schema.json to
accept both string and numeric values (e.g., change "type": "string" to allow
"number" as well) and keep the default/description consistent so values like 22
(unquoted) validate against the JSON schema while still matching PortValue
parsing at runtime.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5da63c5d-902a-45c5-992e-de89fb00f838
📒 Files selected for processing (2)
internal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/cmn/schema/dag_schema_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
internal/service/frontend/api/v1/queues_internal_test.go (1)
242-290: Optional: also covercore.NotStartedin the new helper's test matrix.
runningSummaryFromLeasenow branches on bothcore.NotStartedandcore.Queued, but only thecore.Queuedpath is exercised. Consider a sibling test (or table-driven variant) asserting the sameStatusRunningoutcome when the persisted status iscore.NotStarted, since that’s an equally realistic race window between lease claim and first status write.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/frontend/api/v1/queues_internal_test.go` around lines 242 - 290, Add a test case that exercises the core.NotStarted branch of runningSummaryFromLease: reuse the helper createDistributedQueueRunWithStatus to create a lease whose persisted run status is core.NotStarted (instead of core.Queued) and assert the resulting running summary yields StatusRunning (same expectation as the core.Queued case). Locate the existing test(s) that call createDistributedQueueRunWithStatus and either add a sibling test or expand the table-driven matrix to include an entry with status core.NotStarted and verify the same StatusRunning outcome; ensure you reference runningSummaryFromLease, createDistributedQueueRunWithStatus, core.NotStarted, and StatusRunning in the new assertion.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/cmd/enqueue.go`:
- Around line 139-150: The code can leave a persisted queued attempt on disk if
att.Close(ctx.Context) fails before ctx.QueueStore.Enqueue runs; to fix, change
the Close error handling in the block around
att.Write/att.Close/ctx.QueueStore.Enqueue so that a Close error does not abort
enqueue: if att.Close returns an error, log or record the Close error (e.g. with
process logger or ctx.Logger) but continue to call
ctx.QueueStore.Enqueue(ctx.Context, dag.ProcGroup(), exec.QueuePriorityLow,
dagRun); because Attempt.Close is idempotent (see Attempt.Close implementation),
this avoids orphaned on-disk attempts; alternatively, if you prefer explicit
cleanup, call att.Close/att.Cleanup (or the appropriate cleanup/remove helper)
on failure before returning — update the error handling around att.Close and the
subsequent ctx.QueueStore.Enqueue to implement one of these strategies and
surface both errors appropriately (e.g., enqueue error should still be returned,
but Close errors should not prevent enqueue).
In `@internal/cmn/schema/dag.schema.json`:
- Around line 2594-2620: The JSON schema for sftpExecutorConfig is inconsistent:
update the "bastion.port" property to accept both string and integer (use oneOf:
[ { "type": "string" }, { "type": "integer" } ]) to match the top-level "port"
and sshExecutorConfig.port; also reconcile the runtime sftpConfigSchema in
internal/runtime/builtin/ssh/config.go (currently treats bastion.port as string)
by relaxing its type to accept numeric ports or convert the top-level port to
string-only—ensure both the JSON schema property "bastion.port" and the runtime
sftpConfigSchema agree on the same type handling.
In `@ui/e2e/distributed-stack.spec.ts`:
- Around line 46-55: The async helper releaseRuns is missing an explicit return
type; change its declaration to include Promise<void> (e.g., const releaseRuns =
async (): Promise<void> => { ... }) so it matches the file's TypeScript style
and other helpers like waitForWorkerState, and ensure the function body still
returns no value (only performs side effects on releaseFile/releaseGate).
In `@ui/e2e/fixtures/dags/e2e-distributed-queue.yaml`:
- Around line 8-13: The script can continue if reading the release FIFO fails;
update the snippet around release_file and release_gate to fail fast by enabling
shell errexit (add set -e at the top of the script) and explicitly validate the
FIFO before reading it (check that release_gate is a named pipe using [ -p
"$release_gate" ] and exit with an error if not present), then perform the
blocking read (cat "$release_gate") so the step cannot silently succeed when the
FIFO is missing or unreadable.
---
Nitpick comments:
In `@internal/service/frontend/api/v1/queues_internal_test.go`:
- Around line 242-290: Add a test case that exercises the core.NotStarted branch
of runningSummaryFromLease: reuse the helper createDistributedQueueRunWithStatus
to create a lease whose persisted run status is core.NotStarted (instead of
core.Queued) and assert the resulting running summary yields StatusRunning (same
expectation as the core.Queued case). Locate the existing test(s) that call
createDistributedQueueRunWithStatus and either add a sibling test or expand the
table-driven matrix to include an entry with status core.NotStarted and verify
the same StatusRunning outcome; ensure you reference runningSummaryFromLease,
createDistributedQueueRunWithStatus, core.NotStarted, and StatusRunning in the
new assertion.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 76865353-f792-4503-84fd-105ecaf10b35
📒 Files selected for processing (8)
internal/cmd/enqueue.gointernal/cmd/enqueue_internal_test.gointernal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.gointernal/service/frontend/api/v1/queues.gointernal/service/frontend/api/v1/queues_internal_test.goui/e2e/distributed-stack.spec.tsui/e2e/fixtures/dags/e2e-distributed-queue.yaml
🚧 Files skipped from review as they are similar to previous changes (1)
- internal/cmn/schema/dag_schema_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/cmd/enqueue_internal_test.go (1)
210-215: Consider asserting the queued status exists before publish.The fake currently checks only that
Closehappened beforeEnqueue. Adding a status check here would also guard against a future reorder that publishes before the queued status is written.Proposed stricter queue observer
func (s *enqueueObservingQueueStore) Enqueue(context.Context, string, exec.QueuePriority, exec.DAGRunRef) error { if !s.attempt.closed { return errors.New("status attempt was not closed before queue enqueue") } + if s.attempt.status == nil || s.attempt.status.Status != core.Queued { + return errors.New("queued status was not written before queue enqueue") + } s.enqueued = true return nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmd/enqueue_internal_test.go` around lines 210 - 215, The Enqueue implementation in enqueueObservingQueueStore only verifies attempt.closed; update it to also assert the attempt has its "queued" status set before publishing: in Enqueue (enqueueObservingQueueStore.Enqueue) check the attempt's status field (e.g., s.attempt.status == exec.RunStatusQueued or a boolean like s.attempt.queued) and return an error if the queued status is missing, then set s.enqueued = true and return nil; this ensures the queued status exists prior to marking enqueued.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/cmd/enqueue_internal_test.go`:
- Around line 210-215: The Enqueue implementation in enqueueObservingQueueStore
only verifies attempt.closed; update it to also assert the attempt has its
"queued" status set before publishing: in Enqueue
(enqueueObservingQueueStore.Enqueue) check the attempt's status field (e.g.,
s.attempt.status == exec.RunStatusQueued or a boolean like s.attempt.queued) and
return an error if the queued status is missing, then set s.enqueued = true and
return nil; this ensures the queued status exists prior to marking enqueued.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 32b92003-7f7c-4b4d-81b3-9da8f0e80d52
📒 Files selected for processing (9)
internal/cmd/enqueue.gointernal/cmd/enqueue_internal_test.gointernal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.gointernal/runtime/builtin/ssh/config.gointernal/runtime/builtin/ssh/ssh_test.gointernal/service/frontend/api/v1/queues_internal_test.goui/e2e/distributed-stack.spec.tsui/e2e/fixtures/dags/e2e-distributed-queue.yaml
🚧 Files skipped from review as they are similar to previous changes (5)
- ui/e2e/fixtures/dags/e2e-distributed-queue.yaml
- internal/cmd/enqueue.go
- internal/cmn/schema/dag_schema_test.go
- ui/e2e/distributed-stack.spec.ts
- internal/cmn/schema/dag.schema.json
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
internal/cmn/schema/dag_schema_test.go (1)
636-769: LGTM!Good coverage for the SFTP executor schema: canonical
with, legacyconfigalias, numeric/stringport, optionalbastion, and the negative cases (invalid direction, empty source, unknown field) now retain a valid SFTP baseline so failures are attributable to the rule under test (addressing prior feedback).One optional nit: the
wantErr: "steps"substring is quite permissive — any validation failure rooted atstepspasses the assertion. If you want tighter guarantees, consider asserting on a more specific substring (e.g.,"direction","source", or"unknown_field") for each negative case. Not blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmn/schema/dag_schema_test.go` around lines 636 - 769, The tests in TestDAGSchemaSFTPExecutor use a permissive error assertion (tt.wantErr = "steps") which can hide whether the validation failed for the intended field; update the negative-case test table entries (e.g., "RejectInvalidDirection", "RejectEmptySource", "RejectUnknownConfigField") to set wantErr to a more specific substring matching the expected validation message (for example "direction", "source", "unknown_field") and ensure the test loop still asserts require.Contains(err.Error(), tt.wantErr) so failures verify the precise validation error in TestDAGSchemaSFTPExecutor.internal/runtime/agent/agent_test.go (1)
630-657: Optional: extract the duplicated cancellation-wait block.The
done := make(chan struct{})+ goroutine +select { case <-done: case <-time.After(30*time.Second): t.Fatal(...) }pattern is now repeated verbatim acrossHTTPValid,HTTPInvalidRequest, andHTTPHandleCancel(Lines 630-634/653-657, 677-681/696-700, 720-724/738-742). Consider a small helper, e.g.waitForCancel(t, done, 30*time.Second), to reduce noise. Not blocking.♻️ Sketch
func waitForCancel(t *testing.T, done <-chan struct{}, timeout time.Duration) { t.Helper() select { case <-done: case <-time.After(timeout): t.Fatal("timed out waiting for DAG cancellation") } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/agent_test.go` around lines 630 - 657, Extract the repeated cancellation-wait pattern into a helper (e.g., waitForCancel) and replace the duplicated blocks in HTTPValid, HTTPInvalidRequest, and HTTPHandleCancel: locate the done := make(chan struct{}) + goroutine that calls dagAgent.RunCancel(t) and the select that waits for done or times out after 30s, then implement waitForCancel(t, done, 30*time.Second) (make it t.Helper()) and call it after dagAgent.Abort() in each test to remove the duplicated code and keep behavior identical.internal/cmd/enqueue_internal_test.go (2)
44-70: Consider assertingattempt.closedto strengthen the close-failure test.
TestEnqueueDAGRunPublishesQueueWhenCloseFailsrelies onenqueueObservingQueueStore.Enqueueto indirectly verify closure ordering, but doesn't explicitly assert thatClosewas invoked and its error was tolerated. Addingassert.True(t, attempt.closed)would make the intent explicit and guard against a future regression whereCloseis skipped entirely (in which caseclosedstaysfalsebutEnqueuewould also fail — still caught, but the failure mode would be less clear).♻️ Suggested addition
require.NoError(t, enqueueDAGRun(ctx, dag, "run-1", core.TriggerTypeManual, "")) assert.True(t, queueStore.enqueued) + assert.True(t, attempt.closed, "attempt should be closed even when Close returns an error") require.NotNil(t, attempt.status) assert.Equal(t, core.Queued, attempt.status.Status)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmd/enqueue_internal_test.go` around lines 44 - 70, Add an explicit assertion that the tracking attempt's Close was called by checking attempt.closed in TestEnqueueDAGRunPublishesQueueWhenCloseFails: after invoking enqueueDAGRun (and before/or alongside the existing asserts that check queueStore.enqueued and attempt.status), add assert.True(t, attempt.closed) to ensure enqueueTrackingAttempt.Close on the enqueueTrackingAttempt struct was invoked and its error was tolerated; this makes the test's intent explicit and ties to enqueueObservingQueueStore.Enqueue and enqueueDAGRun behavior.
19-70: Duplicated test setup — extract a small helper.The two tests are nearly identical except for
closeErrand one extra assertion. A small table-driven form or a helper that builds(attempt, runStore, queueStore, ctx, dag)would reduce duplication and make future additions (e.g., Enqueue-failure cases) cheaper.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/cmd/enqueue_internal_test.go` around lines 19 - 70, Both tests TestEnqueueDAGRunClosesStatusBeforeQueuePublish and TestEnqueueDAGRunPublishesQueueWhenCloseFails share nearly identical setup; extract a small helper (or table-driven setup) that builds and returns the common fixtures — an *enqueueTrackingAttempt*, *enqueueTrackingDAGRunStore*, *enqueueObservingQueueStore*, the *Context* (with DAGRunStore/QueueStore) and the *dag* from th.DAG — and then call that helper from each test passing the differing parameter (closeErr or expected assertions). Update tests to call the helper (or iterate table cases) and keep only the unique assertions (e.g., closeErr presence and queue-enqueued checks) while reusing the helper-created attempt, runStore, queueStore, ctx, and dag before invoking enqueueDAGRun.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ui/e2e/distributed-stack.spec.ts`:
- Around line 188-196: The finally block can overwrite the original test error
if releaseRuns() throws, so wrap the fallback call to releaseRuns() in a
try/catch (when !released && releaseGateReady) to absorb any errors from the
fallback release and optionally log them, ensuring the original exception
remains thrown and that the subsequent cleanup calls (fs.rm for releaseFile and
releaseGate) still execute; update the finally to call releaseRuns() inside try
{ await releaseRuns() } catch (err) { /* log or ignore */ } while leaving the
rest of the finally (the checks on completed and the fs.rm calls) unchanged.
---
Nitpick comments:
In `@internal/cmd/enqueue_internal_test.go`:
- Around line 44-70: Add an explicit assertion that the tracking attempt's Close
was called by checking attempt.closed in
TestEnqueueDAGRunPublishesQueueWhenCloseFails: after invoking enqueueDAGRun (and
before/or alongside the existing asserts that check queueStore.enqueued and
attempt.status), add assert.True(t, attempt.closed) to ensure
enqueueTrackingAttempt.Close on the enqueueTrackingAttempt struct was invoked
and its error was tolerated; this makes the test's intent explicit and ties to
enqueueObservingQueueStore.Enqueue and enqueueDAGRun behavior.
- Around line 19-70: Both tests TestEnqueueDAGRunClosesStatusBeforeQueuePublish
and TestEnqueueDAGRunPublishesQueueWhenCloseFails share nearly identical setup;
extract a small helper (or table-driven setup) that builds and returns the
common fixtures — an *enqueueTrackingAttempt*, *enqueueTrackingDAGRunStore*,
*enqueueObservingQueueStore*, the *Context* (with DAGRunStore/QueueStore) and
the *dag* from th.DAG — and then call that helper from each test passing the
differing parameter (closeErr or expected assertions). Update tests to call the
helper (or iterate table cases) and keep only the unique assertions (e.g.,
closeErr presence and queue-enqueued checks) while reusing the helper-created
attempt, runStore, queueStore, ctx, and dag before invoking enqueueDAGRun.
In `@internal/cmn/schema/dag_schema_test.go`:
- Around line 636-769: The tests in TestDAGSchemaSFTPExecutor use a permissive
error assertion (tt.wantErr = "steps") which can hide whether the validation
failed for the intended field; update the negative-case test table entries
(e.g., "RejectInvalidDirection", "RejectEmptySource",
"RejectUnknownConfigField") to set wantErr to a more specific substring matching
the expected validation message (for example "direction", "source",
"unknown_field") and ensure the test loop still asserts
require.Contains(err.Error(), tt.wantErr) so failures verify the precise
validation error in TestDAGSchemaSFTPExecutor.
In `@internal/runtime/agent/agent_test.go`:
- Around line 630-657: Extract the repeated cancellation-wait pattern into a
helper (e.g., waitForCancel) and replace the duplicated blocks in HTTPValid,
HTTPInvalidRequest, and HTTPHandleCancel: locate the done := make(chan struct{})
+ goroutine that calls dagAgent.RunCancel(t) and the select that waits for done
or times out after 30s, then implement waitForCancel(t, done, 30*time.Second)
(make it t.Helper()) and call it after dagAgent.Abort() in each test to remove
the duplicated code and keep behavior identical.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d310e64f-c784-436f-919d-5e238c17dc73
📒 Files selected for processing (10)
internal/cmd/enqueue.gointernal/cmd/enqueue_internal_test.gointernal/cmn/schema/dag.schema.jsoninternal/cmn/schema/dag_schema_test.gointernal/runtime/agent/agent_test.gointernal/runtime/builtin/ssh/config.gointernal/runtime/builtin/ssh/ssh_test.gointernal/service/frontend/api/v1/queues_internal_test.goui/e2e/distributed-stack.spec.tsui/e2e/fixtures/dags/e2e-distributed-queue.yaml
✅ Files skipped from review due to trivial changes (3)
- internal/runtime/builtin/ssh/ssh_test.go
- internal/runtime/builtin/ssh/config.go
- internal/cmn/schema/dag.schema.json
🚧 Files skipped from review as they are similar to previous changes (2)
- internal/cmd/enqueue.go
- ui/e2e/fixtures/dags/e2e-distributed-queue.yaml
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (2)
internal/runtime/agent/agent_test.go (2)
71-86: LGTM — filesystem signaling helpers are solid.
waitForTestFileandwaitForCancelare small, correct, andt.Helper()is used appropriately so failures point to call sites. The 50ms poll interval and explicit 30s cancel timeout are sensible for CI.One optional nit: the failure message in
waitForCancel("timed out waiting for DAG cancellation") is identical to the inline block inTestAgent_Run/ReceiveSignal(Line 321). If you want, you could swap that inlineselectover towaitForCanceltoo for consistency — not required.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/agent_test.go` around lines 71 - 86, The inline select in the TestAgent_Run/ReceiveSignal subtest duplicates the timeout logic in waitForCancel; replace that inline select with a call to waitForCancel(t, done, 30*time.Second) so the test reuses the helper. Locate the subtest (TestAgent_Run/ReceiveSignal) where it currently does select { case <-done: case <-time.After(...): t.Fatal(...) } and remove that block, calling waitForCancel(t, done, 30*time.Second) instead to keep behavior consistent and centralized in the waitForCancel helper.
622-741: HTTP subtests: filesystem-signaling migration looks correct.All three subtests (
HTTPValid,HTTPInvalidRequest,HTTPHandleCancel) now:
- Use distinct per-subtest
startedFile/releaseFileunder a sharedt.TempDir()(good — prevents cross-subtest collisions when running in parallel).- Wait for the on-disk
startedFilemarker before interacting with the agent, which is a stronger signal than status polling since the step body itself writes the marker.- Bound the post-abort wait with
waitForCancel(..., 30*time.Second)instead of an implicit/unbounded wait.A couple of small observations, none blocking:
In
HTTPValid(Lines 647–657), afterwaitForTestFilereturns you immediately query/statusand assertcore.Running. There is a tiny window between the step writing the marker and the agent's status transitioning toRunningas observed byHandleHTTP. In practice the step is still blocked inwaitForFileScriptwhen the status is read, soRunningis essentially guaranteed — but if this ever flakes on slow CI, consider wrapping the status check inrequire.Eventuallythe wayTestAgent_Run/ReceiveSignaldoes.
waitForTestFilehere is a near-duplicate of the one ininternal/intg/platform_helpers_test.go(per the provided context snippet). Since these live in different test packages, sharing would require promoting it to a non-_testhelper package (e.g.,internal/test). Fine to leave as-is for this PR given the scope is SFTP schema validation.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/agent_test.go` around lines 622 - 741, HTTPValid's immediate status check can race; wrap the status assertion in a retry (use require.Eventually) after waitForTestFile to ensure HandleHTTP returns core.Running reliably: in the HTTPValid subtest, replace the direct call/require sequence that invokes dagAgent.HandleHTTP(th.Context)/exec.StatusFromJSON and require.Equal(core.Running) with a short retry loop using require.Eventually that re-calls dagAgent.HandleHTTP(...)/exec.StatusFromJSON until Status==core.Running or timeout; optionally consider promoting waitForTestFile to a shared test helper if you want to deduplicate across packages (but the immediate fix is to make the status assertion in HTTPValid resilient as described).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/runtime/agent/agent_test.go`:
- Around line 71-86: The inline select in the TestAgent_Run/ReceiveSignal
subtest duplicates the timeout logic in waitForCancel; replace that inline
select with a call to waitForCancel(t, done, 30*time.Second) so the test reuses
the helper. Locate the subtest (TestAgent_Run/ReceiveSignal) where it currently
does select { case <-done: case <-time.After(...): t.Fatal(...) } and remove
that block, calling waitForCancel(t, done, 30*time.Second) instead to keep
behavior consistent and centralized in the waitForCancel helper.
- Around line 622-741: HTTPValid's immediate status check can race; wrap the
status assertion in a retry (use require.Eventually) after waitForTestFile to
ensure HandleHTTP returns core.Running reliably: in the HTTPValid subtest,
replace the direct call/require sequence that invokes
dagAgent.HandleHTTP(th.Context)/exec.StatusFromJSON and
require.Equal(core.Running) with a short retry loop using require.Eventually
that re-calls dagAgent.HandleHTTP(...)/exec.StatusFromJSON until
Status==core.Running or timeout; optionally consider promoting waitForTestFile
to a shared test helper if you want to deduplicate across packages (but the
immediate fix is to make the status assertion in HTTPValid resilient as
described).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 90093ce8-af6f-4db6-af09-c596a69f5dc3
📒 Files selected for processing (4)
internal/cmd/enqueue_internal_test.gointernal/cmn/schema/dag_schema_test.gointernal/runtime/agent/agent_test.goui/e2e/distributed-stack.spec.ts
✅ Files skipped from review due to trivial changes (1)
- internal/cmd/enqueue_internal_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- ui/e2e/distributed-stack.spec.ts
- internal/cmn/schema/dag_schema_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
a9de1f4 to
bb5e75d
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (2)
internal/runtime/agent/agent_test.go (2)
635-639: Nit: inconsistentclose(done)placement across subtests.
HTTPValidusesdefer close(done)whileHTTPInvalidRequestandHTTPHandleCancelcallclose(done)afterRunCancelreturns. Both are correct, but aligning on one style (ideallydefer, which also closes ifRunCancelpanics viat.Fatalin another goroutine) would make the three subtests easier to scan.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/agent_test.go` around lines 635 - 639, The three subtests (HTTPValid, HTTPInvalidRequest, HTTPHandleCancel) create a done channel and start goroutines calling dagAgent.RunCancel(t); standardize them to use defer close(done) inside the goroutine so the channel is closed even if RunCancel panics or calls t.Fatal—replace any post-call close(done) with defer close(done) in the goroutine that invokes RunCancel to make behavior consistent and safer.
630-655: Filesystem-signaling refactor looks correct.Concatenating
writeFileCommand(...) + "\n" + waitForFileScript(...)works on both POSIX (two shell statements) and PowerShell (newline-separated statements), and%qpreserves the newline when embedded into the YAML double-quotedcommand:value. The retry onHandleHTTPis a sound replacement forAssertLatestStatus(Running)sinceHandleHTTPalways stampsStatus = core.Runningon GET /status success (seeinternal/runtime/agent/agent.golines 1322-1331), so the loop effectively gates onStatusOK.One minor nit: a fresh
mockResponseWriter{}is allocated every iteration (good, avoids stale state), but shadowing viavar mockResponseWriter = mockResponseWriter{}reuses the type name as the variable name. Harmless here since the type isn't referenced again in the closure, but renaming to e.g.rw := mockResponseWriter{}would be clearer.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/runtime/agent/agent_test.go` around lines 630 - 655, The test currently declares a variable using the same identifier as its type ("var mockResponseWriter = mockResponseWriter{}"), which shadows the type name and is confusing; change the loop local to a clearly named variable like "rw := mockResponseWriter{}" and then use "rw" in the subsequent calls (rw.status, rw.body) when invoking dagAgent.HandleHTTP and parsing the response to avoid type/variable shadowing while preserving the existing behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/runtime/agent/agent_test.go`:
- Around line 635-639: The three subtests (HTTPValid, HTTPInvalidRequest,
HTTPHandleCancel) create a done channel and start goroutines calling
dagAgent.RunCancel(t); standardize them to use defer close(done) inside the
goroutine so the channel is closed even if RunCancel panics or calls
t.Fatal—replace any post-call close(done) with defer close(done) in the
goroutine that invokes RunCancel to make behavior consistent and safer.
- Around line 630-655: The test currently declares a variable using the same
identifier as its type ("var mockResponseWriter = mockResponseWriter{}"), which
shadows the type name and is confusing; change the loop local to a clearly named
variable like "rw := mockResponseWriter{}" and then use "rw" in the subsequent
calls (rw.status, rw.body) when invoking dagAgent.HandleHTTP and parsing the
response to avoid type/variable shadowing while preserving the existing
behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: afabc9f5-4cca-4e5e-a35d-e75097d8966c
📒 Files selected for processing (1)
internal/runtime/agent/agent_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
a1e99a4 to
ed2c3f2
Compare
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@internal/runtime/agent/agent_test.go`:
- Around line 79-86: In waitForCancel replace the direct t.Fatal call with the
testify require equivalent: import "github.com/stretchr/testify/require" and
call require.FailNow(t, "timed out waiting for DAG cancellation") (or
require.FailNow(t, "timed out waiting for DAG cancellation", nil) as needed), so
the helper uses stretchr/testify assertions instead of t.Fatal; update imports
accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6edafbe4-5e86-437c-9987-9daf3bf7859f
📒 Files selected for processing (3)
internal/persis/fileproc/handle.gointernal/persis/fileproc/handle_test.gointernal/runtime/agent/agent_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
🧹 Nitpick comments (1)
internal/intg/distr/zombie_recovery_test.go (1)
187-191: Minor: post-loopGreatercheck is now redundant.Line 191's
assert.Greater(lease.LastHeartbeatAt, initialLease)is already guaranteed by theEventuallyexit condition at line 167 and re-verified implicitly bywaitForLeaseat line 190 (which may, however, return a newer lease — so this assertion still holds). Not worth changing, but you could drop it to reduce noise, or keep it as a belt-and-suspenders guard against regressions inwaitForLease.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/intg/distr/zombie_recovery_test.go` around lines 187 - 191, The final assert.Greater(lease.LastHeartbeatAt, initialLease) is redundant because the Eventually loop and waitForLease(...) already guarantee the lease advanced; remove the assertion line to reduce test noise (i.e., delete the assert.Greater call that references lease.LastHeartbeatAt and initialLease after waitForLease), leaving the latestStatus() check and the lease = waitForLease(...) call intact.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@internal/intg/distr/zombie_recovery_test.go`:
- Around line 187-191: The final assert.Greater(lease.LastHeartbeatAt,
initialLease) is redundant because the Eventually loop and waitForLease(...)
already guarantee the lease advanced; remove the assertion line to reduce test
noise (i.e., delete the assert.Greater call that references
lease.LastHeartbeatAt and initialLease after waitForLease), leaving the
latestStatus() check and the lease = waitForLease(...) call intact.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4f496bb2-3c24-4a0e-8319-302179f6b3ec
📒 Files selected for processing (1)
internal/intg/distr/zombie_recovery_test.go
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
# Conflicts: # internal/runtime/agent/agent_test.go
# Conflicts: # internal/cmn/fileutil/replace_windows.go # internal/persis/fileproc/handle.go # internal/persis/fileproc/handle_test.go
Summary
type: sftpto SFTP-specificwithand legacyconfigvalidationtype: sftpCloses #1976
Testing
go test ./internal/cmn/schema ./internal/core/specmake test TEST_TARGET='./internal/cmn/schema ./internal/core/spec'Summary by CodeRabbit
New Features
Changes
Bug Fixes
Tests