From 253ad3d3d2889313ef14d6c7984422011607a02f Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 28 May 2026 20:35:40 -0400 Subject: [PATCH 1/2] docs(eventbus): design for #112 timer-drain + observability fix Design+plan for issue #112. Passed adversarial-design-review (rev 2): 2 Critical + 4 Important found in rev 1, all resolved in rev 2. Co-Authored-By: Claude Opus 4.8 --- docs/plans/2026-05-28-eventbus-112-fix.md | 204 ++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 docs/plans/2026-05-28-eventbus-112-fix.md diff --git a/docs/plans/2026-05-28-eventbus-112-fix.md b/docs/plans/2026-05-28-eventbus-112-fix.md new file mode 100644 index 00000000..471aa19f --- /dev/null +++ b/docs/plans/2026-05-28-eventbus-112-fix.md @@ -0,0 +1,204 @@ +# Design: eventbus #112 — timer-drain hang + silent-drop observability + +Issue: GoCodeAlone/modular#112. One bug + two observability gaps in `modules/eventbus`. +Single PR. Go 1.26. + +> Rev 2 — after adversarial-design-review FAIL (2 Critical, 4 Important). Changes: +> async workerPool drain (C1), exact count-partition + drain placement (C2), +> CI-safe hang test (I1), CustomMemoryEventBus wg/finished infra (I3), +> module.go doc fix (I4). I2 verified benign (no existing test asserts PerEngineStats keys/len). + +## Problem statement + +### P1 — `MemoryEventBus.Publish` hangs in `deliveryMode: "timeout"` (real bug) + +`memory.go:283-293`: + +```go +deadline := time.NewTimer(blockTimeout) +select { +case sub.eventCh <- event: + sent = true +case <-deadline.C: // timer fires; value drained by select +case <-ctx.Done(): +} +if !deadline.Stop() { + <-deadline.C // already-drained channel; never fires again → blocks forever +} +``` + +`<-deadline.C` branch taken → `Stop()` returns `false` → unconditional follow-up receive blocks +the publishing goroutine forever (one stuck goroutine per timed-out publish). On Go 1.23+ timer +channels are unbuffered, so the legacy `if !Stop() { <-C }` drain is now a hang, not a no-op. + +### P2 — `CustomMemoryEventBus` drops silently (no counters, no `Stats()`) + +`custom_memory.go:288-296` drops on full subscriber channel with only `slog.Warn`. No +`deliveredCount`/`droppedCount`, no `Stats()`. `MemoryEventBus` already exposes +`Stats() (delivered, dropped uint64)`. Module-wiring gap: `engine_registry.go` +`CollectStats`/`CollectPerEngineStats` type-assert the concrete `*MemoryEventBus`, so even after +adding `Stats()` to the custom engine, `EventBusModule.Stats()`/`PerEngineStats()` ignore it. + +### P3 — `Unsubscribe`/`Stop` abandon buffered events without counting them (both engines) + +`handleEvents` exits on `ctx.Done()`/`sub.done` and abandons everything still buffered in +`sub.eventCh` (plus, in `MemoryEventBus`, an event already dequeued when the post-dequeue +cancellation re-check fires at memory.go:469-471). For async `MemoryEventBus` subscriptions an +additional leak exists: events dequeued from `sub.eventCh` and handed to `m.workerPool` but not +yet executed when `Stop` cancels are abandoned uncounted. None increment `droppedCount`. +`eventbus.go:94` AND `module.go` Stop docs falsely claim Stop "processes all in-flight events". + +## Decisions + +### D1 — timer drain fix (P1) + +Replace the unconditional drain with the version-safe non-blocking drain. Inline (NOT +`defer deadline.Stop()` — the block is inside a `for _, sub := range` loop; a deferred Stop would +accumulate one live timer per subscriber until `Publish` returns). The non-blocking form is +correct for BOTH the `<-deadline.C` and `<-ctx.Done()` exit branches (handles the slim TOCTOU +where the timer fires concurrently with ctx cancellation): + +```go +if !deadline.Stop() { + select { + case <-deadline.C: + default: + } +} +``` + +### D2 — at-most-once teardown contract (P3) + exact count partition (closes C2) + +**Contract: at-most-once delivery. On `Unsubscribe`/`Stop`, events not yet handed to a handler +are counted as `dropped`, never delivered.** Correct the false Stop doc comments in +`eventbus.go` AND `module.go`. + +**Exact count partition — every event accepted into a subscriber path is counted in exactly one +bucket, once.** Each event lives in exactly one location at a time; we count at exactly one +transition out of the pipeline: + +| terminal transition | bucket | site | +|---|---|---| +| publish-time full channel (event never entered `eventCh`) | dropped | `Publish` (existing) | +| sync handler ran (success or err) | delivered | `handleEvents` (existing) | +| async worker ran task | delivered | worker task (existing) | +| async `workerPool` full at queue time | dropped | `queueEventHandler` (existing) | +| dequeued but sub cancelled post-dequeue (memory.go:469) | **dropped** | `handleEvents` — NEW, was uncounted | +| still buffered in `eventCh` at handler exit | **dropped** | `drainSubscription` — NEW | +| in `workerPool`, never executed, at Stop | **dropped** | `drainWorkerPool` — NEW (async, see D4) | + +No event can be both delivered and dropped: once a handler runs it is gone from `eventCh`; the +drain only sees what the handler never touched. No double-drop: drain runs only from the owning +handler goroutine (sole reader of `eventCh`), and `drainWorkerPool` runs only after `wg.Wait()` +(no concurrent worker/handler). + +**Drain placement (single owner-side point):** drain lives in `handleEvents`, run immediately +before every exit return (ctx.Done / sub.done / top-of-loop cancelled fast-path), and the +just-dequeued-cancelled event is counted dropped before draining. Because `Unsubscribe` waits on +`sub.finished` (closed by `handleEvents` on exit), by the time `Unsubscribe` returns the channel +is already drained — no separate drain in `Unsubscribe`, no double-drain. + +```go +func (m *MemoryEventBus) drainSubscription(sub *memorySubscription) { + for { + select { + case <-sub.eventCh: + atomic.AddUint64(&m.droppedCount, 1) + default: + return + } + } +} +``` + +**Residual race (documented, accepted):** a publisher that passed the `cancelled`/`isStarted` +check but has not yet sent can enqueue one event after the drain; it is then lost and uncounted. +Closing it fully needs `sub.mutex` held across the channel send, serializing all publishes — +rejected for an observability fix. Conservation holds exactly once publishers are quiesced (the +realistic teardown path; the conservation tests quiesce publishers before teardown). Recorded in ADR. + +### D3 — `statsProvider` interface (P2 module wiring) + +```go +type statsProvider interface { Stats() (delivered, dropped uint64) } +``` +`CollectStats`/`CollectPerEngineStats` assert `statsProvider` instead of `*MemoryEventBus`, so +both memory and custom engines participate in `module.Stats()`/`PerEngineStats()`. Verified safe: +only `memory_race_test.go:89` consumes `PerEngineStats`, and it sums over all entries (no `len`/key +assertion). `DurableMemoryEventBus.Stats()` returns `(delivered uint64)` — wrong arity, does not +satisfy the interface, stays skipped (unchanged). Harmonizing its arity = out of scope follow-up. + +### D4 — drain abandoned async tasks at Stop (closes C1, MemoryEventBus only) + +`MemoryEventBus.Stop` already cancels ctx then waits on `m.wg` (covers BOTH worker and +`handleEvents` goroutines — each does `wg.Add(1)`/`defer wg.Done()`). AFTER `wg.Wait()` completes, +no goroutine sends to or receives from `m.workerPool`, so draining it is race-free and +deterministic. Add to the `case <-done:` branch: + +```go +func (m *MemoryEventBus) drainWorkerPool() { + for { + select { + case <-m.workerPool: + atomic.AddUint64(&m.droppedCount, 1) + default: + return + } + } +} +``` + +This makes V1 hold for async subscriptions too. The `ctx.Done()` (shutdown-timeout) error branch +does not drain — timeout is already a degraded path returning `ErrEventBusShutdownTimeout`. +`CustomMemoryEventBus` has no worker pool (its `handleEvents` runs the handler inline regardless of +`isAsync`), so its only buffer is `sub.eventCh` — fully covered by D2 drain. + +### D5 — `CustomMemoryEventBus` shutdown synchronization (closes I3) + +Custom engine currently has no `WaitGroup`/`finished` channel; `Stop` cancels ctx and returns +without waiting, so a drain added to its `handleEvents` would complete AFTER `Stop` returns → +`Stats()` read post-stop would race the drain. Mirror `MemoryEventBus`: add `wg sync.WaitGroup` +to `CustomMemoryEventBus`, `finished chan struct{}` to `customMemorySubscription`, +`wg.Add(1)`/`defer wg.Done()`+`defer close(sub.finished)` around `handleEvents`, `Stop` waits +`wg.Wait()` under the passed `ctx` budget (returning `ErrEventBusShutdownTimeout` on timeout to +match memory engine), and `Unsubscribe` waits briefly on `sub.finished` (like memory engine) so +the drain is observed deterministically. + +## Invariants (backprop targets) + +``` +V1: ∀ subscriber, once publishers quiesced: enqueued == delivered + dropped + where enqueued = events accepted into sub.eventCh (+ for async, handed to workerPool). + Publish-time full-channel drops are a separate already-counted dropped bucket. +V2: timeout-mode Publish to a full buffer returns within ~blockTimeout (never hangs). +``` + +## Scope + +In: `memory.go`, `custom_memory.go`, `engine_registry.go`, `eventbus.go` (doc), `module.go` (doc) ++ tests + ADR. +Out (documented follow-ups, not #112): `DurableMemoryEventBus.Stats()` arity harmonization; +multi-engine config path does not thread `deliveryMode`/`publishBlockTimeout` into per-engine +`Config` maps (pre-existing; timer fix still reached in single-engine timeout mode + tests). + +## Tasks + +| id | task | file | +|----|------|------| +| T1 | CI-SAFE failing test: timeout-mode publish to full buffer — run Publish in goroutine, `select` with 3s bound, assert it returned (NOT a naked Publish that deadlocks CI) | memory_*_test.go | +| T2 | fix timer drain (D1) | memory.go | +| T3 | failing conservation tests: sync + async, across Unsubscribe AND Stop, both engines; quiesce publishers first | *_test.go | +| T4 | MemoryEventBus: count dequeued-cancelled event; `drainSubscription` on every handleEvents exit (D2) | memory.go | +| T5 | MemoryEventBus: `drainWorkerPool` after wg.Wait in Stop (D4) | memory.go | +| T6 | CustomMemoryEventBus: deliveredCount/droppedCount + Stats(); wg/finished infra (D5); drain-on-exit; Stop waits; Unsubscribe waits | custom_memory.go | +| T7 | statsProvider interface; router asserts it (D3) | engine_registry.go | +| T8 | fix Stop doc comments (D2) | eventbus.go, module.go | +| T9 | ADR: at-most-once teardown contract + rejected best-effort-deliver + documented residual race | docs/plans/ | + +## Verification + +- T1 fails-fast (3s timeout assertion) on unpatched code, passes after T2. Backprop V2. +- conservation tests assert `delivered+dropped == enqueued` for sync AND async, post-teardown. +- `cd modules/eventbus && go test -race ./...` (CLAUDE.md mandates -race). +- `go vet ./...`; `golangci-lint run`; `go fmt ./...`. +- multi-engine: existing `memory_race_test.go` PerEngineStats path still green after D3. From cbaffc29c005bfb6c7b4bca3da1c6c88cb0abe9b Mon Sep 17 00:00:00 2001 From: Jon Langevin Date: Thu, 28 May 2026 20:58:21 -0400 Subject: [PATCH 2/2] fix(eventbus): timer-drain hang + silent-drop observability (#112) Fixes three issues in modules/eventbus surfaced in #112: P1 (bug): MemoryEventBus.Publish hung forever in deliveryMode "timeout". The legacy `if !deadline.Stop() { <-deadline.C }` drain blocks on an already-drained timer channel (on Go 1.23+ timer channels are unbuffered). Replaced with a non-blocking, version-safe drain covering both the timer-fired and ctx-cancelled exit branches. P2 (observability): CustomMemoryEventBus dropped events silently. Added deliveredCount/droppedCount + Stats() mirroring MemoryEventBus, and changed EngineRouter.CollectStats/CollectPerEngineStats to dispatch over a new statsProvider interface so the custom engine participates in module-level stats (previously they type-asserted the concrete *MemoryEventBus and ignored every other engine). P3 (observability): Unsubscribe/Stop abandoned buffered events without counting them. Both engines now count events still buffered in a subscriber channel as dropped on handler-goroutine exit (drainSubscription, run via defer so no exit path skips it), plus the dequeued-but-cancelled event. MemoryEventBus also drains the worker pool after wg.Wait in Stop so abandoned async tasks are counted (closes the async conservation gap). CustomMemoryEventBus gained the WaitGroup + finished-channel shutdown synchronization the standard engine already had, so Stop waits for handler goroutines to drain and Stats() is final on return. Contract: at-most-once delivery across teardown (delivered + dropped == enqueued once publishers quiesce). Corrected the false "processes all in-flight events" Stop doc comments in eventbus.go and module.go. Decision recorded in decisions/0001-eventbus-at-most-once-teardown.md. Design + adversarial review: docs/plans/2026-05-28-eventbus-112-fix.md. Co-Authored-By: Claude Opus 4.8 --- .../0001-eventbus-at-most-once-teardown.md | 66 +++++++ modules/eventbus/custom_memory.go | 119 ++++++++++-- modules/eventbus/engine_registry.go | 26 ++- modules/eventbus/eventbus.go | 7 +- modules/eventbus/issue112_custom_test.go | 129 +++++++++++++ modules/eventbus/issue112_memory_test.go | 170 ++++++++++++++++++ modules/eventbus/memory.go | 53 +++++- modules/eventbus/module.go | 16 +- 8 files changed, 555 insertions(+), 31 deletions(-) create mode 100644 decisions/0001-eventbus-at-most-once-teardown.md create mode 100644 modules/eventbus/issue112_custom_test.go create mode 100644 modules/eventbus/issue112_memory_test.go diff --git a/decisions/0001-eventbus-at-most-once-teardown.md b/decisions/0001-eventbus-at-most-once-teardown.md new file mode 100644 index 00000000..c19c7eb6 --- /dev/null +++ b/decisions/0001-eventbus-at-most-once-teardown.md @@ -0,0 +1,66 @@ +# 1. Eventbus in-memory engines: at-most-once delivery across teardown + +Date: 2026-05-28 +Status: Accepted +Context: issue #112 + +## Context + +The in-memory event-bus engines (`MemoryEventBus`, `CustomMemoryEventBus`) +buffer events in per-subscriber channels and, for async memory subscriptions, in +a shared worker pool. On `Unsubscribe` and `Stop` the handler goroutines exited +immediately and **abandoned** any events still buffered: they were neither +delivered nor counted as dropped. `Stats()` reported zero drops while events +silently evaporated — invisible to tests and to anyone wiring `droppedCount` to +alerting (issue #112, part 3). The `EventBus.Stop` doc comment additionally +claimed Stop "ensure[s] all in-flight events are processed before returning," +which the implementation never honored. + +A contract had to be chosen and documented. Two options: + +1. **Best-effort deliver at teardown** — drain buffered events through the + handler before exiting. +2. **At-most-once; count abandoned events as dropped** — do not deliver buffered + events at teardown; increment `droppedCount` for each so the loss is + observable and `Stats()` conservation (`delivered + dropped == enqueued`) + holds. + +## Decision + +Adopt **at-most-once delivery across teardown** (option 2). + +- On `Unsubscribe`/`Stop`, events still buffered in a subscriber channel are + counted as dropped, not delivered (`drainSubscription`). +- For async `MemoryEventBus` subscriptions, events dequeued into the worker pool + but not executed before `Stop` are also counted as dropped, drained after + `wg.Wait()` so the operation is race-free (`drainWorkerPool`). +- `CustomMemoryEventBus` gains the `sync.WaitGroup` + `finished`-channel + shutdown synchronization the standard engine already had, so `Stop` waits for + handler goroutines to finish draining and `Stats()` is final on return. +- The misleading `Stop` doc comments in `eventbus.go` and `module.go` are + corrected to state the at-most-once contract. + +## Rejected: best-effort deliver at teardown + +`Stop` cancels the bus context first, so any handler invoked during drain would +run with an already-cancelled context. A slow or blocking handler would stall +`Stop` (which has its own shutdown-deadline budget) and `Unsubscribe`. Delivering +on a dying bus is a footgun and contradicts the existing cancel-first ordering. +Counting-as-dropped is safe, observable, order-preserving, and makes a +conservation invariant testable. The goal of #112 is to make drops *visible*, +not to add best-effort delivery semantics the engines never promised. + +## Consequences + +- New invariant, gated by tests: once publishers are quiesced, + `delivered + dropped == enqueued` for both sync and async subscriptions. +- **Residual race (accepted):** a publisher that passed the `cancelled`/ + `isStarted` check but has not yet sent can enqueue one event after the drain; + it is then lost and uncounted. Fully closing this would require holding the + subscription mutex across the channel send, serializing all publishes — + rejected as too costly for an observability fix. Conservation holds exactly + once publishers are quiesced (the realistic teardown path). +- **Out of scope (follow-ups):** `DurableMemoryEventBus.Stats()` has a single + `(delivered)` return and so does not satisfy the new `statsProvider` interface + (it never drops, by design); the multi-engine config path does not thread + `deliveryMode`/`publishBlockTimeout` into per-engine config maps. diff --git a/modules/eventbus/custom_memory.go b/modules/eventbus/custom_memory.go index 6040df27..f6cc1310 100644 --- a/modules/eventbus/custom_memory.go +++ b/modules/eventbus/custom_memory.go @@ -15,14 +15,17 @@ import ( // memory engine, this one includes additional features like event metrics collection, // custom event filtering, and enhanced subscription management. type CustomMemoryEventBus struct { - config *CustomMemoryConfig - subscriptions map[string]map[string]*customMemorySubscription - topicMutex sync.RWMutex - ctx context.Context - cancel context.CancelFunc - isStarted atomic.Bool - eventMetrics *EventMetrics - eventFilters []EventFilter + config *CustomMemoryConfig + subscriptions map[string]map[string]*customMemorySubscription + topicMutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc + isStarted atomic.Bool + eventMetrics *EventMetrics + eventFilters []EventFilter + wg sync.WaitGroup // tracks handler goroutines for deterministic shutdown + deliveredCount uint64 // stats + droppedCount uint64 // stats } // CustomMemoryConfig holds configuration for the custom memory engine @@ -80,12 +83,21 @@ type customMemorySubscription struct { isAsync bool eventCh chan Event done chan struct{} + finished chan struct{} // closed when the handler goroutine exits cancelled bool mutex sync.RWMutex subscriptionTime time.Time processedEvents int64 } +// isCancelled reports whether the subscription has been cancelled, without +// exposing the lock. +func (s *customMemorySubscription) isCancelled() bool { + s.mutex.RLock() + defer s.mutex.RUnlock() + return s.cancelled +} + // Topic returns the topic of the subscription func (s *customMemorySubscription) Topic() string { return s.topic @@ -228,6 +240,31 @@ func (c *CustomMemoryEventBus) Stop(ctx context.Context) error { } c.topicMutex.Unlock() + // Wait for handler goroutines to exit (and run their teardown drains) so + // Stats() is final and accurate by the time Stop returns. Bounded by the + // caller's context, mirroring MemoryEventBus.Stop. + done := make(chan struct{}) + go func() { + defer func() { + if r := recover(); r != nil { + slog.Error("panic recovered in custom memory eventbus shutdown waiter", "error", r) + } + }() + c.wg.Wait() + close(done) + }() + + select { + case <-done: + // All handler goroutines exited gracefully. + case <-ctx.Done(): + // Shutdown deadline elapsed with handlers still running. isStarted is + // intentionally left true (matching MemoryEventBus) so callers do not + // treat a timed-out bus as cleanly stopped; the waiter goroutine exits + // once wg drains after cancel. + return ErrEventBusShutdownTimeout + } + c.isStarted.Store(false) slog.Info("Custom memory event bus stopped", "totalEvents", c.eventMetrics.TotalEvents, @@ -289,7 +326,9 @@ func (c *CustomMemoryEventBus) Publish(ctx context.Context, event Event) error { case sub.eventCh <- event: // Event sent to subscriber default: - // Channel is full, log warning + // Channel is full — drop and count it so the drop is observable via + // Stats() instead of vanishing silently (#112). + atomic.AddUint64(&c.droppedCount, 1) slog.Warn("Subscription channel full, dropping event", "topic", event.Type(), "subscriptionID", sub.id) } @@ -326,6 +365,7 @@ func (c *CustomMemoryEventBus) subscribe(ctx context.Context, topic string, hand isAsync: isAsync, eventCh: make(chan Event, c.config.DefaultEventBufferSize), done: make(chan struct{}), + finished: make(chan struct{}), cancelled: false, subscriptionTime: time.Now(), processedEvents: 0, @@ -339,7 +379,8 @@ func (c *CustomMemoryEventBus) subscribe(ctx context.Context, topic string, hand c.subscriptions[topic][sub.id] = sub c.topicMutex.Unlock() - // Start event handler goroutine + // Start event handler goroutine (tracked so Stop can wait for it to drain). + c.wg.Add(1) go c.handleEvents(sub) slog.Debug("Created custom subscription", "topic", topic, "id", sub.id, "async", isAsync) @@ -370,16 +411,25 @@ func (c *CustomMemoryEventBus) Unsubscribe(ctx context.Context, subscription Sub return err } - // Remove from subscriptions map + // Remove from subscriptions map. Release the lock BEFORE waiting on the + // handler goroutine so a draining handler cannot stall other topic ops. c.topicMutex.Lock() - defer c.topicMutex.Unlock() - if subs, ok := c.subscriptions[sub.topic]; ok { delete(subs, sub.id) if len(subs) == 0 { delete(c.subscriptions, sub.topic) } } + c.topicMutex.Unlock() + + // Wait (briefly) for the handler goroutine to terminate so its teardown + // drain is reflected in Stats() and no post-unsubscribe delivery occurs. + t := time.NewTimer(100 * time.Millisecond) + defer t.Stop() + select { + case <-sub.finished: + case <-t.C: + } return nil } @@ -427,18 +477,38 @@ func (c *CustomMemoryEventBus) matchesTopic(eventTopic, subscriptionTopic string // handleEvents processes events for a custom subscription func (c *CustomMemoryEventBus) handleEvents(sub *customMemorySubscription) { + defer c.wg.Done() + defer close(sub.finished) + // LIFO defer order: recover (registered last) runs first to absorb a handler + // panic, THEN drainSubscription runs, THEN finished is closed, THEN wg.Done. + // So a goroutine waiting on finished (Unsubscribe) or wg (Stop) always sees + // the final dropped count. drainSubscription counts any events still buffered + // in the subscriber channel as dropped on every exit path (at-most-once + // teardown contract, #112). + defer c.drainSubscription(sub) defer func() { if r := recover(); r != nil { slog.Error("panic recovered in custom memory event handler", "error", r, "topic", sub.topic) } }() for { + // Fast path: if cancelled between events, exit before selecting so the + // deferred drain runs deterministically (no race with a ready eventCh). + if sub.isCancelled() { + return + } select { case <-c.ctx.Done(): return case <-sub.done: return case event := <-sub.eventCh: + // Re-check cancellation after dequeue; a dequeued-but-unhandled + // event is counted as dropped (the deferred drain handles the rest). + if sub.isCancelled() { + atomic.AddUint64(&c.droppedCount, 1) + return + } startTime := time.Now() // Process the event @@ -468,10 +538,33 @@ func (c *CustomMemoryEventBus) handleEvents(sub *customMemorySubscription) { "subscriptionID", sub.id, "processingDuration", processingDuration) } + // Count as delivered after processing (success or failure), mirroring + // the standard memory engine. + atomic.AddUint64(&c.deliveredCount, 1) } } } +// drainSubscription counts events still buffered in the subscriber channel as +// dropped (at-most-once teardown contract, #112). Safe only when called from +// the owning handler goroutine on exit — the sole reader of sub.eventCh. +func (c *CustomMemoryEventBus) drainSubscription(sub *customMemorySubscription) { + for { + select { + case <-sub.eventCh: + atomic.AddUint64(&c.droppedCount, 1) + default: + return + } + } +} + +// Stats returns delivery statistics for monitoring/testing, mirroring +// MemoryEventBus.Stats so the engine participates in router-level aggregation. +func (c *CustomMemoryEventBus) Stats() (delivered uint64, dropped uint64) { + return atomic.LoadUint64(&c.deliveredCount), atomic.LoadUint64(&c.droppedCount) +} + // metricsCollector periodically logs metrics func (c *CustomMemoryEventBus) metricsCollector() { defer func() { diff --git a/modules/eventbus/engine_registry.go b/modules/eventbus/engine_registry.go index 36d1762c..ac6d2760 100644 --- a/modules/eventbus/engine_registry.go +++ b/modules/eventbus/engine_registry.go @@ -265,14 +265,24 @@ func (r *EngineRouter) GetEngineForTopic(topic string) string { return r.getEngineForTopic(topic) } -// CollectStats aggregates delivery statistics from engines that expose them. -// At present only the in-memory engine exposes Stats(). Engines that don't -// implement Stats() are simply skipped. This keeps the method safe to call in -// multi-engine configurations mixing different backend types. +// statsProvider is implemented by any engine that exposes delivery statistics +// as a (delivered, dropped) pair. Routing over this interface (rather than a +// concrete type) lets every stats-bearing engine — memory, custom, and any +// future engine — participate in module-level aggregation. Engines that do not +// implement it (e.g. DurableMemoryEventBus, whose Stats() has a different arity) +// are simply skipped. +type statsProvider interface { + Stats() (delivered uint64, dropped uint64) +} + +// CollectStats aggregates delivery statistics from engines that expose them +// via the statsProvider interface. Engines that don't implement it are skipped, +// keeping the method safe to call in multi-engine configurations mixing +// different backend types. func (r *EngineRouter) CollectStats() (delivered uint64, dropped uint64) { for _, engine := range r.engines { - if mem, ok := engine.(*MemoryEventBus); ok { - d, dr := mem.Stats() + if sp, ok := engine.(statsProvider); ok { + d, dr := sp.Stats() delivered += d dropped += dr } @@ -288,8 +298,8 @@ func (r *EngineRouter) CollectStats() (delivered uint64, dropped uint64) { func (r *EngineRouter) CollectPerEngineStats() map[string]DeliveryStats { stats := make(map[string]DeliveryStats) for name, engine := range r.engines { - if mem, ok := engine.(*MemoryEventBus); ok { - d, dr := mem.Stats() + if sp, ok := engine.(statsProvider); ok { + d, dr := sp.Stats() stats[name] = DeliveryStats{Delivered: d, Dropped: dr} } } diff --git a/modules/eventbus/eventbus.go b/modules/eventbus/eventbus.go index bb305808..be298486 100644 --- a/modules/eventbus/eventbus.go +++ b/modules/eventbus/eventbus.go @@ -91,7 +91,12 @@ type EventBus interface { // Stop shuts down the event bus. // This method is called during module shutdown and should cleanup // all resources, close connections, and stop background processes. - // It should ensure all in-flight events are processed before returning. + // + // Delivery is at-most-once across teardown: an event already being handled + // runs to completion, but events still buffered in a subscriber's queue at + // Stop are NOT delivered — the in-memory engines count them as dropped so + // the loss is visible via Stats() rather than silent. Stop returns once + // background goroutines have exited (or the context deadline elapses). Stop(ctx context.Context) error // Publish sends an event to the specified topic. diff --git a/modules/eventbus/issue112_custom_test.go b/modules/eventbus/issue112_custom_test.go new file mode 100644 index 00000000..6e7fa507 --- /dev/null +++ b/modules/eventbus/issue112_custom_test.go @@ -0,0 +1,129 @@ +package eventbus + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newCustomBusWithDrop starts a CustomMemoryEventBus, holds its single handler +// open, fills the (size-1) subscriber buffer, then publishes more so the engine +// must drop on a full channel. Returns the engine and a cleanup func that +// releases the handler and stops the bus. +func newCustomBusWithDrop(t *testing.T) (*CustomMemoryEventBus, func()) { + t.Helper() + ebus, err := NewCustomMemoryEventBus(map[string]interface{}{ + "defaultEventBufferSize": 1, + "maxEventQueueSize": 100, + "enableMetrics": false, + }) + require.NoError(t, err) + cbus := ebus.(*CustomMemoryEventBus) + require.NoError(t, cbus.Start(context.Background())) + + started := make(chan struct{}) + release := make(chan struct{}) + var once sync.Once + _, err = cbus.Subscribe(context.Background(), "full.topic", func(_ context.Context, _ Event) error { + once.Do(func() { close(started) }) + <-release + return nil + }) + require.NoError(t, err) + + require.NoError(t, cbus.Publish(context.Background(), evt112("full.topic"))) + <-started + // buffer (cap 1) takes one; the rest hit the full-channel drop path. + for range 10 { + _ = cbus.Publish(context.Background(), evt112("full.topic")) + } + + cleanup := func() { + close(release) + _ = cbus.Stop(context.Background()) + } + return cbus, cleanup +} + +// TestIssue112_CustomBus_StatsCountsDroppedOnFullChannel gates issue #112 part 2: +// CustomMemoryEventBus must expose Stats() and count silent full-channel drops. +func TestIssue112_CustomBus_StatsCountsDroppedOnFullChannel(t *testing.T) { + cbus, cleanup := newCustomBusWithDrop(t) + defer cleanup() + + _, dropped := cbus.Stats() + assert.Greater(t, dropped, uint64(0), + "custom engine must count publish-time full-channel drops in Stats()") +} + +// TestIssue112_CustomBus_UnsubscribeCountsBufferedAsDropped gates issue #112 +// part 3 on the custom engine: buffered events abandoned at Unsubscribe must be +// counted as dropped, and Stats() conservation must hold. +func TestIssue112_CustomBus_UnsubscribeCountsBufferedAsDropped(t *testing.T) { + ebus, err := NewCustomMemoryEventBus(map[string]interface{}{ + "defaultEventBufferSize": 16, + "maxEventQueueSize": 100, + "enableMetrics": false, + }) + require.NoError(t, err) + cbus := ebus.(*CustomMemoryEventBus) + ctx := context.Background() + require.NoError(t, cbus.Start(ctx)) + defer cbus.Stop(context.Background()) //nolint:errcheck + + started := make(chan struct{}) + release := make(chan struct{}) + var once sync.Once + sub, err := cbus.Subscribe(ctx, "drain.topic", func(_ context.Context, _ Event) error { + once.Do(func() { close(started) }) + <-release + return nil + }) + require.NoError(t, err) + cs := sub.(*customMemorySubscription) + + require.NoError(t, cbus.Publish(ctx, evt112("drain.topic"))) + <-started + + const buffered = 5 + for range buffered { + require.NoError(t, cbus.Publish(ctx, evt112("drain.topic"))) + } + + require.NoError(t, cbus.Unsubscribe(ctx, sub)) + close(release) + + select { + case <-cs.finished: + case <-time.After(2 * time.Second): + t.Fatal("custom handler goroutine did not exit after release") + } + + delivered, dropped := cbus.Stats() + const total = uint64(1 + buffered) + assert.Equal(t, uint64(1), delivered, "e1 should be delivered") + assert.Equal(t, uint64(buffered), dropped, "buffered events must be counted as dropped at Unsubscribe") + assert.Equal(t, total, delivered+dropped, "conservation: delivered+dropped == enqueued") +} + +// TestIssue112_Router_CollectStatsIncludesCustomEngine gates the module-wiring +// half of issue #112 part 2 (design D3): the router must aggregate Stats() from +// ANY engine implementing the statsProvider interface, not only *MemoryEventBus. +func TestIssue112_Router_CollectStatsIncludesCustomEngine(t *testing.T) { + cbus, cleanup := newCustomBusWithDrop(t) + defer cleanup() + + router := &EngineRouter{engines: map[string]EventBus{"custom": cbus}} + + _, dropped := router.CollectStats() + assert.Greater(t, dropped, uint64(0), + "CollectStats must include the custom engine's drops via the statsProvider interface") + + per := router.CollectPerEngineStats() + require.Contains(t, per, "custom", "per-engine stats must include the custom engine") + assert.Greater(t, per["custom"].Dropped, uint64(0)) +} diff --git a/modules/eventbus/issue112_memory_test.go b/modules/eventbus/issue112_memory_test.go new file mode 100644 index 00000000..fb9c1011 --- /dev/null +++ b/modules/eventbus/issue112_memory_test.go @@ -0,0 +1,170 @@ +package eventbus + +import ( + "context" + "sync" + "testing" + "time" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// evt112 builds a minimal CloudEvent for the given topic. +func evt112(topic string) Event { + e := cloudevents.NewEvent() + e.SetID(uuid.NewString()) + e.SetType(topic) + e.SetSource("issue112-test") + return e +} + +// TestIssue112_TimeoutModePublishDoesNotHang is the regression gate for issue +// #112 part 1: in deliveryMode "timeout", once the timer fires the legacy +// `if !deadline.Stop() { <-deadline.C }` drain blocks the publisher forever. +// +// CI-safe: the publish loop runs in a goroutine and the test asserts it returns +// within a bound. On buggy code the goroutine never returns and the bound trips +// (test fails) instead of deadlocking the whole CI run. +func TestIssue112_TimeoutModePublishDoesNotHang(t *testing.T) { + bus := NewMemoryEventBus(&EventBusConfig{ + MaxEventQueueSize: 100, + DefaultEventBufferSize: 1, + WorkerCount: 1, + DeliveryMode: "timeout", + PublishBlockTimeout: 50 * time.Millisecond, + RetentionDays: 1, + }) + ctx := context.Background() + require.NoError(t, bus.Start(ctx)) + + release := make(chan struct{}) + var once sync.Once + relFn := func() { once.Do(func() { close(release) }) } + // LIFO: release the handler before Stop waits on the worker. + defer bus.Stop(context.Background()) //nolint:errcheck + defer relFn() + + _, err := bus.Subscribe(ctx, "hang.topic", func(_ context.Context, _ Event) error { + <-release // hold the single sync handler so the buffer fills + return nil + }) + require.NoError(t, err) + + done := make(chan struct{}) + go func() { + defer close(done) + // First publish is consumed by the handler (which then blocks); the + // next fills the buffer; subsequent ones hit the full-buffer timeout + // path that contains the deadlock. + for range 5 { + _ = bus.Publish(ctx, evt112("hang.topic")) + } + }() + + select { + case <-done: + // All publishes returned — timer drain is race-free. + case <-time.After(3 * time.Second): + t.Fatal("Publish hung in timeout delivery mode (issue #112 P1): timer-drain deadlock") + } +} + +// TestIssue112_MemoryBus_UnsubscribeCountsBufferedAsDropped is the regression +// gate for issue #112 part 3 on the memory engine: events still buffered in a +// subscriber channel at Unsubscribe must be counted as dropped (at-most-once +// teardown contract), so Stats() conservation holds. +func TestIssue112_MemoryBus_UnsubscribeCountsBufferedAsDropped(t *testing.T) { + bus := NewMemoryEventBus(&EventBusConfig{ + MaxEventQueueSize: 100, + DefaultEventBufferSize: 16, + WorkerCount: 1, + DeliveryMode: "drop", + RetentionDays: 1, + }) + ctx := context.Background() + require.NoError(t, bus.Start(ctx)) + defer bus.Stop(context.Background()) //nolint:errcheck + + started := make(chan struct{}) + release := make(chan struct{}) + var once sync.Once + sub, err := bus.Subscribe(ctx, "drain.topic", func(_ context.Context, _ Event) error { + once.Do(func() { close(started) }) + <-release + return nil + }) + require.NoError(t, err) + ms := sub.(*memorySubscription) + + // e1 is dequeued; the handler signals started then blocks. + require.NoError(t, bus.Publish(ctx, evt112("drain.topic"))) + <-started + + // e2..e(1+buffered) sit in the buffer (buffer=16, none dropped at publish). + const buffered = 5 + for range buffered { + require.NoError(t, bus.Publish(ctx, evt112("drain.topic"))) + } + + // Unsubscribe sets cancelled + closes done, then waits (<=100ms) on finished. + // The handler is still blocked, so the wait times out and Unsubscribe returns. + require.NoError(t, bus.Unsubscribe(ctx, sub)) + + // Release the handler: it returns (e1 delivered), the loop's top-of-iteration + // cancelled fast-path fires, and the deferred drain counts the buffered + // events as dropped. + close(release) + + select { + case <-ms.finished: + case <-time.After(2 * time.Second): + t.Fatal("handler goroutine did not exit after release") + } + + delivered, dropped := bus.Stats() + const total = uint64(1 + buffered) + assert.Equal(t, uint64(1), delivered, "e1 should be delivered") + assert.Equal(t, uint64(buffered), dropped, "buffered events must be counted as dropped at Unsubscribe") + assert.Equal(t, total, delivered+dropped, "conservation: delivered+dropped == enqueued") +} + +// TestIssue112_MemoryBus_StopCountsWorkerPoolAsDropped is the regression gate +// for the async leak (adversarial-review C1): events dequeued into the worker +// pool but not yet executed when Stop cancels must be counted as dropped, so +// conservation holds for async subscriptions too. +func TestIssue112_MemoryBus_StopCountsWorkerPoolAsDropped(t *testing.T) { + bus := NewMemoryEventBus(&EventBusConfig{ + MaxEventQueueSize: 100, + DefaultEventBufferSize: 64, + WorkerCount: 1, // single slow worker so the pool backs up + DeliveryMode: "drop", + RetentionDays: 1, + }) + ctx := context.Background() + require.NoError(t, bus.Start(ctx)) + + const n = 30 + _, err := bus.SubscribeAsync(ctx, "wp.topic", func(_ context.Context, _ Event) error { + time.Sleep(15 * time.Millisecond) + return nil + }) + require.NoError(t, err) + + for range n { + require.NoError(t, bus.Publish(ctx, evt112("wp.topic"))) + } + // Let handleEvents move events from the channel into the worker pool and let + // the single worker start chewing through them. + time.Sleep(20 * time.Millisecond) + + require.NoError(t, bus.Stop(context.Background())) + + delivered, dropped := bus.Stats() + assert.Equal(t, uint64(n), delivered+dropped, + "conservation: every async-queued event must be delivered or dropped (delivered=%d dropped=%d)", delivered, dropped) + assert.Greater(t, dropped, uint64(0), + "a slow worker + immediate Stop must leave some queued events counted as dropped") +} diff --git a/modules/eventbus/memory.go b/modules/eventbus/memory.go index 03fe3d84..5ed5ee08 100644 --- a/modules/eventbus/memory.go +++ b/modules/eventbus/memory.go @@ -166,7 +166,11 @@ func (m *MemoryEventBus) Stop(ctx context.Context) error { select { case <-done: - // All workers exited gracefully + // All workers and handler goroutines have exited. Nothing can now send + // to or receive from the worker pool, so any tasks still queued were + // dequeued from subscriber channels but never executed — count them as + // dropped so Stats() conservation holds for async subscriptions (#112). + m.drainWorkerPool() case <-ctx.Done(): return ErrEventBusShutdownTimeout } @@ -175,6 +179,21 @@ func (m *MemoryEventBus) Stop(ctx context.Context) error { return nil } +// drainWorkerPool counts any async handler tasks still queued in the worker +// pool as dropped. It MUST be called only after m.wg.Wait() has completed (no +// concurrent senders from handleEvents, no concurrent receivers in worker), so +// the non-blocking drain is race-free. +func (m *MemoryEventBus) drainWorkerPool() { + for { + select { + case <-m.workerPool: + atomic.AddUint64(&m.droppedCount, 1) + default: + return + } + } +} + // matchesTopic checks if an event topic matches a subscription topic pattern // Supports wildcard patterns like "user.*" matching "user.created", "user.updated", etc. func matchesTopic(eventTopic, subscriptionTopic string) bool { @@ -288,8 +307,16 @@ func (m *MemoryEventBus) Publish(ctx context.Context, event Event) error { // timeout drop case <-ctx.Done(): } + // Race-free, version-safe drain. On Go 1.23+ timer channels are + // unbuffered, so the legacy unconditional `<-deadline.C` blocks + // forever once the timer has fired and the select consumed the + // tick (issue #112). The non-blocking select covers both the + // timer-fired and ctx-cancelled exit branches. if !deadline.Stop() { - <-deadline.C + select { + case <-deadline.C: + default: + } } } default: // "drop" @@ -453,6 +480,10 @@ func (m *MemoryEventBus) SubscriberCount(topic string) int { func (m *MemoryEventBus) handleEvents(sub *memorySubscription) { defer m.wg.Done() defer close(sub.finished) + // Runs first (LIFO), before finished is closed: on every exit path, count + // events still buffered in the subscriber channel as dropped (at-most-once + // teardown contract, #112). Placed as a defer so no exit return can skip it. + defer m.drainSubscription(sub) for { // Fast path: if subscription cancelled, exit before selecting (avoids processing backlog after unsubscribe) @@ -467,6 +498,9 @@ func (m *MemoryEventBus) handleEvents(sub *memorySubscription) { case event := <-sub.eventCh: // Re-check cancellation after dequeue to avoid processing additional events post-unsubscribe. if sub.isCancelled() { + // This event was dequeued but will not be handled — count it as + // dropped (the deferred drain handles the rest of the buffer). + atomic.AddUint64(&m.droppedCount, 1) return } if sub.isAsync { @@ -546,6 +580,21 @@ func (m *MemoryEventBus) worker() { } } +// drainSubscription counts any events still buffered in the subscriber channel +// as dropped (at-most-once teardown contract, #112). It is safe only when +// called from the owning handler goroutine on exit — that goroutine is the sole +// reader of sub.eventCh, so there is no concurrent receiver to double-drain. +func (m *MemoryEventBus) drainSubscription(sub *memorySubscription) { + for { + select { + case <-sub.eventCh: + atomic.AddUint64(&m.droppedCount, 1) + default: + return + } + } +} + // Stats returns basic delivery stats for monitoring/testing. func (m *MemoryEventBus) Stats() (delivered uint64, dropped uint64) { return atomic.LoadUint64(&m.deliveredCount), atomic.LoadUint64(&m.droppedCount) diff --git a/modules/eventbus/module.go b/modules/eventbus/module.go index 6ec519d9..437b6456 100644 --- a/modules/eventbus/module.go +++ b/modules/eventbus/module.go @@ -342,16 +342,18 @@ func (m *EventBusModule) Start(ctx context.Context) error { } // Stop performs shutdown logic for the module. -// This method gracefully shuts down all event bus engines, ensuring all in-flight -// events are processed and all subscriptions are properly cleaned up. +// This method gracefully shuts down all event bus engines and cleans up all +// subscriptions. Delivery is at-most-once across teardown: an event already in +// a handler runs to completion, but events still buffered in subscriber queues +// are counted as dropped (visible via Stats()) rather than delivered or lost +// silently. // // The shutdown process: // 1. Checks if already stopped (idempotent) -// 2. Stops accepting new events -// 3. Waits for in-flight events to complete -// 4. Cancels all active subscriptions -// 5. Shuts down worker pools -// 6. Closes all underlying event bus engines +// 2. Cancels all active subscriptions and signals engines to stop +// 3. Waits for handler/worker goroutines to exit (bounded by ctx) +// 4. Counts any still-buffered or still-queued events as dropped +// 5. Closes all underlying event bus engines // // This method is thread-safe and can be called multiple times safely. func (m *EventBusModule) Stop(ctx context.Context) error {