feat: add coordinator and worker health endpoints#1802
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
📝 WalkthroughWalkthroughThis PR introduces HTTP health check servers for Coordinator and Worker services with configurable ports (default 8091 and 8092 respectively), where setting port to 0 disables the health server. New HTTP Changes
Sequence Diagram(s)sequenceDiagram
participant Client as External Client<br/>(Probe/Monitor)
participant Svc as Coordinator/<br/>Worker Service
participant HTTP as HTTP Health<br/>Server
participant gRPC as gRPC Health<br/>Server
rect rgba(100, 150, 200, 0.5)
Note over Client,gRPC: Health Check Flow
Client->>HTTP: GET /health
HTTP->>Svc: Check service status
Svc->>gRPC: Query gRPC health state
gRPC-->>Svc: SERVING status
Svc-->>HTTP: Healthy status
HTTP-->>Client: 200 OK {"status":"healthy"}
end
rect rgba(150, 100, 150, 0.5)
Note over Svc,HTTP: Lifecycle Management
Svc->>HTTP: Start (on service startup)
HTTP->>HTTP: Listen on port
HTTP-->>Svc: Ready
Svc->>gRPC: Set SERVING
Note over Svc,HTTP: ... service running ...
Svc->>HTTP: Stop (on service shutdown)
HTTP->>HTTP: Graceful shutdown
HTTP-->>Svc: Stopped
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (4)
internal/service/healthcheck/server.go (1)
72-125: Consider usingerrors.Is()for error comparison.Line 116 compares errors directly with
!=. While this works forhttp.ErrServerClosed, usingerrors.Is()is the idiomatic Go approach that handles wrapped errors correctly.♻️ Suggested improvement
- if err := server.Serve(listener); err != nil && err != http.ErrServerClosed { + if err := server.Serve(listener); err != nil && !errors.Is(err, http.ErrServerClosed) {You'll also need to add
"errors"to the imports.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/healthcheck/server.go` around lines 72 - 125, The health server goroutine in Start uses a direct error comparison (err != http.ErrServerClosed) which fails for wrapped errors; update the check inside the anonymous goroutine in Start (referencing server.Serve) to use errors.Is(err, http.ErrServerClosed) instead of !=, and add "errors" to the imports so the code compiles and correctly handles wrapped errors when logging in the Serve error branch.internal/service/worker/worker.go (1)
214-219: Consider stopping the health server earlier in the shutdown sequence.Currently, the health server is stopped last, after the PostgreSQL pool manager and coordinator client cleanup. For Kubernetes deployments, stopping the health server first would cause probes to fail immediately, signaling that the pod is no longer ready to receive traffic while other cleanup proceeds.
This is a minor operational consideration and not a blocking issue.
🔧 Suggested reordering (optional)
func (w *Worker) Stop(ctx context.Context) error { var err error w.stopOnce.Do(func() { logger.Info(ctx, "Worker stopping", tag.WorkerID(w.id)) + // Stop health server first to signal unavailability to probes + if w.healthServer != nil { + if stopErr := w.healthServer.Stop(ctx); stopErr != nil && err == nil { + err = fmt.Errorf("failed to stop worker health check server: %w", stopErr) + } + } + // Cancel the internal context to signal all goroutines to stop if w.stopCancel != nil { w.stopCancel() } // ... rest of cleanup ... - - if w.healthServer != nil { - if stopErr := w.healthServer.Stop(ctx); stopErr != nil && err == nil { - err = fmt.Errorf("failed to stop worker health check server: %w", stopErr) - } - } })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/worker/worker.go` around lines 214 - 219, Move the health-server stop earlier in the worker shutdown sequence so readiness probes fail immediately: in the worker shutdown function (the method that calls w.healthServer.Stop currently after Postgres pool and coordinator cleanup), invoke w.healthServer.Stop(ctx) near the start of the teardown (before cleaning up the Postgres pool manager and coordinator client) and keep the existing error handling (assigning to err when stopErr != nil && err == nil) to preserve semantics.internal/service/coordinator/service.go (1)
69-72: Thread-safety consideration forDisableHealthServer().This method sets a flag without synchronization. The current usage in
startall.go(called immediately after construction, beforeStart()) is safe. However, if this method were ever called concurrently withStart(), it would create a data race ondisableHealthServer.Consider documenting that this must be called before
Start(), or adding mutex protection for defensive coding:🛡️ Optional: Add documentation or mutex protection
Option 1: Document the constraint
// DisableHealthServer disables the dedicated HTTP health check server. +// Must be called before Start(). func (srv *Service) DisableHealthServer() { srv.disableHealthServer = true }Option 2: Add mutex protection
func (srv *Service) DisableHealthServer() { + srv.mu.Lock() + defer srv.mu.Unlock() srv.disableHealthServer = true }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/coordinator/service.go` around lines 69 - 72, The DisableHealthServer method mutates srv.disableHealthServer without synchronization which can race with Start; either document the usage constraint or make the setter thread-safe. Locate the Service type and the DisableHealthServer method and either add a comment on DisableHealthServer stating it must be called before Start() (and reference Start in the comment), or protect access to disableHealthServer with the Service's mutex (e.g., lock/unlock around reads/writes where Start reads it and in DisableHealthServer) so concurrent calls are safe; ensure both setter and any readers (Start) use the same synchronization primitive.internal/service/worker/health_test.go (1)
84-134: Well-designed test helpers.The polling-based helpers using
require.Eventuallyare appropriate for testing async server startup/shutdown. The timeouts (5s) and poll intervals (10ms) are reasonable.Note: These helpers are duplicated in
internal/service/coordinator/health_test.go. If the test suite grows, consider extracting them to a shared test utility package, but this is fine for now.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@internal/service/worker/health_test.go` around lines 84 - 134, The three duplicated test helper functions (requireWorkerHealthServerURL, requireHealthyWorkerHealth, requireWorkerHealthStopped) should be extracted from internal/service/worker/health_test.go into a shared test utility package (e.g., internal/testutil or internal/test/helpers); move the functions there preserving their signatures and behavior, update internal/service/worker/health_test.go and internal/service/coordinator/health_test.go to import that package and call the helpers, and ensure any test imports (testing, healthcheck, http, time, json, require) are available or re-exported as needed so the tests compile and run unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@charts/dagu/templates/coordinator-deployment.yaml`:
- Around line 43-45: The template is rendering a containerPort and HTTP probes
even when coordinator.healthPort is 0 (which disables the health server) causing
invalid manifests; update coordinator-deployment.yaml to conditionally render
the health containerPort block and the livenessProbe/readinessProbe httpGet
entries only when .Values.coordinator.healthPort is non‑zero (e.g. guard the "-
name: health / containerPort: {{ .Values.coordinator.healthPort }}" block and
the livenessProbe/readinessProbe httpGet path/port with an if check on
.Values.coordinator.healthPort), and apply the same conditional guarding in
worker-deployment.yaml for the worker health port and its probes so ports/probes
are omitted when healthPort == 0.
In `@charts/dagu/templates/worker-deployment.yaml`:
- Around line 51-54: The template currently always renders the health
containerPort and the /health probes even when worker.healthPort is set to 0;
update the Helm template in worker-deployment.yaml so the health port entry (the
ports: - name: health containerPort block) and the liveness/readiness probe
blocks are conditionally rendered only when $.Values.worker.healthPort != 0
(i.e., skip emitting the health port and the probe definitions when
worker.healthPort is 0), ensuring the container spec and Probe fields are
omitted when the health server is disabled.
---
Nitpick comments:
In `@internal/service/coordinator/service.go`:
- Around line 69-72: The DisableHealthServer method mutates
srv.disableHealthServer without synchronization which can race with Start;
either document the usage constraint or make the setter thread-safe. Locate the
Service type and the DisableHealthServer method and either add a comment on
DisableHealthServer stating it must be called before Start() (and reference
Start in the comment), or protect access to disableHealthServer with the
Service's mutex (e.g., lock/unlock around reads/writes where Start reads it and
in DisableHealthServer) so concurrent calls are safe; ensure both setter and any
readers (Start) use the same synchronization primitive.
In `@internal/service/healthcheck/server.go`:
- Around line 72-125: The health server goroutine in Start uses a direct error
comparison (err != http.ErrServerClosed) which fails for wrapped errors; update
the check inside the anonymous goroutine in Start (referencing server.Serve) to
use errors.Is(err, http.ErrServerClosed) instead of !=, and add "errors" to the
imports so the code compiles and correctly handles wrapped errors when logging
in the Serve error branch.
In `@internal/service/worker/health_test.go`:
- Around line 84-134: The three duplicated test helper functions
(requireWorkerHealthServerURL, requireHealthyWorkerHealth,
requireWorkerHealthStopped) should be extracted from
internal/service/worker/health_test.go into a shared test utility package (e.g.,
internal/testutil or internal/test/helpers); move the functions there preserving
their signatures and behavior, update internal/service/worker/health_test.go and
internal/service/coordinator/health_test.go to import that package and call the
helpers, and ensure any test imports (testing, healthcheck, http, time, json,
require) are available or re-exported as needed so the tests compile and run
unchanged.
In `@internal/service/worker/worker.go`:
- Around line 214-219: Move the health-server stop earlier in the worker
shutdown sequence so readiness probes fail immediately: in the worker shutdown
function (the method that calls w.healthServer.Stop currently after Postgres
pool and coordinator cleanup), invoke w.healthServer.Stop(ctx) near the start of
the teardown (before cleaning up the Postgres pool manager and coordinator
client) and keep the existing error handling (assigning to err when stopErr !=
nil && err == nil) to preserve semantics.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 2bc6e523-b342-4fa2-9474-e5d451953cd7
📒 Files selected for processing (27)
README.mdcharts/dagu/README.mdcharts/dagu/templates/configmap.yamlcharts/dagu/templates/coordinator-deployment.yamlcharts/dagu/templates/worker-deployment.yamlcharts/dagu/values.yamlinternal/cmd/coord.gointernal/cmd/coord_test.gointernal/cmd/flags.gointernal/cmd/startall.gointernal/cmd/worker.gointernal/cmd/worker_test.gointernal/cmn/config/config.gointernal/cmn/config/config_test.gointernal/cmn/config/definition.gointernal/cmn/config/loader.gointernal/cmn/config/loader_test.gointernal/cmn/schema/config.schema.jsoninternal/service/coordinator/health_test.gointernal/service/coordinator/service.gointernal/service/healthcheck/server.gointernal/service/healthcheck/server_test.gointernal/service/scheduler/health.gointernal/service/worker/health_test.gointernal/service/worker/worker.gointernal/test/coordinator.gointernal/test/helper.go
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #1802 +/- ##
==========================================
+ Coverage 69.02% 69.21% +0.19%
==========================================
Files 424 425 +1
Lines 51230 51394 +164
==========================================
+ Hits 35361 35574 +213
+ Misses 12853 12802 -51
- Partials 3016 3018 +2
... and 13 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary
Testing
Closes #1788
Summary by CodeRabbit
Release Notes
New Features
DAGU_COORDINATOR_HEALTH_PORT,DAGU_WORKER_HEALTH_PORT) and CLI flags. Set port to0to disable health checks.Documentation