feat(search-sync-worker): add spotlight + user-room sync collections#78
feat(search-sync-worker): add spotlight + user-room sync collections#78
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR implements member event search synchronization and canonical event publishing. It adds Changes
Sequence Diagram(s)sequenceDiagram
participant JS as JetStream Stream
participant Handler as Handler (fetchMsg loop)
participant Col as Collection (spotlight/user-room)
participant ES as Elasticsearch Adapter
participant Bulk as Bulk Request
loop Fetch & Process
Handler->>JS: Fetch up to bulkBatchSize capacity
JS-->>Handler: MemberAddEvent/MemberRemoveEvent (raw JSON)
Handler->>Col: BuildAction(data)
Col->>Col: parseMemberEvent()
Col-->>Handler: []searchengine.BulkAction
Handler->>Handler: Append actions to h.actions<br/>Record (msg, actionStart, actionCount)
end
alt Flush triggered (size or interval)
Handler->>Handler: Snapshot & clear pending/actions under lock
Handler->>ES: Submit single bulk request with all actions
ES-->>Handler: BulkResponse (per-item results)
Handler->>Handler: Check each action's status<br/>(2xx, 409, context-aware 404)
alt All actions in message succeeded
Handler->>JS: Ack source message
else Any action failed
Handler->>JS: Nak source message (retry)
end
else Bulk error
Handler->>JS: Nak all pending source messages
end
sequenceDiagram
participant Room as Room Worker
participant Sub as Subscription Update
participant Out as Outbox Publish
participant Canon as Canonical Subject Publish
participant IB as Inbox Worker
Note over Room,IB: Member Add Flow
Room->>Room: processAddMembers()
Room->>Sub: Update member subscription
Sub-->>Room: subscription created
Room->>Out: Publish MemberAddEvent to outbox<br/>(with RoomName, RoomType)
Room->>Canon: Publish MemberAddEvent to<br/>RoomCanonicalMemberAdded()
Out-->>IB: Consume from outbox
Canon-->>IB: Consume from canonical subject
IB->>IB: handleMemberAdded()
IB->>Canon: Re-publish to canonical<br/>RoomCanonicalMemberAdded(siteID)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
search-sync-worker/main.go (1)
226-255:⚠️ Potential issue | 🟠 Major
BATCH_SIZEno longer bounds the bulk request size.This loop still fetches and buffers up to
batchSizemessages before it checks whether to flush, butBuildActionnow returns multiple bulk actions per message. A small number of fan-out messages can therefore blow far past the configured limit before the first flush, which means oversized ES bulk requests and avoidable memory spikes.Please base the flush threshold on buffered actions, and check it inside the message loop rather than only after the whole fetched batch.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/main.go` around lines 226 - 255, The loop currently flushes based on buffered messages, but because BuildAction can produce multiple actions per message you must instead track and flush based on buffered actions: add or use a handler method that reports current action count (e.g., ActionCount() or make handler.Add return number of actions added from a message), change the fetch/threshold logic to compute fetchSize from remaining action capacity (batchSize - handler.ActionCount(), with floor 1), and move the flush check inside the for msg := range batch.Messages() loop so after each handler.Add(...) you check handler.ActionCount() (or use handler.BufferFull() redefined to mean action-capacity-full) and call handler.Flush(ctx) and update lastFlush when the action limit is reached; ensure you stop processing further messages from the fetched batch once the action threshold is hit so ES bulk size cannot exceed batchSize.
🧹 Nitpick comments (1)
search-sync-worker/inbox_integration_test.go (1)
378-382: Don't split a stateful scenario into ordered subtests.These subtests mutate shared ES/NATS state and rely on the previous step's side effects. If one
requireaborts mid-sequence, the rest inherit half-mutated state and start failing noisily. Either keep this as one linear test or create fresh fixtures per subtest. As per coding guidelines, "Each test must be fully independent — no shared mutable state between tests; never rely on test execution order."Also applies to: 430-432
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/inbox_integration_test.go` around lines 378 - 382, The TestUserRoomSync_LWWGuard test is split into ordered subtests that mutate shared ES/NATS state and use require, which can leave later subtests in a half-mutated state; either collapse the subtests into one single linear test body (remove t.Run subtests) inside TestUserRoomSync_LWWGuard so state is mutated deterministically, or make each t.Run create fresh fixtures (new ES index, new NATS subject/connection, freshly seeded data) so they are fully independent; update any helpers used by the subtests (setup/seed functions referenced by TestUserRoomSync_LWWGuard and its t.Run children) to return isolated resources and ensure every subtest defers teardown to clean ES/NATS state before returning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/searchengine/searchengine.go`:
- Around line 25-35: The comment on BulkAction misleadingly states ES `_update`
doesn't accept `version`/`version_type`; change the comment on the BulkAction
type so it accurately says Elasticsearch does support versioning on update items
but this adapter intentionally ignores the Version field for ActionUpdate (i.e.,
for ActionUpdate, Version is ignored by design because search-sync collections
use collection-level idempotency/guards rather than external versioning). Update
the text to mention that this is a deliberate adapter choice and not an ES
limitation, referencing BulkAction, ActionUpdate and Version so readers
understand why Version is ignored for updates.
In `@search-sync-worker/handler.go`:
- Around line 111-126: The loop over pending batches treats any non-2xx/409
result as failure; change the check inside the loop (iterating i from
p.actionStart to p.actionStart+p.actionCount over results and actions) to also
treat Status==404 as success when the corresponding action is a delete or update
(i.e., check actions[i].Action == ActionDelete || actions[i].Action ==
ActionUpdate), while preserving the existing acceptance for 2xx and 409 for all
actions and keeping 404 as failure for index actions; update the condition that
sets allOK and the error logging accordingly so only true failures trigger
slog.Error with result.Status/result.Error/actions[i].DocID/actions[i].Index.
In `@search-sync-worker/inbox_stream.go`:
- Around line 37-43: The StreamSource construction is using both FilterSubject
and SubjectTransforms which are mutually exclusive; remove the FilterSubject
field from the StreamSource for the OUTBOX_<remote> source and rely on the
SubjectTransforms entry (its Source value, e.g. sourcePattern) to act as the
filter. Update the code that appends the jetstream.StreamSource (the block
creating sources = append(... &jetstream.StreamSource{ Name:
fmt.Sprintf("OUTBOX_%s", remote), FilterSubject: ..., SubjectTransforms:
[]jetstream.SubjectTransformConfig{ {Source: sourcePattern, Destination:
destPattern}, }, })) to omit FilterSubject so only SubjectTransforms.Source is
used as the selector.
In `@search-sync-worker/spotlight.go`:
- Around line 97-100: spotlightTemplateBody currently hard-codes "spotlight-*"
for the index_patterns which will miss custom/versioned spotlight indices;
change it to read the configured spotlight pattern instead (e.g., use the app
config value or helper like
getSpotlightIndexPattern()/cfg.SpotlightIndexPattern) and set "index_patterns":
[]string{configuredPattern} so the template targets the exact
configured/versioned spotlight index rather than the broad hard-coded wildcard.
In `@search-sync-worker/user_room.go`:
- Around line 182-185: userRoomTemplateBody currently hardcodes "user-room-*" so
template doesn't follow caller-supplied index name used by BuildAction; change
userRoomTemplateBody to accept the configured index/prefix (or otherwise access
the same config used by BuildAction, e.g., pass c.indexName or an indexPattern
string) and use that value for the "index_patterns" entry instead of
"user-room-*", ensuring the template will be applied to the actual indices (and
thus include the expected rooms.keyword / roomTimestamps mappings).
- Around line 107-117: The remove path currently emits a plain update (in the
OutboxMemberRemoved case using buildRemoveRoomUpdateBody and returning a
searchengine.BulkAction with ActionUpdate) which will 404 if the user doc is
missing; modify the OutboxMemberRemoved branch (and the similar branch around
the 161-176 range) to either include an upsert/tombstone payload or use
scripted_upsert so the update becomes an upsert (no-op when doc missing), or
alternatively change the bulk adapter to treat document_missing_exception/404
for remove updates as success; ensure the change references the same indexName
and account DocID and preserves the existing update script semantics while
preventing 404s on missing documents.
---
Outside diff comments:
In `@search-sync-worker/main.go`:
- Around line 226-255: The loop currently flushes based on buffered messages,
but because BuildAction can produce multiple actions per message you must
instead track and flush based on buffered actions: add or use a handler method
that reports current action count (e.g., ActionCount() or make handler.Add
return number of actions added from a message), change the fetch/threshold logic
to compute fetchSize from remaining action capacity (batchSize -
handler.ActionCount(), with floor 1), and move the flush check inside the for
msg := range batch.Messages() loop so after each handler.Add(...) you check
handler.ActionCount() (or use handler.BufferFull() redefined to mean
action-capacity-full) and call handler.Flush(ctx) and update lastFlush when the
action limit is reached; ensure you stop processing further messages from the
fetched batch once the action threshold is hit so ES bulk size cannot exceed
batchSize.
---
Nitpick comments:
In `@search-sync-worker/inbox_integration_test.go`:
- Around line 378-382: The TestUserRoomSync_LWWGuard test is split into ordered
subtests that mutate shared ES/NATS state and use require, which can leave later
subtests in a half-mutated state; either collapse the subtests into one single
linear test body (remove t.Run subtests) inside TestUserRoomSync_LWWGuard so
state is mutated deterministically, or make each t.Run create fresh fixtures
(new ES index, new NATS subject/connection, freshly seeded data) so they are
fully independent; update any helpers used by the subtests (setup/seed functions
referenced by TestUserRoomSync_LWWGuard and its t.Run children) to return
isolated resources and ensure every subtest defers teardown to clean ES/NATS
state before returning.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8ff946f6-d333-4ddf-aff1-028f37e0421f
📒 Files selected for processing (23)
pkg/model/event.gopkg/model/model_test.gopkg/searchengine/adapter.gopkg/searchengine/adapter_test.gopkg/searchengine/searchengine.gopkg/stream/stream.gopkg/stream/stream_test.gopkg/subject/subject.gopkg/subject/subject_test.gosearch-sync-worker/collection.gosearch-sync-worker/handler.gosearch-sync-worker/inbox_integration_test.gosearch-sync-worker/inbox_stream.gosearch-sync-worker/inbox_stream_test.gosearch-sync-worker/integration_test.gosearch-sync-worker/main.gosearch-sync-worker/messages.gosearch-sync-worker/messages_test.gosearch-sync-worker/spotlight.gosearch-sync-worker/spotlight_test.gosearch-sync-worker/template.gosearch-sync-worker/user_room.gosearch-sync-worker/user_room_test.go
456d971 to
c906357
Compare
Response to CodeRabbit reviewPushed ✅ Fixed (5)1. Done. Extracted a new
This is the "known sharp edge" I flagged in the PR description, and it's the right call to fix it now. Added 14 unit test cases in Bonus: this also resolves finding #5 ( 3. Done. Verified against both docs.nats.io/source_and_mirror and ADR-36: 4. Done. 5. Done. 8. (nitpick) Done. Per CLAUDE.md's "each test must be fully independent — no shared mutable state, never rely on test execution order" rule, collapsed the 6 sequential ❌ Rejected (1)1. The comment as-written is correct. CodeRabbit's web-search summary is inaccurate. The ES
And from the Bulk API docs for the The reason is architectural: The cited links in the web-search result describe the bulk API in general; they don't actually verify support for ⏸️ Deferred (1)7. (outside diff) Valid architectural point but not actionable today. Every collection currently in this PR produces exactly one action per message ( When/if such a collection lands, the fix is straightforward: track an I've left a comment in Summary
🤖 Generated with Claude Code Generated by Claude Code |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
search-sync-worker/main.go (1)
80-91:⚠️ Potential issue | 🟠 MajorReject non-positive batch settings at startup.
runConsumerassumes these values are> 0.FETCH_BATCH_SIZE<=0can collapse intoFetch(0)/busy looping,BULK_BATCH_SIZE<=0keepsremaining<=0forever, andFLUSH_INTERVAL<=0forces constant flush checks. Please validate them immediately after parsing/defaulting config and exit with a clear error.Suggested startup validation
if cfg.SpotlightIndex == "" { cfg.SpotlightIndex = fmt.Sprintf("spotlight-%s-v1-chat", cfg.SiteID) } if cfg.UserRoomIndex == "" { cfg.UserRoomIndex = fmt.Sprintf("user-room-%s", cfg.SiteID) } + switch { + case cfg.FetchBatchSize <= 0: + slog.Error("invalid config", "name", "FETCH_BATCH_SIZE", "value", cfg.FetchBatchSize) + os.Exit(1) + case cfg.BulkBatchSize <= 0: + slog.Error("invalid config", "name", "BULK_BATCH_SIZE", "value", cfg.BulkBatchSize) + os.Exit(1) + case cfg.FlushInterval <= 0: + slog.Error("invalid config", "name", "FLUSH_INTERVAL", "value", cfg.FlushInterval) + os.Exit(1) + } ctx := context.Background()As per coding guidelines, "Fail fast on missing required config — log error and exit with non-zero code".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/main.go` around lines 80 - 91, After parsing/defaulting the config from env.ParseAs[config], validate that numeric settings used by runConsumer (FETCH_BATCH_SIZE, BULK_BATCH_SIZE, FLUSH_INTERVAL) are positive; if any are <= 0 log a clear error via slog.Error (including the setting name and value) and exit with a non-zero code (os.Exit(1)). Update the startup path immediately after the current cfg defaults (after setting SpotlightIndex and UserRoomIndex) to check cfg.FetchBatchSize, cfg.BulkBatchSize, and cfg.FlushInterval (or the exact field names in config) and fail fast to prevent Fetch(0), infinite remaining loop, or constant flush checks in runConsumer.
🧹 Nitpick comments (2)
search-sync-worker/template.go (1)
25-33: Skipes-tagged fields that don't expose a concrete JSON name.This shared helper will currently create a mapping entry under
""or"-"if a future struct forgets ajsontag or usesjson:"-". Failing closed here is safer than quietly generating a broken template.Suggested hardening
jsonTag := field.Tag.Get("json") name, _, _ := strings.Cut(jsonTag, ",") + if name == "" || name == "-" { + continue + } esType, analyzer, _ := strings.Cut(esTag, ",")🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/template.go` around lines 25 - 33, The code currently derives JSON field name into variable name and unconditionally adds a mapping entry; change it to skip fields that don't expose a concrete JSON name by checking the parsed name from jsonTag and returning without adding a prop when name == "" or name == "-". Update the block around jsonTag/name (the variables jsonTag and name) so that after computing name, you do a guard (if name == "" || name == "-" { continue } or return depending on context) before computing esType/analyzer and assigning props[name], ensuring no mapping is created for anonymous/ignored json fields.search-sync-worker/inbox_integration_test.go (1)
40-58:historySharedis accepted but its actual value is ignored.On Line 57, the helper only converts the pointer to a boolean flag. The concrete timestamp passed by callers (e.g.,
restrictedFrom) is not propagated intoSubscription.HistorySharedSince, which makes those test inputs less faithful.💡 Proposed refactor
type memberFixture struct { SubID string Account string Restricted bool // if true, HistorySharedSince is set — user-room-sync filters, spotlight-sync indexes + HistorySharedSince *time.Time } func buildMemberEventPayload( subID, account, roomID, roomName, siteID string, joinedAt time.Time, historyShared *time.Time, ) model.MemberAddedPayload { return buildBulkMemberEventPayload(roomID, roomName, siteID, joinedAt, []memberFixture{{ SubID: subID, Account: account, Restricted: historyShared != nil, + HistorySharedSince: historyShared, }}) } func buildBulkMemberEventPayload( roomID, roomName, siteID string, joinedAt time.Time, members []memberFixture, ) model.MemberAddedPayload { - historyFrom := joinedAt.Add(-1 * time.Hour) subscriptions := make([]model.Subscription, 0, len(members)) for _, m := range members { sub := model.Subscription{ ID: m.SubID, User: model.SubscriptionUser{ID: "u-" + m.Account, Account: m.Account}, RoomID: roomID, SiteID: siteID, Role: model.RoleMember, JoinedAt: joinedAt, LastSeenAt: joinedAt, } - if m.Restricted { + if m.HistorySharedSince != nil { + sub.HistorySharedSince = m.HistorySharedSince + } else if m.Restricted { + historyFrom := joinedAt.Add(-1 * time.Hour) sub.HistorySharedSince = &historyFrom } subscriptions = append(subscriptions, sub) }Also applies to: 72-86
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/inbox_integration_test.go` around lines 40 - 58, buildMemberEventPayload currently ignores the actual historyShared timestamp and only sets Restricted based on its nil-ness; update it to pass the concrete timestamp into buildBulkMemberEventPayload so that Subscription.HistorySharedSince is populated. Specifically, when calling buildBulkMemberEventPayload from buildMemberEventPayload, propagate historyShared (the *time.Time) into the created memberFixture / Subscription data rather than converting it to a boolean; ensure buildBulkMemberEventPayload and any code that constructs Subscription.HistorySharedSince (or reads memberFixture.Restricted) use the timestamp value to set Subscription.HistorySharedSince when non-nil.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/stream/stream_test.go`:
- Around line 28-33: Replace uses of t.Errorf and t.Fatalf in stream_test.go
with testify assertions: add imports for "github.com/stretchr/testify/assert"
and "github.com/stretchr/testify/require", then convert non-fatal checks like
the Name and Subjects assertions to assert calls (e.g., assert.Equal(t,
tt.wantName, tt.cfg.Name) and assert.Len(t, tt.cfg.Subjects, 1); assert.Equal(t,
tt.wantSubj, tt.cfg.Subjects[0])) and convert checks that should stop the test
on failure to require calls (e.g., require.NoError/require.Equal where
appropriate) using the same tt.* symbols (tt.cfg.Name, tt.wantName,
tt.cfg.Subjects, tt.wantSubj) to locate each assertion to update.
In `@search-sync-worker/handler.go`:
- Around line 167-176: The isBulkItemSuccess function currently treats any 404
for delete/update as success; change it to inspect the bulk item error payload
(e.g., result.Error.Type or result.Error.Reason on searchengine.BulkResult) and
only treat 404 as idempotent success when the error type matches the benign
missing-document case (e.g., "document_missing_exception" or the equivalent used
in tests), otherwise return false so index/template-missing errors like
"index_not_found_exception" are not acked; ensure you handle nil/absent Error
safely and keep the existing 2xx and 409 logic in isBulkItemSuccess.
---
Outside diff comments:
In `@search-sync-worker/main.go`:
- Around line 80-91: After parsing/defaulting the config from
env.ParseAs[config], validate that numeric settings used by runConsumer
(FETCH_BATCH_SIZE, BULK_BATCH_SIZE, FLUSH_INTERVAL) are positive; if any are <=
0 log a clear error via slog.Error (including the setting name and value) and
exit with a non-zero code (os.Exit(1)). Update the startup path immediately
after the current cfg defaults (after setting SpotlightIndex and UserRoomIndex)
to check cfg.FetchBatchSize, cfg.BulkBatchSize, and cfg.FlushInterval (or the
exact field names in config) and fail fast to prevent Fetch(0), infinite
remaining loop, or constant flush checks in runConsumer.
---
Nitpick comments:
In `@search-sync-worker/inbox_integration_test.go`:
- Around line 40-58: buildMemberEventPayload currently ignores the actual
historyShared timestamp and only sets Restricted based on its nil-ness; update
it to pass the concrete timestamp into buildBulkMemberEventPayload so that
Subscription.HistorySharedSince is populated. Specifically, when calling
buildBulkMemberEventPayload from buildMemberEventPayload, propagate
historyShared (the *time.Time) into the created memberFixture / Subscription
data rather than converting it to a boolean; ensure buildBulkMemberEventPayload
and any code that constructs Subscription.HistorySharedSince (or reads
memberFixture.Restricted) use the timestamp value to set
Subscription.HistorySharedSince when non-nil.
In `@search-sync-worker/template.go`:
- Around line 25-33: The code currently derives JSON field name into variable
name and unconditionally adds a mapping entry; change it to skip fields that
don't expose a concrete JSON name by checking the parsed name from jsonTag and
returning without adding a prop when name == "" or name == "-". Update the block
around jsonTag/name (the variables jsonTag and name) so that after computing
name, you do a guard (if name == "" || name == "-" { continue } or return
depending on context) before computing esType/analyzer and assigning
props[name], ensuring no mapping is created for anonymous/ignored json fields.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: ff8d4979-df8b-43ba-a2d0-bb2d06547287
📒 Files selected for processing (24)
pkg/model/event.gopkg/model/model_test.gopkg/searchengine/adapter.gopkg/searchengine/adapter_test.gopkg/searchengine/searchengine.gopkg/stream/stream.gopkg/stream/stream_test.gopkg/subject/subject.gopkg/subject/subject_test.gosearch-sync-worker/collection.gosearch-sync-worker/handler.gosearch-sync-worker/handler_test.gosearch-sync-worker/inbox_integration_test.gosearch-sync-worker/inbox_stream.gosearch-sync-worker/inbox_stream_test.gosearch-sync-worker/integration_test.gosearch-sync-worker/main.gosearch-sync-worker/messages.gosearch-sync-worker/messages_test.gosearch-sync-worker/spotlight.gosearch-sync-worker/spotlight_test.gosearch-sync-worker/template.gosearch-sync-worker/user_room.gosearch-sync-worker/user_room_test.go
✅ Files skipped from review due to trivial changes (3)
- pkg/model/model_test.go
- pkg/searchengine/searchengine.go
- search-sync-worker/spotlight_test.go
🚧 Files skipped from review as they are similar to previous changes (11)
- search-sync-worker/messages_test.go
- pkg/searchengine/adapter.go
- pkg/stream/stream.go
- search-sync-worker/integration_test.go
- search-sync-worker/inbox_stream_test.go
- pkg/model/event.go
- pkg/subject/subject_test.go
- search-sync-worker/inbox_stream.go
- pkg/searchengine/adapter_test.go
- search-sync-worker/messages.go
- search-sync-worker/user_room.go
Response to CodeRabbit's review on
|
Follow-up:
|
3026f46 to
61a3cf9
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
pkg/subject/subject_test.go (1)
74-90: Useassert/requirefor the new subject-slice checks.This subtest is doing manual length and element comparisons even though the repo standardizes on Testify in
_test.gofiles.require.Equal(t, want, got)would make the intent clearer and keep the test style consistent.As per coding guidelines, "
**/*_test.go: Use standard testing package with github.com/stretchr/testify/assert and testify/require for assertions`."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/subject/subject_test.go` around lines 74 - 90, Replace the manual length and element-by-element comparisons in the subtest named "InboxMemberEventSubjects" with a single require.Equal assertion; call subject.InboxMemberEventSubjects("site-a") into got, build want as before, then use require.Equal(t, want, got). Add the import for "github.com/stretchr/testify/require" to the test file and remove the manual len check and for-loop that compares elements.search-sync-worker/handler_test.go (1)
304-407: Collapse the 404 permutations into a table-driven test.These subtests all exercise the same handler flow with different
{action, status, errorType, wantAck}inputs. A table would remove a lot of duplication and make it easier to add more ES error classifications without copying another full setup block.As per coding guidelines, "
**/handler_test.go: For handler tests: test each NATS/HTTP handler method with table-driven tests covering all documented scenarios`."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/handler_test.go` around lines 304 - 407, Collapse the repeated subtests in TestHandler_Flush_404OnDeleteAndUpdate into a single table-driven loop: define a slice of test cases containing name, collection factory (e.g. newStubDeleteCollection, newStubUpdateCollection, newStubIndexCollection), the mocked BulkResult (Status, ErrorType, Error) and expected ack/nack booleans; then for each case call t.Run(case.name, func(t *testing.T){ create a gomock.Controller, NewMockStore, set the Bulk expectation (gomock.Any(), gomock.Len(1)) to return the case's BulkResult, build the handler via NewHandler(store, coll, 500), create stubMsg, h.Add(msg), h.Flush(ctx) and assert msg.acked/msg.nacked match expected }); keep existing expectations (Bulk call and return values), and reuse existing helpers (newStubDeleteCollection, newStubUpdateCollection, newStubIndexCollection, NewHandler, stubMsg) to ensure behavior is identical while removing duplication.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@search-sync-worker/inbox_stream.go`:
- Around line 37-45: The code is building NATS subject strings inline
(destPattern and sourcePattern) — instead, create and use subject
builder/pattern helpers in pkg/subject (e.g., functions like
InboxAggregatePattern(siteID string) and OutboxToPattern(srcSiteID, destSiteID
string)) and replace the fmt.Sprintf usages in inbox_stream.go: build
destPattern with pkg/subject.InboxAggregatePattern(siteID) and build
sourcePattern with pkg/subject.OutboxToPattern(remote, siteID); then pass those
returned patterns into the jetstream.StreamSource SubjectTransforms (leaving
Name as OUTBOX_{remote} and the SubjectTransformConfig usage unchanged) so all
canonical subject definitions live in pkg/subject.
- Around line 83-95: In parseMemberEvent, validate evt.Type after unmarshalling
the OutboxEvent and fail closed if it is not one of the supported values
("member_added" or "member_removed"); update parseMemberEvent (which returns
*model.OutboxEvent, *model.MemberAddedPayload) to check evt.Type and return a
descriptive error (e.g., "unsupported event type: %s") when the type is
unexpected so mispublished INBOX messages cannot be processed further.
---
Nitpick comments:
In `@pkg/subject/subject_test.go`:
- Around line 74-90: Replace the manual length and element-by-element
comparisons in the subtest named "InboxMemberEventSubjects" with a single
require.Equal assertion; call subject.InboxMemberEventSubjects("site-a") into
got, build want as before, then use require.Equal(t, want, got). Add the import
for "github.com/stretchr/testify/require" to the test file and remove the manual
len check and for-loop that compares elements.
In `@search-sync-worker/handler_test.go`:
- Around line 304-407: Collapse the repeated subtests in
TestHandler_Flush_404OnDeleteAndUpdate into a single table-driven loop: define a
slice of test cases containing name, collection factory (e.g.
newStubDeleteCollection, newStubUpdateCollection, newStubIndexCollection), the
mocked BulkResult (Status, ErrorType, Error) and expected ack/nack booleans;
then for each case call t.Run(case.name, func(t *testing.T){ create a
gomock.Controller, NewMockStore, set the Bulk expectation (gomock.Any(),
gomock.Len(1)) to return the case's BulkResult, build the handler via
NewHandler(store, coll, 500), create stubMsg, h.Add(msg), h.Flush(ctx) and
assert msg.acked/msg.nacked match expected }); keep existing expectations (Bulk
call and return values), and reuse existing helpers (newStubDeleteCollection,
newStubUpdateCollection, newStubIndexCollection, NewHandler, stubMsg) to ensure
behavior is identical while removing duplication.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2e212552-96a7-4b90-8bcb-75f0d783333b
📒 Files selected for processing (26)
pkg/model/event.gopkg/model/model_test.gopkg/natsutil/ack.gopkg/natsutil/ack_test.gopkg/searchengine/adapter.gopkg/searchengine/adapter_test.gopkg/searchengine/searchengine.gopkg/stream/stream.gopkg/stream/stream_test.gopkg/subject/subject.gopkg/subject/subject_test.gosearch-sync-worker/collection.gosearch-sync-worker/handler.gosearch-sync-worker/handler_test.gosearch-sync-worker/inbox_integration_test.gosearch-sync-worker/inbox_stream.gosearch-sync-worker/inbox_stream_test.gosearch-sync-worker/integration_test.gosearch-sync-worker/main.gosearch-sync-worker/messages.gosearch-sync-worker/messages_test.gosearch-sync-worker/spotlight.gosearch-sync-worker/spotlight_test.gosearch-sync-worker/template.gosearch-sync-worker/user_room.gosearch-sync-worker/user_room_test.go
✅ Files skipped from review due to trivial changes (6)
- pkg/model/model_test.go
- pkg/searchengine/adapter_test.go
- pkg/natsutil/ack_test.go
- pkg/natsutil/ack.go
- search-sync-worker/spotlight_test.go
- search-sync-worker/user_room.go
🚧 Files skipped from review as they are similar to previous changes (8)
- search-sync-worker/messages_test.go
- pkg/stream/stream.go
- search-sync-worker/integration_test.go
- pkg/searchengine/searchengine.go
- pkg/stream/stream_test.go
- search-sync-worker/collection.go
- search-sync-worker/template.go
- search-sync-worker/main.go
| destPattern := fmt.Sprintf("chat.inbox.%s.aggregate.>", siteID) | ||
| sources := make([]*jetstream.StreamSource, 0, len(remoteSiteIDs)) | ||
| for _, remote := range remoteSiteIDs { | ||
| sourcePattern := fmt.Sprintf("outbox.%s.to.%s.>", remote, siteID) | ||
| sources = append(sources, &jetstream.StreamSource{ | ||
| Name: fmt.Sprintf("OUTBOX_%s", remote), | ||
| SubjectTransforms: []jetstream.SubjectTransformConfig{ | ||
| {Source: sourcePattern, Destination: destPattern}, | ||
| }, |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Move these NATS subject patterns into pkg/subject.
This helper reintroduces raw subject formatting for both the sourced OUTBOX pattern and the rewritten INBOX aggregate pattern. Please add builders/pattern helpers in pkg/subject and reuse them here so the canonical subject definitions stay in one place.
As per coding guidelines, "Use dot-delimited hierarchical NATS subjects — use pkg/subject builders, never raw fmt.Sprintf" and "pkg/subject/*.go: Outbox subjects: outbox.{siteID}.to.{destSiteID}.{eventType}; define subject patterns in pkg/subject`."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@search-sync-worker/inbox_stream.go` around lines 37 - 45, The code is
building NATS subject strings inline (destPattern and sourcePattern) — instead,
create and use subject builder/pattern helpers in pkg/subject (e.g., functions
like InboxAggregatePattern(siteID string) and OutboxToPattern(srcSiteID,
destSiteID string)) and replace the fmt.Sprintf usages in inbox_stream.go: build
destPattern with pkg/subject.InboxAggregatePattern(siteID) and build
sourcePattern with pkg/subject.OutboxToPattern(remote, siteID); then pass those
returned patterns into the jetstream.StreamSource SubjectTransforms (leaving
Name as OUTBOX_{remote} and the SubjectTransformConfig usage unchanged) so all
canonical subject definitions live in pkg/subject.
| func parseMemberEvent(data []byte) (*model.OutboxEvent, *model.MemberAddedPayload, error) { | ||
| var evt model.OutboxEvent | ||
| if err := json.Unmarshal(data, &evt); err != nil { | ||
| return nil, nil, fmt.Errorf("unmarshal outbox event: %w", err) | ||
| } | ||
| if evt.Timestamp <= 0 { | ||
| return nil, nil, fmt.Errorf("parse member event: missing timestamp") | ||
| } | ||
| var payload model.MemberAddedPayload | ||
| if err := json.Unmarshal(evt.Payload, &payload); err != nil { | ||
| return nil, nil, fmt.Errorf("unmarshal member added payload: %w", err) | ||
| } | ||
| return &evt, &payload, nil |
There was a problem hiding this comment.
Reject unsupported OutboxEvent.Type values here.
parseMemberEvent is the shared decode boundary for inbox-member collections, but it currently accepts any evt.Type as long as the payload shape parses. Fail closed unless the type is member_added or member_removed, otherwise a mispublished INBOX message can reach the wrong indexing path.
Proposed fix
func parseMemberEvent(data []byte) (*model.OutboxEvent, *model.MemberAddedPayload, error) {
var evt model.OutboxEvent
if err := json.Unmarshal(data, &evt); err != nil {
return nil, nil, fmt.Errorf("unmarshal outbox event: %w", err)
}
if evt.Timestamp <= 0 {
return nil, nil, fmt.Errorf("parse member event: missing timestamp")
}
+ if evt.Type != model.OutboxMemberAdded && evt.Type != model.OutboxMemberRemoved {
+ return nil, nil, fmt.Errorf("parse member event: unsupported type %q", evt.Type)
+ }
var payload model.MemberAddedPayload
if err := json.Unmarshal(evt.Payload, &payload); err != nil {
- return nil, nil, fmt.Errorf("unmarshal member added payload: %w", err)
+ return nil, nil, fmt.Errorf("unmarshal member event payload: %w", err)
}
return &evt, &payload, nil
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@search-sync-worker/inbox_stream.go` around lines 83 - 95, In
parseMemberEvent, validate evt.Type after unmarshalling the OutboxEvent and fail
closed if it is not one of the supported values ("member_added" or
"member_removed"); update parseMemberEvent (which returns *model.OutboxEvent,
*model.MemberAddedPayload) to check evt.Type and return a descriptive error
(e.g., "unsupported event type: %s") when the type is unexpected so mispublished
INBOX messages cannot be processed further.
61a3cf9 to
7cd6729
Compare
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
inbox-worker/handler.go (1)
113-145: Re-publish failures are silently absorbed.Both canonical re-publishes log on failure but let
handleMemberAdded/handleMemberRemovedreturnnil, so JetStream will Ack and the search-index update is lost permanently. This matches the pre-existing best-effort publish style in the service, so not a blocker — but for these specific re-publishes the store-side work is idempotent (BulkCreateSubscriptionsswallows duplicate keys;DeleteSubscriptionsByAccountsis a no-op on empty set), so Nak'ing on publish failure would be safe and would give you at-least-once delivery to the canonical stream. Consider either returning the error or at minimum adding a metric so drift is visible.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@inbox-worker/handler.go` around lines 113 - 145, The re-publish failures in handleMemberRemoved (and the analogous handleMemberAdded) currently only log errors and return nil, which causes the message to be Acked and drops the canonical update; change the behavior so that if h.pub.Publish(ctx, subject.RoomCanonicalMemberRemoved(h.siteID), evt.Payload) (and the RoomCanonicalMemberAdded call) returns an error you propagate that error (return it) instead of swallowing it, leveraging the idempotence of DeleteSubscriptionsByAccounts and BulkCreateSubscriptions to safely Nak/retry; alternatively, if you prefer not to change delivery semantics, increment a visible metric on publish failure so drift is detectable.inbox-worker/handler_test.go (1)
749-752: Optional: tighten canonical subject assertions.
assert.Contains(..., "chat.room.canonical")will also pass if the handler published the wrong canonical variant (e.g.,member_addedinstead ofmember_removed). Consider comparing against the exact builder output so a bug inpkg/subject.RoomCanonicalMemberAdded/Removedor a swap inhandler.gois caught:Proposed tighter assertion
- require.Len(t, records, 1) - assert.Contains(t, records[0].subject, "chat.room.canonical") + require.Len(t, records, 1) + assert.Equal(t, subject.RoomCanonicalMemberRemoved("site-test"), records[0].subject)Also applies to: 803-806
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@inbox-worker/handler_test.go` around lines 749 - 752, The test currently uses assert.Contains on pub.getRecords()[0].subject which can miss incorrect canonical variants; update the assertions to compare the subject exactly to the expected constructed subject (use the subject builder functions like pkg/subject.RoomCanonicalMemberAdded and pkg/subject.RoomCanonicalMemberRemoved or the exact builder output used by the handler) instead of using Contains so the test fails if the wrong variant is published (apply the same change to the other occurrence around lines 803-806).room-worker/handler.go (1)
310-312: Inconsistent siteID source for canonical member-event subjects.The add paths key the canonical subject on
room.SiteID(line 127 invite, line 673 add), while both remove paths key it onh.siteID:// add paths subject.RoomCanonicalMemberAdded(room.SiteID) // remove paths (here and processRemoveOrg) subject.RoomCanonicalMemberRemoved(h.siteID)In room-worker these are equivalent today because room-worker only handles rooms whose
SiteID == h.siteID, but the asymmetry is surprising and will silently break if that invariant ever loosens (e.g., a multi-site worker). Prefer a single convention —room.SiteIDis the more semantically correct key since the canonical subject is "where the room lives":Proposed alignment (remove paths)
In
processRemoveIndividualyou'd need to load the room first (or thread it through), so the simpler fix is to keeph.siteIDeverywhere and switch the add paths to match. Either direction is fine — the important thing is consistency.- if err := h.publish(ctx, subject.RoomCanonicalMemberAdded(room.SiteID), memberAddData); err != nil { + if err := h.publish(ctx, subject.RoomCanonicalMemberAdded(h.siteID), memberAddData); err != nil { slog.Error("room canonical member_added publish failed", "error", err, "roomID", req.RoomID) }Also applies to: 433-435
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@room-worker/handler.go` around lines 310 - 312, The canonical member-event subjects use inconsistent site IDs: some places call subject.RoomCanonicalMemberAdded(room.SiteID) while others call subject.RoomCanonicalMemberRemoved(h.siteID); normalize to use room.SiteID everywhere (i.e., replace uses of h.siteID for canonical room-member subjects with the room's SiteID) so the subject key is always "where the room lives"; update the remove paths (e.g., in processRemoveIndividual/processRemoveOrg and the publish call using subject.RoomCanonicalMemberRemoved) to obtain/load the Room and use room.SiteID when constructing the subject, ensuring all subject.RoomCanonicalMemberAdded/Removed calls are consistent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@search-sync-worker/handler.go`:
- Around line 119-120: The current handler treats any 409 result.Status as
success unconditionally, which hides ES optimistic-concurrency conflicts for
ActionUpdate; modify the logic to only ACK 409 for actions that rely on external
versioning (e.g., ActionIndex and ActionDelete when BulkAction.Version is set)
and do NOT treat 409 as success for ActionUpdate. Locate the code checking
result.Status (and the surrounding bulk-action handling that references
BulkAction.Version and the action type like
ActionUpdate/ActionIndex/ActionDelete) and change the branch so 409 returns true
only for index/delete versioned actions, while for ActionUpdate it returns false
(or schedules a retry/NAK) so conflicts are surfaced and retries occur.
In `@search-sync-worker/room_member.go`:
- Around line 57-75: The parser currently only checks len(evt.Accounts)>0 but
must also validate evt.RoomID and each account string is non-empty; update the
member_added (model.MemberAddEvent) and member_removed/member_left
(model.MemberRemoveEvent) branches where evt is used and memberEvent is returned
to: validate evt.RoomID != "" and iterate evt.Accounts to ensure no account ==
""; if any identifier is missing return a descriptive parse error (e.g., "parse
member_added event: missing room_id" or "parse member_removed event: empty
account id") and do not log or include account values in the error message; make
the same checks for both Add and Remove flows before returning
&memberEvent{...}.
In `@search-sync-worker/spotlight.go`:
- Around line 114-118: The Elasticsearch index template defines a
"custom_tokenizer" with type "whitespace" but incorrectly includes a
"token_chars" setting; remove the "token_chars" entry from the
"custom_tokenizer" map in spotlight.go (i.e., inside the tokenizer config where
"custom_tokenizer" is defined) so only supported options (like "type" and
optionally "max_token_length") remain, ensuring the template is valid for the
whitespace tokenizer.
---
Nitpick comments:
In `@inbox-worker/handler_test.go`:
- Around line 749-752: The test currently uses assert.Contains on
pub.getRecords()[0].subject which can miss incorrect canonical variants; update
the assertions to compare the subject exactly to the expected constructed
subject (use the subject builder functions like
pkg/subject.RoomCanonicalMemberAdded and pkg/subject.RoomCanonicalMemberRemoved
or the exact builder output used by the handler) instead of using Contains so
the test fails if the wrong variant is published (apply the same change to the
other occurrence around lines 803-806).
In `@inbox-worker/handler.go`:
- Around line 113-145: The re-publish failures in handleMemberRemoved (and the
analogous handleMemberAdded) currently only log errors and return nil, which
causes the message to be Acked and drops the canonical update; change the
behavior so that if h.pub.Publish(ctx,
subject.RoomCanonicalMemberRemoved(h.siteID), evt.Payload) (and the
RoomCanonicalMemberAdded call) returns an error you propagate that error (return
it) instead of swallowing it, leveraging the idempotence of
DeleteSubscriptionsByAccounts and BulkCreateSubscriptions to safely Nak/retry;
alternatively, if you prefer not to change delivery semantics, increment a
visible metric on publish failure so drift is detectable.
In `@room-worker/handler.go`:
- Around line 310-312: The canonical member-event subjects use inconsistent site
IDs: some places call subject.RoomCanonicalMemberAdded(room.SiteID) while others
call subject.RoomCanonicalMemberRemoved(h.siteID); normalize to use room.SiteID
everywhere (i.e., replace uses of h.siteID for canonical room-member subjects
with the room's SiteID) so the subject key is always "where the room lives";
update the remove paths (e.g., in processRemoveIndividual/processRemoveOrg and
the publish call using subject.RoomCanonicalMemberRemoved) to obtain/load the
Room and use room.SiteID when constructing the subject, ensuring all
subject.RoomCanonicalMemberAdded/Removed calls are consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: a1cc1feb-84d4-434b-bc55-f57d97912080
📒 Files selected for processing (22)
inbox-worker/handler.goinbox-worker/handler_test.goinbox-worker/integration_test.goinbox-worker/main.gopkg/model/event.gopkg/natsutil/ack.gopkg/natsutil/ack_test.gopkg/searchengine/adapter.gopkg/searchengine/searchengine.gopkg/subject/subject.goroom-worker/handler.goroom-worker/handler_test.gosearch-sync-worker/collection.gosearch-sync-worker/handler.gosearch-sync-worker/handler_test.gosearch-sync-worker/main.gosearch-sync-worker/messages.gosearch-sync-worker/messages_test.gosearch-sync-worker/room_member.gosearch-sync-worker/spotlight.gosearch-sync-worker/template.gosearch-sync-worker/user_room.go
✅ Files skipped from review due to trivial changes (4)
- inbox-worker/integration_test.go
- room-worker/handler_test.go
- pkg/natsutil/ack_test.go
- search-sync-worker/handler_test.go
🚧 Files skipped from review as they are similar to previous changes (9)
- pkg/searchengine/adapter.go
- search-sync-worker/messages_test.go
- pkg/subject/subject.go
- search-sync-worker/collection.go
- pkg/natsutil/ack.go
- pkg/model/event.go
- search-sync-worker/messages.go
- search-sync-worker/main.go
- search-sync-worker/user_room.go
| if result.Status == 409 { | ||
| return true |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Elasticsearch bulk update retry_on_conflict version_conflict_engine_exception official docs
💡 Result:
In Elasticsearch bulk API, use retry_on_conflict in the update action metadata to handle version_conflict_engine_exception during concurrent updates. Example from official docs: { "update" : { "_index" : "index1", "_id" : "1", "retry_on_conflict" : 3 } } { "doc" : { "field" : "value" } } This retries the update up to 3 times on version conflicts. The version_conflict_engine_exception occurs due to optimistic concurrency control when concurrent modifications change the document version/seq_no. Official bulk docs confirm retry_on_conflict is specific to update actions in bulk requests, unlike single update API where it's a query param. For upsert in bulk: { "update" : { "_id" : "1", "retry_on_conflict" : 3 } } { "doc" : { "field" : "value" }, "upsert" : { "counter" : 1 } } Bulk failures are per-item; one conflict doesn't fail the entire request.
Citations:
- 1: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html
- 2: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
- 3: https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-bulk
🏁 Script executed:
# First, let's see the full context of lines 119-120 in handler.go
cat -n search-sync-worker/handler.go | sed -n '100,140p'Repository: hmchangw/chat
Length of output: 1197
🏁 Script executed:
# Check the searchengine package to see how ActionUpdate is built
find . -name "searchengine*" -type f | head -20Repository: hmchangw/chat
Length of output: 91
🏁 Script executed:
# Search for version-related fields in the searchengine/handler code
rg "version_type|retry_on_conflict|Version" search-sync-worker/ pkg/searchengine/ -A 2 -B 2Repository: hmchangw/chat
Length of output: 4976
🏁 Script executed:
# Check user_room.go to see what actions it builds
cat -n search-sync-worker/user_room.go | head -100Repository: hmchangw/chat
Length of output: 3612
🏁 Script executed:
# Search for where ActionUpdate is used and what body is sent
rg "ActionUpdate" search-sync-worker/ -B 3 -A 3Repository: hmchangw/chat
Length of output: 1303
Don't ACK 409 conflicts on ActionUpdate as success.
The BulkAction.Version field is explicitly ignored for ActionUpdate operations. A 409 response indicates an internal Elasticsearch optimistic concurrency conflict (seq_no mismatch), not a stale external version event. Treating it as success silently drops room-array updates when concurrent modifications occur, breaking user-room sync consistency.
Keep 409-as-success for versioned index and delete actions, but either NAK/retry updates or filter them from the success path:
Proposed fix
if result.Status == 409 {
- return true
+ return action != searchengine.ActionUpdate
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@search-sync-worker/handler.go` around lines 119 - 120, The current handler
treats any 409 result.Status as success unconditionally, which hides ES
optimistic-concurrency conflicts for ActionUpdate; modify the logic to only ACK
409 for actions that rely on external versioning (e.g., ActionIndex and
ActionDelete when BulkAction.Version is set) and do NOT treat 409 as success for
ActionUpdate. Locate the code checking result.Status (and the surrounding
bulk-action handling that references BulkAction.Version and the action type like
ActionUpdate/ActionIndex/ActionDelete) and change the branch so 409 returns true
only for index/delete versioned actions, while for ActionUpdate it returns false
(or schedules a retry/NAK) so conflicts are surfaced and retries occur.
| if evt.Timestamp <= 0 { | ||
| return nil, fmt.Errorf("parse member_added event: missing timestamp") | ||
| } | ||
| if len(evt.Accounts) == 0 { | ||
| return nil, fmt.Errorf("parse member_added event: empty accounts") | ||
| } | ||
| return &memberEvent{Add: &evt}, nil | ||
|
|
||
| case "member_removed", "member_left": | ||
| var evt model.MemberRemoveEvent | ||
| if err := json.Unmarshal(data, &evt); err != nil { | ||
| return nil, fmt.Errorf("unmarshal member_removed event: %w", err) | ||
| } | ||
| if evt.Timestamp <= 0 { | ||
| return nil, fmt.Errorf("parse member_removed event: missing timestamp") | ||
| } | ||
| if len(evt.Accounts) == 0 { | ||
| return nil, fmt.Errorf("parse member_removed event: empty accounts") | ||
| } |
There was a problem hiding this comment.
Validate RoomID and each account before building index keys.
len(evt.Accounts) > 0 still allows [""], and missing RoomID currently flows into document IDs like account_ or empty room entries in downstream indexes. Since this parser is the NATS payload boundary for spotlight and user-room sync, reject malformed identifiers here without logging account values.
🛡️ Proposed validation
if evt.Timestamp <= 0 {
return nil, fmt.Errorf("parse member_added event: missing timestamp")
}
+ if evt.RoomID == "" {
+ return nil, fmt.Errorf("parse member_added event: missing roomID")
+ }
if len(evt.Accounts) == 0 {
return nil, fmt.Errorf("parse member_added event: empty accounts")
}
+ for i, account := range evt.Accounts {
+ if account == "" {
+ return nil, fmt.Errorf("parse member_added event: empty account at index %d", i)
+ }
+ }
return &memberEvent{Add: &evt}, nil
case "member_removed", "member_left":
@@
if evt.Timestamp <= 0 {
return nil, fmt.Errorf("parse member_removed event: missing timestamp")
}
+ if evt.RoomID == "" {
+ return nil, fmt.Errorf("parse member_removed event: missing roomID")
+ }
if len(evt.Accounts) == 0 {
return nil, fmt.Errorf("parse member_removed event: empty accounts")
}
+ for i, account := range evt.Accounts {
+ if account == "" {
+ return nil, fmt.Errorf("parse member_removed event: empty account at index %d", i)
+ }
+ }
return &memberEvent{Remove: &evt}, nilBased on learnings, validation should be reserved for system boundaries such as deserialization or other untrusted entry points.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@search-sync-worker/room_member.go` around lines 57 - 75, The parser currently
only checks len(evt.Accounts)>0 but must also validate evt.RoomID and each
account string is non-empty; update the member_added (model.MemberAddEvent) and
member_removed/member_left (model.MemberRemoveEvent) branches where evt is used
and memberEvent is returned to: validate evt.RoomID != "" and iterate
evt.Accounts to ensure no account == ""; if any identifier is missing return a
descriptive parse error (e.g., "parse member_added event: missing room_id" or
"parse member_removed event: empty account id") and do not log or include
account values in the error message; make the same checks for both Add and
Remove flows before returning &memberEvent{...}.
| "tokenizer": map[string]any{ | ||
| "custom_tokenizer": map[string]any{ | ||
| "type": "whitespace", | ||
| "token_chars": []string{"letter", "digit", "punctuation", "symbol"}, | ||
| }, |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Elasticsearch whitespace tokenizer token_chars configuration official docs
💡 Result:
The Elasticsearch whitespace tokenizer does not have a token_chars configuration parameter. Official documentation consistently states that it only accepts one parameter: max_token_length (default 255), which splits tokens exceeding that length. The token_chars parameter does not exist for the whitespace tokenizer. It appears to be confused with the edge_ngram tokenizer's token_chars (which specifies character classes like letter, digit to include in n-grams) or the char_group tokenizer's tokenize_on_chars (which specifies characters or groups like whitespace, punctuation to split on). Official sources: Whitespace tokenizer docs confirm only max_token_length [1,2,3,6,20,21,22]. Char_group uses tokenize_on_chars [11,12,13,14]. No mention of token_chars for whitespace across versions up to current (8.x+).
Citations:
- 1: https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-whitespace-tokenizer.html
- 2: https://www.elastic.co/guide/en/elasticsearch/reference/master/analysis-whitespace-tokenizer.html
- 3: https://www.elastic.co/docs/reference/text-analysis/analysis-whitespace-tokenizer
- 4: https://elastic.co/guide/en/elasticsearch/reference/current/analysis-whitespace-tokenizer.html
- 5: https://www.elastic.co/guide/en/elasticsearch/reference/8.18/analysis-chargroup-tokenizer.html
- 6: https://www.elastic.co/guide/en/elasticsearch/reference/8.17/analysis-chargroup-tokenizer.html
- 7: https://elastic.co/docs/reference/text-analysis/analysis-chargroup-tokenizer
- 8: https://www.elastic.co/guide/en/elasticsearch/reference/current/analysis-chargroup-tokenizer.html
- 9: https://www.elastic.co/guide/en/elasticsearch/reference/8.19/analysis-whitespace-tokenizer.html
- 10: https://www.elastic.co/guide/en/elasticsearch/reference/8.15/analysis-whitespace-tokenizer.html
- 11: https://elastic.co/docs/reference/text-analysis/analysis-whitespace-tokenizer
Remove token_chars from the whitespace tokenizer config.
token_chars is not a valid parameter for Elasticsearch's whitespace tokenizer. The whitespace tokenizer only supports max_token_length (default 255). The token_chars parameter belongs to tokenizers such as edge_ngram and is not applicable here. This invalid configuration will cause template installation to fail at startup.
🔧 Proposed fix
"tokenizer": map[string]any{
"custom_tokenizer": map[string]any{
- "type": "whitespace",
- "token_chars": []string{"letter", "digit", "punctuation", "symbol"},
+ "type": "whitespace",
},
},🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@search-sync-worker/spotlight.go` around lines 114 - 118, The Elasticsearch
index template defines a "custom_tokenizer" with type "whitespace" but
incorrectly includes a "token_chars" setting; remove the "token_chars" entry
from the "custom_tokenizer" map in spotlight.go (i.e., inside the tokenizer
config where "custom_tokenizer" is defined) so only supported options (like
"type" and optionally "max_token_length") remain, ensuring the template is valid
for the whitespace tokenizer.
Implement end-to-end room-member event pipeline for search indexing:
room-worker enriches + publishes to ROOMS stream, inbox-worker
re-publishes cross-site events to local ROOMS, and search-sync-worker
consumes from ROOMS to maintain spotlight (room typeahead) and
user-room (access control) Elasticsearch indexes.
Event enrichment (pkg/model, room-worker):
- MemberAddEvent gains RoomName + RoomType fields (already loaded in
scope at processAddMembers/processInvite time — zero extra queries)
- room-worker publishes enriched events to RoomCanonical subjects
(chat.room.canonical.{site}.member_added/removed) which land in the
existing ROOMS_{siteID} stream. Published alongside existing
chat.room.{roomID}.event.member for backward compat with other
consumers.
- processAddMembers, processRemoveIndividual, processRemoveOrg, and
processInvite all publish to ROOMS stream.
Cross-site relay (inbox-worker):
- After handling member_added (BulkCreateSubscriptions) and
member_removed (DeleteSubscriptionsByAccounts), inbox-worker
re-publishes the event to the local ROOMS stream so search-sync-worker
on the remote site picks it up. Handler gains a siteID field.
search-sync-worker:
- Collection interface: BuildAction returns []BulkAction (fan-out),
StreamConfig returns jetstream.StreamConfig, new FilterSubjects.
- Handler: pendingMsg tracks per-message action ranges, ActionCount()
drives flush decisions, isBulkItemSuccess with ErrorType-aware 404
handling, natsutil.Ack/Nak helpers.
- roomMemberCollection base: StreamConfig from stream.Rooms, filter
subjects from subject.RoomCanonicalMemberEventSubjects.
- parseMemberEvent: tagged-union parser for MemberAddEvent /
MemberRemoveEvent (supports member_added, member_removed, member_left).
- spotlightCollection: doc key = account_roomID (composite), indexes
userAccount/roomId/roomName/roomType/siteId/joinedAt. External
versioning via evt.Timestamp. Restricted rooms
(HistorySharedSince > 0) skip entire event.
- userRoomCollection: per-user rooms array with LWW timestamp guard
in painless scripts. roomTimestamps flattened map prevents stale
out-of-order events from corrupting state. Multi-pod safe via ES
primary-shard atomicity + the guard. Timestamp source is
evt.Timestamp (not JoinedAt).
- main.go: multi-collection loop with per-collection stream/consumer
wiring. FetchBatchSize/BulkBatchSize/BulkFlushInterval config split.
bootstrapConfig for dev-only stream creation. Fan-out-safe
runConsumer with mid-batch flush.
- esPropertiesFromStruct[T] generic for template mapping reflection.
pkg/searchengine:
- ActionUpdate type + bulk adapter (no external versioning on _update).
- BulkResult.ErrorType for distinguishing document_missing_exception
from index_not_found_exception on 404.
pkg/natsutil:
- Ack/Nak helpers with Acker/Naker minimal interfaces.
https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
7cd6729 to
0fc366a
Compare
|
Closing in favor of #109. This PR's most recent commits drifted to a ROOMS-stream architecture that duplicates the existing OUTBOX/INBOX federation pipeline (same Keeping this PR's discussion as historical context. See Generated by Claude Code |
Summary
Adds two
Collectionimplementations tosearch-sync-workerthat consumemember_added/member_removedevents from theINBOXstream and maintain the spotlight (room typeahead) and user-room (message-search access control) Elasticsearch indexes. Replaces the old Monstache-based CDC sync for these two indexes with the existing OUTBOX/INBOX federation pipeline.Index naming (overridable via env):
spotlight-{site}-v1-chat— one doc per subscription, search via roomName typeaheaduser-room-{site}— one doc per user, holding arooms[]array used as atermsfilter on message searchWhat's in this PR
New collections
spotlightCollection(search-sync-worker/spotlight.go)Subscription.IDmember_added→ActionIndexwithVersion = evt.Timestamp(external versioning makes out-of-order delivery safe)member_removed→ActionDeletewithVersion = evt.Timestampspotlight-*withsearch_as_you_typeonroomNamevia a whitespace/lowercase custom analyzeruserRoomCollection(search-sync-worker/user_room.go) — multi-pod safemember_added→ActionUpdatewith painless script + upsertmember_removed→ActionUpdatewith painless script (no upsert)Subscription.HistorySharedSince != nil) → skipped; the search service handles those via DB+cache at query timeflattenedroomTimestampsmap. Both scripts read the stored timestamp, compare toparams.ts, and short-circuit viactx.op = 'none'on stale events. ES primary-shard atomicity + this guard make user-room-sync safe to run with multiple pods sharing the durable consumer.OutboxEvent.Timestamp(publish time), NOTSubscription.JoinedAt—JoinedAtis immutable on the subscription row so add/remove for the same sub would carry the same value and confuse the guard.user-room-*mapsroomsastext+keyword(existing query behavior preserved) androomTimestampsasflattenedto avoid mapping explosion as roomIds accumulate.Collection interface changes
BuildActionnow returns[]searchengine.BulkActionso a single JetStream message can fan out to zero, one, or multiple ES actions. Handler tracks per-message action ranges and acks/nakks each source message as a unit.FilterSubjects(siteID)method so inbox-based collections can subscribe to both local (chat.inbox.{site}.member_*) and federated (chat.inbox.{site}.aggregate.member_*) variants via NATS 2.10+ consumer FilterSubjects.StreamConfigreturnsjetstream.StreamConfigdirectly, with the canonical name + subjects sourced frompkg/stream.*so collections never redefine stream names locally.Shared bits
inboxMemberCollectionbase struct centralizesStreamConfig+FilterSubjectsfor spotlight and user-room (zero per-instance state).parseMemberEventhelper decodesOutboxEvent+MemberAddedPayloadand validates preconditions shared by both inbox-member collections.esPropertiesFromStruct[T any]generic consolidates template-mapping reflection — used by both messages and spotlight.pkg/searchengineActionUpdatetype. Bulk adapter emits a plainupdatemeta withoutversion/version_typebecause_updateis read-modify-write and ES rejects external versioning on it (true for both doc-merge and scripted updates — not specific to painless).ActionIndex/ActionDeletestill use external versioning for spotlight + messages idempotency.pkg/streamInbox(siteID)now returns the full canonicalConfig—Name = INBOX_{siteID}and two non-overlapping subject patterns:chat.inbox.{site}.*(local direct publishes) andchat.inbox.{site}.aggregate.>(federated events sourced from remote OUTBOX streams via SubjectTransform). Centralizes every stream name + subject pattern so any consumer can read off what they're binding to and what the schema is..Namefrom the returned Config, so adding.Subjectsdoesn't affect its current behavior.pkg/modelMemberAddedPayload{Subscription, Room}— the payload shape carried byOutboxEvent{Type: "member_added"}so inbox-member consumers can index without a DB lookup.OutboxMemberAdded/OutboxMemberRemovedconstants replace stringly-typed literals throughout the new code.pkg/subjectInboxMemberAdded/InboxMemberRemovedbuilders for local-publish subjects.InboxMemberAddedAggregate/InboxMemberRemovedAggregatefor federated (transformed) subjects.InboxMemberEventSubjects(siteID)returns the four-subject list used by spotlight and user-room consumer filters.Bootstrap config (test-only, clearly grouped)
A nested
bootstrapConfigstruct groups the fields that are meaningful only in dev / integration tests. Env vars are all prefixedBOOTSTRAP_so they're easy to spot in deployment manifests:BOOTSTRAP_STREAMSCreateOrUpdateStreamat startup. Leavefalsein production.BOOTSTRAP_REMOTE_SITE_IDSBOOTSTRAP_STREAMS=true.In production, streams are owned by their publisher services (
message-gatekeeperforMESSAGES_CANONICAL,inbox-workerforINBOX) and search-sync-worker only manages its own durable consumers. Collections hold no remote-site state — the bootstrap loop inmain.godetects the INBOX stream by comparing againststream.Inbox(cfg.SiteID).Nameand swaps ininboxBootstrapStreamConfig(which layers on cross-site Sources + SubjectTransforms) before callingCreateOrUpdateStream. Stream creation is deduped by name so spotlight + user-room don't double-create the sharedINBOXstream.Consumer durable names
Per-purpose, no more generic
search-sync-worker:message-sync(wassearch-sync-worker)spotlight-syncuser-room-syncGraceful shutdown waits on all three
runConsumergoroutines via adoneChsslice.Tests
spotlight_test.go,user_room_test.go,inbox_stream_test.go, plus model + subject + searchengine round-trip and adapter coverage. ~410 lines of new unit tests.inbox_integration_test.go, ~540 lines, gated by//go:build integration):TestSpotlightSyncIntegration— local + federatedmember_added, federatedmember_removed, doc shape verificationTestUserRoomSyncIntegration— multi-room joins, federated upsert for new user, remove keepsroomTimestampsentry, restricted-room skip path,createdAt/updatedAtstampingTestUserRoomSync_LWWGuard— sequential subtests proving the per-room timestamp guard handles in-order and out-of-order deliveries (initial add → stale add no-op → stale remove no-op → newer remove evicts → re-add restores → another stale add no-op)Scope notes
inbox-workeris intentionally NOT modified here. The enhanced INBOX behavior (publishing + consumingaggregate.*events, migrating the handler to the newMemberAddedPayloadshape, owning stream creation in production) ships in a separate PR. Thepkg/stream.Inboxchange in this PR is additive —inbox-workerreads only.Nameand is unaffected.room-workeris intentionally NOT modified here. The publish-side migration (buildingMemberAddedPayload, routing by invitee's home site to local INBOX vs OUTBOX) is a separate PR coordinated withinbox-worker.Test plan
make lint— 0 issues ✅make test— all services green ✅go vet -tags=integration ./search-sync-worker/... ./pkg/...— clean ✅make test-integration SERVICE=search-sync-worker(requires Docker for testcontainers-go) — needs CI runBOOTSTRAP_STREAMS=true:member_addedroomsarray after a sequence of adds + removesKnown sharp edges (out of scope, follow-ups)
ActionDeleteon a non-existent doc returns 404, which the handler currently treats as failure → infinite nak/retry. Only triggerable by a multi-publisher race that doesn't exist in our topology (JetStream preserves per-subject order from a single publisher), but worth a 2-line handler fix in a follow-up to treat 404 onActionDeleteas success.user-room-syncwith multiple pods: safe via the LWW guard for member-event volume. Documented inuser_room.godoc comment. If volume ever exceeds the single-pod ceiling, the sharding strategy is also documented.🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Improvements