Skip to content

refactor: move coordinator transport behind runtime ports#2235

Merged
yohamta0 merged 4 commits into
mainfrom
refactor/runtime-reporter-port
Jun 1, 2026
Merged

refactor: move coordinator transport behind runtime ports#2235
yohamta0 merged 4 commits into
mainfrom
refactor/runtime-reporter-port

Conversation

@yohamta0
Copy link
Copy Markdown
Collaborator

@yohamta0 yohamta0 commented May 31, 2026

Summary

  • move coordinator dispatch/reporting transport details out of runtime-facing packages and behind domain ports
  • add proto-free dispatch task/status contracts in core/exec and convert to coordinator proto at Dagu adapter boundaries
  • move worker remote reporting adapters under service/worker/coordreport while preserving distributed sub-DAG behavior

Testing

  • go test -count=1 ./internal/core/exec ./internal/proto/convert
  • go test -count=1 ./internal/runtime/executor ./internal/service/coordinator ./internal/service/worker ./internal/service/scheduler
  • go test -count=1 ./internal/intg/distr
  • make lint
  • make test
  • go list -deps ./internal/runtime/... | rg "proto/coordinator/v1|internal/service/coordinator"
  • rg -n "coordinator/v1|coordinatorv1|internal/service/coordinator" internal/runtime internal/core/exec internal/launcher -S
  • git diff --check

Summary by cubic

Moves coordinator transport behind runtime ports and switches the runtime to domain dispatch/status/worker types. Adds a TLS‑validated, coordinator‑backed runtime dispatcher injected via a DispatcherFactory; keeps the runtime proto‑free and preserves distributed sub‑DAG behavior.

  • New Features

    • Added domain dispatch types exec.DispatchTask and exec.DispatchOperation, with converters in internal/proto/convert/dispatch; used by coordinator client/handler with validation.
    • Introduced runtime ports: runtime.StatusPusher, runtime.SchedulerLogStreamer, runtime.ArtifactFinalizer (with AttemptRejected); implemented in service/worker/coordreport.
    • Provided coordinator‑backed runtime dispatcher via coordinator.NewRuntimeDispatcher (validates TLS/retry); wired through a DispatcherFactory in CLI, engine, and tests.
    • Added docs: “Distributed Runtime Ports,” linked from README.
  • Refactors

    • Removed coordinator proto usage across runtime/engine/CLI/scheduler/launcher/worker; progress UI now consumes exec.DAGRunStatus.
    • DispatchTaskStore now persists exec.DispatchTask; releases claims on encode failure; adaptive cleanup interval reduces contention; can read legacy dispatch records.
    • Consolidated worker remote reporting under service/worker/coordreport; updated scheduler/queue to new operation enums.
    • Updated launcher/worker command specs to accept exec.DispatchTask; telemetry now uses exec.WorkerStats/exec.RunningTask.

Written for commit 9001832. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

  • Refactor
    • Decoupled distributed execution dispatching from coordinator protobuf types by introducing internal dispatch abstractions and task representations across the executor, coordinator client, and runtime agent layers.
    • Consolidated distributed task and worker status reporting logic into a unified "coordreport" package with cleaner interface boundaries.
    • Improved dependency injection patterns for runtime dispatcher configuration and status/artifact handling.

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 31, 2026

📝 Walkthrough

Walkthrough

This PR introduces an internal dispatch type abstraction layer to decouple internal task/operation handling from coordinator protobuf types. It defines DispatchTask, DispatchOperation enums, and a Dispatcher interface in the exec package, implements bidirectional proto converters, reorganizes worker reporting from remote to coordreport package with runtime interface implementations, and systematically updates services, executors, and persistence to use the new types.

Changes

Dispatch Type Abstraction & Conversion

Layer / File(s) Summary
Core dispatch types and enums
internal/core/exec/dispatch.go, internal/core/exec/dispatch_test.go
Introduce DispatchOperation enum with String() method, DispatchTask struct with metadata/workflow fields, DAGRunStatusResult wrapper, and Dispatcher interface for task dispatch/cleanup/status/cancellation operations.
Proto-domain conversion
internal/proto/convert/dispatch.go, internal/proto/convert/dispatch_test.go
Bidirectional converters (DispatchTaskToProto, ProtoToDispatchTask, WorkerStatsToProto, ProtoToWorkerStats) with field cloning, port validation, and error wrapping for type transitions between domain and protobuf representations.
Distributed execution types
internal/core/exec/distributed.go
Update ClaimedDispatchTask.Task and DispatchTaskStore.Enqueue to use internal DispatchTask; introduce WorkerStats and RunningTask domain types replacing protobuf equivalents.

Runtime Abstractions & Coordinator Integration

Layer / File(s) Summary
Runtime reporter interfaces
internal/runtime/reporter.go
Define abstract contracts for StatusPusher, AttemptRejected (with AttemptRejectedReason()), SchedulerLogStreamer, and ArtifactFinalizer to decouple runtime from implementations.
Coordinator client dispatch
internal/service/coordinator/client.go, internal/service/coordinator/client_test.go
Embed exec.Dispatcher in Client, convert DispatchTask to proto before dispatching, convert returned status to DAGRunStatusResult with proper error handling; update logging to use internal run ID.
Coordinator handler conversions
internal/service/coordinator/handler.go, internal/service/coordinator/handler_test.go
Convert between stored internal DispatchTask and proto payloads in Poll/Dispatch/AckTaskClaim; convert worker stats in Heartbeat/GetWorkers paths.
Runtime dispatcher factory
internal/service/coordinator/runtime_dispatcher.go
Introduce NewRuntimeDispatcher factory creating coordinator-backed dispatcher with TLS/peer configuration for runtime DAG execution.

Worker Reporting Reorganization

Layer / File(s) Summary
Coordreport package & interfaces
internal/service/worker/coordreport/{artifact_uploader,log_streamer,status_pusher}.go
Rename package from remote to coordreport, implement runtime.ArtifactFinalizer, runtime.SchedulerLogStreamer, runtime.StatusPusher with compile-time assertions; add AttemptRejectedReason() method.
Testing infrastructure
internal/service/worker/coordreport/export_test.go, internal/service/worker/coordreport/log_streamer_test.go
Expose internal log/status/artifact state via exported snapshots and helpers (SnapshotStatusPusher, SnapshotLogStreamer, FlushStepLogWriterWithBuffer) for black-box testing without direct field access.

Executor & Task Building

Layer / File(s) Summary
Task building with dispatch types
internal/runtime/executor/task.go, internal/runtime/executor/task_test.go
Rewrite CreateTask and TaskOption helpers to work with DispatchTask; remove proto conversion from WithPreviousStatus (direct assignment); update all test assertions for new field names.
Distributed task creation
internal/runtime/executor/dag_runner.go, internal/runtime/executor/dag_runner_test.go
Update BuildCoordinatorTask to return *DispatchTask using DispatchOperation enums; change coordinator status retrieval to direct internal type usage without proto conversion; update mock dispatcher to return internal types.
Command spec building
internal/launcher/launcher.go, internal/launcher/launcher_test.go
Update TaskStart, TaskRetry, QueueDispatchTaskRetry methods to accept DispatchTask using new field names (RootDAGRunID/RootDAGRunName, DAGRunID, AttemptID, WorkerID) for hierarchy/identity/routing.

Agent & Service Wiring

Layer / File(s) Summary
Agent dispatcher factory
internal/runtime/agent/agent.go
Add DispatcherFactory option to agent, convert StatusPusher/ArtifactFinalizer to type aliases of runtime interfaces, implement createDispatcher, wire dispatcher through runtime context and cleanup paths.
Scheduler dispatch operations
internal/service/scheduler/dag_executor.go, internal/service/scheduler/queue_dispatcher.go
Update ExecuteDAG/HandleJob to accept DispatchOperation, route based on internal enums, dispatch internal DispatchTask to coordinator using internal operation constants.
Frontend API operations
internal/service/frontend/api/v1/{dagruns,dags,dagruns_edit_retry}.go
Pass DispatchOperationStart/DispatchOperationRetry to CreateTask instead of coordinator protobuf enums.
Engine & command wiring
internal/engine/{engine,run}.go, internal/cmd/{context,dry,restart,retry,start}.go
Add runtimeDispatcherFactory methods, wire DispatcherFactory to agents in execution paths, update progress display to accept internal DAGRunStatus directly.

Worker Execution Integration

Layer / File(s) Summary
Worker task handling
internal/service/worker/handler.go, internal/service/worker/handler_test.go
Convert coordinator Task to DispatchTask via proto converter before routing to launcher methods; refactor tests with local workerHandlerTask helper for status conversion.
Remote execution setup
internal/service/worker/remote_handler.go, internal/service/worker/remote_handler_test.go
Use coordreport components for status/log/artifact reporting, accept runtime interfaces in executeDAGRun, set DispatcherFactory to NewRuntimeDispatcher, wire all runtime abstractions.

Persistence & Storage

Layer / File(s) Summary
Dispatch task store
internal/persis/store/distributed_dispatch.go, internal/persis/store/distributed_test.go
Persist/enqueue DispatchTask instead of proto; rewrite cloneDispatchTask with manual field copying (no proto.Clone); update claim/clear helpers for new task fields; update test store construction and assertions.

Test Infrastructure

Layer / File(s) Summary
Test mocks & helpers
internal/service/coordinator/client_test.go, internal/service/scheduler/dag_executor_test.go, internal/service/worker/{poller,handler}_test.go, internal/cmn/telemetry/collector_test.go, internal/test/helper.go
Update test doubles (mockCoordinatorClient, mockDispatcher, failingDispatchTaskStore) to use DispatchTask/DAGRunStatusResult; update coordinator operation constants to exec.DispatchOperation*; add DispatcherFactory default in test helper.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • dagucloud/dagu#2143: Updates to distributed stale run attempt matching in distributed_stale_run.go that work with the new internal dispatch task field names introduced in this PR.
✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/runtime-reporter-port

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/service/scheduler/dag_executor.go (1)

151-180: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject unsupported distributed operations before dispatch.

The distributed branch dispatches whatever operation it receives, so DispatchOperationUnspecified or any future unknown value gets sent to the coordinator instead of failing fast. The local branch already rejects those cases, so this creates inconsistent behavior and can persist invalid dispatch requests downstream.

Proposed fix
 func (e *DAGExecutor) ExecuteDAG(
 	ctx context.Context,
 	dag *core.DAG,
 	operation exec.DispatchOperation,
 	runID string,
 	previousStatus *exec.DAGRunStatus,
 	triggerType core.TriggerType,
 	scheduleTime string,
 ) error {
 	if e.shouldUseDistributedExecution(dag) {
+		switch operation {
+		case exec.DispatchOperationStart, exec.DispatchOperationRetry:
+		case exec.DispatchOperationUnspecified:
+			return fmt.Errorf("operation not specified")
+		default:
+			return fmt.Errorf("unsupported operation: %v", operation)
+		}
+
 		// Distributed execution: dispatch to coordinator
 		taskOpts := []executor.TaskOption{
 			executor.WithWorkerSelector(dag.WorkerSelector),
 			executor.WithPreviousStatus(previousStatus),
 			executor.WithBaseConfig(executor.ResolveBaseConfig(dag.BaseConfigData, e.baseConfigPath)),
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/scheduler/dag_executor.go` around lines 151 - 180, The
distributed-execution branch in shouldUseDistributedExecution(...) currently
forwards any operation to executor.CreateTask and dispatchToCoordinator(...)
including DispatchOperationUnspecified (and future unknown values); add the same
validation used by the local branch to reject DispatchOperationUnspecified (and
treat unknown/invalid enum values as errors) before building the task options,
returning an error if the operation is invalid so invalid operations are
rejected consistently and not dispatched to dispatchToCoordinator.
🧹 Nitpick comments (5)
internal/runtime/agent/agent.go (2)

969-973: ⚡ Quick win

Update log message to match new variable name.

The cleanup logic now operates on a dispatcher instead of a coordinator client, but the log message still refers to "coordinator client". Update the message for consistency:

 if dispatcher != nil {
 	if err := dispatcher.Cleanup(ctx); err != nil {
-		logger.Warn(ctx, "Failed to cleanup coordinator client", tag.Error(err))
+		logger.Warn(ctx, "Failed to cleanup dispatcher", tag.Error(err))
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/runtime/agent/agent.go` around lines 969 - 973, The log message
refers to "coordinator client" but the code now calls dispatcher.Cleanup; update
the logger.Warn invocation to reference the correct variable (dispatcher) so it
reads something like "Failed to cleanup dispatcher" (keep the same context and
include tag.Error(err)); modify the logger.Warn call that wraps
dispatcher.Cleanup to use the new message.

1561-1570: 💤 Low value

Consider clarifying debug log condition in createDispatcher.

The debug message at line 1564 is only logged when a.registry != nil, which appears intentional (log only when distributed execution might be expected). However, the message "Dispatcher factory is not configured" could be clearer that this is expected behavior in local-only execution scenarios.

Consider either:

  1. Adjusting the log message: "Dispatcher factory not configured; running in local-only mode"
  2. Adding a comment explaining why logging is conditional on registry presence

This would help developers understand when a nil dispatcher is expected vs. a configuration issue.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/runtime/agent/agent.go` around lines 1561 - 1570, In
Agent.createDispatcher, clarify that a nil dispatcher can be expected in
local-only execution by either updating the debug log in the conditional that
currently checks a.registry to read something like "Dispatcher factory not
configured; running in local-only mode" or by adding a one-line comment above
the logger.Debug call explaining that logging is conditional on a.registry to
avoid noisy logs in non-distributed setups; reference the function name
Agent.createDispatcher and the logger.Debug call so you update the message or
add the explanatory comment in that exact location.
internal/cmd/context.go (1)

589-593: 💤 Low value

Unused context parameter in factory closure.

The context.Context parameter at line 590 is declared but never used when creating the dispatcher. If this parameter is required for interface compatibility with DispatcherFactory, consider adding a comment explaining why it's unused. Otherwise, you can use _ to make the intent explicit:

 func (c *Context) RuntimeDispatcherFactory() func(context.Context) runtime.Dispatcher {
-	return func(context.Context) runtime.Dispatcher {
+	return func(_ context.Context) runtime.Dispatcher {
 		return coordinator.NewRuntimeDispatcher(c.ServiceRegistry, c.Config.Core.Peer)
 	}
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/cmd/context.go` around lines 589 - 593, The closure returned by
Context.RuntimeDispatcherFactory declares a context.Context parameter that is
unused; update the closure to either use the blank identifier (func(_
context.Context) runtime.Dispatcher) or add a brief comment above or inside the
closure explaining the parameter is intentionally unused for interface
compatibility with DispatcherFactory, keeping the body that returns
coordinator.NewRuntimeDispatcher(c.ServiceRegistry, c.Config.Core.Peer)
unchanged and referencing the RuntimeDispatcherFactory method and
coordinator.NewRuntimeDispatcher to locate the code.
internal/engine/engine.go (1)

205-209: ⚡ Quick win

Document that the factory may return nil.

The factory function calls coordinator.NewRuntimeDispatcher, which returns nil when serviceRegistry is nil. Callers of this factory should be aware that they may receive a nil dispatcher. Consider adding documentation or an explicit nil check with an error return if a nil dispatcher is not acceptable in this context.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/engine/engine.go` around lines 205 - 209, The
runtimeDispatcherFactory currently returns a func(context.Context)
runtime.Dispatcher that can yield nil because coordinator.NewRuntimeDispatcher
may return nil when e.serviceRegistry is nil; update the factory to perform an
explicit nil check and surface that case instead of silently returning nil:
change runtimeDispatcherFactory to return func(context.Context)
(runtime.Dispatcher, error) (or otherwise return an error/panic) and inside the
returned closure call coordinator.NewRuntimeDispatcher(e.serviceRegistry,
e.cfg.Core.Peer), check the result for nil and return a clear error if nil (and
update all callers of runtimeDispatcherFactory to handle the error), or
alternatively add a short doc comment on runtimeDispatcherFactory that it may
return nil if e.serviceRegistry is nil if you prefer to keep the current
signature.
internal/service/coordinator/client.go (1)

960-1000: 💤 Low value

Clean conversion at adapter boundary.

The implementation correctly converts coordinator proto responses to internal types. The nil check at line 987 is defensive—if attemptCall succeeds, resp should be set. Returning nil, nil when resp == nil despite err == nil handles a protocol error gracefully, though an explicit error might be clearer.

Optional: Return explicit error for nil response
 	if err != nil || resp == nil {
-		return nil, err
+		if err != nil {
+			return nil, err
+		}
+		return nil, fmt.Errorf("coordinator returned nil response")
 	}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/coordinator/client.go` around lines 960 - 1000,
GetDAGRunStatus currently returns (nil, nil) if attemptCall returns no error but
resp is still nil; update the function (GetDAGRunStatus) to treat a nil resp as
an explicit protocol/error case by returning a descriptive error instead of
(nil, nil). Modify the post-attemptCall check that currently does "if err != nil
|| resp == nil { return nil, err }" to return a clear fmt.Errorf (e.g., "nil
response from coordinator" or similar) when resp == nil while preserving
existing error propagation from attemptCall.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@internal/proto/convert/dispatch.go`:
- Around line 20-22: The owner coordinator port validation is incomplete: ensure
you reject impossible port values (e.g. -1, >65535) in both conversion
directions by adding explicit range checks around task.Owner.Port and any
reciprocal conversion helpers in internal/proto/convert/dispatch.go; update the
outbound check (currently guarding int32 cast) to validate 0 <= port <= 65535
and add the same validation on the inbound conversion path (the function that
accepts proto Owner/Port and produces the internal struct) so malformed owner
metadata is rejected early (reference task.Owner.Port and the conversion helper
functions in dispatch.go).

In `@internal/service/coordinator/handler.go`:
- Around line 310-315: When DispatchTaskToProto(claimed.Task) fails inside the
ClaimNext handling branch, ensure you release the claim before returning so the
task becomes available to other pollers; call the existing claim-release path
(the function that releases/unclaims claims in this component—e.g.,
ReleaseClaim/Unclaim/ReturnClaim on the coordinator or handler) with the
claimed.Task (or claimed) and handle any errors, then return the status.Error as
before; locate the ClaimNext handling code surrounding claimed.Task,
req.WorkerId and convert.DispatchTaskToProto to add this release call just prior
to returning on error.

In `@internal/service/coordinator/runtime_dispatcher.go`:
- Around line 17-24: Before returning the dispatcher, validate the TLS/config by
calling cfg.Validate() and handle any validation error the same way
coordinatorClient in engine.go does: call cfg.Validate() after setting fields
(before calling New(registry, cfg)) and propagate or return the error instead of
constructing a dispatcher with an invalid config; reference cfg.Validate() and
the New(registry, cfg) call when making this change.

---

Outside diff comments:
In `@internal/service/scheduler/dag_executor.go`:
- Around line 151-180: The distributed-execution branch in
shouldUseDistributedExecution(...) currently forwards any operation to
executor.CreateTask and dispatchToCoordinator(...) including
DispatchOperationUnspecified (and future unknown values); add the same
validation used by the local branch to reject DispatchOperationUnspecified (and
treat unknown/invalid enum values as errors) before building the task options,
returning an error if the operation is invalid so invalid operations are
rejected consistently and not dispatched to dispatchToCoordinator.

---

Nitpick comments:
In `@internal/cmd/context.go`:
- Around line 589-593: The closure returned by Context.RuntimeDispatcherFactory
declares a context.Context parameter that is unused; update the closure to
either use the blank identifier (func(_ context.Context) runtime.Dispatcher) or
add a brief comment above or inside the closure explaining the parameter is
intentionally unused for interface compatibility with DispatcherFactory, keeping
the body that returns coordinator.NewRuntimeDispatcher(c.ServiceRegistry,
c.Config.Core.Peer) unchanged and referencing the RuntimeDispatcherFactory
method and coordinator.NewRuntimeDispatcher to locate the code.

In `@internal/engine/engine.go`:
- Around line 205-209: The runtimeDispatcherFactory currently returns a
func(context.Context) runtime.Dispatcher that can yield nil because
coordinator.NewRuntimeDispatcher may return nil when e.serviceRegistry is nil;
update the factory to perform an explicit nil check and surface that case
instead of silently returning nil: change runtimeDispatcherFactory to return
func(context.Context) (runtime.Dispatcher, error) (or otherwise return an
error/panic) and inside the returned closure call
coordinator.NewRuntimeDispatcher(e.serviceRegistry, e.cfg.Core.Peer), check the
result for nil and return a clear error if nil (and update all callers of
runtimeDispatcherFactory to handle the error), or alternatively add a short doc
comment on runtimeDispatcherFactory that it may return nil if e.serviceRegistry
is nil if you prefer to keep the current signature.

In `@internal/runtime/agent/agent.go`:
- Around line 969-973: The log message refers to "coordinator client" but the
code now calls dispatcher.Cleanup; update the logger.Warn invocation to
reference the correct variable (dispatcher) so it reads something like "Failed
to cleanup dispatcher" (keep the same context and include tag.Error(err));
modify the logger.Warn call that wraps dispatcher.Cleanup to use the new
message.
- Around line 1561-1570: In Agent.createDispatcher, clarify that a nil
dispatcher can be expected in local-only execution by either updating the debug
log in the conditional that currently checks a.registry to read something like
"Dispatcher factory not configured; running in local-only mode" or by adding a
one-line comment above the logger.Debug call explaining that logging is
conditional on a.registry to avoid noisy logs in non-distributed setups;
reference the function name Agent.createDispatcher and the logger.Debug call so
you update the message or add the explanatory comment in that exact location.

In `@internal/service/coordinator/client.go`:
- Around line 960-1000: GetDAGRunStatus currently returns (nil, nil) if
attemptCall returns no error but resp is still nil; update the function
(GetDAGRunStatus) to treat a nil resp as an explicit protocol/error case by
returning a descriptive error instead of (nil, nil). Modify the post-attemptCall
check that currently does "if err != nil || resp == nil { return nil, err }" to
return a clear fmt.Errorf (e.g., "nil response from coordinator" or similar)
when resp == nil while preserving existing error propagation from attemptCall.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: e4a1ed45-49db-41ce-bf6c-3683f1e7d4f5

📥 Commits

Reviewing files that changed from the base of the PR and between 683ee5b and b36ee6a.

📒 Files selected for processing (59)
  • internal/cmd/context.go
  • internal/cmd/dry.go
  • internal/cmd/progress_remote.go
  • internal/cmd/restart.go
  • internal/cmd/retry.go
  • internal/cmd/start.go
  • internal/cmn/telemetry/collector_test.go
  • internal/core/exec/context.go
  • internal/core/exec/dispatch.go
  • internal/core/exec/dispatch_test.go
  • internal/core/exec/distributed.go
  • internal/engine/engine.go
  • internal/engine/run.go
  • internal/launcher/launcher.go
  • internal/launcher/launcher_test.go
  • internal/persis/store/distributed_dispatch.go
  • internal/persis/store/distributed_test.go
  • internal/proto/convert/dispatch.go
  • internal/proto/convert/dispatch_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/distributed_stale_run.go
  • internal/runtime/executor/dag_runner.go
  • internal/runtime/executor/dag_runner_test.go
  • internal/runtime/executor/task.go
  • internal/runtime/executor/task_test.go
  • internal/runtime/reporter.go
  • internal/service/coordinator/client.go
  • internal/service/coordinator/client_test.go
  • internal/service/coordinator/handler.go
  • internal/service/coordinator/handler_test.go
  • internal/service/coordinator/runtime_dispatcher.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_edit_retry.go
  • internal/service/frontend/api/v1/dagruns_edit_retry_internal_test.go
  • internal/service/frontend/api/v1/dagruns_retry_internal_test.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/metrics_test.go
  • internal/service/frontend/api/v1/proc_liveness_test.go
  • internal/service/frontend/api/v1/workers_internal_test.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/scheduler/dag_executor_test.go
  • internal/service/scheduler/queue_dispatcher.go
  • internal/service/scheduler/queue_dispatcher_test.go
  • internal/service/scheduler/queue_processor_startup_test.go
  • internal/service/scheduler/queue_processor_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/worker/coordreport/artifact_uploader.go
  • internal/service/worker/coordreport/artifact_uploader_test.go
  • internal/service/worker/coordreport/export_test.go
  • internal/service/worker/coordreport/log_streamer.go
  • internal/service/worker/coordreport/log_streamer_test.go
  • internal/service/worker/coordreport/status_pusher.go
  • internal/service/worker/coordreport/status_pusher_test.go
  • internal/service/worker/handler.go
  • internal/service/worker/handler_test.go
  • internal/service/worker/poller_test.go
  • internal/service/worker/remote_handler.go
  • internal/service/worker/remote_handler_test.go
  • internal/test/helper.go
💤 Files with no reviewable changes (1)
  • internal/core/exec/context.go

Comment thread internal/proto/convert/dispatch.go Outdated
Comment on lines +20 to +22
if task.Owner.Port < math.MinInt32 || task.Owner.Port > math.MaxInt32 {
return nil, fmt.Errorf("owner coordinator port out of range: %d", task.Owner.Port)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Reject impossible coordinator ports in both directions.

These helpers currently accept values like -1 or 70000: the outbound path only guards the int32 cast, and the inbound path does no validation at all. That lets malformed owner metadata survive conversion and fail later when the worker tries to ack/report back to the owner coordinator.

Proposed fix
+const maxCoordinatorPort = 65535
+
 func DispatchTaskToProto(task *exec.DispatchTask) (*coordinatorv1.Task, error) {
 	if task == nil {
 		return nil, nil
 	}
-	if task.Owner.Port < math.MinInt32 || task.Owner.Port > math.MaxInt32 {
+	if task.Owner.Port <= 0 || task.Owner.Port > maxCoordinatorPort {
 		return nil, fmt.Errorf("owner coordinator port out of range: %d", task.Owner.Port)
 	}
@@
 func ProtoToDispatchTask(task *coordinatorv1.Task) (*exec.DispatchTask, error) {
 	if task == nil {
 		return nil, nil
 	}
+	if task.OwnerCoordinatorPort <= 0 || task.OwnerCoordinatorPort > maxCoordinatorPort {
+		return nil, fmt.Errorf("owner coordinator port out of range: %d", task.OwnerCoordinatorPort)
+	}
 
 	dispatchTask := &exec.DispatchTask{

Also applies to: 99-103

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/proto/convert/dispatch.go` around lines 20 - 22, The owner
coordinator port validation is incomplete: ensure you reject impossible port
values (e.g. -1, >65535) in both conversion directions by adding explicit range
checks around task.Owner.Port and any reciprocal conversion helpers in
internal/proto/convert/dispatch.go; update the outbound check (currently
guarding int32 cast) to validate 0 <= port <= 65535 and add the same validation
on the inbound conversion path (the function that accepts proto Owner/Port and
produces the internal struct) so malformed owner metadata is rejected early
(reference task.Owner.Port and the conversion helper functions in dispatch.go).

Comment on lines 310 to +315
if claimed != nil && claimed.Task != nil {
claimed.Task.WorkerId = req.WorkerId
return &coordinatorv1.PollResponse{Task: claimed.Task}, nil
claimed.Task.WorkerID = req.WorkerId
task, err := convert.DispatchTaskToProto(claimed.Task)
if err != nil {
return nil, status.Error(codes.Internal, "failed to encode claimed task: "+err.Error())
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Release the claim if encoding the claimed task fails.

ClaimNext has already made this task unavailable to other pollers. Returning here on DispatchTaskToProto error leaves the claim stranded until its TTL expires, so one malformed record can keep surfacing as an internal error and stall queue progress.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/coordinator/handler.go` around lines 310 - 315, When
DispatchTaskToProto(claimed.Task) fails inside the ClaimNext handling branch,
ensure you release the claim before returning so the task becomes available to
other pollers; call the existing claim-release path (the function that
releases/unclaims claims in this component—e.g.,
ReleaseClaim/Unclaim/ReturnClaim on the coordinator or handler) with the
claimed.Task (or claimed) and handle any errors, then return the status.Error as
before; locate the ClaimNext handling code surrounding claimed.Task,
req.WorkerId and convert.DispatchTaskToProto to add this release call just prior
to returning on error.

Comment on lines +17 to +24
cfg := DefaultConfig()
cfg.MaxRetries = 50
cfg.CAFile = peerConfig.ClientCaFile
cfg.CertFile = peerConfig.CertFile
cfg.KeyFile = peerConfig.KeyFile
cfg.SkipTLSVerify = peerConfig.SkipTLSVerify
cfg.Insecure = peerConfig.Insecure
return New(registry, cfg)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Add config validation before returning the dispatcher.

The coordinatorClient method in engine.go (line 195) validates the coordinator config by calling cfg.Validate(). This NewRuntimeDispatcher function should do the same to catch invalid TLS configurations early.

🔒 Proposed fix to add validation
 	cfg.CertFile = peerConfig.CertFile
 	cfg.KeyFile = peerConfig.KeyFile
 	cfg.SkipTLSVerify = peerConfig.SkipTLSVerify
 	cfg.Insecure = peerConfig.Insecure
+	if err := cfg.Validate(); err != nil {
+		return nil
+	}
 	return New(registry, cfg)
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cfg := DefaultConfig()
cfg.MaxRetries = 50
cfg.CAFile = peerConfig.ClientCaFile
cfg.CertFile = peerConfig.CertFile
cfg.KeyFile = peerConfig.KeyFile
cfg.SkipTLSVerify = peerConfig.SkipTLSVerify
cfg.Insecure = peerConfig.Insecure
return New(registry, cfg)
cfg.CertFile = peerConfig.CertFile
cfg.KeyFile = peerConfig.KeyFile
cfg.SkipTLSVerify = peerConfig.SkipTLSVerify
cfg.Insecure = peerConfig.Insecure
if err := cfg.Validate(); err != nil {
return nil, err
}
return New(registry, cfg)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/coordinator/runtime_dispatcher.go` around lines 17 - 24,
Before returning the dispatcher, validate the TLS/config by calling
cfg.Validate() and handle any validation error the same way coordinatorClient in
engine.go does: call cfg.Validate() after setting fields (before calling
New(registry, cfg)) and propagate or return the error instead of constructing a
dispatcher with an invalid config; reference cfg.Validate() and the
New(registry, cfg) call when making this change.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 59 files

Partial review: This PR has more than 50 files, so cubic reviewed the highest-priority files first. During the trial, paid plans get a higher file limit.
You can try an ultrareview to bypass the file limit, comment @cubic-dev-ai ultrareview. Learn more.

Fix all with cubic | Re-trigger cubic

Comment thread internal/service/coordinator/runtime_dispatcher.go
@yohamta0 yohamta0 force-pushed the refactor/runtime-reporter-port branch from b36ee6a to f64be5c Compare May 31, 2026 14:06
@yohamta0 yohamta0 force-pushed the refactor/runtime-reporter-port branch from f64be5c to b7308bd Compare May 31, 2026 14:09
@yohamta0 yohamta0 force-pushed the refactor/runtime-reporter-port branch from 88fdb07 to 9001832 Compare June 1, 2026 04:58
@yohamta0 yohamta0 merged commit 90e344e into main Jun 1, 2026
11 checks passed
@yohamta0 yohamta0 deleted the refactor/runtime-reporter-port branch June 1, 2026 05:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant