Skip to content

feat(eventbus): B.3a — system-event-bus (13/13 ACs, 100%)#423

Merged
remyluslosius merged 3 commits into
mainfrom
feat/slice-b-b3a-event-bus
May 29, 2026
Merged

feat(eventbus): B.3a — system-event-bus (13/13 ACs, 100%)#423
remyluslosius merged 3 commits into
mainfrom
feat/slice-b-b3a-event-bus

Conversation

@remyluslosius
Copy link
Copy Markdown
Contributor

Summary

Slice B.3a — system-event-bus implementation. Opens Slice B.3 (orchestration plumbing). Typed in-process pub/sub for OpenWatch-internal events; foundation for B.3b's alert router.

Coverage 13/13 ACs = 100%
Tests 13 sub-tests under -race
LOC 1,051 across 6 files

What landed

Component Purpose
app/specs/system/event-bus.spec.yaml New spec (13 ACs, 9 constraints), status: approved
internal/eventbus/ Bus + Subscription + Metrics + typed events

Architectural choices locked

  • In-process via Go channels only — no kafka, nats, rabbitmq, redis, GCP Pub/Sub. AC-12 source-inspection enforces. If cross-process delivery is ever needed, a separate bridge sits alongside, not inside
  • Typed Event interface — Kind() + Timestamp() — and closed EventKind enum (currently HeartbeatPulse + DriftDetected). Subscribers filter by Kind at registration; no wildcard
  • Non-blocking Publish — select-with-default pattern; subscriber whose buffer is full has the event dropped + DroppedCount increments. Other subscribers still receive
  • Per-subscriber goroutine — each subscriber drains its own channel at its own pace. Slow subscriber doesn't block fast ones (AC-11 verified)
  • Shutdown drains + closes — subsequent Publish is a no-op without panic
  • Per-subscriber counters on Subscription so consumers can self-monitor without the bus knowing who they are

ACs satisfied

AC Mechanism
AC-01 One subscriber receives event within 100ms
AC-02 Zero subscribers → NoSubscribersCount++
AC-03 Three subscribers all receive
AC-04 1000 concurrent publishes × 10 subscribers race-clean
AC-05 Shutdown closes channels; Publish no-ops after
AC-06 BufferSize=1 + 5 publishes → 5 dropped
AC-07 EventKind enum has exactly 2 values
AC-08 Heartbeat-only subscriber doesn't receive DriftDetected
AC-09 All counters round-trip through scenarios
AC-10 1000 publishes × 10 subscribers under 100ms wall-clock
AC-11 Slow subscriber doesn't starve fast subscriber
AC-12 No kafka/nats/rabbitmq/redis-pubsub/GCP-pubsub/SNS/SQS imports
AC-13 Unsubscribe closes channel; Publish doesn't deliver after

Local validation

  • go build ./internal/eventbus/ — clean
  • go vet ./internal/eventbus/ — clean
  • go test -race ./internal/eventbus/ — 13 sub-tests pass
  • specter coverage — system-event-bus 13/13 = 100%

Slice B.3 status

B.3a event bus This PR — 13/13 ACs
B.3b alert router Pending (subscribes to this bus)

Relationship to other PRs

Slice B running total

Wave Status ACs
B.1 trunk (scheduler + executor + writer) DONE 46
B.2 awareness (liveness + drift) DONE 29
B.3a event bus DONE 13
B.3b alert router Pending
B.4 fleet rollup queries Pending

Slice B totals so far: 88 ACs across 6 specs, all at 100%.

Opens Slice B.3 (orchestration plumbing). Typed in-process pub/sub for
OpenWatch-internal events, carrying Bucket B events per the
Kensa/OpenWatch boundary doc § 4. Foundation for B.3b's alert router.

Spec
  New: app/specs/system/event-bus.spec.yaml (status: approved).
  13 ACs across 9 constraints.

internal/eventbus package
  doc.go      Architectural choices: in-process via Go channels only,
              typed Event interface + closed EventKind enum, per-
              subscriber goroutine (slow subscriber doesn't block
              others), Shutdown drains + closes.

  types.go    EventKind closed enum (HeartbeatPulse, DriftDetected).
              Event interface (Kind, Timestamp).
              HeartbeatPulse struct (HostID, Reachable, OccurredAt,
                PriorReachable, ResponseTimeMS).
              DriftDetected struct (HostID, ScanID, DriftType,
                ScoreDelta + per-severity transition counts).
              DefaultBufferSize = 1024 (spec C-04).
              SubscribeOptions with BufferSize + Kinds filter.

  bus.go      Bus struct holding mu (RWMutex), subscribers map,
              closed atomic.Bool, metrics.
              Publish: non-blocking via select-with-default; per-
                subscriber drop+count on full channel.
              Subscribe: returns a Subscription registered for the
                given kinds.
              Shutdown: closes every subscriber channel; subsequent
                Publish is a no-op.
              Subscription with Events() <-chan, Delivered/Dropped
                counts, Unsubscribe (close-once via sync.Once).
              Metrics: PublishedCount, DeliveredCount, DroppedCount,
                NoSubscribersCount; Snapshot returns typed struct.

ACs covered (13 of 13)
  AC-01  One subscriber receives the event within 100ms
  AC-02  Zero subscribers → drop silently + NoSubscribersCount++
  AC-03  Three subscribers all receive
  AC-04  1000 concurrent publishes × 10 subscribers race-clean
  AC-05  Shutdown closes channels; post-Shutdown Publish is a no-op
  AC-06  BufferSize=1 + 5 publishes without read → 5 dropped
  AC-07  EventKind enum has exactly 2 values (AllEventKinds)
  AC-08  Heartbeat-only subscriber does NOT receive DriftDetected
  AC-09  All counters round-trip through publish/drop scenarios
  AC-10  1000 publishes × 10 subscribers under 100ms wall-clock
  AC-11  Slow subscriber doesn't block fast subscriber (100 events
         fully received by fast despite slow being starved)
  AC-12  Source-inspection: no kafka/nats/rabbitmq/redis-pubsub/
         GCP-pubsub/SNS/SQS imports
  AC-13  Unsubscribe closes channel; subsequent Publish doesn't
         deliver and doesn't panic

Local validation
  go build ./internal/eventbus/      clean
  go vet ./internal/eventbus/        clean
  go test -race ./internal/eventbus/ 13 sub-tests pass
  specter coverage system-event-bus  13/13 = 100%

Architectural choices worth flagging
  - Non-blocking Publish via select with default. A subscriber whose
    buffer is full has the event dropped + DroppedCount increments;
    other subscribers still receive. This is the load-shedding policy.
  - Per-subscriber state in Subscription (delivered/dropped atomic
    counters) so consumers can self-monitor without the bus knowing
    who they are.
  - Closing a subscriber channel exactly once via sync.Once on the
    Subscription, with a closed-flag guard on the Bus to avoid the
    double-close when Shutdown and Unsubscribe race.
  - The Event interface is the wire format; payloads are typed Go
    structs that subscribers type-assert. This preserves type safety
    on both sides without erasure to []byte or map[string]any.

Slice B.3 status
  B.3a event bus       this PR — 13/13 ACs
  B.3b alert router    pending (subscribes to this bus)
@remyluslosius remyluslosius force-pushed the feat/slice-b-b3a-event-bus branch from c528325 to 2495b15 Compare May 29, 2026 12:39
@remyluslosius remyluslosius merged commit 784ad01 into main May 29, 2026
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant