Skip to content

refactor: separate dispatch and launcher from engine core#2233

Merged
yohamta0 merged 7 commits into
mainfrom
refactor/dispatch-out-of-core
May 31, 2026
Merged

refactor: separate dispatch and launcher from engine core#2233
yohamta0 merged 7 commits into
mainfrom
refactor/dispatch-out-of-core

Conversation

@yohamta0
Copy link
Copy Markdown
Collaborator

@yohamta0 yohamta0 commented May 31, 2026

Summary

  • move coordinator dispatch policy out of core into internal/dispatch
  • move subprocess launcher construction/execution out of runtime into internal/launcher and repoint callers
  • keep dotenv load warnings in core as BuildWarnings while logging them from Dagu-side boundaries

Testing

  • go test -count=1 ./internal/dagwarning ./internal/core/... ./internal/cmd ./internal/runtime/agent ./internal/service/scheduler ./internal/service/worker ./internal/service/frontend/api/v1

Summary by cubic

Separated control-plane policy and subprocess launching from the engine core to reduce coupling, and tightened dotenv warning handling so core records warnings while callers log them. Env rebuilds now surface dotenv warnings via ResolveEnvWithWarnings without mutating DAG state.

  • Refactors

    • Moved dispatch policy to internal/dispatch and repointed all call sites; converted test to a black-box test.
    • Extracted the subprocess launcher to internal/launcher; repointed API, scheduler, worker, manager, and tests. Moved panic-recovery helper into internal/launcher and removed the duplicate in manager.
    • Routed dotenv loader errors to DAG.BuildWarnings in core; added internal/dagwarning to log warnings outside core. Introduced ResolveEnvWithWarnings and made ResolveEnv a thin wrapper. Updated API/scheduler/worker/agent/CLI restore paths to log warnings at boundaries. No migration.
  • Bug Fixes

    • Malformed .env files now produce build warnings that are captured and logged by callers.
    • Ensured env rebuilds do not mutate DAG.Env or DAG.BuildWarnings backing slices; added tests for slice isolation and warning propagation.

Written for commit feddad4. Summary will update on new commits.

Review in cubic

Summary by CodeRabbit

Release Notes

  • Bug Fixes

    • Improved error handling for malformed .env files with warning messages during DAG builds.
    • Enhanced panic recovery in subprocess execution with better error reporting.
  • Refactor

    • Reorganized internal execution and dispatch logic for improved maintainability.

yohamta0 and others added 6 commits May 30, 2026 17:17
ShouldDispatchToCoordinator is control-plane policy: it combines DAG
fields with runtime configuration to choose between local and
coordinator-dispatched execution. Living in core (the workflow language
types), it forced core to import cmn/config, the only such dependency in
the language layer.

Relocate it to a dedicated leaf package, internal/dispatch, importing
only core and cmn/config. Repoint the six call sites across cmd, the
frontend API, and the scheduler, and move its test to a black-box
package. The core package no longer depends on cmn/config.

Co-Authored-By: Claude <noreply@anthropic.com>
The dotenv loader logged eval/read failures through the platform logger
and emitted a success Info line. Append failures to the existing
DAG.BuildWarnings field instead and drop the success log, removing core's
dependency on cmn/logger.

Behavior preserved: load failures still surface to callers through the
BuildWarnings field the build result already carries. No change to
on-disk formats, CLI, API, or configuration; no migration required.

Co-Authored-By: Claude <noreply@anthropic.com>
Move the dagu-subprocess launcher (SubCmdBuilder, CmdSpec,
Run/Start/StartProcess and helpers) out of package runtime into a
dedicated internal/launcher package, so callers can depend on the
launcher without importing the execution engine. internal/launcher
carries its own panic-recovery helper and imports nothing from
internal/runtime, so there is no import cycle.

runtime/subcmd.go now re-exports the launcher symbols as type aliases
and variables, so every existing caller compiles unchanged; call sites
migrate to internal/launcher in follow-up commits before the shim is
removed. The now-unused execWithRecovery copy and its runtime/debug
import are dropped from manager.go.

Behavior preserved: the launcher code moved verbatim, dagu subprocess
argument construction is unchanged, and there is no change to on-disk
formats, CLI, API, or configuration. No migration required.

Co-Authored-By: Claude <noreply@anthropic.com>
…shim

Migrate every caller off the runtime re-export shim to internal/launcher
directly (frontend, scheduler, worker, the runtime Manager, and test
helpers) and delete runtime/subcmd.go. The launcher's own tests move to
internal/launcher as launcher_test.go.

internal/runtime no longer exposes the subprocess launcher, so callers
depend on internal/launcher without pulling the execution engine. The
worker, scheduler, and frontend dags paths that used only the launcher
drop their internal/runtime import entirely.

Behavior preserved: pure import repointing with no logic or argument
changes; no on-disk, CLI, API, or configuration changes; no migration.
Build, full test compilation, and all touched packages' tests pass.

Co-Authored-By: Claude <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 31, 2026

Review Change Stack

📝 Walkthrough

Walkthrough

This PR refactors subprocess execution and distributed-coordination infrastructure. The changes extract dispatch logic into a new package, introduce DAG build warning collection, support environment resolution with warnings, and migrate the execution layer from a runtime package to a new launcher package with panic-recovery support.

Changes

Refactor Dispatch and Launcher Infrastructure

Layer / File(s) Summary
Dispatch Decision Extraction
internal/dispatch/dispatch.go, internal/dispatch/dispatch_test.go, internal/cmd/start.go
ShouldDispatchToCoordinator logic is extracted from internal/core/dispatch.go into a new internal/dispatch package. The test is moved to dispatch_test and all call sites are updated to use the new dispatch package.
DAG Build Warning Collection
internal/core/dag.go, internal/dagwarning/dagwarning.go, internal/cmd/helper.go, internal/runtime/agent/agent.go, internal/core/spec/builder_test.go
Dotenv loading errors in dag.go now append to DAG.BuildWarnings instead of logging directly. A new dagwarning package provides LoadDotEnv (which wraps DAG dotenv loading and logs captured warnings) and Log helpers. All dotenv-loading call sites are updated to use dagwarning.LoadDotEnv.
Environment Resolution with Warnings
internal/core/spec/runtime_env.go, internal/core/spec/runtime_env_external_test.go
ResolveEnvResult struct carries both resolved environment and build warnings. New ResolveEnvWithWarnings helper captures warnings during environment rebuild via cloned.LoadDotEnv. ResolveEnv delegates to the new helper while preserving external behavior. Tests verify warning capture from malformed dotenv files.
Launcher Package Foundation with Panic Recovery
internal/launcher/launcher.go, internal/runtime/manager.go
internal/launcher/launcher.go package declaration is formalized; execWithRecovery helper is added for panic-safe execution with structured error logging and stack-trace capture. The panic-recovery logic is moved from internal/runtime/manager.go to launcher.
Launcher Test Suite Migration
internal/launcher/launcher_test.go
Test file migrates from runtime_test to launcher_test package. All command-spec builders and execution calls (NewSubCmdBuilder, Start, Enqueue, Restart, Retry, task operations) are updated from runtime.* to launcher.* equivalents across all test sections.
Frontend API Launcher Integration
internal/service/frontend/api/v1/api.go, internal/service/frontend/api/v1/dagruns.go, internal/service/frontend/api/v1/dagruns_edit_retry.go, internal/service/frontend/api/v1/dags.go, internal/service/frontend/api/v1/dags_start_internal_test.go
API handlers switch from runtime to launcher: subCmdBuilder field type and initialization use launcher.NewSubCmdBuilder. Dispatch decisions use dispatch.ShouldDispatchToCoordinator. DAG run startup and enqueue operations use launcher.Start, launcher.StartProcess, and launcher.Run. Environment resolution uses ResolveEnvWithWarnings with warning logging via dagwarning.Log. StartResult types are updated from runtime.StartResult to launcher.StartResult.
Scheduler and Executor Launcher Integration
internal/service/scheduler/dag_executor.go, internal/service/scheduler/queue_processor_test.go, internal/service/scheduler/scheduler.go
DAGExecutor switches subCmdBuilder to launcher.SubCmdBuilder. Distributed operation handling (enqueue, start, retry, restart) uses launcher builders and executors. shouldUseDistributedExecution delegates to dispatch.ShouldDispatchToCoordinator. Environment preparation uses ResolveEnvWithWarnings with dagwarning warning logging.
Worker Handler Launcher Integration
internal/service/worker/handler.go, internal/service/worker/handler_test.go
Task handler builds launcher.CmdSpec and executes via launcher.Run. Environment resolution uses ResolveEnvWithWarnings with dagwarning.Log for warning output. taskHandler.subCmdBuilder is created via launcher.NewSubCmdBuilder. Tests update DAG run startup to use launcher.Start.
Test Support and Integration Updates
internal/test/helper.go, internal/runtime/agent/agent_test.go, internal/runtime/manager_test.go, internal/intg/distr/fixtures_test.go
Test helper updates to expose launcher.SubCmdBuilder and initialize via launcher.NewSubCmdBuilder. Integration tests, agent tests, manager tests, and distribution fixtures all switch builder and executor calls from runtime to launcher equivalents.

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 3 | ❌ 2

❌ Failed checks (1 warning, 1 inconclusive)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
Description check ❓ Inconclusive The PR description provides a clear summary of changes and testing approach, but lacks structured sections matching the template. Reorganize the description to match the template with explicit sections: Summary, Changes (bulleted list), Related Issues, and Checklist. The auto-generated content can remain but should be formatted per template.
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately summarizes the main refactoring work: separating dispatch and launcher components from the core engine.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch refactor/dispatch-out-of-core

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No issues found across 28 files

Re-trigger cubic

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
internal/core/spec/runtime_env.go (1)

72-80: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Deep-copy mutable slices before loading dotenv.

dag.Clone() does not clone Env or BuildWarnings, so cloned.LoadDotEnv(ctx) can append into the original DAG's backing arrays when they still have spare capacity. That makes ResolveEnvWithWarnings unexpectedly mutate the caller's snapshot and leak warnings/env between retries.

Suggested fix
  cloned := dag.Clone()
  if hasRuntimeParams(params) {
  		// Recompute DAG/base-config env entries for the new runtime params instead
  		// of short-circuiting to whatever happened to be on the current snapshot.
  		cloned.Env = nil
+	} else {
+		cloned.Env = append([]string(nil), cloned.Env...)
  	}
+	cloned.BuildWarnings = append([]string(nil), cloned.BuildWarnings...)
  warningStart := len(cloned.BuildWarnings)
  cloned.LoadDotEnv(ctx)
  buildWarnings := append([]string{}, cloned.BuildWarnings[warningStart:]...)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/core/spec/runtime_env.go` around lines 72 - 80, The cloned DAG
returned by dag.Clone() shares backing arrays for Env and BuildWarnings, so
before calling cloned.LoadDotEnv(ctx) (e.g., inside ResolveEnvWithWarnings where
hasRuntimeParams(params) is checked) deep-copy cloned.Env and
cloned.BuildWarnings into new slices (use make + copy) when they are non-nil so
LoadDotEnv cannot append into the original DAG's backing arrays; ensure you
replace cloned.Env and cloned.BuildWarnings with these newly allocated copies
prior to calling cloned.LoadDotEnv(ctx).
🧹 Nitpick comments (2)
internal/launcher/launcher.go (1)

602-606: ⚡ Quick win

Align panic log field keys with the existing recovery helper.

The equivalent helper in internal/runtime/agent/agent.go (logRecoveredPanic) emits this same "Recovered from panic" event with keys errType and stackTrace, whereas this one uses err-type and stack-trace. Logging the same event under different field names across packages fragments log queries/dashboards. Consider standardizing.

♻️ Suggested alignment
 			logger.Error(ctx, "Recovered from panic",
 				slog.String("err", err.Error()),
-				slog.String("err-type", fmt.Sprintf("%T", panicObj)),
-				slog.String("stack-trace", string(stack)),
+				slog.String("errType", fmt.Sprintf("%T", panicObj)),
+				slog.String("stackTrace", string(stack)),
 			)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/launcher/launcher.go` around lines 602 - 606, The panic recovery log
in the logger.Error call inside launcher.go uses kebab-case keys ("err-type",
"stack-trace") which diverges from the helper logRecoveredPanic in
internal/runtime/agent/agent.go that emits "errType" and "stackTrace"; update
the field keys in the logger.Error invocation (the call that logs "Recovered
from panic" in the recovery handler) to use errType and stackTrace (match exact
casing) so dashboards/queries remain consistent with logRecoveredPanic.
internal/service/scheduler/dag_executor.go (1)

281-287: ⚡ Quick win

Tag warning logs before this helper emits them.

dagwarning.Log now runs inside prepareDAGForSubprocess, but HandleJob adds tag.DAG and tag.RunID to the context only after calling this helper. In the distributed enqueue path, dotenv/build warnings will therefore be logged without the identifiers needed to correlate them to a specific DAG run.

Suggested change
 func (e *DAGExecutor) HandleJob(
 	ctx context.Context,
 	dag *core.DAG,
 	operation coordinatorv1.Operation,
 	runID string,
 	triggerType core.TriggerType,
 	scheduleTime time.Time,
 ) error {
 	// For distributed execution with START operation, enqueue for persistence
 	if e.shouldUseDistributedExecution(dag) && operation == coordinatorv1.Operation_OPERATION_START {
+		ctx = logger.WithValues(ctx,
+			tag.DAG(dag.Name),
+			tag.RunID(runID),
+		)
 		dag, err := e.prepareDAGForSubprocess(ctx, dag, "")
 		if err != nil {
 			return fmt.Errorf("failed to prepare DAG env for enqueue: %w", err)
 		}
-		ctx = logger.WithValues(ctx,
-			tag.DAG(dag.Name),
-			tag.RunID(runID),
-		)
 
 		logger.Info(ctx, "Enqueueing DAG for distributed execution",
 			slog.Any("worker-selector", dag.WorkerSelector),
 		)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@internal/service/scheduler/dag_executor.go` around lines 281 - 287, The
dotenv/build warnings are being logged by dagwarning.Log inside
prepareDAGForSubprocess before HandleJob attaches tag.DAG and tag.RunID to the
context, so add the DAG/run tags to the context before those warnings are
emitted: either (A) move the tagging in HandleJob (where tag.DAG and tag.RunID
are set) to occur prior to calling
prepareDAGForSubprocess/ResolveEnvWithWarnings, or (B) modify
prepareDAGForSubprocess (or the call site around
spec.ResolveEnvWithWarnings/result.BuildWarnings) to create a new ctxWithTags =
tag.DAG/ tag.RunID added to ctx and pass that ctxWithTags into
spec.ResolveEnvWithWarnings and into dagwarning.Log so all emitted warnings
include the DAG and RunID identifiers.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@internal/core/spec/runtime_env.go`:
- Around line 72-80: The cloned DAG returned by dag.Clone() shares backing
arrays for Env and BuildWarnings, so before calling cloned.LoadDotEnv(ctx)
(e.g., inside ResolveEnvWithWarnings where hasRuntimeParams(params) is checked)
deep-copy cloned.Env and cloned.BuildWarnings into new slices (use make + copy)
when they are non-nil so LoadDotEnv cannot append into the original DAG's
backing arrays; ensure you replace cloned.Env and cloned.BuildWarnings with
these newly allocated copies prior to calling cloned.LoadDotEnv(ctx).

---

Nitpick comments:
In `@internal/launcher/launcher.go`:
- Around line 602-606: The panic recovery log in the logger.Error call inside
launcher.go uses kebab-case keys ("err-type", "stack-trace") which diverges from
the helper logRecoveredPanic in internal/runtime/agent/agent.go that emits
"errType" and "stackTrace"; update the field keys in the logger.Error invocation
(the call that logs "Recovered from panic" in the recovery handler) to use
errType and stackTrace (match exact casing) so dashboards/queries remain
consistent with logRecoveredPanic.

In `@internal/service/scheduler/dag_executor.go`:
- Around line 281-287: The dotenv/build warnings are being logged by
dagwarning.Log inside prepareDAGForSubprocess before HandleJob attaches tag.DAG
and tag.RunID to the context, so add the DAG/run tags to the context before
those warnings are emitted: either (A) move the tagging in HandleJob (where
tag.DAG and tag.RunID are set) to occur prior to calling
prepareDAGForSubprocess/ResolveEnvWithWarnings, or (B) modify
prepareDAGForSubprocess (or the call site around
spec.ResolveEnvWithWarnings/result.BuildWarnings) to create a new ctxWithTags =
tag.DAG/ tag.RunID added to ctx and pass that ctxWithTags into
spec.ResolveEnvWithWarnings and into dagwarning.Log so all emitted warnings
include the DAG and RunID identifiers.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 80bd8205-6d94-4933-a546-1e0d22b8c8b8

📥 Commits

Reviewing files that changed from the base of the PR and between e8ba3ef and 84a72dd.

📒 Files selected for processing (28)
  • internal/cmd/helper.go
  • internal/cmd/start.go
  • internal/core/dag.go
  • internal/core/dispatch.go
  • internal/core/spec/builder_test.go
  • internal/core/spec/runtime_env.go
  • internal/core/spec/runtime_env_external_test.go
  • internal/dagwarning/dagwarning.go
  • internal/dispatch/dispatch.go
  • internal/dispatch/dispatch_test.go
  • internal/intg/distr/fixtures_test.go
  • internal/launcher/launcher.go
  • internal/launcher/launcher_test.go
  • internal/runtime/agent/agent.go
  • internal/runtime/agent/agent_test.go
  • internal/runtime/manager.go
  • internal/runtime/manager_test.go
  • internal/service/frontend/api/v1/api.go
  • internal/service/frontend/api/v1/dagruns.go
  • internal/service/frontend/api/v1/dagruns_edit_retry.go
  • internal/service/frontend/api/v1/dags.go
  • internal/service/frontend/api/v1/dags_start_internal_test.go
  • internal/service/scheduler/dag_executor.go
  • internal/service/scheduler/queue_processor_test.go
  • internal/service/scheduler/scheduler.go
  • internal/service/worker/handler.go
  • internal/service/worker/handler_test.go
  • internal/test/helper.go
💤 Files with no reviewable changes (1)
  • internal/core/dispatch.go

@yohamta0 yohamta0 merged commit 683ee5b into main May 31, 2026
11 checks passed
@yohamta0 yohamta0 deleted the refactor/dispatch-out-of-core branch May 31, 2026 07:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant