Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions decisions/0001-eventbus-at-most-once-teardown.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# 1. Eventbus in-memory engines: at-most-once delivery across teardown

Date: 2026-05-28
Status: Accepted
Context: issue #112

## Context

The in-memory event-bus engines (`MemoryEventBus`, `CustomMemoryEventBus`)
buffer events in per-subscriber channels and, for async memory subscriptions, in
a shared worker pool. On `Unsubscribe` and `Stop` the handler goroutines exited
immediately and **abandoned** any events still buffered: they were neither
delivered nor counted as dropped. `Stats()` reported zero drops while events
silently evaporated — invisible to tests and to anyone wiring `droppedCount` to
alerting (issue #112, part 3). The `EventBus.Stop` doc comment additionally
claimed Stop "ensure[s] all in-flight events are processed before returning,"
which the implementation never honored.

A contract had to be chosen and documented. Two options:

1. **Best-effort deliver at teardown** — drain buffered events through the
handler before exiting.
2. **At-most-once; count abandoned events as dropped** — do not deliver buffered
events at teardown; increment `droppedCount` for each so the loss is
observable and `Stats()` conservation (`delivered + dropped == enqueued`)
holds.

## Decision

Adopt **at-most-once delivery across teardown** (option 2).

- On `Unsubscribe`/`Stop`, events still buffered in a subscriber channel are
counted as dropped, not delivered (`drainSubscription`).
- For async `MemoryEventBus` subscriptions, events dequeued into the worker pool
but not executed before `Stop` are also counted as dropped, drained after
`wg.Wait()` so the operation is race-free (`drainWorkerPool`).
- `CustomMemoryEventBus` gains the `sync.WaitGroup` + `finished`-channel
shutdown synchronization the standard engine already had, so `Stop` waits for
handler goroutines to finish draining and `Stats()` is final on return.
- The misleading `Stop` doc comments in `eventbus.go` and `module.go` are
corrected to state the at-most-once contract.

## Rejected: best-effort deliver at teardown

`Stop` cancels the bus context first, so any handler invoked during drain would
run with an already-cancelled context. A slow or blocking handler would stall
`Stop` (which has its own shutdown-deadline budget) and `Unsubscribe`. Delivering
on a dying bus is a footgun and contradicts the existing cancel-first ordering.
Counting-as-dropped is safe, observable, order-preserving, and makes a
conservation invariant testable. The goal of #112 is to make drops *visible*,
not to add best-effort delivery semantics the engines never promised.

## Consequences

- New invariant, gated by tests: once publishers are quiesced,
`delivered + dropped == enqueued` for both sync and async subscriptions.
- **Residual race (accepted):** a publisher that passed the `cancelled`/
`isStarted` check but has not yet sent can enqueue one event after the drain;
it is then lost and uncounted. Fully closing this would require holding the
subscription mutex across the channel send, serializing all publishes —
rejected as too costly for an observability fix. Conservation holds exactly
once publishers are quiesced (the realistic teardown path).
- **Out of scope (follow-ups):** `DurableMemoryEventBus.Stats()` has a single
`(delivered)` return and so does not satisfy the new `statsProvider` interface
(it never drops, by design); the multi-engine config path does not thread
`deliveryMode`/`publishBlockTimeout` into per-engine config maps.
204 changes: 204 additions & 0 deletions docs/plans/2026-05-28-eventbus-112-fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
# Design: eventbus #112 — timer-drain hang + silent-drop observability

Issue: GoCodeAlone/modular#112. One bug + two observability gaps in `modules/eventbus`.
Single PR. Go 1.26.

> Rev 2 — after adversarial-design-review FAIL (2 Critical, 4 Important). Changes:
> async workerPool drain (C1), exact count-partition + drain placement (C2),
> CI-safe hang test (I1), CustomMemoryEventBus wg/finished infra (I3),
> module.go doc fix (I4). I2 verified benign (no existing test asserts PerEngineStats keys/len).

## Problem statement

### P1 — `MemoryEventBus.Publish` hangs in `deliveryMode: "timeout"` (real bug)

`memory.go:283-293`:

```go
deadline := time.NewTimer(blockTimeout)
select {
case sub.eventCh <- event:
sent = true
case <-deadline.C: // timer fires; value drained by select
case <-ctx.Done():
}
if !deadline.Stop() {
<-deadline.C // already-drained channel; never fires again → blocks forever
}
```

`<-deadline.C` branch taken → `Stop()` returns `false` → unconditional follow-up receive blocks
the publishing goroutine forever (one stuck goroutine per timed-out publish). On Go 1.23+ timer
channels are unbuffered, so the legacy `if !Stop() { <-C }` drain is now a hang, not a no-op.

### P2 — `CustomMemoryEventBus` drops silently (no counters, no `Stats()`)

`custom_memory.go:288-296` drops on full subscriber channel with only `slog.Warn`. No
`deliveredCount`/`droppedCount`, no `Stats()`. `MemoryEventBus` already exposes
`Stats() (delivered, dropped uint64)`. Module-wiring gap: `engine_registry.go`
`CollectStats`/`CollectPerEngineStats` type-assert the concrete `*MemoryEventBus`, so even after
adding `Stats()` to the custom engine, `EventBusModule.Stats()`/`PerEngineStats()` ignore it.

### P3 — `Unsubscribe`/`Stop` abandon buffered events without counting them (both engines)

`handleEvents` exits on `ctx.Done()`/`sub.done` and abandons everything still buffered in
`sub.eventCh` (plus, in `MemoryEventBus`, an event already dequeued when the post-dequeue
cancellation re-check fires at memory.go:469-471). For async `MemoryEventBus` subscriptions an
additional leak exists: events dequeued from `sub.eventCh` and handed to `m.workerPool` but not
yet executed when `Stop` cancels are abandoned uncounted. None increment `droppedCount`.
`eventbus.go:94` AND `module.go` Stop docs falsely claim Stop "processes all in-flight events".

## Decisions

### D1 — timer drain fix (P1)

Replace the unconditional drain with the version-safe non-blocking drain. Inline (NOT
`defer deadline.Stop()` — the block is inside a `for _, sub := range` loop; a deferred Stop would
accumulate one live timer per subscriber until `Publish` returns). The non-blocking form is
correct for BOTH the `<-deadline.C` and `<-ctx.Done()` exit branches (handles the slim TOCTOU
where the timer fires concurrently with ctx cancellation):

```go
if !deadline.Stop() {
select {
case <-deadline.C:
default:
}
}
```

### D2 — at-most-once teardown contract (P3) + exact count partition (closes C2)

**Contract: at-most-once delivery. On `Unsubscribe`/`Stop`, events not yet handed to a handler
are counted as `dropped`, never delivered.** Correct the false Stop doc comments in
`eventbus.go` AND `module.go`.

**Exact count partition — every event accepted into a subscriber path is counted in exactly one
bucket, once.** Each event lives in exactly one location at a time; we count at exactly one
transition out of the pipeline:

| terminal transition | bucket | site |
|---|---|---|
| publish-time full channel (event never entered `eventCh`) | dropped | `Publish` (existing) |
| sync handler ran (success or err) | delivered | `handleEvents` (existing) |
| async worker ran task | delivered | worker task (existing) |
| async `workerPool` full at queue time | dropped | `queueEventHandler` (existing) |
| dequeued but sub cancelled post-dequeue (memory.go:469) | **dropped** | `handleEvents` — NEW, was uncounted |
| still buffered in `eventCh` at handler exit | **dropped** | `drainSubscription` — NEW |
| in `workerPool`, never executed, at Stop | **dropped** | `drainWorkerPool` — NEW (async, see D4) |

No event can be both delivered and dropped: once a handler runs it is gone from `eventCh`; the
drain only sees what the handler never touched. No double-drop: drain runs only from the owning
handler goroutine (sole reader of `eventCh`), and `drainWorkerPool` runs only after `wg.Wait()`
(no concurrent worker/handler).

**Drain placement (single owner-side point):** drain lives in `handleEvents`, run immediately
before every exit return (ctx.Done / sub.done / top-of-loop cancelled fast-path), and the
just-dequeued-cancelled event is counted dropped before draining. Because `Unsubscribe` waits on
`sub.finished` (closed by `handleEvents` on exit), by the time `Unsubscribe` returns the channel
is already drained — no separate drain in `Unsubscribe`, no double-drain.

```go
func (m *MemoryEventBus) drainSubscription(sub *memorySubscription) {
for {
select {
case <-sub.eventCh:
atomic.AddUint64(&m.droppedCount, 1)
default:
return
}
}
}
```

**Residual race (documented, accepted):** a publisher that passed the `cancelled`/`isStarted`
check but has not yet sent can enqueue one event after the drain; it is then lost and uncounted.
Closing it fully needs `sub.mutex` held across the channel send, serializing all publishes —
rejected for an observability fix. Conservation holds exactly once publishers are quiesced (the
realistic teardown path; the conservation tests quiesce publishers before teardown). Recorded in ADR.

### D3 — `statsProvider` interface (P2 module wiring)

```go
type statsProvider interface { Stats() (delivered, dropped uint64) }
```
`CollectStats`/`CollectPerEngineStats` assert `statsProvider` instead of `*MemoryEventBus`, so
both memory and custom engines participate in `module.Stats()`/`PerEngineStats()`. Verified safe:
only `memory_race_test.go:89` consumes `PerEngineStats`, and it sums over all entries (no `len`/key
assertion). `DurableMemoryEventBus.Stats()` returns `(delivered uint64)` — wrong arity, does not
satisfy the interface, stays skipped (unchanged). Harmonizing its arity = out of scope follow-up.

### D4 — drain abandoned async tasks at Stop (closes C1, MemoryEventBus only)

`MemoryEventBus.Stop` already cancels ctx then waits on `m.wg` (covers BOTH worker and
`handleEvents` goroutines — each does `wg.Add(1)`/`defer wg.Done()`). AFTER `wg.Wait()` completes,
no goroutine sends to or receives from `m.workerPool`, so draining it is race-free and
deterministic. Add to the `case <-done:` branch:

```go
func (m *MemoryEventBus) drainWorkerPool() {
for {
select {
case <-m.workerPool:
atomic.AddUint64(&m.droppedCount, 1)
default:
return
}
}
}
```

This makes V1 hold for async subscriptions too. The `ctx.Done()` (shutdown-timeout) error branch
does not drain — timeout is already a degraded path returning `ErrEventBusShutdownTimeout`.
`CustomMemoryEventBus` has no worker pool (its `handleEvents` runs the handler inline regardless of
`isAsync`), so its only buffer is `sub.eventCh` — fully covered by D2 drain.

### D5 — `CustomMemoryEventBus` shutdown synchronization (closes I3)

Custom engine currently has no `WaitGroup`/`finished` channel; `Stop` cancels ctx and returns
without waiting, so a drain added to its `handleEvents` would complete AFTER `Stop` returns →
`Stats()` read post-stop would race the drain. Mirror `MemoryEventBus`: add `wg sync.WaitGroup`
to `CustomMemoryEventBus`, `finished chan struct{}` to `customMemorySubscription`,
`wg.Add(1)`/`defer wg.Done()`+`defer close(sub.finished)` around `handleEvents`, `Stop` waits
`wg.Wait()` under the passed `ctx` budget (returning `ErrEventBusShutdownTimeout` on timeout to
match memory engine), and `Unsubscribe` waits briefly on `sub.finished` (like memory engine) so
the drain is observed deterministically.

## Invariants (backprop targets)

```
V1: ∀ subscriber, once publishers quiesced: enqueued == delivered + dropped
where enqueued = events accepted into sub.eventCh (+ for async, handed to workerPool).
Publish-time full-channel drops are a separate already-counted dropped bucket.
V2: timeout-mode Publish to a full buffer returns within ~blockTimeout (never hangs).
```

## Scope

In: `memory.go`, `custom_memory.go`, `engine_registry.go`, `eventbus.go` (doc), `module.go` (doc)
+ tests + ADR.
Out (documented follow-ups, not #112): `DurableMemoryEventBus.Stats()` arity harmonization;
multi-engine config path does not thread `deliveryMode`/`publishBlockTimeout` into per-engine
`Config` maps (pre-existing; timer fix still reached in single-engine timeout mode + tests).

## Tasks

| id | task | file |
|----|------|------|
| T1 | CI-SAFE failing test: timeout-mode publish to full buffer — run Publish in goroutine, `select` with 3s bound, assert it returned (NOT a naked Publish that deadlocks CI) | memory_*_test.go |
| T2 | fix timer drain (D1) | memory.go |
| T3 | failing conservation tests: sync + async, across Unsubscribe AND Stop, both engines; quiesce publishers first | *_test.go |
| T4 | MemoryEventBus: count dequeued-cancelled event; `drainSubscription` on every handleEvents exit (D2) | memory.go |
| T5 | MemoryEventBus: `drainWorkerPool` after wg.Wait in Stop (D4) | memory.go |
| T6 | CustomMemoryEventBus: deliveredCount/droppedCount + Stats(); wg/finished infra (D5); drain-on-exit; Stop waits; Unsubscribe waits | custom_memory.go |
| T7 | statsProvider interface; router asserts it (D3) | engine_registry.go |
| T8 | fix Stop doc comments (D2) | eventbus.go, module.go |
| T9 | ADR: at-most-once teardown contract + rejected best-effort-deliver + documented residual race | docs/plans/ |

## Verification

- T1 fails-fast (3s timeout assertion) on unpatched code, passes after T2. Backprop V2.
- conservation tests assert `delivered+dropped == enqueued` for sync AND async, post-teardown.
- `cd modules/eventbus && go test -race ./...` (CLAUDE.md mandates -race).
- `go vet ./...`; `golangci-lint run`; `go fmt ./...`.
- multi-engine: existing `memory_race_test.go` PerEngineStats path still green after D3.
Loading
Loading