Skip to content

eventbus: timer-drain hang in MemoryEventBus timeout-mode + silent drop observability gaps #112

@intel352

Description

@intel352

Surfaced these while writing edge-case coverage for the eventbus module. One is a real publisher-hang bug. The others are observability gaps worth surfacing before someone trips over them in production.

1. MemoryEventBus.Publish hangs forever in deliveryMode: "timeout" when the timer fires

modules/eventbus/memory.go (current main) uses the unsafe timer drain idiom:

deadline := time.NewTimer(blockTimeout)
select {
case sub.eventCh <- event:
    sent = true
case <-deadline.C:        // <-- timer fires; channel drained here
    // timeout drop
case <-ctx.Done():
}
if !deadline.Stop() {
    <-deadline.C          // <-- unconditional re-receive on already-drained channel: blocks forever
}

When the timer fires and the select consumed deadline.C, deadline.Stop() returns false, and the unconditional follow-up receive on deadline.C blocks the publisher indefinitely (one stuck goroutine per timed-out publish — accumulates fast under load).

Reproduction

Subscribe with deliveryMode: "timeout" and a PublishBlockTimeout shorter than the handler latency, fill the subscriber buffer, then publish once more. The publishing goroutine never returns. Easy to reproduce in a unit test with a small defaultEventBufferSize, a time.Sleep handler, and a 50 ms timeout — the test hits the Go test binary alarm (panic: test timed out) with a stack parked at the <-deadline.C line.

Suggested fix

Standard race-free drain pattern:

if !deadline.Stop() {
    select {
    case <-deadline.C:
    default:
    }
}

(Or just drop the manual drain entirely on Go 1.23+ where unreferenced timers are GC'd cleanly — but the explicit-and-correct form is fine.)

2. CustomMemoryEventBus drops events silently (no counter, no Stats())

modules/eventbus/custom_memory.go's Publish drops on a full subscriber channel and only logs it:

select {
case sub.eventCh <- event:
default:
    slog.Warn("Subscription channel full, dropping event", ...)
}

There's no counterpart to MemoryEventBus's Stats() / droppedCount. A regression that quietly increased drop rate would be invisible to tests and to anyone wiring metrics from these stats to alerting. It also makes it impossible to write a "conservation" test (delivered + dropped == total) for CustomMemoryEventBus.

Suggested fix

Mirror MemoryEventBus: add deliveredCount + droppedCount atomics, increment on the drop branch + on handler completion, and expose Stats() (delivered, dropped uint64).

3. Unsubscribe and Stop abandon buffered events without counting them as drops

Both engines: when Unsubscribe (or Stop) is called while a subscriber's channel still has buffered events, the handler goroutine exits on the cancel fast-path and those buffered events are silently lost. They never reach the handler and they don't increment droppedCount either.

That's a real observability hole — Stats() reports zero drops even though events evaporated. Consumers wiring droppedCount to a Datadog/Prometheus alert will see green while losing data.

Suggested fix

Pick a documented contract and stick to it. Two reasonable options:

  • Drain at Unsubscribe/Stop: before exiting the handler goroutine, range over the remaining buffered events and either deliver them (best-effort) or increment droppedCount for each.
  • Count-on-cancel: read len(sub.eventCh) at cancel time and atomic.AddUint64(&droppedCount, uint64(len)).

Either is fine; the current "silent abandonment" is the surprise.

4. Test design for concurrency tests with drop-on-full buses

Not a bug, more a guard rail: any test that asserts "no events are lost" against a drop-on-full bus is structurally fragile. If the channel buffer is sized smaller than the burst the test sends, the test hangs on wg.Wait() (or whatever sync primitive expects every event) until the binary alarm trips, instead of failing fast with a useful message. Sizing the buffer to the workload also doesn't prove anything durable — the next person to bump the workload re-flakes it.

The robust property to assert is conservation: every published event resolves to exactly one of delivered or dropped. Pseudocode:

const totalEvents = publishers * eventsPerPublisher
// ... small buffer relative to burst, so drops actually exercise that path ...

require.Eventually(t, func() bool {
    delivered, dropped := bus.Stats()
    return delivered+dropped == totalEvents
}, 5*time.Second, 10*time.Millisecond,
   "every published event must be either delivered or dropped")

Plus a bounded select/time.After around any wg.Wait() so a future regression fails in seconds with processed N/M instead of hitting the test binary alarm. (1) above turns into a 10-minute hang in CI under any concurrency test that publishes into timeout-mode.


Happy to send PRs for (1) and (2)/(3) if useful — wanted to surface the shape first so you can decide what contract you'd want for the abandonment case. Issue (4) is more of a recommendation for the existing tests under modules/eventbus/ than a code change.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions