Skip to content

feat(notification-worker): cache + mobile push overhaul#237

Merged
mliu33 merged 12 commits into
mainfrom
claude/inspiring-feynman-iCsHu
Jun 3, 2026
Merged

feat(notification-worker): cache + mobile push overhaul#237
mliu33 merged 12 commits into
mainfrom
claude/inspiring-feynman-iCsHu

Conversation

@Joey0538
Copy link
Copy Markdown
Collaborator

@Joey0538 Joey0538 commented May 28, 2026

Summary

Overhauls notification-worker into a cached, mention-aware, presence-filtered mobile push pipeline, and bundles the cross-service plumbing the pipeline depends on: canonical room member events (so the worker can move off the per-user subscription.update wildcard), a replyAccounts widening in message-worker so thread followers stay correct, and a request-id bridge in message-gatekeeper.

Implements the 2026-05-22 notification-worker cache + mobile push design. Replaces the previous blanket fan-out (one NotificationEvent per non-sender member on core NATS) with a four-stage cached pipeline that emits batched push events on a server-only JetStream subject.

Pipeline (per canonical message)

  1. Stage 1 — exclusion filters: sender skip, mute, restricted (history-sharing window), thread-non-follower
  2. Stage 2 — in-process hook veto (suppress-only, fail-open on error)
  3. Stage 3 — pure routing predicate (DM/botDM/mention/large-room/bot)
  4. Stage 4 — bulk presence RPC (chunked, fail-open) → per-account DND check

Survivors are sorted lexicographically and chunked into PUSH_RECIPIENT_BATCH_SIZE (default 100) groups. One gzipped PushNotificationEvent is published async per batch to chat.server.notification.push.{siteID}.send with Nats-Msg-Id = {messageId}-b{batchIndex} for stream-side dedup. Desktop banners now live entirely client-side off the broadcast-worker room-event stream — no server desktop publish leg.

Commits

  1. feat(pkg/natsutil) — gzip helpers + configurable decoded payload cap
  2. feat(pkg) — canonical room member events, push/presence types, @here, caches
  3. fix(message-gatekeeper) — bridge payload requestId into ctx for canonical publish
  4. feat(message-worker) — widen thread_rooms.replyAccounts (parent author + mentions)
  5. feat(room-worker, room-service) — dual-publish canonical room member events
  6. feat(notification-worker) — cache + mobile push pipeline
  7. docs — contracts, subject naming, client API, rollout plan
  8. feat(notification-worker) — configurable NATS max payload cap on emitter

Key design choices

  • Batched push events (PUSH_RECIPIENT_BATCH_SIZE=100) instead of per-recipient — push-service uses provider native multicast (FCM send_each_for_multicast up to 500 tokens) so one batch = one outbound HTTP. Stable {messageId}-b{batchIndex} dedup keys across redeliveries (survivors sorted before chunking).
  • pkg/roomsubcache (Valkey) + in-process LRU + single-flight loader collapses TTL-expiry stampedes on hot rooms.
  • Cache invalidation moved off per-user subscription.update onto a new canonical chat.room.canonical.{siteID}.event.member.{added,removed,muted} stream — one event per room mutation instead of N. room-worker and room-service dual-publish (FE keeps consuming subscription.update); the new worker consumer subscribes via durable JetStream with DeliverNewPolicy. Mute is treated the same as add/remove (full invalidate, no race-prone surgical patches — HSET/per-field migration deferred).
  • Raw jetstream.JetStream (oteljetstream is sync-only) with WithPublishAsyncMaxPending cap; a drain goroutine surfaces ack-future errors instead of swallowing them.
  • Outbound size guard: NATS_MAX_PAYLOAD_BYTES (default 256 KiB, matches broker max_payload) — emitter rejects any gzipped batch over the cap before publish with a clear error instead of letting the broker NACK. Default decoded-payload cap in pkg/natsutil lowered from 1 MiB → 256 KiB to match; DecodePayloadWithLimit accepts a per-call override.
  • bootstrapStreams (gated by BOOTSTRAP_STREAMS) provisions both MESSAGES_CANONICAL and PUSH_NOTIFICATIONS streams in dev; production expects ops to own the push stream.
  • Thread-follower lookups fail-open to an empty set so a transient thread_subscriptions Mongo blip doesn't burn the consumer's MaxDeliver=5 retries on every thread reply.
  • thread_rooms.replyAccounts widened in message-worker: now includes the parent message author and any @-mentioned users, not just repliers. Folded into UpdateThreadRoomLastMessage (takes []string now) so it stays one Mongo round-trip per reply. Restores the legacy notification semantics where mentioned users follow the thread automatically.
  • message-gatekeeper requestId bridge: reads requestId from the payload and propagates it via context.Context to the canonical-publish path so downstream consumers see a consistent correlation id end-to-end.
  • Presence source ships as no-op until the presence-service bulk RPC handler lands; PRESENCE_RPC_ENABLED flips it on.
  • RoomType comes from the cached member projection (not a member-count inference) so 2-member channels route correctly.
  • Sender display names are pre-composed once at message-gatekeeper (via pkg/displayfmt.CombineWithFallback) and propagated on the canonical message — no users-collection lookup on the push hot path.

Bugs fixed in this PR

  • Previous fan-out ignored Subscription.Muted and HistorySharedSince (spec called out both).
  • @here was silently treated as "no mention" by broadcast-worker's Resolve early-return — now propagated via ResolveResult.MentionHere.
  • thread_rooms.replyAccounts was only being widened by actual repliers, so parent authors and mention-only users didn't follow the thread — caused asymmetric unread-thread state between sidebars.
  • message-gatekeeper dropped the inbound requestId when publishing to the canonical stream, breaking end-to-end log correlation for notification flows.

Review and observability follow-ups (not in this PR)

  • HSET (per-field) cache encoding + goccy/go-json codec swap for the read hot path (deferred after cost analysis; mute toggle migrates first).
  • Prometheus metrics + /healthz (handler-stage counters, cache hit-rate, async-publish pending, presence RPC outcomes).
  • threadsubcache TTL'd L1 for thread-follower lookups (spec Future Work).
  • OTel trace context propagation through async push publishes (raw jetstream.New strips the otelnats wrapper).
  • Encrypted-room push content scrubbing (spec Future Work).
  • Highlight keywords, per-user rate limiting, PII audit on push payload (spec Future Work).

Test plan

  • make test — full unit suite, race detector on
  • make lint — 0 issues
  • make test-integration SERVICE=notification-worker — real Mongo + Valkey cluster + NATS via testcontainers
  • go build ./...
  • CI green
  • Staging rollout against representative thread_subscriptions collection (validates INDEX_ENSURE_TIMEOUT default of 2 min)
  • Push service consumer wired to the new schema (PushNotificationEvent {id, accounts[], title, body, data, roomId, timestamp} — gzip-encoded, see docs/notification-worker-downstream-contracts.md)
  • Confirm production broker max_payload matches NATS_MAX_PAYLOAD_BYTES (default 256 KiB)

https://claude.ai/code/session_01BYAKxp36datqtnxvgmxLXj


Generated by Claude Code

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 28, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Replaces per-subscription NATS broadcasts with a cached, mention-aware, presence-filtered mobile push pipeline: Valkey-backed member caching with optional L1, mention parsing including @here, staged recipient filters + hook veto, bulk presence RPC, async JetStream emitter, handler rewrite, startup wiring, tests, and docs.

Changes

Mobile Push Pipeline Implementation

Layer / File(s) Summary
Models, subjects, and mention changes
pkg/model/push.go, pkg/model/presence.go, pkg/mention/mention.go, pkg/roomsubcache/roomsubcache.go, pkg/subject/subject.go, pkg/stream/stream.go
Adds push/presence models and fields, introduces @here parsing/resolution, expands cached member projection, and adds per-site push/presence subject builders and stream helper.
NATS gzip helpers and emitter
pkg/natsutil/gzip.go, notification-worker/emit.go, pkg/natsutil/gzip_test.go, notification-worker/emit_test.go
Adds gzip encoding/decoding helpers and an async JetStream emitter (Emitter + jsAsyncPublisher) with tests validating gzip headers, payloads, and async enqueue/error propagation.
Member cache & user meta
notification-worker/members.go, notification-worker/members_test.go, notification-worker/usermeta.go, notification-worker/usermeta_test.go, notification-worker/main.go
Implements Valkey-backed cached member lookup with optional L1, singleflight collapsing, loader fallback, invalidation, and a UserMetaLookup cache; wires loader in main and adds tests.
Presence snapshot & routing
notification-worker/presence.go, notification-worker/presence_test.go, notification-worker/routing.go, notification-worker/routing_test.go
Adds bulk presence snapshotter (chunking, NATS request/reply, fail-open), shouldPush decisions, and EligibleForPush routing predicate with tests.
Threads, veto hook, and handler rewrite
notification-worker/threads.go, notification-worker/hook.go, notification-worker/handler.go, notification-worker/handler_test.go
Adds thread follower lookup + index ensure, a suppress-only Vetoer interface with noop, and rewrites Handler to fetch members once, apply staged filters (mute, history, thread/follower, hook, routing, presence), batch recipients, and emit PushNotificationEvent; extensive handler tests added.
Startup, bootstrap, streams, and deploy
notification-worker/bootstrap.go, notification-worker/bootstrap_test.go, pkg/stream/stream.go, notification-worker/deploy/docker-compose.yml, notification-worker/main.go
Bootstraps PUSH_NOTIFICATIONS_{siteID} stream when enabled, adds stream config helper, composes new env vars, initializes Valkey/L1, raw and OTEL-wrapped JetStream, durable consumer, and eager invalidation subscription; tests updated.
Integration, docs, and subject naming
notification-worker/integration_test.go, docs/*, docs/notification-worker-downstream-contracts.md, docs/client-api.md, docs/nats-subject-naming.md, docs/superpowers/*
Reworks integration test to use real Mongo/Valkey/NATS, updates client/API and subject naming docs to document server-side push stream, downstream contracts, and rollout plan.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • hmchangw/chat#196: Overlaps at broadcast-worker created/channel fan-out and mention flag usage.
  • hmchangw/chat#82: Overlapping edits to mention resolution and broadcast-worker flows.
  • hmchangw/chat#184: Related work touching room subscription caching and projections.

"🐰 I hop with joy, a piped push so neat,
Valkey caches hum, per-recipient beats,
@here joins @all in the mention parade,
Presence and veto keep noise well-played,
Async JetStream sends each cozy treat."

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/inspiring-feynman-iCsHu

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
pkg/mention/mention_test.go (1)

187-194: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Assert MentionHere in TestResolveFromParsed cases.

Line 264 asserts MentionAll and participants, but the new MentionHere flag itself is never asserted, so a regression in flag propagation can slip through.

💡 Proposed test fix
 	tests := []struct {
 		name           string
 		parsed         ParseResult
 		users          map[string]model.User
 		wantParts      []model.Participant
 		wantAccounts   []string
 		wantMentionAll bool
+		wantMentionHere bool
 	}{
@@
 		{
 			name:           "MentionHere only appends synthetic participant",
 			parsed:         ParseResult{MentionHere: true},
 			wantMentionAll: false,
+			wantMentionHere: true,
 			wantParts:      []model.Participant{{Account: "here", EngName: "here"}},
 		},
 		{
 			name:           "MentionAll and MentionHere both append synthetic participants",
 			parsed:         ParseResult{MentionAll: true, MentionHere: true},
 			wantMentionAll: true,
+			wantMentionHere: true,
 			wantParts: []model.Participant{
 				{Account: "all", EngName: "all"},
 				{Account: "here", EngName: "here"},
 			},
 		},
 	}
@@
 			assert.Equal(t, tt.wantAccounts, got.Accounts)
 			assert.Equal(t, tt.wantMentionAll, got.MentionAll)
+			assert.Equal(t, tt.wantMentionHere, got.MentionHere)
 			assert.Equal(t, tt.wantParts, got.Participants)
 		})
 	}
 }

Also applies to: 241-255, 258-264

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/mention/mention_test.go` around lines 187 - 194, The test cases in
TestResolveFromParsed (the tests slice using parsed ParseResult and expected
fields like wantMentionAll and wantParts) never assert the MentionHere flag, so
add a wantMentionHere bool to each test case and assert that the resolved
result's MentionHere matches it; update the test table (the tests slice) to
include wantMentionHere for each scenario and in the test loop compare the
actual result.MentionHere (from ResolveFromParsed or the function under test
that consumes ParseResult) to tt.wantMentionHere alongside the existing
assertions for wantMentionAll and wantParts to ensure flag propagation is
verified.
🧹 Nitpick comments (3)
pkg/roomsubcache/roomsubcache_test.go (1)

263-276: ⚡ Quick win

Use named subtests for RoomType variants.

Lines 264–276 iterate several input variants without t.Run, which makes failures less diagnosable and diverges from the test style requirement.

♻️ Suggested refactor
 func TestMember_RoomType_RoundTrip(t *testing.T) {
-	for _, rt := range []model.RoomType{
-		model.RoomTypeChannel,
-		model.RoomTypeDM,
-		model.RoomTypeBotDM,
-		model.RoomTypeDiscussion,
-	} {
-		m := roomsubcache.Member{ID: "u1", Account: "alice", RoomType: rt}
-		data, err := json.Marshal(m)
-		require.NoError(t, err)
-		var out roomsubcache.Member
-		require.NoError(t, json.Unmarshal(data, &out))
-		assert.Equal(t, rt, out.RoomType, "RoomType %q should round-trip", rt)
-	}
+	tests := []struct {
+		name string
+		rt   model.RoomType
+	}{
+		{name: "channel", rt: model.RoomTypeChannel},
+		{name: "dm", rt: model.RoomTypeDM},
+		{name: "bot_dm", rt: model.RoomTypeBotDM},
+		{name: "discussion", rt: model.RoomTypeDiscussion},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			m := roomsubcache.Member{ID: "u1", Account: "alice", RoomType: tt.rt}
+			data, err := json.Marshal(m)
+			require.NoError(t, err)
+			var out roomsubcache.Member
+			require.NoError(t, json.Unmarshal(data, &out))
+			assert.Equal(t, tt.rt, out.RoomType)
+		})
+	}
 }
As per coding guidelines: “Prefer table-driven tests when testing multiple input/output variations… Each test case… descriptive name… Use `t.Run(name, func(t *testing.T) { ... })` for subtests.”
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/roomsubcache/roomsubcache_test.go` around lines 263 - 276, Replace the
loop in TestMember_RoomType_RoundTrip with table-driven named subtests: build a
slice of cases each containing a name and the model.RoomType (use values
model.RoomTypeChannel, model.RoomTypeDM, model.RoomTypeBotDM,
model.RoomTypeDiscussion), then for each case call t.Run(case.name, func(t
*testing.T) { create the roomsubcache.Member with RoomType case.rt,
marshal/unmarshal and assert equality of out.RoomType }); keep the same
assertions and use roomsubcache.Member and the existing JSON round-trip logic
inside each subtest so failures are reported per named variant.
notification-worker/members_test.go (1)

58-63: ⚡ Quick win

Avoid time.Sleep-based synchronization in the singleflight test.

Using time.Sleep here introduces timing sensitivity. Prefer a channel/WaitGroup gate so goroutines start deterministically and the test remains stable.

As per coding guidelines, "Never use time.Sleep for goroutine synchronization — use proper sync primitives (channels, sync.WaitGroup, sync.Mutex)".

Also applies to: 103-121

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@notification-worker/members_test.go` around lines 58 - 63, The test's
fakeLoader.Load uses time.Sleep for synchronization which makes the test flaky;
modify fakeLoader to use a deterministic sync primitive (e.g., a channel startCh
or a sync.WaitGroup field) instead of f.delay: add a start gate field to the
fakeLoader struct (e.g., startCh chan struct{} or startWG *sync.WaitGroup), have
Load block on that gate (e.g., <-f.startCh or f.startWG.Wait()) before returning
f.out, f.err and incrementing f.calls, and update the tests that instantiate
fakeLoader (and other copies referenced around the 103-121 area) to signal the
gate (close the channel or call Done on the WaitGroup) at the precise moment the
goroutines should proceed; keep existing behaviour of returning f.out and f.err
unchanged.
notification-worker/hook_test.go (1)

16-17: ⚡ Quick win

Fail fast on error in this test.

Use require.NoError on Line 16, then keep assert.True on Line 17. That avoids continuing assertions when the call already failed.

As per coding guidelines "Use standard testing package with github.com/stretchr/testify/assert and testify/require for assertions".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@notification-worker/hook_test.go` around lines 16 - 17, Replace the non-fatal
assertion assert.NoError(t, err) with a fatal assertion require.NoError(t, err)
so the test fails fast if the call returns an error, but keep assert.True(t,
allow) as-is; also add the require import
("github.com/stretchr/testify/require") to the test file imports to avoid
compilation errors and update any existing assert/require usage accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/nats-subject-naming.md`:
- Line 228: The table row for "Notification(account) |
chat.user.{account}.notification" currently has a third cell with a deprecation
note, breaking the 2-column table (MD056); edit the row so it has only two
pipe-separated cells—move the deprecation text "(deprecated; use
PushNotification(siteID) for mobile push)" into the Subject cell alongside or
immediately following `chat.user.{account}.notification` (or place that note
below the table) so the row contains exactly `Notification(account)` and
`chat.user.{account}.notification` as the two columns and linting passes.

In `@docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md`:
- Around line 2006-2009: The current code in the Followers lookup
(h.deps.Followers.Followers(ctx, msg.ThreadParentMessageID)) returns the error
(ferr) which contradicts the fail-open design; change the ferr != nil branch to
log the error with context (include msg.ThreadParentMessageID and ferr) using
the component's logger (e.g., h.logger or existing logger) and then set f to an
empty slice/zero-value for followers and continue, instead of returning
fmt.Errorf; keep the normal successful-path behavior when ferr is nil.

In `@notification-worker/bootstrap.go`:
- Around line 58-63: Replace the hardcoded pushStreamName and pushFilter used
with js.CreateOrUpdateStream by calling the shared builders in pkg/stream and
pkg/subject instead (don’t use raw literals). Specifically, remove the direct
construction of pushStreamName and pushFilter and use the stream builder (e.g.,
pkg/stream.PushNotificationsStream(siteID) or the equivalent stream.* function)
to obtain the StreamConfig/Name and use the subject builder (e.g.,
pkg/subject.PushNotifications... or pkg/subject.PushNotificationFilter(siteID))
to produce the subject pattern passed into Subjects; then pass those values into
js.CreateOrUpdateStream so the bootstrap uses the centralized stream and subject
builders rather than raw strings.

In `@notification-worker/emit.go`:
- Around line 116-122: The drain method currently closes j.done unguarded
causing a panic if called twice; make it idempotent by adding a sync.Once field
(e.g., doneOnce) to the jsAsyncPublisher struct, initialize/import sync, and
replace close(j.done) with doneOnce.Do(func(){ close(j.done) }) inside
jsAsyncPublisher.drain so the channel is closed exactly once.

In `@notification-worker/hook.go`:
- Around line 17-19: Rename the single-method interface Hook to a properly
suffixed name (e.g., Vetoer) and update all usages: change the interface
declaration "type Hook interface { Allow(ctx context.Context, msg
*model.Message, member roomsubcache.Member) (bool, error) }" to "type Vetoer
interface { Allow(... ) ... }" and replace every variable, parameter, field,
function signature, and return type that references Hook to Vetoer (including
mocks/tests and any implementations whose receiver types satisfy Hook) so
signatures (e.g., Allow) and behavior remain unchanged while complying with the
-er naming convention.

In `@notification-worker/main.go`:
- Around line 233-234: The current fire-and-forget call using go
memberLookup.Invalidate(context.Background(), env.Subscription.RoomID) spawns an
unbounded goroutine with context.Background(); replace this with a bounded
worker or synchronous path: create a buffered invalidation channel (e.g.
invalidationCh) and a dedicated invalidation worker goroutine that consumes
RoomID messages and calls memberLookup.Invalidate(ctx, roomID) using the service
shutdown/cancellable context; on shutdown close the channel (or send cancel) and
wait for the worker to finish so invalidations are bounded and
termination-aware, or if Invalidate is cheap call it synchronously instead of
spawning a goroutine.

In `@notification-worker/members_test.go`:
- Around line 69-70: Several test calls silently discard errors (e.g., the
cache.Set call passing context.Background(), "r1", loader.out, time.Minute)
which can hide failures; update each ignored-return call (lines around the shown
cache.Set and the other occurrences at 88-89, 116-117, 142-146) to assert the
error using require.NoError(t, err) (or assert.NoError) after capturing the
returned err variable, and add the testify/require import if missing so tests
fail loudly on error instead of discarding it.

In `@notification-worker/presence_test.go`:
- Around line 78-79: The test stub currently ignores json.Marshal errors when
marshalling reply and always returns &nats.Msg{Data: out}, nil; change this to
capture the marshal error (err := json.Marshal(reply)), and if err != nil return
nil, err; otherwise return the constructed &nats.Msg{Data: out}, nil so the test
surface marshal failures instead of silently passing.

In `@notification-worker/presence.go`:
- Around line 22-24: Rename the single-method interface PresenceSource to a -er
style name (e.g., PresenceFetcher or PresenceSnapshotter) to match repo
conventions; update the interface declaration (PresenceSource ->
PresenceFetcher) and ensure all implementations and call sites that reference
PresenceSource, the Snapshot(ctx context.Context, accounts []string)
(map[string]model.Presence, error) method, and any type assertions are updated
to the new name so the method signature remains identical and compilation
succeeds.

In `@notification-worker/threads_test.go`:
- Around line 12-41: Current tests only exercise stubThreadLookup; add unit
tests exercising the real mongoThreadFollowers implementation and index helper:
write tests that call mongoThreadFollowers.Followers for the happy path (returns
expected follower set), simulate and assert decode/find iteration errors (e.g.,
invalid BSON or mocked cursor error) and empty parent ID/empty collection edge
cases, and write tests for EnsureThreadSubscriptionIndex that verify successful
index creation and that index-creation errors are propagated. Locate and use the
mongoThreadFollowers type and its Followers method plus
EnsureThreadSubscriptionIndex in your test file and mock the Mongo
collection/cursor behaviors to force the error paths.

In `@notification-worker/threads.go`:
- Around line 16-18: Rename the single-method interface ThreadFollowers to a -er
form (e.g., ThreadFollower or ThreadFollowerFetcher) and update all usages and
implementations accordingly: change the type declaration (previously type
ThreadFollowers interface { Followers(ctx context.Context, parentMessageID
string) (map[string]struct{}, error) }) to the new name, update the method
receiver/implementing types and any variable/type annotations that reference
ThreadFollowers, and adjust mocks/tests to use the new interface name so the
Followers(ctx, parentMessageID string) signature remains unchanged.

---

Outside diff comments:
In `@pkg/mention/mention_test.go`:
- Around line 187-194: The test cases in TestResolveFromParsed (the tests slice
using parsed ParseResult and expected fields like wantMentionAll and wantParts)
never assert the MentionHere flag, so add a wantMentionHere bool to each test
case and assert that the resolved result's MentionHere matches it; update the
test table (the tests slice) to include wantMentionHere for each scenario and in
the test loop compare the actual result.MentionHere (from ResolveFromParsed or
the function under test that consumes ParseResult) to tt.wantMentionHere
alongside the existing assertions for wantMentionAll and wantParts to ensure
flag propagation is verified.

---

Nitpick comments:
In `@notification-worker/hook_test.go`:
- Around line 16-17: Replace the non-fatal assertion assert.NoError(t, err) with
a fatal assertion require.NoError(t, err) so the test fails fast if the call
returns an error, but keep assert.True(t, allow) as-is; also add the require
import ("github.com/stretchr/testify/require") to the test file imports to avoid
compilation errors and update any existing assert/require usage accordingly.

In `@notification-worker/members_test.go`:
- Around line 58-63: The test's fakeLoader.Load uses time.Sleep for
synchronization which makes the test flaky; modify fakeLoader to use a
deterministic sync primitive (e.g., a channel startCh or a sync.WaitGroup field)
instead of f.delay: add a start gate field to the fakeLoader struct (e.g.,
startCh chan struct{} or startWG *sync.WaitGroup), have Load block on that gate
(e.g., <-f.startCh or f.startWG.Wait()) before returning f.out, f.err and
incrementing f.calls, and update the tests that instantiate fakeLoader (and
other copies referenced around the 103-121 area) to signal the gate (close the
channel or call Done on the WaitGroup) at the precise moment the goroutines
should proceed; keep existing behaviour of returning f.out and f.err unchanged.

In `@pkg/roomsubcache/roomsubcache_test.go`:
- Around line 263-276: Replace the loop in TestMember_RoomType_RoundTrip with
table-driven named subtests: build a slice of cases each containing a name and
the model.RoomType (use values model.RoomTypeChannel, model.RoomTypeDM,
model.RoomTypeBotDM, model.RoomTypeDiscussion), then for each case call
t.Run(case.name, func(t *testing.T) { create the roomsubcache.Member with
RoomType case.rt, marshal/unmarshal and assert equality of out.RoomType }); keep
the same assertions and use roomsubcache.Member and the existing JSON round-trip
logic inside each subtest so failures are reported per named variant.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 42cb01f6-1123-4b31-8652-d10dac3e2a04

📥 Commits

Reviewing files that changed from the base of the PR and between d77b22c and e5b5d43.

📒 Files selected for processing (33)
  • broadcast-worker/handler.go
  • docs/client-api.md
  • docs/nats-subject-naming.md
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
  • notification-worker/bootstrap.go
  • notification-worker/bootstrap_test.go
  • notification-worker/deploy/docker-compose.yml
  • notification-worker/emit.go
  • notification-worker/emit_test.go
  • notification-worker/handler.go
  • notification-worker/handler_test.go
  • notification-worker/hook.go
  • notification-worker/hook_test.go
  • notification-worker/integration_test.go
  • notification-worker/main.go
  • notification-worker/members.go
  • notification-worker/members_test.go
  • notification-worker/presence.go
  • notification-worker/presence_test.go
  • notification-worker/routing.go
  • notification-worker/routing_test.go
  • notification-worker/threads.go
  • notification-worker/threads_test.go
  • pkg/mention/mention.go
  • pkg/mention/mention_test.go
  • pkg/model/event.go
  • pkg/model/model_test.go
  • pkg/model/presence.go
  • pkg/model/push.go
  • pkg/roomsubcache/roomsubcache.go
  • pkg/roomsubcache/roomsubcache_test.go
  • pkg/subject/subject.go
  • pkg/subject/subject_test.go
💤 Files with no reviewable changes (1)
  • pkg/model/event.go

Comment thread docs/nats-subject-naming.md Outdated
Comment thread notification-worker/bootstrap.go Outdated
Comment thread notification-worker/emit.go Outdated
Comment thread notification-worker/hook.go Outdated
Comment thread notification-worker/members_test.go Outdated
Comment thread notification-worker/presence_test.go Outdated
Comment thread notification-worker/presence.go Outdated
Comment thread notification-worker/threads_test.go
Comment thread notification-worker/threads.go Outdated
Copy link
Copy Markdown
Collaborator Author

Addressed CodeRabbit feedback in 6a5109b:

  • Interface naming (Hook, PresenceSource, ThreadFollowers) — renamed to Vetoer, PresenceSnapshotter, ThreadFollowerLister per the -er single-method convention (concrete impls bulkPresenceSource/mongoThreadFollowers unchanged).
  • PUSH_NOTIFICATIONS literals — now derived from new pkg/stream.PushNotifications + pkg/subject.PushNotificationFilter builders; bootstrap no longer hardcodes the name/subject.
  • drain double-close — guarded with sync.Once (was already in place; verified).
  • Unbounded invalidation goroutine — replaced with a single bounded worker draining a buffered channel, with a shutdown step that closes the channel and waits (was already in place; verified).
  • Discarded errors in members_test.go / presence_test.go — now asserted.
  • Markdown table (MD056) + plan-doc fail-open snippet — fixed/synced.

Deferred (follow-up): the request for unit tests of the real mongoThreadFollowers.Followers / EnsureThreadSubscriptionIndex — these exercise a live *mongo.Collection, so they belong in the integration suite rather than unit tests. Tracking as a follow-up; the cache-backed end-to-end path is already covered by TestNotificationWorker_CacheBackedFanOut (real Mongo + Valkey + NATS).

Verified locally: go build ./..., make lint (0 issues), make test (race), and make test-integration SERVICE=notification-worker all pass.


Generated by Claude Code

@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 2 times, most recently from 06f553f to 45b219a Compare May 28, 2026 07:16
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/notification-worker-downstream-contracts.md`:
- Around line 92-94: The fenced code block containing the line "dedup_window  ≥ 
AckWait × MaxDeliver  =  30s × 5  =  150s   (defaults)" should include a
language tag to satisfy markdown lint (MD040); change the opening fence from ```
to ```text (i.e., replace the code fence around the `dedup_window  ≥  AckWait ×
MaxDeliver  =  30s × 5  =  150s   (defaults)` line with ```text) so the block is
explicitly marked as plain text.
- Line 220: Step 2 currently references the global stream name
"PUSH_NOTIFICATIONS" which is inconsistent with the rest of the contract; change
the rollout step to use the site-scoped stream name format
"PUSH_NOTIFICATIONS_{siteID}" (consistent with §3.1 and other usages) so
provisioning guidance matches the contract; update the text in the rollout step
to explicitly show the templated stream name and, if present, add a short note
or example demonstrating substitution of {siteID}.

In `@notification-worker/members.go`:
- Around line 3-7: GetMembers currently returns bare errors on both the data
load and singleflight failure paths; update the function to wrap those errors
with contextual messages (e.g., fmt.Errorf("GetMembers: load failed: %w", err)
and fmt.Errorf("GetMembers: singleflight failed: %w", err)) and add fmt to the
imports; ensure every return of err in GetMembers uses fmt.Errorf with a short
description referencing the operation (load/singleflight/GetMembers) so no bare
err is returned.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d93dfcd-df25-4d88-87b8-abc046263c21

📥 Commits

Reviewing files that changed from the base of the PR and between 6a5109b and 45b219a.

📒 Files selected for processing (36)
  • broadcast-worker/handler.go
  • docs/client-api.md
  • docs/nats-subject-naming.md
  • docs/notification-worker-downstream-contracts.md
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
  • notification-worker/bootstrap.go
  • notification-worker/bootstrap_test.go
  • notification-worker/deploy/docker-compose.yml
  • notification-worker/emit.go
  • notification-worker/emit_test.go
  • notification-worker/handler.go
  • notification-worker/handler_test.go
  • notification-worker/hook.go
  • notification-worker/hook_test.go
  • notification-worker/integration_test.go
  • notification-worker/main.go
  • notification-worker/members.go
  • notification-worker/members_test.go
  • notification-worker/presence.go
  • notification-worker/presence_test.go
  • notification-worker/routing.go
  • notification-worker/routing_test.go
  • notification-worker/threads.go
  • notification-worker/threads_test.go
  • pkg/mention/mention.go
  • pkg/mention/mention_test.go
  • pkg/model/event.go
  • pkg/model/model_test.go
  • pkg/model/presence.go
  • pkg/model/push.go
  • pkg/roomsubcache/roomsubcache.go
  • pkg/roomsubcache/roomsubcache_test.go
  • pkg/stream/stream.go
  • pkg/stream/stream_test.go
  • pkg/subject/subject.go
  • pkg/subject/subject_test.go
💤 Files with no reviewable changes (1)
  • pkg/model/event.go
✅ Files skipped from review due to trivial changes (1)
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
🚧 Files skipped from review as they are similar to previous changes (32)
  • notification-worker/hook.go
  • broadcast-worker/handler.go
  • notification-worker/routing_test.go
  • pkg/roomsubcache/roomsubcache_test.go
  • pkg/stream/stream_test.go
  • pkg/subject/subject_test.go
  • pkg/stream/stream.go
  • notification-worker/bootstrap.go
  • notification-worker/routing.go
  • pkg/roomsubcache/roomsubcache.go
  • notification-worker/deploy/docker-compose.yml
  • notification-worker/bootstrap_test.go
  • docs/nats-subject-naming.md
  • pkg/subject/subject.go
  • pkg/model/presence.go
  • notification-worker/main.go
  • notification-worker/threads_test.go
  • notification-worker/hook_test.go
  • notification-worker/members_test.go
  • pkg/model/push.go
  • notification-worker/threads.go
  • pkg/mention/mention.go
  • notification-worker/presence_test.go
  • docs/client-api.md
  • notification-worker/presence.go
  • notification-worker/emit.go
  • notification-worker/integration_test.go
  • notification-worker/emit_test.go
  • notification-worker/handler.go
  • pkg/mention/mention_test.go
  • pkg/model/model_test.go
  • notification-worker/handler_test.go

Comment thread docs/notification-worker-downstream-contracts.md Outdated
Comment thread docs/notification-worker-downstream-contracts.md Outdated
Comment thread notification-worker/members.go
@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 3 times, most recently from 04482a0 to 81716c2 Compare May 28, 2026 08:03
@Joey0538 Joey0538 added the ready label May 28, 2026
Comment thread docs/notification-worker-downstream-contracts.md Outdated
Comment thread docs/notification-worker-downstream-contracts.md Outdated
Comment thread docs/notification-worker-downstream-contracts.md Outdated
Comment thread notification-worker/handler.go Outdated
Comment thread notification-worker/handler.go Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@pkg/natsutil/gzip.go`:
- Around line 79-89: DecodePayload's gzip branch (case ContentEncodingGzip)
currently uses io.ReadAll(r) which can decompress arbitrarily large payloads;
introduce a package-level constant like maxDecodedSize (e.g., 16<<20) and wrap
the gzip.Reader r in an io.LimitReader(r, maxDecodedSize) before reading, then
read from that limited reader (io.ReadAll or io.ReadFull) and return a clear
error if the limit is hit so consumers fail loudly instead of triggering OOM;
update the ContentEncodingGzip handling in pkg/natsutil/gzip.go accordingly.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 3f4be060-fd62-44bb-ab05-623085113bfc

📥 Commits

Reviewing files that changed from the base of the PR and between 45b219a and 7f1944a.

📒 Files selected for processing (38)
  • broadcast-worker/handler.go
  • docs/client-api.md
  • docs/nats-subject-naming.md
  • docs/notification-worker-downstream-contracts.md
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
  • notification-worker/bootstrap.go
  • notification-worker/bootstrap_test.go
  • notification-worker/deploy/docker-compose.yml
  • notification-worker/emit.go
  • notification-worker/emit_test.go
  • notification-worker/handler.go
  • notification-worker/handler_test.go
  • notification-worker/hook.go
  • notification-worker/hook_test.go
  • notification-worker/integration_test.go
  • notification-worker/main.go
  • notification-worker/members.go
  • notification-worker/members_test.go
  • notification-worker/presence.go
  • notification-worker/presence_test.go
  • notification-worker/routing.go
  • notification-worker/routing_test.go
  • notification-worker/threads.go
  • notification-worker/threads_test.go
  • pkg/mention/mention.go
  • pkg/mention/mention_test.go
  • pkg/model/event.go
  • pkg/model/model_test.go
  • pkg/model/presence.go
  • pkg/model/push.go
  • pkg/natsutil/gzip.go
  • pkg/natsutil/gzip_test.go
  • pkg/roomsubcache/roomsubcache.go
  • pkg/roomsubcache/roomsubcache_test.go
  • pkg/stream/stream.go
  • pkg/stream/stream_test.go
  • pkg/subject/subject.go
  • pkg/subject/subject_test.go
💤 Files with no reviewable changes (1)
  • pkg/model/event.go
✅ Files skipped from review due to trivial changes (2)
  • notification-worker/routing_test.go
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
🚧 Files skipped from review as they are similar to previous changes (25)
  • pkg/stream/stream.go
  • broadcast-worker/handler.go
  • notification-worker/hook_test.go
  • pkg/subject/subject_test.go
  • pkg/model/push.go
  • notification-worker/threads_test.go
  • notification-worker/deploy/docker-compose.yml
  • pkg/stream/stream_test.go
  • notification-worker/bootstrap.go
  • pkg/subject/subject.go
  • pkg/model/presence.go
  • pkg/roomsubcache/roomsubcache_test.go
  • notification-worker/hook.go
  • pkg/model/model_test.go
  • docs/nats-subject-naming.md
  • pkg/mention/mention_test.go
  • pkg/mention/mention.go
  • notification-worker/members.go
  • notification-worker/routing.go
  • docs/client-api.md
  • notification-worker/members_test.go
  • notification-worker/presence.go
  • notification-worker/presence_test.go
  • notification-worker/emit.go
  • notification-worker/main.go

Comment thread pkg/natsutil/gzip.go
@Joey0538 Joey0538 removed the ready label Jun 1, 2026
@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 2 times, most recently from 8d74fa6 to 6057c0f Compare June 1, 2026 07:16
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (3)
notification-worker/handler.go (1)

23-32: ⚡ Quick win

Rename these single-method interfaces to the repo’s -er form.

MemberLookup and RoomMetaLookup are part of the new dependency surface, but both violate the interface naming convention the rest of the services are moving toward.

As per coding guidelines, "Interfaces must use the -er suffix for single-method interfaces and <Domain>Store pattern for store interfaces".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@notification-worker/handler.go` around lines 23 - 32, Rename the two
single-method interfaces to use the `-er` suffix: change MemberLookup to
MemberGetter (keeping its method GetMembers(ctx context.Context, roomID string)
([]roomsubcache.Member, error)) and change RoomMetaLookup to RoomMetaGetter
(keeping Get(ctx context.Context, roomID string) (roommetacache.Meta, error));
update all references/usages, implementations and tests to use MemberGetter and
RoomMetaGetter so the existing methods continue to satisfy the interfaces.
notification-worker/usermeta.go (1)

16-19: ⚡ Quick win

Rename this single-method interface to the repo’s -er form.

UserMetaLookup breaks the service-wide interface naming rule. Renaming it now will keep the handler dependency surface consistent before more call sites accrete.

As per coding guidelines, "Interfaces must use the -er suffix for single-method interfaces and <Domain>Store pattern for store interfaces".

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@notification-worker/usermeta.go` around lines 16 - 19, Rename the
single-method interface UserMetaLookup to follow the repo `-er` convention
(e.g., UserMetaGetter), keeping the method signature Get(ctx context.Context,
userID string) (*model.User, error) unchanged; update all references (function
parameters, struct fields, constructors, implementations and tests/mocks) to use
UserMetaGetter so implementations with the same Get method continue to satisfy
the interface without code changes to their method bodies, and run tests to
ensure no remaining references to UserMetaLookup remain.
pkg/natsutil/gzip.go (1)

43-53: ⚡ Quick win

Drop the previous buffer reference before putting the writer back.

gzip.Writer keeps its last destination writer. Pooling it immediately after Close() means the last bytes.Buffer can stay live until reuse, which is avoidable memory retention on large payloads. Reset to io.Discard before each Put.

Proposed fix
 	gz, _ := gzipWriterPool.Get().(*gzip.Writer)
 	gz.Reset(&buf)
 	if _, err := gz.Write(payload); err != nil {
+		gz.Reset(io.Discard)
 		gzipWriterPool.Put(gz)
 		return nil, fmt.Errorf("gzip write: %w", err)
 	}
 	if err := gz.Close(); err != nil {
+		gz.Reset(io.Discard)
 		gzipWriterPool.Put(gz)
 		return nil, fmt.Errorf("gzip close: %w", err)
 	}
+	gz.Reset(io.Discard)
 	gzipWriterPool.Put(gz)
 	return buf.Bytes(), nil
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/natsutil/gzip.go` around lines 43 - 53, The gzip.Writer retained the
previous bytes.Buffer when returned to gzipWriterPool; call gz.Reset(io.Discard)
to drop the buffer reference before every gzipWriterPool.Put(gz) so the large
buf isn't retained — update the error-return branches and the normal path in the
function using gz, gz.Close(), gzipWriterPool.Put(gz) to call
gz.Reset(io.Discard) immediately before each Put; ensure io is imported so
Reset(io.Discard) compiles.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/notification-worker-downstream-contracts.md`:
- Around line 97-103: Update the migration table row that describes the sender
enrichment to reflect that the new worker now populates display names: change
the note for the nested `sender` object (`Participant`) to indicate
`sender.engName` and `sender.chineseName` are filled best-effort (resolved from
`account`) rather than saying display names are not populated, and keep the
guidance about resolving from `account` if consumers need authoritative values.

In `@notification-worker/handler.go`:
- Around line 70-75: The handler does not propagate a correlation/request ID;
update HandleMessage to extract an existing request ID from the incoming event
(e.g., evt.RequestID or similar) or generate a new unique ID when missing, then
store it in the context (ctx) and use a context-aware logger for all subsequent
logs (the slog calls in this function and downstream handlers should pull the ID
from ctx). Locate HandleMessage, the unmarshalling of evt (json.Unmarshal into
evt), and where msg is used, inject the request ID into context immediately
after unmarshalling and ensure any slog.Warn/Error calls include the
context-based logger so the correlation ID is present for all log lines.
- Around line 233-235: The Emit failure is only logged and the handler
continues, causing silent loss; change the block that calls
h.deps.Emitter.Emit(ctx, pushEvt) so that on error it returns a wrapped error
instead of continuing. Specifically, replace the slog.Error-only branch with an
immediate return of an error (e.g. using fmt.Errorf or errors.Wrap) that
includes context like batchIdx, msg.ID and the original err so the upstream
consumer can redeliver; update the function to propagate that error out if
necessary.

---

Nitpick comments:
In `@notification-worker/handler.go`:
- Around line 23-32: Rename the two single-method interfaces to use the `-er`
suffix: change MemberLookup to MemberGetter (keeping its method GetMembers(ctx
context.Context, roomID string) ([]roomsubcache.Member, error)) and change
RoomMetaLookup to RoomMetaGetter (keeping Get(ctx context.Context, roomID
string) (roommetacache.Meta, error)); update all references/usages,
implementations and tests to use MemberGetter and RoomMetaGetter so the existing
methods continue to satisfy the interfaces.

In `@notification-worker/usermeta.go`:
- Around line 16-19: Rename the single-method interface UserMetaLookup to follow
the repo `-er` convention (e.g., UserMetaGetter), keeping the method signature
Get(ctx context.Context, userID string) (*model.User, error) unchanged; update
all references (function parameters, struct fields, constructors,
implementations and tests/mocks) to use UserMetaGetter so implementations with
the same Get method continue to satisfy the interface without code changes to
their method bodies, and run tests to ensure no remaining references to
UserMetaLookup remain.

In `@pkg/natsutil/gzip.go`:
- Around line 43-53: The gzip.Writer retained the previous bytes.Buffer when
returned to gzipWriterPool; call gz.Reset(io.Discard) to drop the buffer
reference before every gzipWriterPool.Put(gz) so the large buf isn't retained —
update the error-return branches and the normal path in the function using gz,
gz.Close(), gzipWriterPool.Put(gz) to call gz.Reset(io.Discard) immediately
before each Put; ensure io is imported so Reset(io.Discard) compiles.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 99d99568-8499-4599-9c66-dd1f2bf10e3f

📥 Commits

Reviewing files that changed from the base of the PR and between 7f1944a and ef28d81.

📒 Files selected for processing (40)
  • broadcast-worker/handler.go
  • docs/client-api.md
  • docs/nats-subject-naming.md
  • docs/notification-worker-downstream-contracts.md
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
  • notification-worker/bootstrap.go
  • notification-worker/bootstrap_test.go
  • notification-worker/deploy/docker-compose.yml
  • notification-worker/emit.go
  • notification-worker/emit_test.go
  • notification-worker/handler.go
  • notification-worker/handler_test.go
  • notification-worker/hook.go
  • notification-worker/hook_test.go
  • notification-worker/integration_test.go
  • notification-worker/main.go
  • notification-worker/members.go
  • notification-worker/members_test.go
  • notification-worker/presence.go
  • notification-worker/presence_test.go
  • notification-worker/routing.go
  • notification-worker/routing_test.go
  • notification-worker/threads.go
  • notification-worker/threads_test.go
  • notification-worker/usermeta.go
  • notification-worker/usermeta_test.go
  • pkg/mention/mention.go
  • pkg/mention/mention_test.go
  • pkg/model/event.go
  • pkg/model/model_test.go
  • pkg/model/presence.go
  • pkg/model/push.go
  • pkg/natsutil/gzip.go
  • pkg/natsutil/gzip_test.go
  • pkg/roomsubcache/roomsubcache.go
  • pkg/roomsubcache/roomsubcache_test.go
  • pkg/stream/stream.go
  • pkg/stream/stream_test.go
  • pkg/subject/subject.go
  • pkg/subject/subject_test.go
💤 Files with no reviewable changes (1)
  • pkg/model/event.go
✅ Files skipped from review due to trivial changes (2)
  • docs/client-api.md
  • docs/superpowers/plans/2026-05-27-notification-worker-cache-and-mobile.md
🚧 Files skipped from review as they are similar to previous changes (30)
  • pkg/stream/stream.go
  • pkg/stream/stream_test.go
  • notification-worker/hook.go
  • broadcast-worker/handler.go
  • notification-worker/threads_test.go
  • pkg/roomsubcache/roomsubcache_test.go
  • notification-worker/deploy/docker-compose.yml
  • notification-worker/bootstrap_test.go
  • notification-worker/emit_test.go
  • pkg/model/push.go
  • notification-worker/routing.go
  • pkg/model/presence.go
  • notification-worker/routing_test.go
  • notification-worker/emit.go
  • notification-worker/presence_test.go
  • pkg/subject/subject_test.go
  • pkg/roomsubcache/roomsubcache.go
  • notification-worker/hook_test.go
  • notification-worker/threads.go
  • pkg/mention/mention.go
  • pkg/subject/subject.go
  • docs/nats-subject-naming.md
  • notification-worker/members.go
  • notification-worker/presence.go
  • notification-worker/bootstrap.go
  • notification-worker/main.go
  • notification-worker/members_test.go
  • pkg/model/model_test.go
  • pkg/mention/mention_test.go
  • notification-worker/integration_test.go

Comment thread docs/notification-worker-downstream-contracts.md
Comment thread notification-worker/handler.go
Comment thread notification-worker/handler.go Outdated
@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 2 times, most recently from 5ec598a to bdb6799 Compare June 1, 2026 08:41
Comment thread broadcast-worker/handler.go Outdated
resolved := mention.ResolveFromParsed(parsed, userByAccount)

if err := h.store.UpdateRoomLastMessage(ctx, msg.RoomID, msg.ID, msg.CreatedAt, resolved.MentionAll); err != nil {
if err := h.store.UpdateRoomLastMessage(ctx, msg.RoomID, msg.ID, msg.CreatedAt, resolved.MentionAll || resolved.MentionHere); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought we should remove mention here for now ?

Comment thread notification-worker/handler.go Outdated
// this hot path — push event production is purely CPU-bound. The legacy-
// in-flight fallback to UserAccount is owned by Message.SenderDisplayName.
var sender *model.Participant
for i := range members {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should not need to loop through members just to find sender info. I think all these info are already included in the message event. User ID, UserAccount, UserDisplayName

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — fixed in 3112143. Sender now built directly from msg.UserID / msg.UserAccount / msg.SenderDisplayName(); the member-scan was redundant since gatekeeper already populates those fields on the canonical message.


Generated by Claude Code

Comment thread notification-worker/handler.go Outdated
return nil
}

accounts := make([]string, len(candidates))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be built inside the same loop as building candidates slice so that we can save extra loop

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 3112143accounts is now appended in the same pass as candidates, dropping the second walk.


Generated by Claude Code

Comment thread notification-worker/handler.go Outdated
msg := evt.Message
// requestID is set by the NATS dispatch loop in main.go via natsutil.ContextWithRequestIDFromHeaders;
// pulled here once so every slog line carries the same correlation key without per-call lookups.
requestID := natsutil.RequestIDFromContext(ctx)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we put request id in the message canonical Jetstream event ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is carried — not in the JSON payload, in the NATS message header.

message-gatekeeper publishes the canonical event via natsutil.NewMsg(ctx, canonicalSubj, evtData) (see message-gatekeeper/handler.go:287), and natsutil.NewMsg copies X-Request-ID from ctx into nats.Header (see pkg/natsutil/request_id.go:53). In this worker, main.go:287 extracts it back via natsutil.ContextWithRequestIDFromHeaders(msgCtx, msg.Headers()) before calling HandleMessage, so RequestIDFromContext(ctx) here returns the same ID that was minted at the user's original publish entry point. There's an integration assertion of this round-trip in message-gatekeeper/handler_test.go:792.


Generated by Claude Code

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct me if I am wrong. I didn't see message-gatekeeper put the requestId from the JSON payload into the context. Therefore, natsutil.NewMsg(...) will not be able to extract request id from the context.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — I traced the chain again and there is a real gap. processMessage validated req.RequestID (JSON) but never bridged it into ctx, so unless the inbound MESSAGES message also had X-Request-ID in its NATS header, the canonical publish went out header-less. Our only Go publisher to that subject (tools/loadgen/generator.go:198) does not set the header, so this was broken under load testing today.

Fixed in ac8d333message-gatekeeper/handler.go:processMessage now does ctx = natsutil.WithRequestID(ctx, req.RequestID) right after validation, so the JSON-payload requestId is the canonical source for the X-Request-ID header on every downstream publish (canonical event, plus anything else reached via this ctx). Payload value wins over whatever was in ctx since the client's requestId is also what addresses the reply subject. Existing propagation test updated to assert payload-wins; added a second test covering the bare-ctx (no inbound header) case.

Thanks for pushing on this — the original chain only worked because the test happened to seed ctx with WithRequestID directly, masking the missing bridge.


Generated by Claude Code

return meta.Name
case err != nil:
slog.Warn("room meta lookup failed, falling back to sender",
"error", err, "roomId", roomID, "request_id", natsutil.RequestIDFromContext(ctx))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have request id in message canonical event header ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes — see the reply on the line 77 thread for the full chain. Short version: message-gatekeeper publishes the canonical event with natsutil.NewMsg(ctx, ...) which writes X-Request-ID into nats.Header; main.go extracts it into ctx via ContextWithRequestIDFromHeaders before HandleMessage runs, so RequestIDFromContext(ctx) inside resolveTitle here returns the same correlation ID end-to-end.


Generated by Claude Code

Joey0538 pushed a commit that referenced this pull request Jun 1, 2026
…nts loop

Build sender directly from the canonical MessageEvent (UserID, UserAccount,
SenderDisplayName) instead of scanning members to find the same fields.
Collect candidate accounts in the same pass as candidate members so we don't
re-walk the survivors to project Account.

Addresses review feedback on PR #237.
Comment thread notification-worker/emit.go Outdated
return p
}

func (j *jsAsyncPublisher) PublishMsgAsync(msg *nats.Msg) error {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior of Jetstream publish msg async ? Do we ack or nak if error occurs and we fail to publish asynchronously ?

Since we are sending recipients in batch, I am thinking that there might not have a need to use async. One of the reasons is that using async might produce too much events that push-service is not able to keep up the speed to consume due to third party api rate limit.

Another benefit is to reduce code complexity where there is a clear ack and nak behavior. What's your opinion ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed and done in 7ead710. Async publish was actually losing failures — we acked the canonical message before the PubAckFuture resolved, so push-publish errors were log-only. Sync gives clean nak-on-failure semantics: handler aggregates per-batch errors, dispatch loop naks, JetStream redelivers, and the {messageID}-bN Nats-Msg-Id dedups batches that already landed. No throughput cost since each canonical message already runs in its own semaphore-bounded goroutine. Dropped ~80 lines of machinery (jsAsyncPublisher, error pump, drain, doneOnce, PUSH_ASYNC_MAX_PENDING env, the second jetstream.New call). Rate limit on push-service is now handled cleanly by JetStream's pull-consumer backpressure on the next stage.


Generated by Claude Code

Comment thread notification-worker/main.go Outdated
}
}()

invalSub, err := nc.NatsConn().Subscribe(subject.SubscriptionUpdateWildcard(), func(msg *nats.Msg) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a bit concern for subscribing wildcard subscription update events which will be a lot. Perhaps, it's better to just subscribe room canonical subject for add member and remove member event types ?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the volume. Addressed in 7ead710 by keeping the existing wildcard but tightening both axes:

  1. SubscribeQueueSubscribe("notification-worker", ...) so N service instances share the fan-out instead of each receiving every event. Was a real hole — plain core-NATS sub means every replica was redundantly invalidating its own cache.
  2. Inline filter on Action ∈ {added, removed, mute_toggled}. role_updated is dropped (Role isn't in the cached projection so it's a wasted DEL). Kept mute_toggled because the cached member projection (main.go:71) includes muted and the handler filters if m.Muted { continue } against it — dropping mute invalidation would let muted users keep receiving pushes until the 5-min TTL reconciles.

Went this route rather than switching to canonical-room add/remove events because that'd require a new producer-side subject hierarchy out of scope for this PR, and it still wouldn't cover mute_toggled which currently rides this same fan-out.


Generated by Claude Code

Comment thread notification-worker/main.go Outdated

publisher := &natsPublisher{nc: nc}
handler := NewHandler(memberLookup, publisher)
asyncPub := newJSAsyncPublisher(rawJS)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we already launch separate goroutine to handle each nats message, I am thinking that it's better just to use publish sync which is easier to debug the behavior and simpler.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in 7ead710 — same point as the emit.go thread; sync publish in each per-message goroutine.


Generated by Claude Code

Comment thread notification-worker/threads.go Outdated
return map[string]struct{}{}, nil
}
opts := options.Find().SetProjection(bson.M{"userAccount": 1, "_id": 0})
cur, err := m.col.Find(ctx, bson.M{"parentMessageId": parentMessageID}, opts)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you help check if we can use "reply" field on thread room if I remember correctly?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that ThreadRoom.ReplyAccounts exists and is maintained — message-worker/store_mongo.go:139 does $addToSet: {replyAccounts: <account>} on every reply, parent author is seeded at room creation (message-worker/handler.go:132). So it's a viable, cheaper source — one document instead of a Find over thread_subscriptions.

One semantic difference I want to flag before switching: thread_subscriptions also contains rows for users who got @-mentioned on a prior reply but never replied themselves (auto-subscribed via message-worker/handler.go:340 markThreadMentionsMarkThreadSubscriptionMention). ReplyAccounts does not include them — they'd silently stop receiving pushes on subsequent thread replies they aren't re-mentioned in (the worker's own mention.Parse handles same-message mentions, so direct re-mentions still work).

Example: Alice posts "hey @bob" starting a thread. Bob is now in thread_subscriptions (hasMention=true) but not in ReplyAccounts (he hasn't replied). Alice replies again without @bob — current code pushes to Bob (he's in thread_subscriptions); a ReplyAccounts-only check would skip him.

Holding pending a product call on whether mention-only auto-followers should keep receiving pushes on later replies. If the answer is "no, only repliers get the running notification stream," I'll swap to ReplyAccounts in a follow-up commit.


Generated by Claude Code

@mliu33
Copy link
Copy Markdown
Collaborator

mliu33 commented Jun 2, 2026

Excellent work! Just some suggestions, thanks a lot!

@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 2 times, most recently from c8bc593 to a454d96 Compare June 2, 2026 02:25
Comment thread pkg/natsutil/gzip.go Outdated
@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 4 times, most recently from 7735a04 to 2b495d1 Compare June 3, 2026 01:53
@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch 2 times, most recently from 1b388a0 to dd66e50 Compare June 3, 2026 05:28
ngangwar962 pushed a commit that referenced this pull request Jun 3, 2026
…thread_rooms.replyAccounts

Switch channelThreadFanOut from ListThreadSubscriptions (thread_subscriptions
collection cursor scan) to GetThreadFollowers (single FindOne projection on
thread_rooms.replyAccounts), matching the pattern PR #237 introduced in
notification-worker.

- store.go: ListThreadSubscriptions → GetThreadFollowers returning map[string]struct{}
- store_mongo.go: threadSubCol → threadRoomCol; thread_subscriptions index →
  thread_rooms(parentMessageId) index; ListThreadSubscriptions → GetThreadFollowers
  (FindOne + projection, ErrNoDocuments returns empty set)
- handler.go: threadFanOutAccounts takes map[string]struct{} instead of
  []model.ThreadSubscription; channelThreadFanOut drops siteID param
- main.go: wire thread_rooms collection instead of thread_subscriptions
- handler_test.go + mock_store_test.go: updated accordingly

https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
ngangwar962 pushed a commit that referenced this pull request Jun 3, 2026
…thread_rooms.replyAccounts

Switch channelThreadFanOut from ListThreadSubscriptions (thread_subscriptions
collection cursor scan) to GetThreadFollowers (single FindOne projection on
thread_rooms.replyAccounts), matching the pattern PR #237 introduced in
notification-worker.

- store.go: ListThreadSubscriptions → GetThreadFollowers returning map[string]struct{}
- store_mongo.go: threadSubCol → threadRoomCol; thread_subscriptions index →
  thread_rooms(parentMessageId) index; ListThreadSubscriptions → GetThreadFollowers
  (FindOne + projection, ErrNoDocuments returns empty set)
- handler.go: threadFanOutAccounts takes map[string]struct{} instead of
  []model.ThreadSubscription; channelThreadFanOut drops siteID param
- main.go: wire thread_rooms collection instead of thread_subscriptions
- handler_test.go + mock_store_test.go: updated accordingly

https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
claude added 10 commits June 3, 2026 11:12
Adds Encode/Decode helpers for gzip-compressed NATS payloads with a
configurable max decoded size (default 256 KiB) to defend against
gzip bombs while allowing call-site overrides via DecodePayloadWithLimit.
…caches

- pkg/model: PushNotificationEvent, presence types, CanonicalMemberEvent
  struct + Added/Removed/Muted constants
- pkg/subject: RoomCanonicalMemberEvent(Filter) builders + per-site
  push/presence subject builders
- pkg/stream: PUSH_NOTIFICATIONS_{siteID} stream helper
- pkg/mention: @here parsing/resolution alongside @ALL
- pkg/roomsubcache: expanded cached member projection
- pkg/userstore: UserMetaLookup cache with singleflight collapsing
…cal publish

Reads the requestId from the inbound message payload and propagates it
through context to the canonical publish path so downstream consumers
see a consistent correlation ID.
… + mentions)

Treats the parent message author and any @-mentioned users as thread
followers by adding them to thread_rooms.replyAccounts on every reply.
Folds AddReplyAccounts into UpdateThreadRoomLastMessage (now takes
[]string) to halve Mongo round-trips per reply; uses $addToSet $each
for idempotent merge.
…vents

Adds canonical event.member.{added,removed,muted} publishes on the
room-canonical subject hierarchy alongside the existing per-user
subscription.update events. FE keeps consuming subscription.update;
notification-worker (and future server-side consumers) move to the
canonical stream — one event per room mutation instead of N per-user
events.
Replaces per-subscription NATS broadcasts with a Valkey-backed, mention-
aware, presence-filtered mobile push pipeline:

- Member cache (L1 LRU + L2 Valkey) with singleflight collapsing and
  loader fallback; eager invalidation on canonical room member events
- Bulk presence snapshot via NATS request/reply with chunking + fail-open
- Staged recipient filters: mute -> history -> thread/follower -> hook
  veto -> routing -> presence; batches recipients into PushNotificationEvent
- Async JetStream emitter with gzip-encoded payloads
- Suppress-only Vetoer hook interface (noop by default)
- Durable consumer migrated from subscription.update wildcard to the
  RoomCanonicalMemberEventFilter (one event per room mutation, full
  cache invalidate for add/remove/mute)
- PUSH_NOTIFICATIONS_{siteID} stream bootstrap gated by BOOTSTRAP_STREAMS
…out plan

- docs/client-api.md: server-side mobile push stream + handler changes
- docs/nats-subject-naming.md: canonical member event + push/presence
  subjects
- docs/notification-worker-downstream-contracts.md: new; downstream
  consumer contract for PushNotificationEvent
- docs/superpowers/plans/...: rollout plan working notes
Adds NATS_MAX_PAYLOAD_BYTES (default 256 KiB) so the emitter rejects any
gzipped batch larger than the broker's configured max_payload before
publishing — surfaces a clear error instead of a NATS NACK. Set to 0 to
disable the check (tests).

Updates the downstream contract doc to document the cap and the matching
DecodePayloadWithLimit recommendation for push-service consumers.
…member cache

pkg/userstore.Cache is now an in-process LRU+TTL (with singleflight) that
caches both FindUserByID and FindUsersByAccounts, cross-populating the by-id
and by-account prefixes so a hit on either path satisfies the other. Shared
by message-gatekeeper, broadcast-worker, and message-worker; broadcast-worker's
local CachedUserStore is deleted.

In-memory is the right shape here: entries are tiny (~500 B/user), per-pod
working set caps at a few MB for 10K warm users, and writes are rare
(display-name changes are admin events). Valkey overhead buys nothing at
this size. Each service gets USER_CACHE_SIZE + USER_CACHE_TTL knobs (or
USER_META_CACHE_* on gatekeeper for naming continuity).

notification-worker drops the L1 LRU sitting in front of the Valkey
roomsubcache: that L1 could hold up to 1000 large member lists per pod
(~100 MB+ for big channels) of state already living in Valkey, which is
the OOM concern that actually matters for the member-cache path.
Singleflight at the Valkey layer still collapses concurrent in-pod misses;
the only loss is microsecond serving of repeat reads (now ~1-2 ms Valkey
RTT), noise next to the rest of the handler's Mongo and presence RPCs.
Bonus: removes the 5s cross-pod staleness window where remote pods' L1
entries served already-removed members until TTL expiry.
After dual-publish, the mute-toggle canonical member event is emitted
before GetUserSiteID is consulted, so the strict no-call assertion no
longer holds in the GetUserSiteID-failure path.
@Joey0538 Joey0538 force-pushed the claude/inspiring-feynman-iCsHu branch from dd66e50 to 2fd5b93 Compare June 3, 2026 11:22
Comment thread message-worker/handler.go
if parentFound {
replyAccounts = append(replyAccounts, parentSender.Account)
}
if err := h.threadStore.UpdateThreadRoomLastMessage(ctx, existingRoom.ID, msg.ID, replyAccounts, now); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

Comment thread message-worker/handler.go
mentionedAccounts = append(mentionedAccounts, p.Account)
}
if len(mentionedAccounts) > 0 {
if err := h.threadStore.AddReplyAccounts(ctx, threadRoomID, mentionedAccounts); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

// GetMembers returns the member list, populating Valkey on a Mongo round-trip.
// Callers must not mutate the slice.
func (c *cachedMemberLookup) GetMembers(ctx context.Context, roomID string) ([]roomsubcache.Member, error) {
members, err, _ := c.sf.Do(roomID, func() (any, error) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I remember correctly, I think we should return from cache first if exist before using c.sf.Do(...) because only one request can execute single flight while other concurrent requests will be waiting. Is my understanding correct ?

Comment thread room-worker/handler.go Outdated
slog.Error("marshal canonical member event failed", "error", err, "type", eventType, "roomID", roomID, "account", account)
return
}
if err := h.publish(ctx, subject.RoomCanonicalMemberEvent(h.siteID, eventType), data, ""); err != nil {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't publish room canonical event again in room-worker. The room canonical event is already published and should only be published by room-service

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch on the smell — there is a real problem here, but the mechanism isn't quite duplicate-publish. Let me lay out what's actually happening:

The two subjects look similar but mean different things:

Subject Publisher Consumer Role
chat.room.canonical.{site}.member.add room-service room-worker Command — "please add this member" (a JetStream job)
chat.room.canonical.{site}.event.member.added room-worker notification-worker Event — "member has been added" (post-mutation signal for cache invalidation)
chat.room.canonical.{site}.event.member.muted room-service notification-worker Event published directly by room-service because mute is a sync RPC (no worker hop)

For mute, room-service does the Mongo write itself, so it publishes both. For add/remove, the work runs async in room-worker, so room-worker is the only place that knows when the Mongo write has actually landed. If room-service published the event up front (alongside the command), notification-worker would invalidate the cache, immediately reload from Mongo, and repopulate with pre-mutation state — stale until TTL expires.

The real bug — room-worker eats its own publishes:

The ROOMS_{siteID} stream subject pattern is chat.room.canonical.{siteID}.> (wildcard tail — see pkg/stream/stream.go:32 + pkg/subject/subject.go:301). Both the command subjects and the event subjects flow into this stream.

room-worker's consumer is created with no FilterSubjects, so it pulls everything — including the .event.member.added/removed messages it published itself. The switch in HandleJetStreamMsg (handler.go:201-217) doesn't match those suffixes, falls through to default:, and logs WarnContext("unknown member operation", ...) before acking. So every member mutation produces a spurious warn-log line and wastes a JetStream pull cycle.

Note this isn't new with the dual-publish commit — the wildcard stream + no-filter consumer setup was already there. The new event publishes just made the symptom visible.

Proposed fix — filter room-worker's consumer (one line, matches what notification-worker already does):

notification-worker created its consumer on the same stream with FilterSubject: subject.RoomCanonicalMemberEventFilter (event.member.>) — see notification-worker/main.go:228-229. The symmetric fix for room-worker is a positive list of just the command subjects in buildConsumerConfig:

cc.FilterSubjects = []string{
    subject.RoomCanonical(siteID, "create"),
    subject.RoomCanonical(siteID, "member.add"),
    subject.RoomCanonical(siteID, "member.remove"),
    subject.RoomCanonical(siteID, "member.role-update"),
    subject.RoomCanonical(siteID, "room.rename"),
}

After this, room-worker only sees commands, notification-worker only sees events, both publishers keep their post-mutation timing correct, and the warn-log spam disappears.

The two alternatives both have downsides:

  • Move add/remove to room-service (sync) — loses the intentional async architecture: processAddMembers does Mongo writes + key fanout + sys-message publishes + backfill; blocking client RPC for that would be a regression.
  • Separate stream for events — splits "all canonical room messages" across two streams, needs ops/IaC bootstrap, no win over filtering.

Does this match your concern? If you're good with the filter approach I'll implement it.


Generated by Claude Code

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following up — there's a third option I missed initially that's architecturally cleaner. Sharing so you can pick.

Option C — reuse the existing member-change sys-messages on MESSAGES_CANONICAL:

room-worker already publishes member-change sys-messages to MESSAGES_CANONICAL_{site} after the Mongo write lands (see room-worker/handler.go:537-556):

sysMsg := model.Message{
    Type:        model.MessageTypeMemberRemoved,  // or MembersAdded / MemberLeft
    ...
}
h.publish(ctx, subject.MsgCanonicalCreated(h.siteID), msgEvtData, ...)

And notification-worker already consumes MESSAGES_CANONICAL as its primary input (notification-worker/main.go:176) — every push notification flows through HandleMessage. So notification-worker could invalidate roomsubcache when it sees msg.Type ∈ {MembersAdded, MemberLeft, MemberRemoved} at the top of HandleMessage, before the GetMembers call. Timing is correct (sys-message is published post-Mongo, same as the canonical event would be).

What this would change:

  • Delete the event.member.added/removed publishes from room-worker/handler.go's publishMemberMutation helper.
  • Add an invalidation branch in notification-worker/handler.go HandleMessage keyed on msg.Type.
  • Keep room-service's event.member.muted publish — mute is a sync RPC with no worker hop and no sys-message (it's a per-user preference, not a room event), so it still needs the canonical event path.
  • Keep notification-worker's existing ROOMS-stream consumer with its filter — it now only carries mute events.
  • No filter needed on room-worker's consumer — the self-consume problem goes away entirely because room-worker no longer publishes to .event.member.*.

Comparison:

Aspect Option A: filter room-worker Option C: invalidate on sys-message
Change footprint 1 line in buildConsumerConfig room-worker delete + notification-worker handler edit + tests
Subjects in play Same (4 commands + 3 events) Drops .event.member.added/removed, keeps .event.member.muted only
Streams in play Same Same
Self-consume warn-log Fixed by filter Gone (no publish to self-consume)
Conceptual model "Filter out my own publishes" "Cache invalidation rides the sys-message that already exists"
Risk Very low Low — but couples cache invalidation to sys-message presence (if MessageTypeMemberLeft is ever published without a real mutation, e.g. a backfill replay, we'd invalidate unnecessarily — harmless but worth noting)

A is a 5-minute fix that ships now. C is the right architecture longer-term and removes a whole subject family. They're not mutually exclusive — we could ship A on this PR to unblock and file C as a follow-up, or do C directly here.

Which would you prefer?

  • A — filter now, keep the event subjects.
  • C — refactor to sys-message invalidation in this PR.
  • A now, C as follow-up — unblock first, clean up next.

Generated by Claude Code

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going with Option C — sys-message-driven invalidation for add/remove, dedicated event subject retained only for mute.

The federation refactor (OUTBOX retiring, INBOX-only via supercluster gateway) tipped this:

  • Three dedicated .event.member.* subjects would each need gateway-routing decisions designed and maintained. One subject (mute) is materially less surface area than three.
  • Sys-messages already need cross-site delivery for chat content reasons. Cache invalidation for add/remove rides that route — no new federation contract to design.
  • Mute legitimately keeps its dedicated subject: no sys-message exists for it (private preference, not a chat-visible event), and its cross-site routing semantics (user's home site → room's host site) differ from room-state events.

Why this works correctness-wise: member add/remove are channel-only by hard guard in room-worker (handler.go:350-351 for remove, 808-811 for add). Sys-messages are always published for these operations. No DM gap.

Coupling note we'll document: if anyone later relaxes the channel-only invariant for member ops (group DMs with dynamic membership, bot-managed rooms, etc.), they need to remember that sys-message publication is also a cache-invalidation signal. Will add comments at the publish and invalidate sites so the coupling isn't invisible.

Concrete changes coming:

  1. room-worker/handler.go — delete event.member.added/removed publishes from publishMemberMutation helper (keep per-user subscription.update for FE)
  2. notification-worker/handler.go — invalidate roomsubcache at top of HandleMessage when msg.Type ∈ {MembersAdded, MemberLeft, MemberRemoved}
  3. notification-worker/main.go — narrow the ROOMS-stream consumer's FilterSubject from .event.member.> to .event.member.muted; rename durable to notification-worker-mute-invalidate
  4. Tests + docs/client-api.md + downstream contracts doc updated to match

Starting implementation now.


Generated by Claude Code

claude added 2 commits June 3, 2026 12:30
Singleflight collapses concurrent loads but also serializes cache hits
behind the in-flight caller. Probe the cache first and only enter
singleflight on miss (with a re-check inside the flight).
…member-change sys-messages

room-worker stopped publishing dedicated canonical event.member.added/removed
events; notification-worker now invalidates roomsubcache at the top of
HandleMessage when it sees a MembersAdded/MemberLeft/MemberRemoved
sys-message on MESSAGES_CANONICAL. Mute keeps its dedicated canonical
event subject (no sys-message, different cross-site routing).

Closes a self-consume warn-log smell where room-worker's ROOMS-stream
consumer pulled its own .event.member.added/removed publishes (no
FilterSubjects) and dispatched to default. Drops two publishes per
member mutation and one entry in the subject-routing surface that the
upcoming federation refactor would have had to thread through.

Safe because add/remove are channel-only by hard guard in room-worker
(handler.go:350,808); the sys-message is always published when the
mutation happens. Documented at the publish/consume sites so the
coupling is visible if anyone later relaxes that invariant.

https://claude.ai/code/session_01BYAKxp36datqtnxvgmxLXj
ngangwar962 pushed a commit that referenced this pull request Jun 3, 2026
Adds end-to-end thread reply support across the pipeline. The headline
change is broadcast-worker fanning out thread reply create/edit/delete
events and the parent message's reply-count badge (tcount) to the right
audience in real time.

## Services changed

**broadcast-worker** (the feature)
- handleThreadCreated/Updated/Deleted + handleThreadTCountUpdated
- Thread fan-out via GetThreadFollowers (thread_rooms.replyAccounts) —
  single FindOne projection, same pattern as notification-worker PR #237
- Concurrent publishToThreadAccounts via errgroup (was serial)
- buildRoomEvent forwards evt.Timestamp instead of time.Now() for stable
  JetStream redelivery semantics
- GetThreadFollowers adds siteID filter + compound index
  {parentMessageId, siteId} to prevent cross-site data leakage
- All slog calls use context variants (WarnContext/ErrorContext) with
  request_id
- Channel replies fan out to replyAccounts ∪ @-mentions minus sender/bot;
  DM/BotDM replies go to all human members
- BulkUpdateRoomLastMessage for batching room metadata writes

**message-worker**
- SaveThreadMessage returns *int (post-CAS tcount) for badge publishing
- publishThreadReplyEvent emits EventThreadReplyAdded on .created subject
- saveThreadMessageEncrypted uses standalone IF NOT EXISTS LWT INSERT on
  messages_by_id (not UnloggedBatch) to detect JetStream redeliveries;
  thread_messages_by_room written separately (non-LWT) after the guard
- incrementParentTcount uses slog.ErrorContext with request_id

**history-service**
- Edit/delete events carry ThreadParentMessageID + TShow
- SoftDeleteMessage CAS-decrements tcount; messages_by_room mirror is
  best-effort (non-fatal) to avoid suppressing the canonical delete event
- publishThreadMetadataEvent on delete carries post-CAS NewTCount
- TestHistoryService_DeleteMessage_ThreadReply_PublishFailsButDeleteSucceeds
  covers the best-effort publish contract

**search-sync-worker**
- Skip EventThreadReplyAdded in BuildAction (badge events carry no
  searchable document; skipping prevents spurious index errors)

**room-service**
- UpdateSubscriptionThreadRead made atomic (returns resulting array +
  alert flag)

**pkg/model**
- EventThreadReplyAdded, NewTCount *int on MessageEvent (bson tag omits
  omitempty — zero is a valid tcount when last reply deleted),
  RoomEventThreadMetadataUpdated, ThreadAction, ThreadMetadataUpdatedEvent

**pkg/subject**
- Badge events use existing .created subject + EventThreadReplyAdded
  discriminator (no new subject namespace); MsgCanonicalThreadReply and
  EventThreadReplyDeleted removed as dead code

## Implementation notes

- Thread reply identity: non-empty ThreadParentMessageID + TShow=false
- Canonical subject convention: badge events travel over
  chat.msg.canonical.{siteID}.created with event field as discriminator
- Badge authority: NewTCount is the post-CAS value from messages_by_id
- Idempotency: IF NOT EXISTS LWT on messages_by_id; stable JetStream
  dedup IDs (e.g. thread-reply-added:{site}:{msgID})
- Error semantics: badge-path publish failures propagate so JetStream
  nacks and redelivers

## Known limitations (accepted)

- Parent-message mentionees not in replyAccounts won't receive thread events
- Edit/delete fan-out uses current followers only
- SaveThreadMessage write-then-publish is not atomic
- messages_by_room tcount mirror can drift (logged at ERROR, no self-heal)

https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
Copy link
Copy Markdown
Collaborator

@mliu33 mliu33 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Amazing work, thanks! Just one more minor comment

if err := cursor.All(ctx, &subs); err != nil {
return nil, fmt.Errorf("decode subscriptions for room %s: %w", roomID, err)
var out []roomsubcache.Member
for cur.Next(ctx) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is using cur.All better instead of decode one by one?

@mliu33 mliu33 merged commit f48a698 into main Jun 3, 2026
14 checks passed
ngangwar962 pushed a commit that referenced this pull request Jun 4, 2026
Adds end-to-end thread reply support across the pipeline. The headline
change is broadcast-worker fanning out thread reply create/edit/delete
events and the parent message's reply-count badge (tcount) to the right
audience in real time.

**broadcast-worker** (the feature)
- handleThreadCreated/Updated/Deleted + handleThreadTCountUpdated
- Thread fan-out via GetThreadFollowers (thread_rooms.replyAccounts) —
  single FindOne projection, same pattern as notification-worker PR #237
- Concurrent publishToThreadAccounts via errgroup (was serial)
- buildRoomEvent forwards evt.Timestamp instead of time.Now() for stable
  JetStream redelivery semantics
- GetThreadFollowers adds siteID filter + compound index
  {parentMessageId, siteId} to prevent cross-site data leakage
- All slog calls use context variants (WarnContext/ErrorContext) with
  request_id
- Channel replies fan out to replyAccounts ∪ @-mentions minus sender/bot;
  DM/BotDM replies go to all human members
- BulkUpdateRoomLastMessage for batching room metadata writes

**message-worker**
- SaveThreadMessage returns *int (post-CAS tcount) for badge publishing
- publishThreadReplyEvent emits EventThreadReplyAdded on .created subject
- saveThreadMessageEncrypted uses standalone IF NOT EXISTS LWT INSERT on
  messages_by_id (not UnloggedBatch) to detect JetStream redeliveries;
  thread_messages_by_room written separately (non-LWT) after the guard
- incrementParentTcount uses slog.ErrorContext with request_id

**history-service**
- Edit/delete events carry ThreadParentMessageID + TShow
- SoftDeleteMessage CAS-decrements tcount; messages_by_room mirror is
  best-effort (non-fatal) to avoid suppressing the canonical delete event
- publishThreadMetadataEvent on delete carries post-CAS NewTCount
- TestHistoryService_DeleteMessage_ThreadReply_PublishFailsButDeleteSucceeds
  covers the best-effort publish contract

**search-sync-worker**
- Skip EventThreadReplyAdded in BuildAction (badge events carry no
  searchable document; skipping prevents spurious index errors)

**room-service**
- UpdateSubscriptionThreadRead made atomic (returns resulting array +
  alert flag)

**pkg/model**
- EventThreadReplyAdded, NewTCount *int on MessageEvent (bson tag omits
  omitempty — zero is a valid tcount when last reply deleted),
  RoomEventThreadMetadataUpdated, ThreadAction, ThreadMetadataUpdatedEvent

**pkg/subject**
- Badge events use existing .created subject + EventThreadReplyAdded
  discriminator (no new subject namespace); MsgCanonicalThreadReply and
  EventThreadReplyDeleted removed as dead code

- Thread reply identity: non-empty ThreadParentMessageID + TShow=false
- Canonical subject convention: badge events travel over
  chat.msg.canonical.{siteID}.created with event field as discriminator
- Badge authority: NewTCount is the post-CAS value from messages_by_id
- Idempotency: IF NOT EXISTS LWT on messages_by_id; stable JetStream
  dedup IDs (e.g. thread-reply-added:{site}:{msgID})
- Error semantics: badge-path publish failures propagate so JetStream
  nacks and redelivers

- Parent-message mentionees not in replyAccounts won't receive thread events
- Edit/delete fan-out uses current followers only
- SaveThreadMessage write-then-publish is not atomic
- messages_by_room tcount mirror can drift (logged at ERROR, no self-heal)

https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
ngangwar962 pushed a commit that referenced this pull request Jun 4, 2026
…name, notification-worker PRs)

After rebasing onto origin/main (which merged PRs #250, #155, #224, #237 since branch diverged),
restore all files that were incorrectly resolved to their origin/main versions:
- pkg/model: message.go, subscription.go — restore RoomRenamedSysData, IsRoomMember, Restricted
- pkg/model/event.go — add ReplyMessageID to ThreadMetadataUpdatedEvent, format with goimports
- pkg/subject/subject.go — restore PushNotificationFilter needed by pkg/stream
- message-worker/store_cassandra.go — fix SaveThreadMessage/incrementParentTcount return types to (*int, error)
- message-worker/handler_test.go — use gomock.Any() for replyAccounts (parent author now included)
- message-gatekeeper: restore handler.go/handler_test.go (NewHandler signature with UserGetter)
- notification-worker: restore handler.go/main.go from origin/main (PR #237 overhaul)
- room-service: restore all files from origin/main (OrgMembers, UpdateSubscriptionThreadRead signatures)

https://claude.ai/code/session_013Vs7CusvrZFrRKJaSoFtCi
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants