feat(eventbus): add EventBus interface and LocalEventBus implementation (#4229 phase A-B)#4342
Conversation
β¦on (#4229 phase A-B) Issue #4229: EventBus abstraction for session event fanout. Adds: - src/event-bus.ts: EventBus port interface (publish, subscribe, replaySince, destroy) with typed BusEvent envelope - src/local-event-bus.ts: In-process implementation using EventEmitter with configurable ring buffer for replay Design: - Minimal interface β no delivery guarantees, implementations decide - Typed channels: 'session:abc', 'global', etc. - Monotonically increasing IDs for cursor-based replay - Async delivery via setImmediate (consistent with SessionEventBus) Tests: 11 new tests covering publish/subscribe, replay, buffer limits, destroy, unsubscribe cleanup. Cleanup: remove stale existsSync/readFileSync imports from server.ts (step 7 of #4243). Also bumps CI bundle threshold 2444 β 2452 KB.
There was a problem hiding this comment.
β Approved. Clean EventBus interface + LocalEventBus implementation.
event-bus.ts (62 lines): Minimal port interface β publish, subscribe, replaySince, destroy. Well-typed BusEvent with channel/id/type/timestamp/data. No over-engineering. β
local-event-bus.ts (104 lines): In-process implementation, zero external deps. Per-channel EventEmitters with ring buffers. Good: ID counter reset guard at MAX_SAFE_INTEGER, configurable buffer size, async delivery via setImmediate. β
Tests (11): Solid coverage β pub/sub, unsubscribe, multi-subscriber, channel isolation, replay, buffer eviction, monotonic IDs, destroy, cleanup on last unsub, timestamps. β
server.ts: Stale import cleanup (existsSync/readFileSync removed β the ones I flagged in #4336). β
Minor observations (not blocking):
void immin publish() β either track the immediate handle for destroy() cleanup or just callsetImmediate(...)without assignment.- Buffer eviction via
splice(0, excess)is O(n) β fine at buffer=50, worth a circular buffer if it grows. - No guard for publish-after-destroy β calling publish() on a destroyed bus silently creates new emitters.
Approved-minor-bump label added, branch updated to re-trigger bump gate. CI: Node 20 failure is pre-existing E2E flake (Effort: high).
Summary
Part of #4229 β Redis Streams EventBus + SSE bridge.
This PR delivers the MVP: the
EventBusport interface and an in-processLocalEventBusimplementation. Zero external deps.New files
src/event-bus.tsEventBusinterface +BusEventtypesrc/local-event-bus.tssrc/__tests__/local-event-bus.test.tsEventBus interface
Design decisions
LocalEventBusis the default. Redis adapter is a future PR (phase C-D).session:abc123,global, etc. Implementations decide routing.setImmediate()for consistency with existingSessionEventBus.lastEventId.Cleanup included
existsSync/readFileSyncimports fromserver.ts(arch(server.ts): God object β 1632 lines, 60 imports, 30+ module-level mutable globalsΒ #4243 step 7)Stacking
SessionEventBusβEventBus), Phase D (Redis adapter, feature-flagged)Verification
Part of #4229. Closes phase A-B.