From 68b4933ec1b54877bb2311ae9a555ab3557dc56c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 24 Apr 2026 16:47:10 +0900 Subject: [PATCH 1/6] feat(memwatch): graceful shutdown on memory-pressure threshold MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add an opt-in memory watchdog that polls runtime.MemStats.HeapInuse and fires the existing root-ctx cancel once a configurable threshold is crossed, causing the process to exit cleanly (exit code 2) rather than waiting for the kernel OOM-killer. Motivation: 2026-04-24 incident — all 4 live nodes were SIGKILLed 22-169 times in 24h under a traffic spike. Each kill risked WAL tail truncation and reset lease/raft state, producing election storms and p99 spikes to 6-8s. A user-space watchdog that triggers orderly shutdown before kernel OOM avoids the WAL corruption path entirely. Design: - HeapInuse chosen over RSS (no syscall per poll), and over Sys/Alloc/NumGC (they include freed-but-not-returned memory or monotonic counters). Tradeoff documented in the package comment. - Single-shot (CAS-guarded) so OnExceed fires exactly once even if allocation stays high across polls. - Wired to the existing root-ctx cancel at main.go — no duplicate shutdown path, no direct raft/adapter manipulation. - Default OFF: ELASTICKV_MEMORY_SHUTDOWN_THRESHOLD_MB unset or "0" disables the watcher, no goroutine spawned. - Exit code 2 distinguishes memory-pressure exit from crash (1) or normal shutdown (0). Env vars: - ELASTICKV_MEMORY_SHUTDOWN_THRESHOLD_MB (default off) - ELASTICKV_MEMORY_SHUTDOWN_POLL_INTERVAL (default 1s) Tests (internal/memwatch/memwatch_test.go): fires-once, does-not-fire, context-cancel-stops, zero-threshold-disabled. All pass. --- internal/memwatch/memwatch.go | 195 +++++++++++++++++++++++++++++ internal/memwatch/memwatch_test.go | 158 +++++++++++++++++++++++ main.go | 95 ++++++++++++++ 3 files changed, 448 insertions(+) create mode 100644 internal/memwatch/memwatch.go create mode 100644 internal/memwatch/memwatch_test.go diff --git a/internal/memwatch/memwatch.go b/internal/memwatch/memwatch.go new file mode 100644 index 00000000..b91f345e --- /dev/null +++ b/internal/memwatch/memwatch.go @@ -0,0 +1,195 @@ +// Package memwatch provides a user-space memory watchdog that triggers +// an orderly shutdown before the kernel OOM-killer sends SIGKILL. +// +// Motivation +// +// elastickv runs in memory-constrained containers (e.g. 3GB RAM VMs). Go's +// runtime is unaware of the container/host memory limit, and even with +// GOMEMLIMIT set the process can still lose the race against the kernel +// OOM-killer under sustained memtable/goroutine growth. A SIGKILL leaves +// the Raft WAL potentially truncated mid-operation; a cooperative SIGTERM +// path lets the node sync the WAL and stop raft cleanly, avoiding the +// election storms and lease loss that follow crash-restarts. +// +// The watcher polls runtime.ReadMemStats at a fixed cadence. When +// HeapInuse crosses the configured threshold it invokes OnExceed once +// and exits. The watcher never calls os.Exit or sends signals itself; +// callers wire OnExceed to the existing shutdown path (typically a +// root context.CancelFunc). +// +// Wiring in elastickv (see main.go): +// +// ctx, cancel := context.WithCancel(context.Background()) +// // ... build runtimes, servers, errgroup ... +// w := memwatch.New(memwatch.Config{ +// ThresholdBytes: threshold, +// PollInterval: pollInterval, +// OnExceed: func() { +// memoryPressureExit.Store(true) // flips exit code to 2 +// cancel() // fires the same shutdown path +// }, // SIGTERM would use. +// }) +// eg.Go(func() error { w.Start(runCtx); return nil }) +// +// Metric choice +// +// We read runtime.MemStats.HeapInuse. It is the closest Go-runtime-visible +// proxy for "how close are we to OOM" without a syscall per poll. RSS from +// /proc/self/status is more accurate but requires a read syscall on every +// poll; at the 1s cadence this watchdog runs that accuracy isn't worth the +// cost. We deliberately do NOT use MemStats.Sys, NumGC or Alloc: Sys and +// NumGC include memory the runtime has already released back to the OS (or +// are monotonic counters) and Alloc counts only currently-live heap objects, +// missing the span-level overhead that the OOM-killer actually sees. +package memwatch + +import ( + "context" + "log/slog" + "runtime" + "sync" + "sync/atomic" + "time" +) + +// DefaultPollInterval is the polling cadence used when Config.PollInterval +// is zero. One second is frequent enough to catch fast-growing memtables +// before the kernel kills the process, but infrequent enough that +// runtime.ReadMemStats (which stops the world briefly) doesn't become a +// meaningful source of latency on its own. +const DefaultPollInterval = time.Second + +// Config configures a Watcher. +type Config struct { + // ThresholdBytes is the HeapInuse threshold in bytes. When + // runtime.MemStats.HeapInuse exceeds this value the watcher invokes + // OnExceed exactly once and returns. A zero value disables the + // watcher entirely (Start returns immediately). + ThresholdBytes uint64 + + // PollInterval is how often ReadMemStats is called. Defaults to + // DefaultPollInterval when zero. + PollInterval time.Duration + + // OnExceed is called at most once, from the watcher's own goroutine, + // when the threshold is crossed. It must be non-blocking or at least + // must not block the caller indefinitely (the watcher returns + // immediately after invocation regardless). Typical implementations + // cancel a root context and flag a process-wide exit-code sentinel. + OnExceed func() + + // Logger, if non-nil, receives a single structured log line when the + // threshold is crossed. When nil, slog.Default() is used. + Logger *slog.Logger +} + +// Watcher polls process memory and fires OnExceed once, when HeapInuse +// crosses the configured threshold. Callers get a single-shot notification +// and are expected to initiate graceful shutdown; Watcher does not call +// os.Exit or send signals itself. +type Watcher struct { + cfg Config + fired atomic.Bool + doneCh chan struct{} +} + +// New constructs a Watcher from the given Config. The Watcher does not +// start polling until Start is called. +func New(cfg Config) *Watcher { + if cfg.PollInterval <= 0 { + cfg.PollInterval = DefaultPollInterval + } + if cfg.Logger == nil { + cfg.Logger = slog.Default() + } + return &Watcher{ + cfg: cfg, + doneCh: make(chan struct{}), + } +} + +// Start runs the watchdog loop. It returns when ctx is cancelled, when +// OnExceed has fired, or immediately when ThresholdBytes is zero (the +// watcher is disabled). It is safe to call Start at most once per Watcher; +// subsequent calls return immediately because the done channel has already +// been closed. +func (w *Watcher) Start(ctx context.Context) { + defer w.closeDoneOnce() + + if w.cfg.ThresholdBytes == 0 { + // Disabled: do not even start a ticker, so an OFF-by-default + // deployment pays zero cost. + return + } + + ticker := time.NewTicker(w.cfg.PollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if w.checkAndMaybeFire() { + return + } + } + } +} + +// Done returns a channel that is closed when Start returns. Tests can use +// it to assert the watcher goroutine actually exits (no leak) after +// ctx cancel or OnExceed. +func (w *Watcher) Done() <-chan struct{} { + return w.doneCh +} + +// closeDoneOnce guards against the rare case where Start might be invoked +// twice on the same Watcher (programmer error): the second call would +// otherwise panic on closing an already-closed channel. We prefer a silent +// no-op because Start's contract is "runs once"; diagnosing misuse is the +// caller's job. +var closeDoneMu sync.Mutex + +func (w *Watcher) closeDoneOnce() { + closeDoneMu.Lock() + defer closeDoneMu.Unlock() + select { + case <-w.doneCh: + return + default: + close(w.doneCh) + } +} + +// checkAndMaybeFire reads MemStats once, and if HeapInuse is at or above +// the threshold and OnExceed has not already fired, invokes OnExceed and +// returns true to signal the loop to exit. +func (w *Watcher) checkAndMaybeFire() bool { + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + + if ms.HeapInuse < w.cfg.ThresholdBytes { + return false + } + + // CompareAndSwap so a (hypothetical) concurrent caller cannot cause + // OnExceed to run twice. The watcher currently runs from one goroutine + // but keeping the guard explicit documents the "single-shot" contract. + if !w.fired.CompareAndSwap(false, true) { + return true + } + + w.cfg.Logger.Warn("memory pressure shutdown", + "heap_inuse_bytes", ms.HeapInuse, + "threshold_bytes", w.cfg.ThresholdBytes, + "heap_alloc_bytes", ms.HeapAlloc, + "heap_sys_bytes", ms.HeapSys, + "next_gc_bytes", ms.NextGC, + ) + + if w.cfg.OnExceed != nil { + w.cfg.OnExceed() + } + return true +} diff --git a/internal/memwatch/memwatch_test.go b/internal/memwatch/memwatch_test.go new file mode 100644 index 00000000..cb266e13 --- /dev/null +++ b/internal/memwatch/memwatch_test.go @@ -0,0 +1,158 @@ +package memwatch + +import ( + "context" + "runtime" + "sync/atomic" + "testing" + "time" +) + +// waitForDone asserts the watcher's goroutine exits within d, returning +// immediately on success and failing the test on timeout. Using the +// Watcher.Done() channel avoids the goleak dep and keeps the check local. +func waitForDone(t *testing.T, w *Watcher, d time.Duration) { + t.Helper() + select { + case <-w.Done(): + case <-time.After(d): + t.Fatalf("watcher goroutine did not exit within %v", d) + } +} + +// TestWatcher_FiresOnceAboveThreshold creates a watcher with a threshold +// so low the current heap is guaranteed to exceed it, verifies OnExceed +// fires, and verifies it fires only once even though the polling loop +// would otherwise observe "over threshold" on every subsequent tick. +func TestWatcher_FiresOnceAboveThreshold(t *testing.T) { + t.Parallel() + + fired := make(chan struct{}, 8) + var count atomic.Int32 + w := New(Config{ + // 1 byte threshold: HeapInuse is always > 1B in a live program. + ThresholdBytes: 1, + PollInterval: 5 * time.Millisecond, + OnExceed: func() { + count.Add(1) + // non-blocking send: buffered channel so a pathological + // double-fire is observable via the count, not a deadlock. + select { + case fired <- struct{}{}: + default: + } + }, + }) + + // Hold a live allocation so HeapInuse cannot collapse mid-test. We + // touch it after the wait below so the compiler/escape analysis keeps + // it on the heap for the duration of the test. + ballast := make([]byte, 1<<20) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Start(ctx) + + select { + case <-fired: + case <-time.After(2 * time.Second): + t.Fatalf("OnExceed was not invoked") + } + + // Give the loop several more poll intervals to prove it does not fire + // again. The watcher should have already returned on first fire, so + // this also indirectly tests the loop-exit path. + time.Sleep(50 * time.Millisecond) + if got := count.Load(); got != 1 { + t.Fatalf("OnExceed fired %d times, want exactly 1", got) + } + + // Ensure Start actually returned. + waitForDone(t, w, time.Second) + + // Keep ballast live past the assertions. + runtime.KeepAlive(ballast) +} + +// TestWatcher_DoesNotFireBelowThreshold runs the watcher for multiple poll +// intervals with a threshold far above any reasonable process HeapInuse +// and verifies OnExceed is never called. +func TestWatcher_DoesNotFireBelowThreshold(t *testing.T) { + t.Parallel() + + var count atomic.Int32 + w := New(Config{ + // 1 TiB: a Go test binary will not reach this. + ThresholdBytes: 1 << 40, + PollInterval: 5 * time.Millisecond, + OnExceed: func() { + count.Add(1) + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go w.Start(ctx) + + // Three polls' worth plus a safety margin. + time.Sleep(30 * time.Millisecond) + cancel() + waitForDone(t, w, time.Second) + + if got := count.Load(); got != 0 { + t.Fatalf("OnExceed fired %d times below threshold, want 0", got) + } +} + +// TestWatcher_StopsOnContextCancel verifies the watcher goroutine exits +// promptly when the supplied context is cancelled, with no leak. +func TestWatcher_StopsOnContextCancel(t *testing.T) { + t.Parallel() + + w := New(Config{ + ThresholdBytes: 1 << 40, // never fires + PollInterval: 10 * time.Millisecond, + OnExceed: func() { t.Fatalf("OnExceed unexpectedly fired") }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + go w.Start(ctx) + + // Let the goroutine park in its select. + time.Sleep(20 * time.Millisecond) + cancel() + waitForDone(t, w, time.Second) +} + +// TestWatcher_DisabledWhenThresholdZero verifies a threshold of zero +// disables the watcher: Start returns immediately and OnExceed is never +// invoked, even after generous wait time. +func TestWatcher_DisabledWhenThresholdZero(t *testing.T) { + t.Parallel() + + var count atomic.Int32 + w := New(Config{ + ThresholdBytes: 0, + PollInterval: 1 * time.Millisecond, + OnExceed: func() { + count.Add(1) + }, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go w.Start(ctx) + // When disabled, Start must return essentially immediately. + waitForDone(t, w, 200*time.Millisecond) + + // Allocate something to grow the heap and confirm the (stopped) + // watcher does not observe it. + ballast := make([]byte, 4<<20) + time.Sleep(20 * time.Millisecond) + runtime.KeepAlive(ballast) + + if got := count.Load(); got != 0 { + t.Fatalf("OnExceed fired %d times while disabled, want 0", got) + } +} diff --git a/main.go b/main.go index 4306388a..c5f53f06 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "context" "flag" "log" + "log/slog" "net" "net/http" "os" @@ -11,11 +12,13 @@ import ( "strconv" "strings" "sync" + "sync/atomic" "time" "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/distribution" internalutil "github.com/bootjp/elastickv/internal" + "github.com/bootjp/elastickv/internal/memwatch" internalraftadmin "github.com/bootjp/elastickv/internal/raftadmin" "github.com/bootjp/elastickv/internal/raftengine" etcdraftengine "github.com/bootjp/elastickv/internal/raftengine/etcd" @@ -95,12 +98,76 @@ var ( raftDynamoMap = flag.String("raftDynamoMap", "", "Map of Raft address to DynamoDB address (raftAddr=dynamoAddr,...)") ) +// memoryPressureExit is set to true by the memwatch OnExceed callback to +// signal that the subsequent graceful shutdown was triggered by user-space +// OOM avoidance rather than an ordinary SIGTERM. The process exits with a +// distinct non-zero code (exitCodeMemoryPressure) so operators reading +// logs can distinguish this case from a crash or an ordinary stop. +var memoryPressureExit atomic.Bool + +// exitCodeMemoryPressure is reported by main when memwatch triggered the +// shutdown. It is non-zero so supervisors see a non-success exit, but +// distinct from log.Fatalf's 1 and from os.Exit(1) in the other binaries +// so log scraping can tell them apart. +const exitCodeMemoryPressure = 2 + +// memoryShutdownThresholdEnvVar configures the heap-inuse ceiling at +// which memwatch triggers a graceful shutdown. Empty or "0" disables the +// watchdog (the default; existing operators see no behaviour change). +const memoryShutdownThresholdEnvVar = "ELASTICKV_MEMORY_SHUTDOWN_THRESHOLD_MB" + +// memoryShutdownPollIntervalEnvVar overrides memwatch's default poll +// cadence. Accepts any time.ParseDuration string. Invalid values log a +// warning and fall through to the default. +const memoryShutdownPollIntervalEnvVar = "ELASTICKV_MEMORY_SHUTDOWN_POLL_INTERVAL" + func main() { flag.Parse() if err := run(); err != nil { log.Fatalf("%v", err) } + + if memoryPressureExit.Load() { + // run() returned cleanly because cancel() propagated through the + // errgroup; mark the exit code so the reason isn't lost. + os.Exit(exitCodeMemoryPressure) + } +} + +// memwatchConfigFromEnv resolves the memwatch Config from environment +// variables. It returns (cfg, true) when the watcher should run, or +// (_, false) when the operator has not opted in (the default). Errors in +// the optional poll-interval override are logged and ignored so a typo +// cannot take the process down. +func memwatchConfigFromEnv() (memwatch.Config, bool) { + raw := strings.TrimSpace(os.Getenv(memoryShutdownThresholdEnvVar)) + if raw == "" { + return memwatch.Config{}, false + } + mb, err := strconv.ParseUint(raw, 10, 64) + if err != nil { + slog.Warn("invalid "+memoryShutdownThresholdEnvVar+"; watcher disabled", + "value", raw, "error", err) + return memwatch.Config{}, false + } + if mb == 0 { + return memwatch.Config{}, false + } + + cfg := memwatch.Config{ + ThresholdBytes: mb * 1024 * 1024, + } + if rawInterval := strings.TrimSpace(os.Getenv(memoryShutdownPollIntervalEnvVar)); rawInterval != "" { + d, err := time.ParseDuration(rawInterval) + if err != nil || d <= 0 { + slog.Warn("invalid "+memoryShutdownPollIntervalEnvVar+"; using default", + "value", rawInterval, "error", err) + } else { + cfg.PollInterval = d + } + } + return cfg, true } func run() error { @@ -173,6 +240,7 @@ func run() error { eg.Go(func() error { return runDistributionCatalogWatcher(runCtx, distCatalog, cfg.engine) }) + startMemoryWatchdog(runCtx, eg, cancel) distServer := adapter.NewDistributionServer( cfg.engine, distCatalog, @@ -516,6 +584,33 @@ func dispatchMonitorSources(runtimes []*raftGroupRuntime) []monitoring.DispatchS return out } +// startMemoryWatchdog optionally starts the memwatch goroutine. The +// watcher is off by default; it is enabled only when the operator sets +// ELASTICKV_MEMORY_SHUTDOWN_THRESHOLD_MB. On threshold crossing the +// callback flips the memoryPressureExit sentinel and cancels the root +// context, routing through the exact same shutdown path SIGTERM would +// use (errgroup unwinds, CleanupStack runs, WAL is synced). We do NOT +// send a signal, call os.Exit, or touch the raft engine directly here. +func startMemoryWatchdog(ctx context.Context, eg *errgroup.Group, cancel context.CancelFunc) { + cfg, enabled := memwatchConfigFromEnv() + if !enabled { + return + } + cfg.OnExceed = func() { + memoryPressureExit.Store(true) + cancel() + } + w := memwatch.New(cfg) + slog.Info("memory watchdog enabled", + "threshold_bytes", cfg.ThresholdBytes, + "poll_interval", cfg.PollInterval, + ) + eg.Go(func() error { + w.Start(ctx) + return nil + }) +} + // startMonitoringCollectors wires up the per-tick Prometheus // collectors (raft dispatch, Pebble LSM, store-layer OCC conflicts) // on top of the running raft runtimes. Kept separate from run() so From ab4bdab42fb428e3d971ec8a805fd78fba1a1b9b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 00:03:11 +0900 Subject: [PATCH 2/6] fix(memwatch): address review feedback - main.go: move memoryPressureExit check before log.Fatalf so exit code 2 surfaces even when run() returns a spurious error during cancel()- driven shutdown (both Gemini and Codex flagged). - main.go: extract 1024*1024 into bytesPerMiB constant to satisfy the mnd linter. - memwatch.go: replace package-level closeDoneMu with a per-Watcher sync.Once so multiple watchers don't contend on a shared mutex. - memwatch.go: enforce Start's single-shot contract with an atomic CompareAndSwap at entry, closing the window where two concurrent Start calls could both run the loop. runtime/metrics migration (Gemini MEDIUM) deferred: at 1s poll cadence the ReadMemStats STW pause is sub-millisecond on modern Go, and the warning log consumes multiple MemStats fields (HeapAlloc, HeapSys, NextGC) that would need per-field metric-sample plumbing. Worth revisiting only if the watcher starts polling faster. --- internal/memwatch/memwatch.go | 34 ++++++++++++++-------------------- main.go | 26 +++++++++++++++----------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/internal/memwatch/memwatch.go b/internal/memwatch/memwatch.go index b91f345e..0e8dec3e 100644 --- a/internal/memwatch/memwatch.go +++ b/internal/memwatch/memwatch.go @@ -1,7 +1,7 @@ // Package memwatch provides a user-space memory watchdog that triggers // an orderly shutdown before the kernel OOM-killer sends SIGKILL. // -// Motivation +// # Motivation // // elastickv runs in memory-constrained containers (e.g. 3GB RAM VMs). Go's // runtime is unaware of the container/host memory limit, and even with @@ -31,7 +31,7 @@ // }) // eg.Go(func() error { w.Start(runCtx); return nil }) // -// Metric choice +// # Metric choice // // We read runtime.MemStats.HeapInuse. It is the closest Go-runtime-visible // proxy for "how close are we to OOM" without a syscall per poll. RSS from @@ -88,9 +88,11 @@ type Config struct { // and are expected to initiate graceful shutdown; Watcher does not call // os.Exit or send signals itself. type Watcher struct { - cfg Config - fired atomic.Bool - doneCh chan struct{} + cfg Config + fired atomic.Bool + started atomic.Bool + doneCh chan struct{} + closeOnce sync.Once } // New constructs a Watcher from the given Config. The Watcher does not @@ -114,6 +116,9 @@ func New(cfg Config) *Watcher { // subsequent calls return immediately because the done channel has already // been closed. func (w *Watcher) Start(ctx context.Context) { + if !w.started.CompareAndSwap(false, true) { + return + } defer w.closeDoneOnce() if w.cfg.ThresholdBytes == 0 { @@ -144,22 +149,11 @@ func (w *Watcher) Done() <-chan struct{} { return w.doneCh } -// closeDoneOnce guards against the rare case where Start might be invoked -// twice on the same Watcher (programmer error): the second call would -// otherwise panic on closing an already-closed channel. We prefer a silent -// no-op because Start's contract is "runs once"; diagnosing misuse is the -// caller's job. -var closeDoneMu sync.Mutex - +// closeDoneOnce closes doneCh at most once across the Watcher's lifetime. +// Per-Watcher sync.Once avoids the contention a shared package-level mutex +// would introduce if multiple watchers coexisted. func (w *Watcher) closeDoneOnce() { - closeDoneMu.Lock() - defer closeDoneMu.Unlock() - select { - case <-w.doneCh: - return - default: - close(w.doneCh) - } + w.closeOnce.Do(func() { close(w.doneCh) }) } // checkAndMaybeFire reads MemStats once, and if HeapInuse is at or above diff --git a/main.go b/main.go index c5f53f06..34673d3f 100644 --- a/main.go +++ b/main.go @@ -15,6 +15,11 @@ import ( "sync/atomic" "time" + "github.com/cockroachdb/errors" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" + "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/distribution" internalutil "github.com/bootjp/elastickv/internal" @@ -26,10 +31,6 @@ import ( "github.com/bootjp/elastickv/monitoring" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" - "github.com/cockroachdb/errors" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" ) const ( @@ -121,18 +122,21 @@ const memoryShutdownThresholdEnvVar = "ELASTICKV_MEMORY_SHUTDOWN_THRESHOLD_MB" // warning and fall through to the default. const memoryShutdownPollIntervalEnvVar = "ELASTICKV_MEMORY_SHUTDOWN_POLL_INTERVAL" +const bytesPerMiB = 1024 * 1024 + func main() { flag.Parse() - if err := run(); err != nil { - log.Fatalf("%v", err) - } - + err := run() if memoryPressureExit.Load() { - // run() returned cleanly because cancel() propagated through the - // errgroup; mark the exit code so the reason isn't lost. + // memwatch fired: surface exit code 2 regardless of whether run() + // returned a nil or an error (cancel() can cause in-flight + // listeners to return spurious errors during shutdown). os.Exit(exitCodeMemoryPressure) } + if err != nil { + log.Fatalf("%v", err) + } } // memwatchConfigFromEnv resolves the memwatch Config from environment @@ -156,7 +160,7 @@ func memwatchConfigFromEnv() (memwatch.Config, bool) { } cfg := memwatch.Config{ - ThresholdBytes: mb * 1024 * 1024, + ThresholdBytes: mb * bytesPerMiB, } if rawInterval := strings.TrimSpace(os.Getenv(memoryShutdownPollIntervalEnvVar)); rawInterval != "" { d, err := time.ParseDuration(rawInterval) From 9034431b14e73a3d5f5e1b18e8cfb1b7a7fc555c Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 00:31:37 +0900 Subject: [PATCH 3/6] fix(memwatch): address round-2 review feedback MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - main.go:18 (golangci gci): normalize import ordering so the internal github.com/bootjp/elastickv block precedes third-party cockroachdb/grpc/etcd imports per the project's gci config. - main.go:164 (CodeRabbit Major): guard against ParseUint returning a value whose mb*bytesPerMiB product would wrap past math.MaxUint64. The overflowed low value would otherwise fire the watcher immediately; reject and disable with a warning instead. - main.go:611 (CodeRabbit Minor): memwatchConfigFromEnv now sets cfg.PollInterval = memwatch.DefaultPollInterval before the ParseDuration override branch, so the "memory watchdog enabled" log line prints the effective interval (1s) rather than the uninitialised zero value. - main.go:136 (Gemini Medium): log any non-nil error from run() before os.Exit(exitCodeMemoryPressure). The previous path was intentionally swallowing errors to preserve exit code 2, but a secondary failure during graceful shutdown should still be visible in logs. Emit slog.Warn so the error is captured without mutating the exit code. Not addressed: CodeRabbit Minor on memwatch_test.go:115 (t.Fatalf from goroutine) — the test goroutine only exits after the watcher's Done channel closes, and t.FailNow's contract about test-goroutine ownership is a lint-only concern without a concrete failure mode here. Left for a follow-up cleanup pass. Not addressed: Gemini Medium re runtime/metrics migration — already explained in the previous commit's message. At 1s cadence the ReadMemStats STW pause is a non-issue; the warning log also uses multiple MemStats fields that would need per-field plumbing. --- main.go | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/main.go b/main.go index 34673d3f..89499b38 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "flag" "log" "log/slog" + "math" "net" "net/http" "os" @@ -15,11 +16,6 @@ import ( "sync/atomic" "time" - "github.com/cockroachdb/errors" - "golang.org/x/sync/errgroup" - "google.golang.org/grpc" - "google.golang.org/grpc/reflection" - "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/distribution" internalutil "github.com/bootjp/elastickv/internal" @@ -31,6 +27,10 @@ import ( "github.com/bootjp/elastickv/monitoring" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" + "github.com/cockroachdb/errors" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" + "google.golang.org/grpc/reflection" ) const ( @@ -131,7 +131,12 @@ func main() { if memoryPressureExit.Load() { // memwatch fired: surface exit code 2 regardless of whether run() // returned a nil or an error (cancel() can cause in-flight - // listeners to return spurious errors during shutdown). + // listeners to return spurious errors during shutdown). Still + // log any residual error so a secondary failure during the + // graceful shutdown is visible in logs rather than swallowed. + if err != nil { + slog.Warn("shutdown error after memory pressure", "error", err) + } os.Exit(exitCodeMemoryPressure) } if err != nil { @@ -158,10 +163,20 @@ func memwatchConfigFromEnv() (memwatch.Config, bool) { if mb == 0 { return memwatch.Config{}, false } + // Guard against mb * bytesPerMiB wrapping past math.MaxUint64. The + // value has no real use above this ceiling (the host does not have + // exabytes of RAM), and a wrapped value would set an absurdly low + // threshold that fires immediately. + if mb > math.MaxUint64/bytesPerMiB { + slog.Warn("value for "+memoryShutdownThresholdEnvVar+" would overflow uint64; watcher disabled", + "value_mb", mb) + return memwatch.Config{}, false + } cfg := memwatch.Config{ ThresholdBytes: mb * bytesPerMiB, } + cfg.PollInterval = memwatch.DefaultPollInterval if rawInterval := strings.TrimSpace(os.Getenv(memoryShutdownPollIntervalEnvVar)); rawInterval != "" { d, err := time.ParseDuration(rawInterval) if err != nil || d <= 0 { From 807520dd62a9c56a54a5d0c7c91cf0eb64c7b41b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 00:51:10 +0900 Subject: [PATCH 4/6] fix(memwatch): address round-3 review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - memwatch.go: enforce a 10ms floor on PollInterval (MinPollInterval const). Protects against operators setting microsecond cadences that accumulate STW pauses (Gemini Medium). - main.go: filter context.Canceled out of the post-shutdown error log. Canceled is the expected signal that cancel() propagated; it's noise, not a secondary failure (Gemini Medium). Not addressed: runtime/metrics migration — third request for the same change. At 1s default cadence, the ReadMemStats STW pause is sub-millisecond; the warning log uses HeapAlloc/HeapSys/NextGC alongside HeapInuse, so migrating needs per-field metric-sample plumbing. Rationale stays unchanged across rounds. --- internal/memwatch/memwatch.go | 14 +++++++++++++- main.go | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/internal/memwatch/memwatch.go b/internal/memwatch/memwatch.go index 0e8dec3e..c60d21a4 100644 --- a/internal/memwatch/memwatch.go +++ b/internal/memwatch/memwatch.go @@ -59,6 +59,15 @@ import ( // meaningful source of latency on its own. const DefaultPollInterval = time.Second +// MinPollInterval is the floor enforced by New. ReadMemStats is a STW +// operation (~tens of microseconds on modern Go); polling faster than +// 10ms risks accumulating noticeable pause time on its own, and no +// memory-growth pattern we care about needs sub-10ms detection latency. +// Values below this floor (including zero from an unset Config) are +// clamped to DefaultPollInterval; an explicit operator override below +// MinPollInterval is clamped up to it. +const MinPollInterval = 10 * time.Millisecond + // Config configures a Watcher. type Config struct { // ThresholdBytes is the HeapInuse threshold in bytes. When @@ -98,8 +107,11 @@ type Watcher struct { // New constructs a Watcher from the given Config. The Watcher does not // start polling until Start is called. func New(cfg Config) *Watcher { - if cfg.PollInterval <= 0 { + switch { + case cfg.PollInterval <= 0: cfg.PollInterval = DefaultPollInterval + case cfg.PollInterval < MinPollInterval: + cfg.PollInterval = MinPollInterval } if cfg.Logger == nil { cfg.Logger = slog.Default() diff --git a/main.go b/main.go index 89499b38..5c587d77 100644 --- a/main.go +++ b/main.go @@ -134,7 +134,7 @@ func main() { // listeners to return spurious errors during shutdown). Still // log any residual error so a secondary failure during the // graceful shutdown is visible in logs rather than swallowed. - if err != nil { + if err != nil && !errors.Is(err, context.Canceled) { slog.Warn("shutdown error after memory pressure", "error", err) } os.Exit(exitCodeMemoryPressure) From f203b1a88c17428d4af8f7aed5ed4ef163f1aa23 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 01:30:18 +0900 Subject: [PATCH 5/6] refactor(memwatch): migrate from runtime.ReadMemStats to runtime/metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Gemini flagged runtime.ReadMemStats as a stop-the-world operation in rounds 1-3 and again now. Accepting the migration: at the new 10ms MinPollInterval floor the STW pause does start to register, and runtime/metrics has been available since Go 1.16 (this repo targets 1.25+). Changes: - Replace the ReadMemStats sampling with a reusable []metrics.Sample populated once in New. Hot path only walks a fixed slice; no name resolution, no allocations. - Compare the threshold against /memory/classes/heap/objects:bytes + /memory/classes/heap/unused:bytes which is the runtime/metrics equivalent of MemStats.HeapInuse (allocated heap spans including reusable free slots; excludes memory already released to the OS). - Warning log now emits heap_objects_bytes, heap_released_bytes, and next_gc_bytes alongside heap_inuse_bytes. Dropped heap_sys_bytes — runtime/metrics doesn't expose a direct equivalent and Sys includes released memory (misleading for "how close to OOM"). - sampleUint64 defensively falls back to 0 for KindBad (metric missing in the runtime); behaviour-preserving for the metrics we use (all Go 1.16+), but guards against a future runtime that retires one of the names. Tests pass unchanged: the fires-once, does-not-fire, context-cancel, and disabled-when-zero cases don't assume the underlying metrics source, only that heap growth crosses the threshold. --- internal/memwatch/memwatch.go | 137 +++++++++++++++++++++++----------- 1 file changed, 95 insertions(+), 42 deletions(-) diff --git a/internal/memwatch/memwatch.go b/internal/memwatch/memwatch.go index c60d21a4..51d705d9 100644 --- a/internal/memwatch/memwatch.go +++ b/internal/memwatch/memwatch.go @@ -11,11 +11,11 @@ // path lets the node sync the WAL and stop raft cleanly, avoiding the // election storms and lease loss that follow crash-restarts. // -// The watcher polls runtime.ReadMemStats at a fixed cadence. When -// HeapInuse crosses the configured threshold it invokes OnExceed once -// and exits. The watcher never calls os.Exit or sends signals itself; -// callers wire OnExceed to the existing shutdown path (typically a -// root context.CancelFunc). +// The watcher samples runtime/metrics at a fixed cadence. When the live +// heap-in-use byte count crosses the configured threshold it invokes +// OnExceed once and exits. The watcher never calls os.Exit or sends +// signals itself; callers wire OnExceed to the existing shutdown path +// (typically a root context.CancelFunc). // // Wiring in elastickv (see main.go): // @@ -33,20 +33,30 @@ // // # Metric choice // -// We read runtime.MemStats.HeapInuse. It is the closest Go-runtime-visible -// proxy for "how close are we to OOM" without a syscall per poll. RSS from +// We sample `runtime/metrics` (Go 1.16+) rather than `runtime.ReadMemStats`. +// ReadMemStats triggers a stop-the-world pause proportional to the number of +// goroutines and heap size; at 1 s cadence that's typically negligible, but +// at a tighter MinPollInterval (10 ms) it begins to register. runtime/metrics +// readers are lock-free for the counters we need and do not stop the world. +// +// The threshold is compared against +// +// /memory/classes/heap/objects:bytes + /memory/classes/heap/unused:bytes +// +// which is the runtime/metrics equivalent of MemStats.HeapInuse: bytes held +// in heap spans that are currently allocated from the OS, including span +// overhead, but excluding pages the runtime has released back. RSS from // /proc/self/status is more accurate but requires a read syscall on every -// poll; at the 1s cadence this watchdog runs that accuracy isn't worth the -// cost. We deliberately do NOT use MemStats.Sys, NumGC or Alloc: Sys and -// NumGC include memory the runtime has already released back to the OS (or -// are monotonic counters) and Alloc counts only currently-live heap objects, -// missing the span-level overhead that the OOM-killer actually sees. +// poll and is not what the Go allocator itself tracks. We deliberately do +// NOT compare against "total heap classes" (which includes released memory +// already returned to the OS) or "heap/objects" alone (which misses span +// fragmentation that the OOM-killer sees). package memwatch import ( "context" "log/slog" - "runtime" + "runtime/metrics" "sync" "sync/atomic" "time" @@ -54,30 +64,27 @@ import ( // DefaultPollInterval is the polling cadence used when Config.PollInterval // is zero. One second is frequent enough to catch fast-growing memtables -// before the kernel kills the process, but infrequent enough that -// runtime.ReadMemStats (which stops the world briefly) doesn't become a -// meaningful source of latency on its own. +// before the kernel kills the process, and infrequent enough that even +// aggressive log rollups don't observe the watcher as a hot sampler. const DefaultPollInterval = time.Second -// MinPollInterval is the floor enforced by New. ReadMemStats is a STW -// operation (~tens of microseconds on modern Go); polling faster than -// 10ms risks accumulating noticeable pause time on its own, and no -// memory-growth pattern we care about needs sub-10ms detection latency. -// Values below this floor (including zero from an unset Config) are -// clamped to DefaultPollInterval; an explicit operator override below -// MinPollInterval is clamped up to it. +// MinPollInterval is the floor enforced by New. runtime/metrics reads are +// cheap but a sub-10ms cadence produces no detection benefit over 10ms +// (memory pressure does not move that fast on these VMs) and would churn +// the ticker for no gain. const MinPollInterval = 10 * time.Millisecond // Config configures a Watcher. type Config struct { - // ThresholdBytes is the HeapInuse threshold in bytes. When - // runtime.MemStats.HeapInuse exceeds this value the watcher invokes - // OnExceed exactly once and returns. A zero value disables the - // watcher entirely (Start returns immediately). + // ThresholdBytes is the heap-in-use threshold in bytes. When the + // sampled heap-in-use crosses this value the watcher invokes OnExceed + // exactly once and returns. A zero value disables the watcher entirely + // (Start returns immediately). ThresholdBytes uint64 - // PollInterval is how often ReadMemStats is called. Defaults to - // DefaultPollInterval when zero. + // PollInterval is how often the metrics are sampled. Defaults to + // DefaultPollInterval when zero; values below MinPollInterval are + // clamped up to MinPollInterval. PollInterval time.Duration // OnExceed is called at most once, from the watcher's own goroutine, @@ -92,7 +99,24 @@ type Config struct { Logger *slog.Logger } -// Watcher polls process memory and fires OnExceed once, when HeapInuse +// Metric sample indices — kept stable so samples[] can be reused across +// polls without reallocating or re-resolving names. +const ( + sampleHeapObjects = iota + sampleHeapUnused + sampleHeapReleased + sampleGCGoal + sampleCount +) + +var metricNames = [sampleCount]string{ + sampleHeapObjects: "/memory/classes/heap/objects:bytes", + sampleHeapUnused: "/memory/classes/heap/unused:bytes", + sampleHeapReleased: "/memory/classes/heap/released:bytes", + sampleGCGoal: "/gc/heap/goal:bytes", +} + +// Watcher polls process memory and fires OnExceed once, when heap-in-use // crosses the configured threshold. Callers get a single-shot notification // and are expected to initiate graceful shutdown; Watcher does not call // os.Exit or send signals itself. @@ -102,6 +126,9 @@ type Watcher struct { started atomic.Bool doneCh chan struct{} closeOnce sync.Once + // samples is reused across polls; metric-name resolution happens once + // in New so the hot path only walks a fixed []Sample. + samples []metrics.Sample } // New constructs a Watcher from the given Config. The Watcher does not @@ -116,9 +143,14 @@ func New(cfg Config) *Watcher { if cfg.Logger == nil { cfg.Logger = slog.Default() } + samples := make([]metrics.Sample, sampleCount) + for i, name := range metricNames { + samples[i].Name = name + } return &Watcher{ - cfg: cfg, - doneCh: make(chan struct{}), + cfg: cfg, + doneCh: make(chan struct{}), + samples: samples, } } @@ -168,14 +200,32 @@ func (w *Watcher) closeDoneOnce() { w.closeOnce.Do(func() { close(w.doneCh) }) } -// checkAndMaybeFire reads MemStats once, and if HeapInuse is at or above -// the threshold and OnExceed has not already fired, invokes OnExceed and -// returns true to signal the loop to exit. +// sampleUint64 reads a named Uint64 sample from w.samples after metrics.Read +// has populated them. Returns 0 if the metric is not supported by the +// current Go runtime (runtime/metrics guarantees no panic, just +// KindBad). The watcher treats missing metrics as "no pressure detected"; +// the primary metrics used by the threshold check have been present since +// Go 1.16, so this only matters for defensive correctness. +func (w *Watcher) sampleUint64(idx int) uint64 { + if w.samples[idx].Value.Kind() != metrics.KindUint64 { + return 0 + } + return w.samples[idx].Value.Uint64() +} + +// checkAndMaybeFire samples runtime/metrics once, computes heap-in-use, and +// if it is at or above the threshold and OnExceed has not already fired, +// invokes OnExceed and returns true to signal the loop to exit. func (w *Watcher) checkAndMaybeFire() bool { - var ms runtime.MemStats - runtime.ReadMemStats(&ms) + metrics.Read(w.samples) + + objects := w.sampleUint64(sampleHeapObjects) + unused := w.sampleUint64(sampleHeapUnused) + // heap-in-use = allocated heap spans (live objects plus reusable free + // slots the runtime still owns), matching MemStats.HeapInuse. + heapInuse := objects + unused - if ms.HeapInuse < w.cfg.ThresholdBytes { + if heapInuse < w.cfg.ThresholdBytes { return false } @@ -186,12 +236,15 @@ func (w *Watcher) checkAndMaybeFire() bool { return true } + released := w.sampleUint64(sampleHeapReleased) + gcGoal := w.sampleUint64(sampleGCGoal) + w.cfg.Logger.Warn("memory pressure shutdown", - "heap_inuse_bytes", ms.HeapInuse, + "heap_inuse_bytes", heapInuse, + "heap_objects_bytes", objects, + "heap_released_bytes", released, "threshold_bytes", w.cfg.ThresholdBytes, - "heap_alloc_bytes", ms.HeapAlloc, - "heap_sys_bytes", ms.HeapSys, - "next_gc_bytes", ms.NextGC, + "next_gc_bytes", gcGoal, ) if w.cfg.OnExceed != nil { From 82bc1a549b307e2041e4c660c6a80d394e9138ba Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sat, 25 Apr 2026 01:52:02 +0900 Subject: [PATCH 6/6] fix(memwatch): sample threshold once before the first tick Codex P1: the ticker-only loop delayed the first sample by up to PollInterval, so if the process was already above the threshold at Start (crash-loop restart, large startup allocations) the kernel OOM-killer could fire before the watcher had ever observed the pressure. Do one pre-tick sample so the watcher exits immediately if we started above the line. --- internal/memwatch/memwatch.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/memwatch/memwatch.go b/internal/memwatch/memwatch.go index 51d705d9..781beb09 100644 --- a/internal/memwatch/memwatch.go +++ b/internal/memwatch/memwatch.go @@ -171,6 +171,14 @@ func (w *Watcher) Start(ctx context.Context) { return } + // Sample once before the first tick: if the process is already above + // the threshold at Start (crashloop-restart after OOM, large startup + // allocations, etc.), waiting for the first ticker cycle can let the + // kernel OOM-kill the process we were supposed to protect. + if w.checkAndMaybeFire() { + return + } + ticker := time.NewTicker(w.cfg.PollInterval) defer ticker.Stop()