Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 30 minutes and 17 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughReworks thread-reply persistence to resolve/create thread rooms before saving replies, adds insert-based thread subscription semantics, persists threadRoomID and parent reply counts, expands Cassandra message schema (UDTs, payload/state columns, pinned table), adds MaxRedeliver config, and updates mocks/tests accordingly. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant JetStream
participant Handler
participant ThreadStore
participant CassandraStore
Client->>JetStream: publish thread reply
JetStream->>Handler: deliver message
Handler->>Handler: decode + validate
alt invalid payload or exceeded redeliver
Handler->>JetStream: Nak / TermWithReason
else valid
Handler->>ThreadStore: handleThreadRoomAndSubscriptions(parentID)
ThreadStore->>ThreadStore: lookup or create thread room → threadRoomID
ThreadStore-->>Handler: threadRoomID
Handler->>ThreadStore: ThreadSubscriptionExists(threadRoomID, userID)
ThreadStore-->>Handler: exists? (bool)
alt not exists
Handler->>ThreadStore: InsertThreadSubscription(sub)
ThreadStore-->>Handler: insert result
end
Handler->>CassandraStore: SaveThreadMessage(msg, threadRoomID)
CassandraStore->>CassandraStore: insert rows (messages_by_id/room/thread_messages_by_room)
CassandraStore->>CassandraStore: incrementParentTcount(msg) [CAS loop]
CassandraStore-->>Handler: success
Handler->>JetStream: Ack
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (3)
message-worker/integration_test.go (1)
70-82: Make the test table match the new production partition key.The PR objective says
thread_messages_by_roomis partitioned by(room_id, thread_room_id), but this fixture still declaresPRIMARY KEY ((room_id), thread_room_id, created_at, message_id). That means these integration tests are no longer validating the same data model or partitioning behavior as production.♻️ Proposed fix
- `CREATE TABLE IF NOT EXISTS chat_test.thread_messages_by_room ( - room_id TEXT, - thread_room_id TEXT, - created_at TIMESTAMP, - message_id TEXT, - thread_parent_id TEXT, - sender FROZEN<"Participant">, - msg TEXT, - site_id TEXT, - updated_at TIMESTAMP, - mentions SET<FROZEN<"Participant">>, - PRIMARY KEY ((room_id), thread_room_id, created_at, message_id) - ) WITH CLUSTERING ORDER BY (thread_room_id DESC, created_at DESC, message_id DESC)`, + `CREATE TABLE IF NOT EXISTS chat_test.thread_messages_by_room ( + room_id TEXT, + thread_room_id TEXT, + created_at TIMESTAMP, + message_id TEXT, + thread_parent_id TEXT, + sender FROZEN<"Participant">, + msg TEXT, + site_id TEXT, + updated_at TIMESTAMP, + mentions SET<FROZEN<"Participant">>, + PRIMARY KEY ((room_id, thread_room_id), created_at, message_id) + ) WITH CLUSTERING ORDER BY (created_at DESC, message_id DESC)`,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/integration_test.go` around lines 70 - 82, Update the test fixture so the table schema matches production partitioning: change the PRIMARY KEY declaration for thread_messages_by_room from PRIMARY KEY ((room_id), thread_room_id, created_at, message_id) to use a composite partition key PRIMARY KEY ((room_id, thread_room_id), created_at, message_id) and keep the existing WITH CLUSTERING ORDER BY clause; locate the CREATE TABLE statement for thread_messages_by_room in integration_test.go and adjust only the PRIMARY KEY tuple to (room_id, thread_room_id).message-worker/main.go (1)
32-32: Prefix the new env var with the service name.
MAX_REDELIVERis a message-worker-specific knob, so adding it unprefixed keeps extending the shared/global config surface. Please align it with the repo rule here and use a service-prefixed name such asMESSAGE_WORKER_MAX_REDELIVER.As per coding guidelines, "Config env var naming: SCREAMING_SNAKE_CASE, prefix with service name for service-specific vars; provide envDefault for non-critical config, never default secrets".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/main.go` at line 32, The MaxRedeliver config field is using a global env var name; change the struct tag for the MaxRedeliver field (symbol: MaxRedeliver in message-worker/main.go) from env:"MAX_REDELIVER" to env:"MESSAGE_WORKER_MAX_REDELIVER" (keep envDefault:"5"), and update any documentation or env examples that reference MAX_REDELIVER to use MESSAGE_WORKER_MAX_REDELIVER so the variable follows the service-prefixed SCREAMING_SNAKE_CASE convention.message-worker/deploy/docker-compose.yml (1)
104-129: Consider composite partition key for thread-specific query efficiency.The current design uses
(room_id)as partition key withthread_room_idas the first clustering column. This means all thread messages in a room reside in one partition, which works well for "list all threads in room" queries but requires filtering for "get messages in thread X" queries.If the dominant query pattern is fetching messages for a specific thread, a composite partition key
((room_id, thread_room_id), created_at, message_id)would provide direct partition access. However, if listing threads across a room is common, the current design is appropriate.This is a design consideration rather than a defect—verify against your actual query patterns.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/deploy/docker-compose.yml` around lines 104 - 129, The primary key for chat.thread_messages_by_room currently uses ((room_id), thread_room_id, created_at, message_id) which places all threads for a room in one partition and makes fetching a specific thread inefficient; update the PRIMARY KEY to use a composite partition key ((room_id, thread_room_id), created_at, message_id) if your dominant query is "get messages for thread X" so queries can target the partition directly (adjust any code that reads/writes this table to use both room_id and thread_room_id as the partition key), otherwise keep the current schema if "list threads in room" is the primary pattern.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@message-worker/handler.go`:
- Around line 176-185: The code is returning a usable threadRoomID from
handleThreadRoomAndSubscriptions before the first-reply initialization (parent
resolution and subscriptions) is fully complete, allowing SaveThreadMessage to
persist replies against a partially-initialized thread; change
handleThreadRoomAndSubscriptions so it does not return a threadRoomID until
handleFirstThreadReply (parent resolution) and subscription setup complete and
idempotent, making the parent resolution fail fast (propagating
errMessageNotFound) and avoid writing any partial Mongo state on error; ensure
any partial subscription writes are rolled back or made idempotent and that
SaveThreadMessage is only called after successful completion, and reference
errThreadRoomExists handling to ensure redelivery will retry subscription setup
rather than skipping it.
In `@message-worker/store_cassandra.go`:
- Around line 123-156: In incrementParentTcount, the current
select/compute/update on plain INT is racy under concurrency; replace it with a
retryable CAS (LWT) loop or switch to a Cassandra COUNTER table. Implement a
bounded retry loop in incrementParentTcount that: read current tcount (as you
do), attempt an LWT update like UPDATE messages_by_id SET tcount = ? WHERE
message_id = ? AND created_at = ? IF tcount = ? using the query execution method
that returns whether the LWT was applied (i.e., ExecCAS/CompareAndSet), if not
applied re-read tcount and retry a few times, and apply the same pattern to the
messages_by_room update (or instead create/USE a dedicated counter table with
COUNTER column and use UPDATE ... = tcount + 1 atomically). Ensure errors
propagate if retries exhaust; refer to the incrementParentTcount function,
variables parentID, parentCreatedAt, and newCount when making changes.
In `@message-worker/store_mongo.go`:
- Around line 77-81: The InsertThreadSubscription function currently returns an
error on any InsertOne failure causing a race-induced duplicate-key to NAK
messages; update threadStoreMongo.InsertThreadSubscription to treat
mongo.IsDuplicateKeyError(err) as success by importing and using
mongo.IsDuplicateKeyError from the mongo driver and returning nil when that
check is true (i.e., only wrap/return non-duplicate errors from the
s.threadSubscriptions.InsertOne call), so concurrent inserts become idempotent.
---
Nitpick comments:
In `@message-worker/deploy/docker-compose.yml`:
- Around line 104-129: The primary key for chat.thread_messages_by_room
currently uses ((room_id), thread_room_id, created_at, message_id) which places
all threads for a room in one partition and makes fetching a specific thread
inefficient; update the PRIMARY KEY to use a composite partition key ((room_id,
thread_room_id), created_at, message_id) if your dominant query is "get messages
for thread X" so queries can target the partition directly (adjust any code that
reads/writes this table to use both room_id and thread_room_id as the partition
key), otherwise keep the current schema if "list threads in room" is the primary
pattern.
In `@message-worker/integration_test.go`:
- Around line 70-82: Update the test fixture so the table schema matches
production partitioning: change the PRIMARY KEY declaration for
thread_messages_by_room from PRIMARY KEY ((room_id), thread_room_id, created_at,
message_id) to use a composite partition key PRIMARY KEY ((room_id,
thread_room_id), created_at, message_id) and keep the existing WITH CLUSTERING
ORDER BY clause; locate the CREATE TABLE statement for thread_messages_by_room
in integration_test.go and adjust only the PRIMARY KEY tuple to (room_id,
thread_room_id).
In `@message-worker/main.go`:
- Line 32: The MaxRedeliver config field is using a global env var name; change
the struct tag for the MaxRedeliver field (symbol: MaxRedeliver in
message-worker/main.go) from env:"MAX_REDELIVER" to
env:"MESSAGE_WORKER_MAX_REDELIVER" (keep envDefault:"5"), and update any
documentation or env examples that reference MAX_REDELIVER to use
MESSAGE_WORKER_MAX_REDELIVER so the variable follows the service-prefixed
SCREAMING_SNAKE_CASE convention.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 08bcc4a0-60e4-4239-9ede-2c28a2b10b42
📒 Files selected for processing (9)
message-worker/deploy/docker-compose.ymlmessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/main.gomessage-worker/mock_store_test.gomessage-worker/store.gomessage-worker/store_cassandra.gomessage-worker/store_mongo.go
gocql already provides a reflection-based UDT marshaler that reads the `cql:"..."` struct tag. The custom marshalUDTField/unmarshalUDTField helpers and the per-UDT MarshalUDT/UnmarshalUDT methods delegating to them were reimplementing behaviour the driver does natively — all that was actually needed was the existing `cql` tags. This mirrors the same simplification applied to message-worker's cassParticipant in PR #95. - Delete pkg/model/cassandra/udt.go (helpers + verifyUDTTags) - Delete pkg/model/cassandra/udt_test.go - Remove MarshalUDT/UnmarshalUDT methods and init() from message.go - Drop two obsolete tests in message_test.go https://claude.ai/code/session_01XKyoiWJDNWWkNc29RixBqJ
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@message-worker/handler_test.go`:
- Around line 504-507: The Metadata method on fakeJSMsg is formatted across
multiple lines and violates goimports/gofmt style; edit the fakeJSMsg method
declarations (specifically the Metadata() method) to the single-line style used
by the other methods or simply run `goimports -w` (or `gofmt -w`) on the file to
reformat and normalize spacing so Metadata() matches the other fakeJSMsg
methods.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dcd01a19-87f0-475d-839f-be9ee70a1856
📒 Files selected for processing (4)
message-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/main.go
✅ Files skipped from review due to trivial changes (1)
- message-worker/main.go
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
message-worker/store_cassandra.go (2)
110-112: Wrap error at call site.Per the project error-handling guideline (always wrap with a short description of what the current function was doing — never return bare
err), this site should add context identifying the outer operation. Even thoughincrementParentTcountwraps internally, the outerSaveThreadMessagestack should also contribute its context.♻️ Proposed fix
- if err := s.incrementParentTcount(ctx, msg); err != nil { - return err - } + if err := s.incrementParentTcount(ctx, msg); err != nil { + return fmt.Errorf("increment parent tcount for %s: %w", msg.ThreadParentMessageID, err) + }As per coding guidelines: "Never return bare
err… always provide meaningful context."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/store_cassandra.go` around lines 110 - 112, In SaveThreadMessage, wrap the error returned by s.incrementParentTcount(ctx, msg) with context before returning (e.g., use fmt.Errorf("SaveThreadMessage: incrementParentTcount failed: %w", err)) so the outer stack includes the operation that failed; update the return site inside SaveThreadMessage to wrap err rather than returning a bare err, keeping the existing internal wrapping in incrementParentTcount intact.
143-160: Unbounded CAS retry loop.Both CAS loops spin without an upper bound or backoff. Under sustained contention on a hot parent message this could burn coordinator/CPU time and hold the write path open indefinitely. Consider bounding retries (e.g., 5-10 attempts with small backoff) and returning a descriptive error if the bound is exceeded, so the handler can NAK and let JetStream redelivery cool things off.
Also applies to: 172-189.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/store_cassandra.go` around lines 143 - 160, The CAS retry loops around s.cassSession.Query(...).ScanCAS(¤t) that update tcount (the one shown and the similar loop at lines 172-189) are unbounded; change them to cap retries (e.g., 5–10 attempts) and add a small backoff between attempts (e.g., time.Sleep with exponential or fixed jitter) and if the cap is exceeded return a clear, descriptive error (e.g., "max CAS retries exceeded for parent <parentID>") so callers can NAK/trigger redelivery; keep the same update logic (newVal/tcount/current) and ensure the error wraps context (parentID/parentCreatedAt) and that both loop locations are updated consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@message-worker/handler.go`:
- Around line 192-196: Update the doc comments to reflect that LastSeenAt is a
pointer and now set to nil rather than a zero time: change the phrases in the
comments for handleFirstThreadReply and buildThreadSubscription that read
"lastSeenAt is always zero" to "lastSeenAt is always nil" (or "LastSeenAt is set
to nil") and mention LastSeenAt is a *time.Time pointer to avoid confusion
between a zero time.Time value and a nil pointer.
In `@message-worker/store_cassandra.go`:
- Around line 88-115: SaveThreadMessage currently calls incrementParentTcount
unconditionally so JetStream redeliveries can double-count tcount; modify
SaveThreadMessage to perform the INSERT into thread_messages_by_room using a
lightweight transaction (IF NOT EXISTS) and check the query result's applied
flag, and only call incrementParentTcount when that INSERT applied == true;
locate the thread_messages_by_room INSERT in SaveThreadMessage and change it to
an LWT insert (IF NOT EXISTS) and gate the incrementParentTcount call on the
applied result (alternatively, implement a dedupe table like
thread_reply_counted with IF NOT EXISTS and only increment when that CAS
applied).
---
Nitpick comments:
In `@message-worker/store_cassandra.go`:
- Around line 110-112: In SaveThreadMessage, wrap the error returned by
s.incrementParentTcount(ctx, msg) with context before returning (e.g., use
fmt.Errorf("SaveThreadMessage: incrementParentTcount failed: %w", err)) so the
outer stack includes the operation that failed; update the return site inside
SaveThreadMessage to wrap err rather than returning a bare err, keeping the
existing internal wrapping in incrementParentTcount intact.
- Around line 143-160: The CAS retry loops around
s.cassSession.Query(...).ScanCAS(¤t) that update tcount (the one shown and
the similar loop at lines 172-189) are unbounded; change them to cap retries
(e.g., 5–10 attempts) and add a small backoff between attempts (e.g., time.Sleep
with exponential or fixed jitter) and if the cap is exceeded return a clear,
descriptive error (e.g., "max CAS retries exceeded for parent <parentID>") so
callers can NAK/trigger redelivery; keep the same update logic
(newVal/tcount/current) and ensure the error wraps context
(parentID/parentCreatedAt) and that both loop locations are updated
consistently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3541ca00-b061-4ea2-99d0-6ea5c2e6065e
📒 Files selected for processing (6)
message-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/store_cassandra.gopkg/model/model_test.gopkg/model/threadsubscription.go
🚧 Files skipped from review as they are similar to previous changes (1)
- message-worker/integration_test.go
…ge_model.md Add missing UDTs (Card, CardAction, File, QuotedParentMessage), full column sets for all tables, the new pinned_messages_by_room table, and replace the stale thread_message_id column with thread_parent_id in thread_messages_by_room. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
…n cassParticipant gocql's reflection-based UDT marshaler lowercases Go field names when no cql tag is present, so EngName became "engname" instead of "eng_name" and CompanyName became "companyname" instead of "company_name". Adding cql struct tags makes cassParticipant a correct gocql UDT struct, letting the driver map fields accurately via reflection for both FROZEN<"Participant"> and SET<FROZEN<"Participant">> columns without a manual switch. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
…nt_created_at to SaveThreadMessage - SaveThreadMessage now accepts threadRoomID and writes it as thread_room_id to both messages_by_id and thread_messages_by_room - Adds thread_parent_id (msg.ThreadParentMessageID) and thread_parent_created_at (msg.ThreadParentMessageCreatedAt) to both tables - Fixes stale thread_message_id column reference → thread_parent_id in the thread_messages_by_room INSERT - handleThreadRoomAndSubscriptions now returns (threadRoomID, error) so processMessage can resolve the room ID before calling SaveThreadMessage; handleSubsequentThreadReply updated accordingly - Updates integration_test.go schema and call sites to match https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
…redelivery - MAX_REDELIVER env var (default 5) controls how many times a failing message may be redelivered before being dropped permanently - Consumer is configured with MaxDeliver = MaxRedeliver + 1 so NATS enforces the cap at the broker level as a safety net - nackOrTerminate helper reads msg.Metadata().NumDelivered on each failure: * within budget (NumDelivered <= maxRedeliver) → Nak, allow redelivery * over budget (NumDelivered > maxRedeliver) → TermWithReason, drop forever * metadata unavailable → Nak fallback (never silently drop) - Unit tests cover all four paths: success (Ack), within-budget (Nak), at-limit (Nak), over-limit (Term), and metadata error (Nak fallback) https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
…stinction
All ThreadSubscriptions now initialize lastSeenAt to time.Time{} regardless of
whether the subscriber is the parent author or the replier. MongoDB $setOnInsert
preserves this zero value on subsequent upserts, so the handler never needs to
distinguish between roles. Also aligns LastMsgAt and UpdateThreadRoomLastMessage
to use the handler's local `now` variable instead of msg.CreatedAt.
https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
When SaveThreadMessage is called, increment tcount on the parent message row in both messages_by_id and messages_by_room. Uses read-modify-write (Cassandra INT not COUNTER) — accepted by design. Skips silently when ThreadParentMessageCreatedAt is nil (primary key unavailable) or when the parent row is absent. Also: add tcount INT to integration test schema for both tables, and fix NewHandler arity (missing maxRedeliver arg) in integration tests. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
handleFirstThreadReply: UpsertThreadSubscription → InsertThreadSubscription. Subscriptions are created exactly once; no update path needed. handleSubsequentThreadReply: remove GetMessageSender call entirely. Instead of re-ensuring the parent author's subscription on every reply, only check whether the current replier is subscribed (ThreadSubscriptionExists) and insert if not. This eliminates a Cassandra round-trip per subsequent reply. ThreadStore interface: replace UpsertThreadSubscription with InsertThreadSubscription + ThreadSubscriptionExists. store_mongo.go implements via insertOne + findOne. Integration tests updated accordingly. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
…eadMessage Both INSERT INTO messages_by_id statements now include room_id so the column is populated consistently, matching the docker-compose schema definition. Integration test schema updated to include the column; subtests added to verify the field is written for both regular and thread messages. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
Remove manual delivery-count tracking and TermWithReason logic from HandleJetStreamMsg. The JetStream consumer MaxDeliver config already enforces the redeliver limit at the broker level, so the handler no longer needs to inspect delivery counts or call TermWithReason. NewHandler no longer accepts a maxRedeliver parameter; MAX_REDELIVER env config and consumer MaxDeliver setting are kept unchanged. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
Nil pointer clearly expresses "not yet seen" vs zero-value time which is ambiguous. buildThreadSubscription sets LastSeenAt: nil; handler and integration tests assert Nil instead of IsZero. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
Replace blind read-modify-write with a CAS retry loop using UPDATE ... IF tcount = ?. Scanning tcount into *int detects NULL (unset on first insert) — binding nil as the IF condition evaluates to IF tcount = null, handling the initial increment correctly. On conflict ScanCAS returns the current value for the next attempt. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
Maintaining tcount on the write path was subject to a redelivery double-count gap: if SaveThreadMessage's inserts succeed but the tcount bump fails, JetStream redelivers and bumps tcount a second time while the inserts no-op on same PK. LWT solves concurrency but not redelivery idempotency. Drop tcount entirely and let readers compute the reply count via SELECT COUNT(*) FROM thread_messages_by_room WHERE room_id = ? AND thread_room_id = ?. Always correct, bounded partition scan, no counter maintenance. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
This reverts commit 449bf08.
When CreateThreadRoom succeeds on first delivery but the handler fails before all subscriptions are written, JetStream redelivers and routes via errThreadRoomExists to handleSubsequentThreadReply. The old code only checked/inserted the replier's subscription, so the parent author's subscription was permanently lost on such a partial failure. Replace ThreadSubscriptionExists + InsertThreadSubscription in the subsequent-reply path with UpsertThreadSubscription ($setOnInsert) for both the parent author and the replier. This makes the path fully idempotent: redeliveries after any partial failure will fill in whichever subscriptions are missing without overwriting existing ones. Remove ThreadSubscriptionExists from the ThreadStore interface as it is no longer needed. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
40cdae9 to
e1c5be8
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (4)
message-worker/store_mongo.go (1)
77-82:⚠️ Potential issue | 🟠 MajorMake subscription inserts duplicate-key idempotent.
(threadRoomId,userId)is unique, so a concurrent/replayed insert can return duplicate-key even though the target state already exists. Treat that as success to avoid unnecessary NAK/redelivery loops.Proposed fix
func (s *threadStoreMongo) InsertThreadSubscription(ctx context.Context, sub *model.ThreadSubscription) error { if _, err := s.threadSubscriptions.InsertOne(ctx, sub); err != nil { + if mongo.IsDuplicateKeyError(err) { + return nil + } return fmt.Errorf("insert thread subscription: %w", err) } return nil }Verification:
#!/bin/bash # Verify whether InsertThreadSubscription suppresses duplicate-key errors. rg -n -C4 --type=go 'func \(s \*threadStoreMongo\) InsertThreadSubscription|mongo\.IsDuplicateKeyError'Based on learnings, unique indexes make retry/idempotency safe when duplicate-key errors are suppressed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/store_mongo.go` around lines 77 - 82, The InsertThreadSubscription function currently returns duplicate-key errors on concurrent/replayed inserts; modify it so that when s.threadSubscriptions.InsertOne returns a Mongo duplicate-key error (use mongo.IsDuplicateKeyError(err) or equivalent), the function treats it as success and returns nil; leave other errors unchanged. Update the InsertThreadSubscription implementation to check the error from InsertOne, call mongo.IsDuplicateKeyError(err) and return nil for that case, otherwise wrap and return the error as before.message-worker/store_cassandra.go (1)
100-110:⚠️ Potential issue | 🟠 MajorGate
tcountincrements on a newly inserted reply row.The reply inserts are idempotent on redelivery, but
incrementParentTcountruns every timeSaveThreadMessageis retried. A transient failure after the inserts can over-count the same reply; useINSERT ... IF NOT EXISTSor a dedupe table and increment only when the CAS insert applies.Verification:
#!/bin/bash # Check whether thread reply counting is gated by IF NOT EXISTS/dedupe before incrementing. rg -n -C5 --type=go 'INSERT INTO thread_messages_by_room|IF NOT EXISTS|incrementParentTcount|ScanCAS'🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/store_cassandra.go` around lines 100 - 110, The current SaveThreadMessage flow inserts into thread_messages_by_room and then unconditionally calls incrementParentTcount, which causes over-counting on retries; change the insert to a lightweight CAS (INSERT ... IF NOT EXISTS) or write to a dedupe table and only call incrementParentTcount when the conditional insert applied/returned true. Locate the INSERT statement into thread_messages_by_room and modify the query to use IF NOT EXISTS (or perform a separate idempotency row insert), then gate the call to incrementParentTcount (or call it only when the CAS result indicates success); ensure incrementParentTcount remains unchanged but is invoked conditionally based on the CAS/dedupe result.message-worker/handler.go (2)
68-76:⚠️ Potential issue | 🟠 MajorDon’t persist a thread reply when the parent is missing.
On the first-reply path,
CreateThreadRoomcan succeed,handleFirstThreadReplycan returnnilforerrMessageNotFound, andprocessMessagethen saves the reply to Cassandra with that newthreadRoomID. Use a skip signal beforeSaveThreadMessage, or resolve the parent before creating/returning a usable thread room ID.Based on learnings, a missing parent is treated as an unresolvable thread context, and no partial Mongo state should be written.
Also applies to: 106-110, 121-128
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/handler.go` around lines 68 - 76, The code currently persists thread replies even when the thread parent is missing; update handleThreadRoomAndSubscriptions/CreateThreadRoom/handleFirstThreadReply so it returns a clear "skip" or specific errMessageNotFound sentinel (not nil) when the parent cannot be resolved and ensure processMessage checks that sentinel and does NOT call h.store.SaveThreadMessage (and similarly avoid partial Mongo writes in the other similar paths around the SaveThreadMessage calls referenced) — either resolve/validate the parent before returning a usable threadRoomID or return the skip signal so processMessage aborts saving the thread reply.
117-120:⚠️ Potential issue | 🟡 MinorUpdate the comments:
LastSeenAtis nil, not zero.The code now sets a
*time.Timefield tonil; “zero” reads liketime.Time{}and is misleading.Proposed doc update
-// lastSeenAt is always zero — the subscription is brand-new and the user has not +// LastSeenAt is always nil — the subscription is brand-new and the user has not // yet "seen" the thread.-// lastSeenAt is always zero — subscriptions are insert-only; the field is never +// LastSeenAt is always nil — subscriptions are insert-only; the field is never // updated by the message worker.Also applies to: 196-198
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@message-worker/handler.go` around lines 117 - 120, Update the function comments for handleFirstThreadReply (and the similar comment block around the other occurrence) to say that LastSeenAt is nil (a nil *time.Time) rather than "zero" — clarify that the subscription's LastSeenAt pointer is nil because the user has not seen the thread, avoiding confusion with a zero-valued time.Time; reference LastSeenAt and handleFirstThreadReply when making the wording changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@message-worker/store_cassandra.go`:
- Around line 143-160: The CAS loop updating tcount in messages_by_id retries
indefinitely; add a bounded retry (e.g., maxRetries constant) and on exceeding
it return a wrapped error so the worker can requeue/release the message instead
of spinning; implement by introducing a retry counter around the for loop that
breaks on applied, increments per iteration, and returns fmt.Errorf("cas tcount
in messages_by_id for parent %s: retry limit exceeded", parentID) (or wrap the
last err) when the limit is hit; apply the identical retry-limit pattern to the
messages_by_room CAS loop (the loop that uses roomID/roomCreatedAt and tcount
variables) so both LWT loops are bounded and return errors when exhausted.
---
Duplicate comments:
In `@message-worker/handler.go`:
- Around line 68-76: The code currently persists thread replies even when the
thread parent is missing; update
handleThreadRoomAndSubscriptions/CreateThreadRoom/handleFirstThreadReply so it
returns a clear "skip" or specific errMessageNotFound sentinel (not nil) when
the parent cannot be resolved and ensure processMessage checks that sentinel and
does NOT call h.store.SaveThreadMessage (and similarly avoid partial Mongo
writes in the other similar paths around the SaveThreadMessage calls referenced)
— either resolve/validate the parent before returning a usable threadRoomID or
return the skip signal so processMessage aborts saving the thread reply.
- Around line 117-120: Update the function comments for handleFirstThreadReply
(and the similar comment block around the other occurrence) to say that
LastSeenAt is nil (a nil *time.Time) rather than "zero" — clarify that the
subscription's LastSeenAt pointer is nil because the user has not seen the
thread, avoiding confusion with a zero-valued time.Time; reference LastSeenAt
and handleFirstThreadReply when making the wording changes.
In `@message-worker/store_cassandra.go`:
- Around line 100-110: The current SaveThreadMessage flow inserts into
thread_messages_by_room and then unconditionally calls incrementParentTcount,
which causes over-counting on retries; change the insert to a lightweight CAS
(INSERT ... IF NOT EXISTS) or write to a dedupe table and only call
incrementParentTcount when the conditional insert applied/returned true. Locate
the INSERT statement into thread_messages_by_room and modify the query to use IF
NOT EXISTS (or perform a separate idempotency row insert), then gate the call to
incrementParentTcount (or call it only when the CAS result indicates success);
ensure incrementParentTcount remains unchanged but is invoked conditionally
based on the CAS/dedupe result.
In `@message-worker/store_mongo.go`:
- Around line 77-82: The InsertThreadSubscription function currently returns
duplicate-key errors on concurrent/replayed inserts; modify it so that when
s.threadSubscriptions.InsertOne returns a Mongo duplicate-key error (use
mongo.IsDuplicateKeyError(err) or equivalent), the function treats it as success
and returns nil; leave other errors unchanged. Update the
InsertThreadSubscription implementation to check the error from InsertOne, call
mongo.IsDuplicateKeyError(err) and return nil for that case, otherwise wrap and
return the error as before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 95cc7d70-b587-411c-adc2-85737365fabf
📒 Files selected for processing (11)
message-worker/deploy/docker-compose.ymlmessage-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/main.gomessage-worker/mock_store_test.gomessage-worker/store.gomessage-worker/store_cassandra.gomessage-worker/store_mongo.gopkg/model/model_test.gopkg/model/threadsubscription.go
✅ Files skipped from review due to trivial changes (1)
- pkg/model/model_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- message-worker/main.go
- message-worker/store.go
- message-worker/integration_test.go
| for { | ||
| newVal := 1 | ||
| if tcount != nil { | ||
| newVal = *tcount + 1 | ||
| } | ||
| var current *int | ||
| applied, err := s.cassSession.Query( | ||
| `UPDATE messages_by_id SET tcount = ? WHERE message_id = ? AND created_at = ? IF tcount = ?`, | ||
| newVal, parentID, parentCreatedAt, tcount, | ||
| ).WithContext(ctx).ScanCAS(¤t) | ||
| if err != nil { | ||
| return fmt.Errorf("cas tcount in messages_by_id for parent %s: %w", parentID, err) | ||
| } | ||
| if applied { | ||
| break | ||
| } | ||
| tcount = current | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Inspect the CAS retry loops and confirm they have bounded attempts.
sed -n '125,192p' message-worker/store_cassandra.goRepository: hmchangw/chat
Length of output: 2079
Bound the CAS retry loops.
Both LWT loops retry indefinitely on contention. Add a retry limit and return a wrapped error when exhausted, allowing the worker to release the message back to JetStream instead of spinning indefinitely.
Proposed direction
+ const maxTcountCASRetries = 8
- for {
+ for attempt := 0; attempt < maxTcountCASRetries; attempt++ {
newVal := 1
if tcount != nil {
newVal = *tcount + 1
}
var current *int
applied, err := s.cassSession.Query(
`UPDATE messages_by_id SET tcount = ? WHERE message_id = ? AND created_at = ? IF tcount = ?`,
newVal, parentID, parentCreatedAt, tcount,
).WithContext(ctx).ScanCAS(¤t)
if err != nil {
return fmt.Errorf("cas tcount in messages_by_id for parent %s: %w", parentID, err)
}
if applied {
break
}
tcount = current
}
+ return fmt.Errorf("failed to increment tcount after %d attempts for parent %s", maxTcountCASRetries, parentID)Apply the same pattern to the messages_by_room CAS loop (lines 172–189).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@message-worker/store_cassandra.go` around lines 143 - 160, The CAS loop
updating tcount in messages_by_id retries indefinitely; add a bounded retry
(e.g., maxRetries constant) and on exceeding it return a wrapped error so the
worker can requeue/release the message instead of spinning; implement by
introducing a retry counter around the for loop that breaks on applied,
increments per iteration, and returns fmt.Errorf("cas tcount in messages_by_id
for parent %s: retry limit exceeded", parentID) (or wrap the last err) when the
limit is hit; apply the identical retry-limit pattern to the messages_by_room
CAS loop (the loop that uses roomID/roomCreatedAt and tcount variables) so both
LWT loops are bounded and return errors when exhausted.
✅ Addressed in commit d25039a
There was a problem hiding this comment.
We should never have infinite loop in the application code. Maybe have a retry limit or just do it once and return error
There was a problem hiding this comment.
@mliu33, agreed — unbounded retry loops are risky. This has already been addressed in commit d25039a. The current code introduces a casIncrement helper with a hard casMaxRetries = 16 cap and returns an error when exhausted:
const casMaxRetries = 16
func casIncrement(maxRetries int, initial *int, update func(newVal int, expected *int) (applied bool, current *int, err error)) error {
tcount := initial
for range maxRetries {
// ...
}
return fmt.Errorf("cas increment exceeded %d retries", maxRetries)
}Both the messages_by_id and messages_by_room CAS increments in incrementParentTcount go through this helper, so after 16 failed attempts the error propagates back to the JetStream handler and the message is NAK'd for redelivery — no infinite spin.
🧠 Learnings used
Learnt from: yenta
Repo: hmchangw/chat PR: 80
File: message-worker/handler.go:192-199
Timestamp: 2026-04-14T16:58:26.689Z
Learning: In `message-worker/handler.go` (Go), `handleFirstThreadReply` and `handleSubsequentThreadReply` intentionally return `nil` (after a `slog.Warn`) when `GetMessageSender` returns `errMessageNotFound` for the parent message. This skips all remaining steps — including replier subscription creation and `UpdateThreadRoomLastMessage` — by design: a missing parent is treated as an unresolvable thread context, and no partial Mongo state should be written. Do not flag this early-return pattern as a bug.
Learnt from: CR
Repo: hmchangw/chat PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-17T08:39:31.962Z
Learning: Applies to **/*store_cassandra.go : Cassandra is ONLY for message history (time-series) — MongoDB handles everything else
Learnt from: Joey0538
Repo: hmchangw/chat PR: 64
File: search-sync-worker/main.go:142-154
Timestamp: 2026-04-09T09:24:01.680Z
Learning: In hmchangw/chat, all JetStream pull-consumer workers (message-worker, broadcast-worker, message-gatekeeper, search-sync-worker) intentionally treat all cons.Fetch() errors uniformly — no in-process classification of fatal vs. transient errors. Fatal errors like ErrConsumerDeleted are handled by container orchestration restart policies, not in-process error classification. Do NOT flag uniform Fetch error handling as an issue in this repo.
Learnt from: vjauhari-work
Repo: hmchangw/chat PR: 72
File: room-service/handler.go:0-0
Timestamp: 2026-04-14T01:20:03.653Z
Learning: In `room-service/handler.go` (Go), the `CountOwners` pre-publish check in `handleUpdateRole` is intentionally the authoritative last-owner guard. The concurrent-demotion race (two simultaneous demotions both observing count > 1 before the worker applies them) is a known edge case explicitly accepted by the PR author (PR `#72`). Do not flag this as a race condition issue.
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/roomkeysender/roomkeysender.go:28-30
Timestamp: 2026-04-08T08:25:32.108Z
Learning: In this repository’s Go code, follow the CLAUDE.md rule: don’t add defensive error handling, fallbacks, or nil/validation guards for conditions that are guaranteed not to occur. For internal functions/methods (e.g., when all callers are known to pass non-nil pointer parameters, such as pkg/roomkeysender/roomkeysender.go’s Send), the absence of nil-pointer guards is acceptable—do not flag it as a review issue. Reserve pointer validation and other safety checks for system boundaries (e.g., external inputs, API/HTTP requests, deserialization, or other untrusted entry points).
Learnt from: Joey0538
Repo: hmchangw/chat PR: 52
File: docs/superpowers/specs/2026-04-07-search-sync-worker-design.md:82-82
Timestamp: 2026-04-07T06:13:53.100Z
Learning: In the `hmchangw/chat` repository, the `UpdatedAt time.Time` field (with `json:"updatedAt"` tag) is planned to be added to the `Message` struct in `pkg/model/message.go` as part of implementation step 1 (model changes), as specified in the design doc `docs/superpowers/specs/2026-04-07-search-sync-worker-design.md`. Any constructors, serializers, or helpers that build or handle `Message` should also be updated to populate/handle `UpdatedAt`.
Learnt from: CR
Repo: hmchangw/chat PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-04-17T08:39:31.962Z
Learning: Applies to **/*store_cassandra.go : Design Cassandra tables around query patterns (partition key = room ID, clustering key = timestamp), no secondary indexes
Learnt from: vjauhari-work
Repo: hmchangw/chat PR: 44
File: room-service/handler.go:392-400
Timestamp: 2026-04-01T10:43:13.466Z
Learning: In room-service/handler.go (Go), the handleAddMembers flow intentionally runs BulkCreateSubscriptions before writeRoomMembers without a transaction. The partial-commit inconsistency risk is mitigated by unique indexes on both `subscriptions` and `room_members` collections (added in PR `#44`), making retries idempotent (duplicate-key errors suppressed). A full MongoDB transactional approach is deferred to a follow-up PR; MongoDB transactions require a replica set.
Learnt from: yenta
Repo: hmchangw/chat PR: 80
File: message-worker/handler.go:203-215
Timestamp: 2026-04-14T17:02:08.750Z
Learning: In `message-worker/handler.go` (Go), `handleFirstThreadReply` and `handleSubsequentThreadReply` always initialize `ThreadSubscription.LastSeenAt` to `time.Time{}` (zero value) for the parent author — including when `msg.UserID == parentSender.ID` (self-reply). This is intentional: subscriptions are created with a zero read cursor; `LastSeenAt` is updated exclusively by the read-behaviour flow. The self-reply case (where the author's own subscription should start as "seen") is a known gap explicitly deferred to a follow-up PR. Do not re-flag the zero `lastSeenAt` init as a bug.
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/model/model_test.go:306-317
Timestamp: 2026-04-08T08:25:48.132Z
Learning: In Go, `time.Time` is `comparable` and therefore may be used as a type argument to generic constraints requiring `comparable` (e.g., `func roundTrip[T comparable](...)`). This is because `time.Time`’s underlying fields are all comparable (`uint64`, `int64`, and a pointer `*Location`, where pointers are comparable by address). As a result, any struct that contains or embeds a `time.Time` field (e.g., `model.Message`, or structs embedding it like `model.NotificationEvent`) should also satisfy `comparable` and should not be flagged as a compilation-error when used with `T comparable`.
Extract the infinite CAS retry logic into a casIncrement helper that accepts an update closure and a maxRetries limit (casMaxRetries = 16). Replace both unbounded for loops in incrementParentTcount with a single call each, and add unit tests for casIncrement covering: first-attempt success, retry with refreshed value, exhaustion returning an error, and immediate propagation of update errors. 16 retries covers any realistic burst of concurrent thread replies while guaranteeing the function terminates under pathological contention. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
ThreadSubscription.LastSeenAt is *time.Time (nil on init) since task 10; the surrounding doc comments still described it as "always zero". Update them to "always nil" to match the actual field type. https://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
| } | ||
| return fmt.Errorf("read tcount in messages_by_room for parent %s: %w", parentID, err) | ||
| } | ||
| for { |
There was a problem hiding this comment.
same as above, we should never have potential infinite loop in the code. Maybe have a retry limit or just do it once and return error
| // A conflict means another thread-reply landed between our read and write; | ||
| // 16 attempts is sufficient for any realistic burst while preventing an | ||
| // infinite loop if something unexpected keeps the row locked. | ||
| const casMaxRetries = 16 |
There was a problem hiding this comment.
16 is pretty big lol. I intially was think about 3 or 5 lol
) gocql already provides a reflection-based UDT marshaler that reads the `cql:"..."` struct tag. The custom marshalUDTField/unmarshalUDTField helpers and the per-UDT MarshalUDT/UnmarshalUDT methods delegating to them were reimplementing behaviour the driver does natively — all that was actually needed was the existing `cql` tags. This mirrors the same simplification applied to message-worker's cassParticipant in PR #95. - Delete pkg/model/cassandra/udt.go (helpers + verifyUDTTags) - Delete pkg/model/cassandra/udt_test.go - Remove MarshalUDT/UnmarshalUDT methods and init() from message.go - Drop two obsolete tests in message_test.go https://claude.ai/code/session_01XKyoiWJDNWWkNc29RixBqJ Co-authored-by: Claude <noreply@anthropic.com>
Summary
End-to-end hardening of the
message-workerservice across 8 tasks.Task 1 — Sync Cassandra schema (docker-compose): Added UDTs
Card,CardAction,File,QuotedParentMessage; addedthread_messages_by_roomandpinned_messages_by_roomtables; expandedmessages_by_roomandmessages_by_idwith all thread, rich-content, and audit columns.Task 2 — Fix UDT marshaling: Replaced manual
MarshalUDT/UnmarshalUDTswitch withcqlstruct tags oncassParticipantso gocql's reflection correctly maps snake_case UDT fields (e.g.eng_name) instead of lowercased Go field names.Task 3 —
SaveThreadMessagethread fields: Addedthread_room_id,thread_parent_id, andthread_parent_created_atto themessages_by_idINSERT; addedthread_parent_idtothread_messages_by_roomINSERT;threadRoomIDis now resolved byhandleThreadRoomAndSubscriptionsbefore the Cassandra write and passed as a parameter.Task 4 — MaxRedeliver mechanism: Added
MAX_REDELIVERenv var (default 5) andmaxRedeliverfield onHandler.nackOrTerminatechecksNumDeliveredfrom JetStream metadata — NAKs within budget, permanently terminates withTermWithReasononce the limit is exceeded. ConsumerMaxDeliverset toMaxRedeliver + 1.Task 5 — Simplify
lastSeenAt: AllThreadSubscriptionrecords are initialised withlastSeenAt = time.Time{}regardless of role (parent author or replier).buildThreadSubscriptionno longer accepts alastSeenAtargument.LastMsgAtandUpdateThreadRoomLastMessagenow use the handler's localnowinstead ofmsg.CreatedAt.Task 6 — Increment parent
tcounton thread reply:SaveThreadMessagenow incrementstcounton the parent message row in bothmessages_by_idandmessages_by_roomvia a read-modify-write (CassandraINT, notCOUNTER). Skipped whenThreadParentMessageCreatedAtis nil or the parent row is absent. Integration test schema updated withtcount INTcolumn.Task 7 — Simplify thread subscription logic: Replaced
UpsertThreadSubscriptionwith two focused operations:InsertThreadSubscription(plaininsertOne) for the first-reply path, andThreadSubscriptionExists+InsertThreadSubscriptionfor subsequent replies. The subsequent-reply path no longer callsGetMessageSenderor re-ensures the parent author's subscription — it only checks/inserts the current replier's subscription.Task 8 — Persist
room_idinmessages_by_id: BothSaveMessageandSaveThreadMessagenow includeroom_idin theINSERT INTO messages_by_idcolumn list, matching the full schema definition.Test plan
make test SERVICE=message-workermake test-integration SERVICE=message-workerdocker-composeand publish thread replies; verifytcount,thread_room_id,room_id, and subscriptions are written correctlyhttps://claude.ai/code/session_012dXwAD1V1mJfekBJpzKX48
Summary by CodeRabbit
New Features
Bug Fixes
Configuration