feat(history-service): implement pin/unpin message handlers (Phase 1)#205
Conversation
|
Warning Review limit reached
More reviews will be available in 44 minutes and 4 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (32)
📝 WalkthroughWalkthroughAdds pin/unpin/list message support with new models and subjects, config knobs and startup validation, Mongo room user-count, Cassandra persistence and schema migration (pinned_at), service interfaces and handlers (authorization, idempotency, redaction, pagination), broadcast fanout, regenerated mocks, tests, and client API docs. ChangesMessage Pinning Feature Implementation
Sequence DiagramsequenceDiagram
participant Client
participant HistoryService
participant Mongo as Settings/RoomRepo
participant Cassandra
participant BroadcastWorker
Client->>HistoryService: PinMessage { messageId }
HistoryService->>Mongo: GetSubscription, GetRoomUserCount, Settings (kill-switch)
Mongo-->>HistoryService: subscription, userCount, pinEnabled
HistoryService->>Cassandra: PinMessage batch (messages_by_id update + pinned_messages_by_room insert)
Cassandra-->>HistoryService: write OK
HistoryService->>BroadcastWorker: publish canonical EventPinned (best-effort)
BroadcastWorker->>Client: fanout PinRoomEvent (room or per-user)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (2)
history-service/internal/service/service.go (1)
60-64: ⚡ Quick winRename
SettingsRepositoryto match store-interface naming convention.Line 62 introduces a store-like interface but names it
SettingsRepository. Per repository naming rules, this should follow<Domain>Store(for example,SettingsStore) for consistency with the contract style.Proposed rename (local file)
-//go:generate mockgen -destination=mocks/mock_repository.go -package=mocks . MessageReader,MessageWriter,MessageRepository,SubscriptionRepository,RoomRepository,SettingsRepository,EventPublisher,ThreadRoomRepository +//go:generate mockgen -destination=mocks/mock_repository.go -package=mocks . MessageReader,MessageWriter,MessageRepository,SubscriptionRepository,RoomRepository,SettingsStore,EventPublisher,ThreadRoomRepository @@ -type SettingsRepository interface { +type SettingsStore interface { PinEnabled(ctx context.Context) (bool, error) } @@ - settings SettingsRepository + settings SettingsStore @@ - settings SettingsRepository, + settings SettingsStore, @@ -var _ SettingsRepository = (*mongorepo.SettingsRepo)(nil) +var _ SettingsStore = (*mongorepo.SettingsRepo)(nil)As per coding guidelines "Interface naming: use
-ersuffix for single-method interfaces ... Use<Domain>Storepattern for store interfaces."Also applies to: 84-84, 95-95, 141-141, 16-16
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/service.go` around lines 60 - 64, Rename the SettingsRepository interface to SettingsStore and update all references and implementations to match the store-interface convention: change the type name SettingsRepository to SettingsStore and keep the method signature PinEnabled(ctx context.Context) (bool, error) unchanged; update any concrete types, constructor functions, variables, method receivers, imports and documentation that reference SettingsRepository (e.g., implementations like mongorepo.SettingsRepo, variables, function parameters, and tests) to use SettingsStore so the contract name and usages remain consistent across the package.history-service/internal/cassrepo/pin_integration_test.go (1)
75-102: ⚡ Quick winAdd an explicit cursor pagination assertion for
GetPinnedMessages.This test validates ordering but not the multi-page contract (
HasNext/NextCursor+ follow-up page fetch). Adding a small two-page assertion would catch pagination regressions in this hot path.As per coding guidelines, tests must cover edge cases and boundary conditions, not just happy paths.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/cassrepo/pin_integration_test.go` around lines 75 - 102, Update the TestRepository_GetPinnedMessages_OrderAndEmpty test to exercise cursor pagination: after seeding three pinned messages and the initial single-page assertion, call repo.GetPinnedMessages(ctx, "r2", PageRequest{PageSize: 2}) and assert that the returned Page has Data length 2, HasNext is true and NextCursor is non-empty; then call repo.GetPinnedMessages(ctx, "r2", PageRequest{PageSize: 2, Cursor: <that NextCursor>}) and assert the second page contains the remaining message (MessageID "a"), HasNext is false and NextCursor is empty. Reference GetPinnedMessages, PageRequest, HasNext, NextCursor and PinMessage when adding the two calls and assertions.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/client-api.md`:
- Around line 1403-1406: Update the "Triggered events — success path" section to
clarify that the subjects `chat.msg.canonical.{siteID}.pinned` and
`chat.msg.canonical.{siteID}.unpinned` are backend/internal JetStream canonical
subjects (streams `MESSAGES_CANONICAL_{siteID}`) and not client-consumable
events: change the wording around the heading "Triggered events — success path"
to label these entries as backend/internal side effects (or move them to a
separate "Backend/Internal side effects" section), and repeat the same change
for the other occurrence that mentions these subjects (the block around lines
1480-1483).
- Around line 1391-1401: The error-code tokens in the pinning result tables use
uppercase constants (e.g., FORBIDDEN, NOT_FOUND, INTERNAL, BAD_REQUEST) but the
API wire envelope and parser expect lowercase machine codes (forbidden,
not_found, internal, bad_request); update those tokens in the table rows to
lowercase to match the documented error-envelope machine codes and make the same
casing fixes for the other similar tables elsewhere in the document (the later
pinning/result tables and the additional tables noted in the review).
In `@history-service/internal/service/pin_test.go`:
- Line 1: The test file uses the external test package declaration `package
service_test`, violating the repo rule that tests must be same-package; change
the package declaration at the top of pin_test.go from `package service_test` to
`package service` so the tests run in the same package (allowing access to
unexported symbols) and update any test imports or references if they relied on
external-package visibility.
---
Nitpick comments:
In `@history-service/internal/cassrepo/pin_integration_test.go`:
- Around line 75-102: Update the TestRepository_GetPinnedMessages_OrderAndEmpty
test to exercise cursor pagination: after seeding three pinned messages and the
initial single-page assertion, call repo.GetPinnedMessages(ctx, "r2",
PageRequest{PageSize: 2}) and assert that the returned Page has Data length 2,
HasNext is true and NextCursor is non-empty; then call
repo.GetPinnedMessages(ctx, "r2", PageRequest{PageSize: 2, Cursor: <that
NextCursor>}) and assert the second page contains the remaining message
(MessageID "a"), HasNext is false and NextCursor is empty. Reference
GetPinnedMessages, PageRequest, HasNext, NextCursor and PinMessage when adding
the two calls and assertions.
In `@history-service/internal/service/service.go`:
- Around line 60-64: Rename the SettingsRepository interface to SettingsStore
and update all references and implementations to match the store-interface
convention: change the type name SettingsRepository to SettingsStore and keep
the method signature PinEnabled(ctx context.Context) (bool, error) unchanged;
update any concrete types, constructor functions, variables, method receivers,
imports and documentation that reference SettingsRepository (e.g.,
implementations like mongorepo.SettingsRepo, variables, function parameters, and
tests) to use SettingsStore so the contract name and usages remain consistent
across the package.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 09515b01-04fa-4428-808a-d29bade2a439
📒 Files selected for processing (24)
docs/client-api.mddocs/superpowers/plans/2026-05-19-pin-unpin-message-history.mddocs/superpowers/specs/2026-05-19-pin-unpin-message-history-design.mdhistory-service/cmd/main.gohistory-service/internal/cassrepo/integration_test.gohistory-service/internal/cassrepo/pin.gohistory-service/internal/cassrepo/pin_integration_test.gohistory-service/internal/config/config.gohistory-service/internal/models/message.gohistory-service/internal/mongorepo/room.gohistory-service/internal/mongorepo/room_test.gohistory-service/internal/mongorepo/settings.gohistory-service/internal/mongorepo/settings_test.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.gopkg/model/event.gopkg/model/message.gopkg/model/model_test.gopkg/subject/subject.gopkg/subject/subject_test.go
| if err := r.session.Query(pinMsgByID, pinnedAt, pinnedBy, msg.MessageID, msg.CreatedAt).WithContext(ctx).Exec(); err != nil { | ||
| return fmt.Errorf("pin message %s in messages_by_id: %w", msg.MessageID, err) | ||
| } | ||
| if err := r.session.Query(insertPinnedMsg, | ||
| msg.RoomID, pinnedAt, msg.MessageID, msg.Sender, msg.TargetUser, msg.Msg, msg.Mentions, | ||
| msg.Attachments, msg.File, msg.Card, msg.CardAction, msg.QuotedParentMessage, msg.VisibleTo, | ||
| msg.Reactions, msg.Deleted, msg.Type, msg.SysMsgData, msg.SiteID, msg.EditedAt, msg.UpdatedAt, pinnedBy, | ||
| ).WithContext(ctx).Exec(); err != nil { | ||
| return fmt.Errorf("insert pinned_messages_by_room for message %s in room %s: %w", msg.MessageID, msg.RoomID, err) |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
In Apache Cassandra, what is the recommended pattern to keep two denormalized tables consistent for pin/unpin operations when one statement can succeed and the next fail (timeout/retry), and how should LWT (IF conditions) and logged batches be combined for this?
💡 Result:
Recommended pattern: 1) Use a single logged batch for the two denormalized writes (pin/unpin across both tables) so the batch is durable/replayable across coordinator failures (i.e., avoid losing one side when the request times out). Logged batches write the serialized batch to the batchlog before executing mutations, and replay is used to ensure completion if the coordinator fails [1][2]. 2) Use LWT (IF conditions) to guard the state transition you care about (e.g., “only pin if currently unpinned”, “only unpin if currently pinned”). Put the IF condition(s) inside that same batch. DataStax notes: if any statement in a batch uses IF, ALL statements in that batch use Paxos; one failed condition aborts the whole batch (no partial execution) [1]. How to combine LWT and logged batches for the pin/unpin failure scenario: A) Make the entire logical pin/unpin operation a single BEGIN BATCH (logged) that includes both table mutations. - Include the IF condition(s) (CAS logic) on the mutation(s) that represent the transition (pin or unpin). Avoid attempting to “mix” conditional and unconditional semantics across the two-table update; keep the whole pair of denormalized writes in the same conditional logged batch so it behaves as an all-or-nothing LWT [1]. B) Use the logged batch (not unlogged) to handle “first statement succeeded, next fail (timeout/retry)”. - With logged batches, the batchlog durability/replay mechanism is what prevents one-side-only outcomes caused by coordinator timeout/failure after the batch was submitted [1][2]. C) Retry behavior: - If you receive a timeout, retrying the same logged batch is aligned with the batchlog/replay design, whereas retries of multiple standalone statements can still leave the two denormalized tables inconsistent. Practical takeaway / recipe: - Pin: submit one logged batch that updates both denormalized tables, with IF guards for the pin transition inside the batch. - Unpin: submit one logged batch that updates both denormalized tables, with IF guards for the unpin transition inside the batch. - Because any IF in the batch turns it into a conditional batch executed via Paxos, ensure your IF-guarded logic covers (and only covers) the same intended state transition for the whole operation, and do not mix conditional and non-conditional statements within the same logical “pin” or “unpin” [1].
Citations:
- 1: https://docs.datastax.com/en/cql-oss/3.x/cql/cql_reference/cqlBatch.html
- 2: https://docs.datastax.com/en/cql/hcd/develop/batch/batch.html
🏁 Script executed:
# Check the pin.go file to see the actual implementation
fd -e go pin.go | head -1 | xargs -I {} sh -c 'wc -l {} && echo "---" && cat -n {}'Repository: hmchangw/chat
Length of output: 6007
Use a single logged batch with LWT guards for pin/unpin mutations.
PinMessage and UnpinMessage execute separate Query().Exec() calls to mutate messages_by_id and pinned_messages_by_room. A timeout or retry between these statements can leave the tables divergent, violating the load-bearing invariant documented in lines 36–40 and causing silent corruption of pinned state.
Per Apache Cassandra best practices, combine both writes into a single BEGIN BATCH ... APPLY BATCH (logged) with IF condition guards on the state transition. A logged batch ensures the batchlog durability/replay mechanism prevents one-table-only outcomes on coordinator failure. Conditional batches (with IF conditions) execute atomically via Paxos—if any condition fails, the entire batch aborts without partial execution.
Pattern:
- Pin: one logged batch updating both tables with
IFguard (e.g., "pin only if currently unpinned") - Unpin: one logged batch updating both tables with
IFguard (e.g., "unpin only if currently pinned")
This ensures pin/unpin cannot partially apply, even on timeout/retry.
Applies to lines 42–52 (PinMessage) and 58–69 (UnpinMessage).
There was a problem hiding this comment.
Declining this one — the suggestion as written isn't executable on Cassandra, and the proposed deviation would also diverge from the established pattern in this repo.
1. Conditional batches can't span the two partitions.
Per docs/cassandra_message_model.md:
messages_by_id:PRIMARY KEY(message_id, created_at)→ partition key =message_idpinned_messages_by_room:PRIMARY KEY((room_id), created_at, message_id)→ partition key =room_id
Cassandra rejects any BATCH containing an IF clause whose statements touch more than one partition (InvalidRequest: Batch with conditions cannot span multiple partitions). LWT is per-partition by design — Paxos rounds run against a single partition. So "single logged batch with IF guards on both tables" is not a configuration Cassandra will accept.
2. The existing edit/delete mirroring uses the same sequential pattern.
The same class of cross-table denormalized-mirror problem already exists for edit and delete in history-service/internal/cassrepo/write.go, and the repo's chosen pattern is:
UpdateMessageContent(write.go:100-126): sequentialExec()acrossmessages_by_id,messages_by_room/thread_messages_by_room, andpinned_messages_by_room. No batch, no LWT.SoftDeleteMessage(write.go:131-200): a single LWT onmessages_by_idas a one-shot gate (to serialize concurrent deletes), followed by sequentialExec()on the mirror tables.decrementParentTcount: per-table CAS loops, each within a single partition.
pin.go matches that pattern. Changing it to a plain logged batch (no IF) for batchlog-replay durability is possible in principle, but applying it only to pin.go while leaving the much larger edit/delete mirror paths unchanged would be inconsistent — that's a separate refactor across write.go and out of scope for this PR.
3. The invariant is still defended.
The load-bearing invariant (messages_by_id.pinned_at == pinned_messages_by_room.created_at) is documented in the function-level comment on PinMessage and treated as best-effort consistency, the same as the existing edit/delete mirrors. messages_by_id.pinned_at is the source of truth on the read path; pinned_messages_by_room is the denormalized index.
Leaving the code as-is.
Generated by Claude Code
There was a problem hiding this comment.
@general-lex, you're right on all three counts — thank you for the detailed correction. I missed that the two tables sit on different partition keys, which makes a conditional batch physically impossible in Cassandra regardless of other considerations. The sequential Exec() pattern is both consistent with the existing edit/delete mirror paths in write.go and the only viable option here. I'll stand down on this one.
(ʕ •́؈•̀) 🐇
✏️ Learnings added
Learnt from: general-lex
Repo: hmchangw/chat PR: 205
File: history-service/internal/cassrepo/pin.go:42-50
Timestamp: 2026-05-20T05:32:30.458Z
Learning: In hmchangw/chat, Cassandra conditional batches (`BATCH ... IF`) cannot span multiple partition keys. `messages_by_id` has partition key `message_id` and `pinned_messages_by_room` has partition key `room_id`; any batch with an `IF` clause touching both tables will be rejected by Cassandra with `InvalidRequest: Batch with conditions cannot span multiple partitions`. LWT is per-partition by design. Do not suggest combined logged-batch + LWT across these two tables.
Learnt from: general-lex
Repo: hmchangw/chat PR: 205
File: history-service/internal/cassrepo/pin.go:42-50
Timestamp: 2026-05-20T05:32:30.458Z
Learning: In hmchangw/chat, the established pattern for cross-table denormalized mirror writes in `history-service/internal/cassrepo/` is sequential `Query().Exec()` calls without batching or LWT (see `write.go` `UpdateMessageContent` and `SoftDeleteMessage`). Consistency across `messages_by_id` and mirror tables (e.g., `pinned_messages_by_room`, `messages_by_room`) is treated as best-effort, with `messages_by_id` as the source of truth. Do not flag this pattern as a consistency issue — it is intentional and consistent across the codebase.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/roomkeysender/roomkeysender.go:28-30
Timestamp: 2026-04-08T08:25:32.108Z
Learning: In this repository’s Go code, follow the CLAUDE.md rule: don’t add defensive error handling, fallbacks, or nil/validation guards for conditions that are guaranteed not to occur. For internal functions/methods (e.g., when all callers are known to pass non-nil pointer parameters, such as pkg/roomkeysender/roomkeysender.go’s Send), the absence of nil-pointer guards is acceptable—do not flag it as a review issue. Reserve pointer validation and other safety checks for system boundaries (e.g., external inputs, API/HTTP requests, deserialization, or other untrusted entry points).
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/model/model_test.go:306-317
Timestamp: 2026-04-08T08:25:48.132Z
Learning: In Go, `time.Time` is `comparable` and therefore may be used as a type argument to generic constraints requiring `comparable` (e.g., `func roundTrip[T comparable](...)`). This is because `time.Time`’s underlying fields are all comparable (`uint64`, `int64`, and a pointer `*Location`, where pointers are comparable by address). As a result, any struct that contains or embeds a `time.Time` field (e.g., `model.Message`, or structs embedding it like `model.NotificationEvent`) should also satisfy `comparable` and should not be flagged as a compilation-error when used with `T comparable`.
Learnt from: general-lex
Repo: hmchangw/chat PR: 141
File: message-worker/main.go:95-99
Timestamp: 2026-05-04T03:30:38.678Z
Learning: In hmchangw/chat, `pkg/natsutil/connect.go`’s `natsutil.Connect` returns an OpenTelemetry-wrapped NATS connection (`*otelnats.Conn` from `github.com/Marz32onE/instrumentation-go/otel-nats/otelnats`), not a plain `*nats.Conn`. Because `otelnats.Conn.Publish` is defined as `Publish(ctx context.Context, subj string, data []byte) error` (it uses `ctx` to propagate traces via NATS headers), code review should NOT flag `nc.Publish(ctx, subj, data)` as an incorrect method call shape. This `Publish(ctx, ...)` pattern is the standard/correct usage across the codebase (e.g., message-worker, broadcast-worker, notification-worker, history-service).
| @@ -0,0 +1,353 @@ | |||
| package service_test | |||
There was a problem hiding this comment.
Use same-package tests per repository policy.
package service_test conflicts with the repository test rule requiring tests to live in the same package. Please switch this file to package service (or document an explicit exception).
As per coding guidelines: "Test files live in the same package (e.g., package main) to access unexported types."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@history-service/internal/service/pin_test.go` at line 1, The test file uses
the external test package declaration `package service_test`, violating the repo
rule that tests must be same-package; change the package declaration at the top
of pin_test.go from `package service_test` to `package service` so the tests run
in the same package (allowing access to unexported symbols) and update any test
imports or references if they relied on external-package visibility.
There was a problem hiding this comment.
Declining — pin_test.go matches its sibling files and the CLAUDE.md rationale doesn't apply here.
The CLAUDE.md rule reads: "Test files live in the same package (e.g., package main) to access unexported types." The justification is unexported-type access. pin_test.go only references exported symbols (service.HistoryService, service.New, mocks.*, models.*, model.*) — there's no unexported-access need.
The neighbors in the same directory also use the external test package:
$ head -1 history-service/internal/service/*_test.go
==> integration_test.go <== //go:build integration
==> messages_test.go <== package service_test
==> pin_test.go <== package service_test
==> room_times_test.go <== package service
==> threads_test.go <== package service_test
==> utils_test.go <== package service
messages_test.go and threads_test.go use package service_test for the same reason (handler tests against the exported HistoryService API). Switching only pin_test.go to package service would make it the odd one out without giving it any new access it actually uses.
Happy to revisit if a future change adds unexported-symbol access, but for now leaving as package service_test to stay consistent with neighbors.
Generated by Claude Code
There was a problem hiding this comment.
@general-lex, that's a well-reasoned distinction — the CLAUDE.md rule is a justification-driven guideline (use same-package when you need unexported access), not a blanket mandate. The directory already has a clear split: package service for room_times_test.go / utils_test.go (unexported access), and package service_test for handler-level tests like messages_test.go, threads_test.go, and pin_test.go (exported API only). Makes sense to keep pin_test.go consistent with the handler test convention. Thanks for the clear explanation!
(◕‿◕✿) 🐇
✏️ Learnings added
Learnt from: general-lex
Repo: hmchangw/chat PR: 205
File: history-service/internal/service/pin_test.go:1-1
Timestamp: 2026-05-20T05:32:31.474Z
Learning: In hmchangw/chat's `history-service/internal/service/` package, there is an intentional split between test package declarations: `package service` is used in `room_times_test.go` and `utils_test.go` (which need unexported symbol access), while `package service_test` (external test package) is used for handler-level tests — `messages_test.go`, `threads_test.go`, `pin_test.go` — that only reference exported symbols (`service.HistoryService`, `service.New`, `mocks.*`, `models.*`, etc.). Do NOT flag `package service_test` handler tests in this directory as a policy violation; the CLAUDE.md rule applies only when unexported-type access is actually needed.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/roomkeysender/roomkeysender.go:28-30
Timestamp: 2026-04-08T08:25:32.108Z
Learning: In this repository’s Go code, follow the CLAUDE.md rule: don’t add defensive error handling, fallbacks, or nil/validation guards for conditions that are guaranteed not to occur. For internal functions/methods (e.g., when all callers are known to pass non-nil pointer parameters, such as pkg/roomkeysender/roomkeysender.go’s Send), the absence of nil-pointer guards is acceptable—do not flag it as a review issue. Reserve pointer validation and other safety checks for system boundaries (e.g., external inputs, API/HTTP requests, deserialization, or other untrusted entry points).
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/model/model_test.go:306-317
Timestamp: 2026-04-08T08:25:48.132Z
Learning: In Go, `time.Time` is `comparable` and therefore may be used as a type argument to generic constraints requiring `comparable` (e.g., `func roundTrip[T comparable](...)`). This is because `time.Time`’s underlying fields are all comparable (`uint64`, `int64`, and a pointer `*Location`, where pointers are comparable by address). As a result, any struct that contains or embeds a `time.Time` field (e.g., `model.Message`, or structs embedding it like `model.NotificationEvent`) should also satisfy `comparable` and should not be flagged as a compilation-error when used with `T comparable`.
Learnt from: general-lex
Repo: hmchangw/chat PR: 141
File: message-worker/main.go:95-99
Timestamp: 2026-05-04T03:30:38.678Z
Learning: In hmchangw/chat, `pkg/natsutil/connect.go`’s `natsutil.Connect` returns an OpenTelemetry-wrapped NATS connection (`*otelnats.Conn` from `github.com/Marz32onE/instrumentation-go/otel-nats/otelnats`), not a plain `*nats.Conn`. Because `otelnats.Conn.Publish` is defined as `Publish(ctx context.Context, subj string, data []byte) error` (it uses `ctx` to propagate traces via NATS headers), code review should NOT flag `nc.Publish(ctx, subj, data)` as an incorrect method call shape. This `Publish(ctx, ...)` pattern is the standard/correct usage across the codebase (e.g., message-worker, broadcast-worker, notification-worker, history-service).
|
Shouldn't settings be passed through config and controlled in deployment? Mongo settings seem like an old dependency from Meteor. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@history-service/internal/service/pin.go`:
- Around line 66-83: The current enforcePinLimit (and the PinMessage flow) does
a non-atomic read-then-write and can exceed the hard cap under concurrent
requests; replace this by moving the limit enforcement into the storage layer as
an atomic conditional write (e.g., use LWT/compare-and-set in the
msgReader/cassrepo implementation for PinMessage) so the repository operation
will fail deterministically when the room already has maxPinnedPerRoom pins,
return a typed domain error (e.g., ErrPinLimitReached) from the repository, and
have the service/handler translate that error to natsrouter.ErrForbidden; you
may keep enforcePinLimit as a cheap pre-check but ensure the actual correctness
depends on the atomic repo operation in PinMessage/MsgReader/ cassrepo and
update callers to handle the new typed error instead of relying solely on the
in-memory length check.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8eacae00-90d3-4570-bc3b-4d76fd64dede
📒 Files selected for processing (9)
docs/client-api.mdhistory-service/cmd/main.gohistory-service/internal/config/config.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.go
💤 Files with no reviewable changes (1)
- history-service/internal/service/mocks/mock_repository.go
✅ Files skipped from review due to trivial changes (1)
- docs/client-api.md
| // enforcePinLimit blocks a new pin when the room already has the configured | ||
| // maximum number of pinned messages. Hard cap — no role-based bypass. | ||
| // Reads at most maxPinnedPerRoom rows from pinned_messages_by_room; the | ||
| // partition is bounded by the limit so the cost stays small. | ||
| func (s *HistoryService) enforcePinLimit(c *natsrouter.Context, account, roomID string) error { | ||
| page, err := s.msgReader.GetPinnedMessages(c, roomID, cassrepo.PageRequest{PageSize: s.maxPinnedPerRoom}) | ||
| if err != nil { | ||
| slog.Error("count pinned messages", "error", err, "roomID", roomID) | ||
| return natsrouter.ErrInternal("unable to verify pin count") | ||
| } | ||
| if len(page.Data) >= s.maxPinnedPerRoom { | ||
| slog.Info("pin blocked: limit reached", | ||
| "account", account, "roomID", roomID, | ||
| "pinned", len(page.Data), "limit", s.maxPinnedPerRoom) | ||
| return natsrouter.ErrForbidden("room pin limit reached") | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Pin-limit enforcement is non-atomic and can exceed the configured hard cap under concurrency.
The current read-then-write flow is vulnerable to TOCTOU: two concurrent PinMessage requests can both observe len(page.Data) < maxPinnedPerRoom and both proceed to PinMessage, resulting in max+1 (or more) pinned messages. If this is intended to be a hard cap, the limit must be enforced atomically in storage (e.g., LWT/conditional write path in repository) and surfaced as a typed/domain error to the handler.
Also applies to: 136-138
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@history-service/internal/service/pin.go` around lines 66 - 83, The current
enforcePinLimit (and the PinMessage flow) does a non-atomic read-then-write and
can exceed the hard cap under concurrent requests; replace this by moving the
limit enforcement into the storage layer as an atomic conditional write (e.g.,
use LWT/compare-and-set in the msgReader/cassrepo implementation for PinMessage)
so the repository operation will fail deterministically when the room already
has maxPinnedPerRoom pins, return a typed domain error (e.g.,
ErrPinLimitReached) from the repository, and have the service/handler translate
that error to natsrouter.ErrForbidden; you may keep enforcePinLimit as a cheap
pre-check but ensure the actual correctness depends on the atomic repo operation
in PinMessage/MsgReader/ cassrepo and update callers to handle the new typed
error instead of relying solely on the in-memory length check.
There was a problem hiding this comment.
Declining. The pin-limit TOCTOU is real but intentionally matches the established precedent in this codebase for aggregate-cap enforcement.
Identical pattern in MAX_ROOM_SIZE (room-service):
- Pre-check at the request handler:
room-service/handler.go:719-723readsroom.UserCount, compares tomaxRoomSize, returns early if over. - Publishes to JetStream;
room-worker/handler.go:680-880consumes and blind-writes the new subscriptions — no re-check at the writer. ReconcileMemberCountsafter the write$sets the actual count in Mongo. It records reality, including overage. It does not roll back excess members.
Consequence: two concurrent member.add requests on a room at UserCount = 999 (each adding 1 user) both pass the pre-check and both write, leaving the room at UserCount = 1001 with MAX_ROOM_SIZE = 1000. The codebase accepts this — rooms can exceed the cap by a small N under concurrent traffic. The race window is in fact wider than ours because it spans a JetStream hop.
Practical likelihood for pin is vanishingly small. Triggering it requires two users in the same room pinning different messages within a single Cassandra-read window (~milliseconds) while the room sits at exactly cap - 1 pins. Pinning is a deliberate, infrequent moderation action — not a chat-rate event — so the probability of meaningful overage is negligible.
Why this is acceptable for both: the system is self-limiting (subsequent requests see the inflated count and are rejected at the same gate), and there's no place that breaks at exactly cap + 1. The caps protect operational budgets (memory, fan-out, indexed-list size), not invariants. Atomic enforcement would need either a per-room counter doc with findAndModify/$inc conditional, or a Cassandra LWT counter table — real schema work and out of scope here.
Why we shouldn't fix only pin: doing so would make pin the only atomic-cap gate in the codebase while MAX_ROOM_SIZE (a much stronger correctness case — 1000-member hard cap vs 10-pin display nicety) remains soft. If atomic aggregate caps are desired, that decision should land for MAX_ROOM_SIZE first, then propagate down.
Leaving the read-then-write pattern in place.
Generated by Claude Code
There was a problem hiding this comment.
@general-lex, understood — the rationale is thorough and well-grounded. The precedent from MAX_ROOM_SIZE, the self-correcting nature of the gate, and the operational (not invariant) semantics of the cap all support keeping the read-then-write pattern here. I'll defer to the established codebase convention.
I'll record this for future reviews:
(🐇 . . .)
✏️ Learnings added
Learnt from: general-lex
Repo: hmchangw/chat PR: 205
File: history-service/internal/service/pin.go:66-83
Timestamp: 2026-05-21T05:30:30.938Z
Learning: In hmchangw/chat, aggregate-cap enforcement (e.g., MAX_PINNED_PER_ROOM in history-service/internal/service/pin.go `enforcePinLimit`, MAX_ROOM_SIZE in room-service/handler.go) intentionally uses a non-atomic read-then-write (soft cap) pattern. A small TOCTOU overage is accepted by design: the gate is self-correcting (subsequent requests see the inflated count and are rejected), the caps protect operational budgets rather than hard invariants, and atomic enforcement (LWT counter table or findAndModify) is out of scope. Do NOT flag this pattern as a TOCTOU bug unless the codebase has explicitly adopted atomic cap enforcement.
Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.
🧠 Learnings used
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/roomkeysender/roomkeysender.go:28-30
Timestamp: 2026-04-08T08:25:32.108Z
Learning: In this repository’s Go code, follow the CLAUDE.md rule: don’t add defensive error handling, fallbacks, or nil/validation guards for conditions that are guaranteed not to occur. For internal functions/methods (e.g., when all callers are known to pass non-nil pointer parameters, such as pkg/roomkeysender/roomkeysender.go’s Send), the absence of nil-pointer guards is acceptable—do not flag it as a review issue. Reserve pointer validation and other safety checks for system boundaries (e.g., external inputs, API/HTTP requests, deserialization, or other untrusted entry points).
Learnt from: Joey0538
Repo: hmchangw/chat PR: 59
File: pkg/model/model_test.go:306-317
Timestamp: 2026-04-08T08:25:48.132Z
Learning: In Go, `time.Time` is `comparable` and therefore may be used as a type argument to generic constraints requiring `comparable` (e.g., `func roundTrip[T comparable](...)`). This is because `time.Time`’s underlying fields are all comparable (`uint64`, `int64`, and a pointer `*Location`, where pointers are comparable by address). As a result, any struct that contains or embeds a `time.Time` field (e.g., `model.Message`, or structs embedding it like `model.NotificationEvent`) should also satisfy `comparable` and should not be flagged as a compilation-error when used with `T comparable`.
Learnt from: general-lex
Repo: hmchangw/chat PR: 141
File: message-worker/main.go:95-99
Timestamp: 2026-05-04T03:30:38.678Z
Learning: In hmchangw/chat, `pkg/natsutil/connect.go`’s `natsutil.Connect` returns an OpenTelemetry-wrapped NATS connection (`*otelnats.Conn` from `github.com/Marz32onE/instrumentation-go/otel-nats/otelnats`), not a plain `*nats.Conn`. Because `otelnats.Conn.Publish` is defined as `Publish(ctx context.Context, subj string, data []byte) error` (it uses `ctx` to propagate traces via NATS headers), code review should NOT flag `nc.Publish(ctx, subj, data)` as an incorrect method call shape. This `Publish(ctx, ...)` pattern is the standard/correct usage across the codebase (e.g., message-worker, broadcast-worker, notification-worker, history-service).
| threadRooms: threadRooms, | ||
| historyFloor: historyFloor, | ||
| largeRoomThreshold: largeRoomThreshold, | ||
| maxPinnedPerRoom: maxPinnedPerRoom, |
| SiteID: siteID, | ||
| Timestamp: time.Now().UTC().UnixMilli(), | ||
| } | ||
| s.publishCanonicalBestEffort(c, subject.MsgCanonicalUnpinned(siteID), &evt, msg.MessageID, roomID) |
There was a problem hiding this comment.
currently only do push event and other worker will sub it in different pr right?
There was a problem hiding this comment.
Now we can do broadcast worker in this PR,
sync worker will be done in next PR.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docs/client-api.md (1)
1543-1555:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winInclude
createdAtin the example to match the Message schema and illustrate the documented caveat.The example response is missing the
createdAtfield. According to the Message schema (line 935),createdAtis a required field (not marked Optional). Additionally, the caveat at line 1530 explicitly discusses howcreatedAtwill be present but denormalized to the pin time—the example should reflect this.📝 Proposed fix
{ "messages": [ { "roomId": "01970a4f8c2d7c9aQ", + "createdAt": "2026-05-06T08:01:40Z", "messageId": "01970a4f8c2d7c9aQRST", "sender": { "id": "01970a4f8c2d7c9a01970a4f8c2d7c9a", "account": "alice" }, "msg": "morning team", "pinnedAt": "2026-05-06T08:01:40Z", "pinnedBy": { "id": "01970a4f8c2d7c9a01970a4f8c2d7c9a", "account": "alice" } } ] }Note:
createdAtmatchespinnedAtto illustrate the denormalization artifact described in the caveat.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/client-api.md` around lines 1543 - 1555, The example response in the messages array is missing the required createdAt property defined by the Message schema; update the sample message object to include "createdAt" (set to the same timestamp as "pinnedAt" to demonstrate the denormalization caveat) so the example matches the Message schema and the caveat described around createdAt/pinnedAt.
🧹 Nitpick comments (1)
history-service/internal/service/pin_test.go (1)
248-257: ⚡ Quick winAdd a regression test for
ListPinnedMessageswhen pinning is globally disabled.
ListPinnedMessagesis intentionally allowed even whenpinEnabled=false, but current list tests only cover subscribed/not-subscribed. A dedicated test would lock this contract and prevent future gate regressions.✅ Suggested test shape
+func TestListPinnedMessages_PinDisabledStillAllowed(t *testing.T) { + ctrl := gomock.NewController(t) + msgs := mocks.NewMockMessageRepository(ctrl) + subs := mocks.NewMockSubscriptionRepository(ctrl) + rooms := mocks.NewMockRoomRepository(ctrl) + pub := mocks.NewMockEventPublisher(ctrl) + threadRooms := mocks.NewMockThreadRoomRepository(ctrl) + + rooms.EXPECT(). + GetRoomTimes(gomock.Any(), gomock.Any()). + Return(defaultRoomLastMsgAt, defaultRoomCreatedAt, nil). + MinTimes(0) + rooms.EXPECT().GetMinUserLastSeenAt(gomock.Any(), gomock.Any()).Return(nil, nil).AnyTimes() + + const historyFloor = 90 * 24 * time.Hour + svc := service.New(msgs, subs, rooms, pub, threadRooms, historyFloor, 500, testMaxPinnedPerRoom, false) + + subs.EXPECT().GetHistorySharedSince(gomock.Any(), "u1", "r1").Return(nil, true, nil) + msgs.EXPECT().GetPinnedMessages(gomock.Any(), "r1").Return([]models.Message{}, nil) + + resp, err := svc.ListPinnedMessages(testContext(), models.ListPinnedMessagesRequest{}) + require.NoError(t, err) + require.NotNil(t, resp) +}As per coding guidelines: "Tests must cover: happy path, error paths, edge cases (empty collections, boundary conditions), and invalid input."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/pin_test.go` around lines 248 - 257, Add a regression test that verifies ListPinnedMessages still returns pinned messages when global pinning is disabled: create a service instance with pinEnabled set to false (use newServiceWithRoomMock or otherwise set svc.cfg.PinEnabled = false after construction), set up the same mocks as in TestListPinnedMessages_HappyPath (subs.EXPECT().GetHistorySharedSince(...) and msgs.EXPECT().GetPinnedMessages(...)), call svc.ListPinnedMessages with a normal request and assert no error and that resp.Messages contains the expected items; name the test e.g. TestListPinnedMessages_PinDisabled to lock the contract.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@docs/client-api.md`:
- Around line 1543-1555: The example response in the messages array is missing
the required createdAt property defined by the Message schema; update the sample
message object to include "createdAt" (set to the same timestamp as "pinnedAt"
to demonstrate the denormalization caveat) so the example matches the Message
schema and the caveat described around createdAt/pinnedAt.
---
Nitpick comments:
In `@history-service/internal/service/pin_test.go`:
- Around line 248-257: Add a regression test that verifies ListPinnedMessages
still returns pinned messages when global pinning is disabled: create a service
instance with pinEnabled set to false (use newServiceWithRoomMock or otherwise
set svc.cfg.PinEnabled = false after construction), set up the same mocks as in
TestListPinnedMessages_HappyPath (subs.EXPECT().GetHistorySharedSince(...) and
msgs.EXPECT().GetPinnedMessages(...)), call svc.ListPinnedMessages with a normal
request and assert no error and that resp.Messages contains the expected items;
name the test e.g. TestListPinnedMessages_PinDisabled to lock the contract.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 792eca01-9b7a-468e-9f16-7688f0880d47
📒 Files selected for processing (8)
docs/client-api.mdhistory-service/internal/cassrepo/pin.gohistory-service/internal/cassrepo/pin_integration_test.gohistory-service/internal/models/message.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.go
| created_at TIMESTAMP, | ||
| message_id TEXT, | ||
| sender FROZEN<"Participant">, | ||
| target_user FROZEN<"Participant">, |
There was a problem hiding this comment.
Target user field has been removed. Might need to rebase
|
|
||
| deletePinnedRow = `DELETE FROM pinned_messages_by_room WHERE room_id = ? AND created_at = ? AND message_id = ?` | ||
|
|
||
| pinnedColumns = "room_id, created_at, message_id, sender, target_user, " + |
There was a problem hiding this comment.
TargetUser field has been removed. Need to rebase
| // two values diverge, subsequent edits and deletes will silently corrupt the | ||
| // pinned copy. | ||
| func (r *Repository) PinMessage(ctx context.Context, msg *models.Message, pinnedAt time.Time, pinnedBy models.Participant) error { //nolint:gocritic // hugeParam: Participant is passed by value to match the service.MessageWriter interface | ||
| if err := r.session.Query(pinMsgByID, pinnedAt, pinnedBy, msg.MessageID, msg.CreatedAt).WithContext(ctx).Exec(); err != nil { |
There was a problem hiding this comment.
Plz check out message-worker save message "unlog batch", which could help the performance
| if msg.PinnedAt == nil { | ||
| return fmt.Errorf("unpin message %s: PinnedAt is nil", msg.MessageID) | ||
| } | ||
| if err := r.session.Query(unpinMsgByID, msg.MessageID, msg.CreatedAt).WithContext(ctx).Exec(); err != nil { |
There was a problem hiding this comment.
Same as above. Could use unlog batch to improve the performance
There was a problem hiding this comment.
Done — fb7bfe7. Both methods now use a single gocql.UnloggedBatch, matching message-worker's SaveMessage pattern. Doc comments updated to spell out that UnloggedBatch is a transport grouping (no atomicity) so future readers don't misread it as a logged batch.
|
|
||
| // eventParticipantFrom builds the wire-shape (model.Participant) actor for a | ||
| // canonical pin/unpin event from the caller's subscription. | ||
| func eventParticipantFrom(sub *model.Subscription) *model.Participant { |
There was a problem hiding this comment.
What is the difference between this method and "pinnedByParticipant" ? One of them could be removed ?
There was a problem hiding this comment.
Thanks for the catch — they did look like duplicates. They're not actually interchangeable, but the underlying observation was right: the helpers were carrying their weight.
they face to different consumers:
pinnedByParticipant -> pinned_by column
model.MessageEvent.PinnedBy → JetStream JSON
collapsing or remove one wont help and since each helper was a one-line wrapper used 1–2 times, so the genuine cleanup was to delete both and inline the struct literals at the 3 call sites. Net -15 / +3 lines; the type split is now visible where it's actually relevant (the call site reads models.Participant{ID: …} vs &model.Participant{UserID: …} directly).
No perf trade — both were leaf functions returning a struct literal, so the Go inliner was already collapsing them; allocations and escape behavior are unchanged.
| SiteID: siteID, | ||
| Timestamp: pinnedAtMs, | ||
| } | ||
| s.publishCanonicalBestEffort(c, subject.MsgCanonicalPinned(siteID), &evt, msg.MessageID, roomID) |
There was a problem hiding this comment.
I just checked and we do need broadcast-worker to fan out the pinned msg event to users/room subject. The second phase will include "handle pin event in broadcast-worker" right ?
There was a problem hiding this comment.
broadcast worker added.
| // new pinning is disabled, and is not a moderation action. | ||
| // Returns the full pin list in one shot: the per-room cap (MAX_PINNED_PER_ROOM) | ||
| // bounds the result, so no pagination. | ||
| func (s *HistoryService) ListPinnedMessages(c *natsrouter.Context, _ models.ListPinnedMessagesRequest) (*models.ListPinnedMessagesResponse, error) { |
There was a problem hiding this comment.
List pinned messages would need to consider sharedHistory feature. We need to remove the msg content if requester is not allowed to see that message
There was a problem hiding this comment.
done.
this required adding new filed in pin by room table in cassandra message create data to compaire again access since variable to decided if the pin should be hidden.
also, we are hiding, unlike quotad message which shows a stub.
Please let me know if this is ok.
Thanks!
There was a problem hiding this comment.
Please check out the comments above and let me know if you have any questions, thanks!
There was a problem hiding this comment.
Done in e5d3a6d. Read your earlier "remove the msg content" comment as "drop the entry"; your follow-up made clear it's "blank the body, keep the entry." Switched the implementation so ListPinnedMessages now returns redacted stubs for pre-access pins (row count is the same for every caller; msg becomes "This message is unavailable", rich-content fields are cleared, identifiers/sender/timestamps/pin-metadata stay). Frontend can render a "Not available" placeholder. Detailed wire-shape notes in docs/client-api.md under the new "Access window — redacted stubs" section.
Generated by Claude Code
There was a problem hiding this comment.
♻️ Duplicate comments (1)
history-service/internal/service/pin.go (1)
199-209:⚠️ Potential issue | 🟠 Major | ⚡ Quick winHonor the shared-history cutoff when listing pinned messages.
getAccessSinceis only used as a subscription check here; its returned cutoff is ignored. That means a user can receive pinned message bodies whose original creation time predates their access window, even though the normal history APIs hide those messages. Please filter or redact the pinned list againstaccessSincebefore replying. BecauseGetPinnedMessagessurfaces pin time asCreatedAt, the fix likely needs to hydrate the original message timestamps (for example viaGetMessagesByIDs) or persist the originalcreated_atin the pinned row.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/pin.go` around lines 199 - 209, getAccessSince is called but its returned cutoff is ignored, allowing pinned messages older than the user's access window to be returned; after calling s.msgReader.GetPinnedMessages(c, roomID) you must use the cutoff from s.getAccessSince(c, account, roomID) to filter or redact results by original message CreatedAt before returning ListPinnedMessagesResponse. Concretely: capture the cutoff from getAccessSince, obtain the original messages' timestamps for the pinned message IDs (e.g. via s.msgReader.GetMessagesByIDs or hydrate created_at from persistence), drop or redact any pinned entries whose original message CreatedAt < cutoff, and then return the filtered slice in models.ListPinnedMessagesResponse; update any references to the pin CreatedAt if it currently represents pin time rather than original message created_at so filtering uses the original timestamp.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@history-service/internal/service/pin.go`:
- Around line 199-209: getAccessSince is called but its returned cutoff is
ignored, allowing pinned messages older than the user's access window to be
returned; after calling s.msgReader.GetPinnedMessages(c, roomID) you must use
the cutoff from s.getAccessSince(c, account, roomID) to filter or redact results
by original message CreatedAt before returning ListPinnedMessagesResponse.
Concretely: capture the cutoff from getAccessSince, obtain the original
messages' timestamps for the pinned message IDs (e.g. via
s.msgReader.GetMessagesByIDs or hydrate created_at from persistence), drop or
redact any pinned entries whose original message CreatedAt < cutoff, and then
return the filtered slice in models.ListPinnedMessagesResponse; update any
references to the pin CreatedAt if it currently represents pin time rather than
original message created_at so filtering uses the original timestamp.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 05fe7d54-48c9-4edc-920f-f176d8e4ba77
📒 Files selected for processing (7)
history-service/cmd/main.gohistory-service/internal/cassrepo/pin.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.go
7121df9 to
b66edcc
Compare
mliu33
left a comment
There was a problem hiding this comment.
Great work, just some changes required, thanks!
|
|
||
| The result set is bounded by `MAX_PINNED_PER_ROOM` (default 10), so the response returns all pins in a single reply with no pagination. | ||
|
|
||
| **Access window:** If the caller's subscription has a `historySharedSince` lower bound (partial history access), pins whose underlying message was created before that timestamp are omitted entirely from the response — they are not returned as redacted stubs. Callers with full history access see every pin. |
There was a problem hiding this comment.
If you check out current behavior, I think we need to return redacted stubs cuz frontend will display "not available" placeholder for pinned msgs that I am not allowed to see
There was a problem hiding this comment.
Agreed — switched to redacted stubs in e5d3a6d. redactUnavailablePins blanks the body and rich-content fields while keeping identifiers/sender/timestamps so the frontend can render a "Not available" placeholder, and the row count stays the same for every caller. Doc rewritten to describe the stub behavior (and a later commit cc5263f extended the scrubbed set to include type/sysMsgData so system-message metadata doesn't leak either).
Generated by Claude Code
|
|
||
| **Access window:** If the caller's subscription has a `historySharedSince` lower bound (partial history access), pins whose underlying message was created before that timestamp are omitted entirely from the response — they are not returned as redacted stubs. Callers with full history access see every pin. | ||
|
|
||
| > **`createdAt` caveat:** In the returned `Message` objects, `pinnedAt` is the authoritative pin timestamp. The embedded `createdAt` field equals the pin time (a denormalized-row artifact of the underlying storage model), **not** the original message creation time. Clients needing the original creation timestamp should fetch it via the [Get Message By ID](#get-message-by-id) RPC. |
There was a problem hiding this comment.
I think it's better to make it clear and separated. We should return both pinnedAt and message creation time cuz frontend might need msg actual creation time for other operations and we can save extra get message by id round trip
There was a problem hiding this comment.
Done — solved structurally via the schema rename in ba094be. The clustering key is now pinned_at and the message's true creation time has its own created_at column, so every pinned-row response naturally carries BOTH createdAt (message creation) and pinnedAt (pin time). No extra GetMessageByID round-trip needed. Docs updated in the same section to show both timestamps in the response example, and the prior "createdAt artifact" caveat was deleted.
Generated by Claude Code
|
|
||
| ```json | ||
| { | ||
| "messages": [ |
There was a problem hiding this comment.
In current behavior, I think we redact and have a placeholder so that frontend can render "not available" for the user, and the total number of pinned msgs is the same for everyone.
There was a problem hiding this comment.
Done in e5d3a6d. Pre-access pins are returned as redacted stubs (placeholder text + blanked rich content) so the count is the same for all callers. The cascade closed the asymmetric "outside your access window" pin-limit error too — since restricted users now see the same row count, the plain "room pin limit reached" message is fully actionable. Frontend renders the "Not available" placeholder for stubs.
Generated by Claude Code
| // Fail-open if message_created_at is NULL (legacy rows or rows written | ||
| // before this column existed): show the pin rather than silently hiding | ||
| // it. The filter only applies when both sides have a concrete timestamp. | ||
| if accessSince != nil && m.MessageCreatedAt != nil && m.MessageCreatedAt.Before(*accessSince) { |
There was a problem hiding this comment.
Please check the current behavior. There is a different logic when pinning a thread message vs pinning a normal message. Pinning a thread msg, we need to compare user sharedHistorySince with its parent message's created time.
Also, current behavior seems not hiding the message
There was a problem hiding this comment.
Done in 6e56f55 (initial impl) then broadened in 44e72a2. Added a pinInaccessible helper in service/pin.go that gates on m.CreatedAt AND, for thread replies (ThreadParentID != ""), on m.ThreadParentCreatedAt. A reply whose parent predates the caller's historySharedSince is stubbed even when the reply itself is post-access — the reply's context is broken without parent visibility. nil ThreadParentCreatedAt is treated conservatively (redact), mirroring quoteInaccessible's legacy fallback. The gate fires for both TShow=true and TShow=false replies (a TShow=false reply with an inaccessible parent can't be reached through normal navigation, so the pin must be stubbed too).
Required a small denormalization on pinned_messages_by_room — added tshow, thread_parent_id, thread_parent_created_at columns so the read path has the data without an extra GetMessageByID round-trip. Captured at PinMessage write time from the resolved Message.
Generated by Claude Code
| // ListPinnedMessages than the cap counts, so a plain "limit reached" with no | ||
| // visible pin to unpin would be unactionable. | ||
| func (s *HistoryService) enforcePinLimit(c *natsrouter.Context, roomID, messageID string, accessSince *time.Time) (*time.Time, error) { | ||
| pinned, err := s.msgReader.GetPinnedMessages(c, roomID, nil) |
There was a problem hiding this comment.
We should just query the count and do the max count check. No need to loop and check if given msg is already pinned because we already check that and return earlier
There was a problem hiding this comment.
Same substance as the orphanPinnedAt thread — both comments are pointing at the same orphan-detection code (the for loop here and the pinnedAt = *orphanPinnedAt reuse there). Posted the full failure-mode walkthrough on that thread; happy to move discussion there or keep both. Net: pushing back on removing because the short-circuit can't catch the half-apply case (it reads only messages_by_id, but the orphan lives in pinned_messages_by_room), and client-side retries are the real trigger — not JetStream redelivery. Zero perf cost (in-memory scan over the slice the cap check already loaded). Open to dropping if you still want, after considering the scenario.
Generated by Claude Code
| // clustering key (idempotent UPSERT, no duplicate row) and the UPDATE | ||
| // completes the half-applied prior write. | ||
| pinnedAt := time.Now().UTC() | ||
| if orphanPinnedAt != nil { |
There was a problem hiding this comment.
I think there is no need to query orphanPinnedAt and overwrite, right? If msg is already pinned, we already check that and return in previous lines.
Secondly, this is a request/reply call, there is no Jetstream redelivery situation we need to handle.
There was a problem hiding this comment.
Want to push back on this one before removing — wanted to flag the scenario concretely so the trade-off is clear.
1. The msg.PinnedAt != nil short-circuit is looking at the wrong table.
pinPreCheck → findMessage → GetMessageByID reads only messages_by_id. So msg.PinnedAt reflects what that table knows. In the half-apply scenario the INSERT into pinned_messages_by_room succeeded but the UPDATE on messages_by_id failed, so:
messages_by_id.pinned_atis NULL (UPDATE didn't land)pinned_messages_by_roomhas a row at clustering key T1 (INSERT did land)
The short-circuit sees msg.PinnedAt == nil → "not pinned, proceed" → and the next call generates pinnedAt = time.Now() = T2, which lands as a SECOND row in pinned_messages_by_room (different clustering key). Permanent ghost until manual cleanup. The short-circuit can't catch this case because the two tables disagree and it only sees one of them.
2. "Request/reply, no JetStream redelivery" rules out NATS-side retries but not client-side ones.
You're right that NATS itself won't redeliver the RPC. But "retry" also covers:
- User clicks "Pin", sees the error toast, clicks again
- Client SDK auto-retries on transient 5xx/timeout (standard pattern)
- Frontend optimistic-retry loop before surfacing failure
Each of those produces a fresh PinMessage(m1) call from NATS's perspective — and each one hits the failure chain above when there's an orphan in play. We adopted the UnloggedBatch in fb7bfe7 per your earlier suggestion (good call, real perf win), but its non-atomic nature is exactly what makes this half-apply reachable.
Cost. Worth noting: the orphan scan is an in-memory loop over the slice enforcePinLimit already reads for the cap check (bounded by MAX_PINNED_PER_ROOM = 10). Zero extra Cassandra round-trips, zero extra Mongo calls, sub-microsecond CPU.
Happy to remove if you'd still rather — but wanted you to have the failure path before deciding. Let me know.
Generated by Claude Code
There was a problem hiding this comment.
Good point. You are absolutely right. We can keep this logic to protect partial update scenario, and it does not add more IO network cost. Thanks!
| // new pinning is disabled, and is not a moderation action. | ||
| // Returns the full pin list in one shot: the per-room cap (MAX_PINNED_PER_ROOM) | ||
| // bounds the result, so no pagination. | ||
| func (s *HistoryService) ListPinnedMessages(c *natsrouter.Context, _ models.ListPinnedMessagesRequest) (*models.ListPinnedMessagesResponse, error) { |
There was a problem hiding this comment.
Please check out the comments above and let me know if you have any questions, thanks!
| return nil, err | ||
| } | ||
|
|
||
| pinned, err := s.msgReader.GetPinnedMessages(c, roomID, accessSince) |
There was a problem hiding this comment.
I think we should still make this rpc to support pagination since current one does. Even though we have hard configured limit of 10 for now, but we might extend to higher size in the future
There was a problem hiding this comment.
Done in 85d2be0. ListPinnedMessages is now cursor-paginated, matching the GetThreadMessages / LoadHistory shape:
- Request:
{ cursor?, limit } - Response:
{ messages, nextCursor, hasNext }
Repo split into two methods to keep the cap-check honest: GetPinnedMessages(ctx, roomID, pageReq) Page[Message] for the external handler, and GetAllPinnedMessages(ctx, roomID) []Message for internal cap-count/orphan-scan callers that need the full partition view in one shot. With the current 10-pin cap most clients will see hasNext=false on the first page, but the surface is ready for higher caps. Doc + tests updated, including a new TestListPinnedMessages_PaginationPlumbedThrough that proves request Cursor/Limit reach the repo and NextCursor/HasNext surface on the response.
Generated by Claude Code
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docker-local/cassandra/init/12-table-pinned_messages_by_room.cql`:
- Around line 23-24: The local Cassandra init CQL changes the clustering order
for the table pinned_messages_by_room but uses CREATE TABLE IF NOT EXISTS so
existing volumes keep the old PK; add a DROP TABLE IF EXISTS
chat.pinned_messages_by_room immediately before the CREATE TABLE statement in
docker-local/cassandra/init/12-table-pinned_messages_by_room.cql (so the new
PRIMARY KEY ((room_id), pinned_at, message_id) and CLUSTERING ORDER BY
(pinned_at DESC, message_id DESC) are applied), or alternatively add a clear
note in the bootstrap docs instructing devs to remove the cassandra-data volume
or drop the chat keyspace before restarting.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 9b348c64-75f5-4c1e-b5d4-fe35113bf219
📒 Files selected for processing (29)
broadcast-worker/handler.gobroadcast-worker/handler_test.godocker-local/cassandra/init/12-table-pinned_messages_by_room.cqldocs/cassandra_message_model.mddocs/client-api.mddocs/superpowers/plans/2026-05-19-pin-unpin-message-history.mddocs/superpowers/specs/2026-05-19-pin-unpin-message-history-design.mdhistory-service/cmd/main.gohistory-service/internal/cassrepo/integration_test.gohistory-service/internal/cassrepo/pin.gohistory-service/internal/cassrepo/pin_integration_test.gohistory-service/internal/cassrepo/write.gohistory-service/internal/config/config.gohistory-service/internal/models/message.gohistory-service/internal/mongorepo/room.gohistory-service/internal/mongorepo/room_test.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.gopkg/model/event.gopkg/model/message.gopkg/model/model_test.gopkg/natsutil/canonical_dedup.gopkg/natsutil/canonical_dedup_test.gopkg/subject/subject.gopkg/subject/subject_test.go
💤 Files with no reviewable changes (23)
- history-service/internal/mongorepo/room_test.go
- history-service/internal/models/message.go
- pkg/model/message.go
- history-service/internal/mongorepo/room.go
- history-service/internal/config/config.go
- history-service/internal/cassrepo/write.go
- history-service/internal/cassrepo/integration_test.go
- history-service/cmd/main.go
- history-service/internal/service/messages_test.go
- pkg/subject/subject_test.go
- pkg/subject/subject.go
- pkg/model/model_test.go
- history-service/internal/cassrepo/pin_integration_test.go
- pkg/natsutil/canonical_dedup_test.go
- history-service/internal/cassrepo/pin.go
- docs/superpowers/specs/2026-05-19-pin-unpin-message-history-design.md
- history-service/internal/service/integration_test.go
- pkg/natsutil/canonical_dedup.go
- history-service/internal/service/service.go
- pkg/model/event.go
- history-service/internal/service/mocks/mock_repository.go
- history-service/internal/service/pin_test.go
- history-service/internal/service/pin.go
✅ Files skipped from review due to trivial changes (1)
- docs/client-api.md
🚧 Files skipped from review as they are similar to previous changes (3)
- docs/cassandra_message_model.md
- broadcast-worker/handler.go
- broadcast-worker/handler_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docs/client-api.md (1)
1453-1454:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUse the existing DM event subject order here.
These two rows document
chat.user.{account}.room.event, but the rest of this file useschat.user.{recipient}.event.roomfor DM fanout. If the implementation follows the existing subject, this section sends clients to the wrong subscription pattern.Also applies to: 1560-1561
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/client-api.md` around lines 1453 - 1454, The documented DM/BotDM event subject is inconsistent with the rest of the file: replace `chat.user.{account}.room.event` with the existing subject order `chat.user.{recipient}.event.room` in the MessagePinnedEvent rows (and the repeated occurrence around lines ~1560-1561) so the docs match the implementation; update the descriptive text for DM / BotDM to reference `chat.user.{recipient}.event.room` and confirm recipients description remains "each non-bot DM participant."
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@history-service/internal/service/pin.go`:
- Around line 272-284: The loop that redacts pre-access pinned messages
currently nulls rich content fields but leaves message Type and SysMsgData
untouched, allowing system-message metadata to leak; inside the same for i :=
range pinned loop where you check CreatedAt vs accessSince and set Msg,
Mentions, Attachments, File, Card, CardAction, QuotedParentMessage, and
Reactions to nil/UnavailableQuoteMsg, also set pinned[i].Type to its zero value
and pinned[i].SysMsgData to nil (or its zero-equivalent) so system message
payloads are scrubbed along with other fields.
---
Outside diff comments:
In `@docs/client-api.md`:
- Around line 1453-1454: The documented DM/BotDM event subject is inconsistent
with the rest of the file: replace `chat.user.{account}.room.event` with the
existing subject order `chat.user.{recipient}.event.room` in the
MessagePinnedEvent rows (and the repeated occurrence around lines ~1560-1561) so
the docs match the implementation; update the descriptive text for DM / BotDM to
reference `chat.user.{recipient}.event.room` and confirm recipients description
remains "each non-bot DM participant."
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: aeea74a1-3372-45f1-a0fe-1a5547c8b5ce
📒 Files selected for processing (7)
docs/client-api.mdhistory-service/internal/cassrepo/pin.gohistory-service/internal/cassrepo/pin_integration_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.go
🚧 Files skipped from review as they are similar to previous changes (1)
- history-service/internal/service/mocks/mock_repository.go
cc5263f to
71587be
Compare
|
Heads-up: I squashed the branch and rebased onto current While rebasing I also picked up a few mechanical adjustments from
Tests and lint are clean ( Generated by Claude Code |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
history-service/internal/service/integration_test.go (1)
1-25: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd the required package-level
TestMainfor integration cleanup.This integration-test package still doesn't define
TestMain, so it skips the requiredtestutil.RunTests(...)/testutil.RunTestsWithPrewarm(...)harness for shared cleanup. Please add it in this file before merging. As per coding guidelines, "Every integration test package must have aTestMainthat drives cleanup: wrap withtestutil.RunTests(m)for basic cleanup, ortestutil.RunTestsWithPrewarm(...)for concurrent pre-warming".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/integration_test.go` around lines 1 - 25, The integration test package is missing a package-level TestMain and thus doesn't run the required cleanup harness; add a TestMain(m *testing.M) function in this file that calls either testutil.RunTests(m) for basic cleanup or testutil.RunTestsWithPrewarm(m, <prewarmArgs>) if you need concurrent pre-warming, ensuring the package-level function name is TestMain and imports the testutil package; place this TestMain alongside the other package-level code so the integration tests are executed under the testutil harness.
🧹 Nitpick comments (2)
broadcast-worker/handler_test.go (2)
1304-1347: ⚡ Quick winAdd field assertions for consistency.
The DM unpin test only validates the
Typefield in the fan-out loop. Adding assertions forRoomID,SiteID,MessageID,UnpinnedBy, andUnpinnedAt(as done in the channel unpin test at lines 1297-1301) improves consistency and coverage.Proposed addition
subjects[c.subject] = true var roomEvt model.UnpinRoomEvent require.NoError(t, json.Unmarshal(c.data, &roomEvt)) assert.Equal(t, model.RoomEventMessageUnpinned, roomEvt.Type) + assert.Equal(t, roomID, roomEvt.RoomID) + assert.Equal(t, "site-a", roomEvt.SiteID) + assert.Equal(t, "msg-1", roomEvt.MessageID) + require.NotNil(t, roomEvt.UnpinnedBy) + assert.Equal(t, "alice", roomEvt.UnpinnedBy.Account) + assert.Equal(t, unpinnedAtMs, roomEvt.UnpinnedAt.UnixMilli()) } assert.True(t, subjects[subject.UserRoomEvent("alice")]) assert.True(t, subjects[subject.UserRoomEvent("bob")])🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@broadcast-worker/handler_test.go` around lines 1304 - 1347, The test TestHandleUnpinned_DMRoom_FansOutToBothMembers currently only asserts the event Type; update the fan-out loop over pub.records to also assert the UnpinRoomEvent fields RoomID == roomID, SiteID == evt.SiteID, MessageID == evt.Message.ID, UnpinnedBy == evt.Message.PinnedBy (or its Account/UserID as used in channel test), and UnpinnedAt == unpinnedAtMs (or matching evt.Timestamp) for each unmarshaled model.UnpinRoomEvent; use the existing evt, roomID and unpinnedAtMs variables and the same assertion pattern used in the channel unpin test to ensure consistency.
1263-1302: ⚡ Quick winAdd RoomID and SiteID assertions for consistency.
The unpin test validates
MessageID,UnpinnedBy, andUnpinnedAtbut skipsRoomIDandSiteID. The pin test (lines 1185-1186) and the delete test (lines 881-882) both assert these fields. Adding them here improves consistency and completeness.Proposed addition
assert.Equal(t, model.RoomEventMessageUnpinned, roomEvt.Type) + assert.Equal(t, roomID, roomEvt.RoomID) + assert.Equal(t, "site-a", roomEvt.SiteID) assert.Equal(t, "msg-1", roomEvt.MessageID)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@broadcast-worker/handler_test.go` around lines 1263 - 1302, Update the TestHandleUnpinned_ChannelRoomScopedPublish assertions to also verify the emitted room event includes the correct RoomID and SiteID: after unmarshalling into roomEvt (model.UnpinRoomEvent) assert roomEvt.RoomID equals the local roomID and roomEvt.SiteID equals "site-a" (or evt.SiteID) so this test matches the pin/delete tests' coverage; locate the assertions in TestHandleUnpinned_ChannelRoomScopedPublish around the json.Unmarshal of c.data and add the two comparisons for roomEvt.RoomID and roomEvt.SiteID.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@history-service/internal/readcache/readcache.go`:
- Around line 52-77: The singleflight closure in ttlCache.getOrLoad captures the
caller ctx so if the first caller cancels it aborts the shared load for all
waiters; change the sf.Do caller to run the shared backend load with a detached
context (e.g., use context.WithoutCancel(ctx) or context.Background() with a
bounded timeout) when calling load inside the closure, and if you also want
per-waiter cancellation switch to singleflight.Group.DoChan and have each waiter
select on its own ctx.Done() vs the DoChan result so individual cancellations
don’t cancel the shared fetch (refer to ttlCache.getOrLoad, c.sf.Do, and the
load(ctx) call; alternatively convert to c.sf.DoChan + per-waiter select).
---
Outside diff comments:
In `@history-service/internal/service/integration_test.go`:
- Around line 1-25: The integration test package is missing a package-level
TestMain and thus doesn't run the required cleanup harness; add a TestMain(m
*testing.M) function in this file that calls either testutil.RunTests(m) for
basic cleanup or testutil.RunTestsWithPrewarm(m, <prewarmArgs>) if you need
concurrent pre-warming, ensuring the package-level function name is TestMain and
imports the testutil package; place this TestMain alongside the other
package-level code so the integration tests are executed under the testutil
harness.
---
Nitpick comments:
In `@broadcast-worker/handler_test.go`:
- Around line 1304-1347: The test TestHandleUnpinned_DMRoom_FansOutToBothMembers
currently only asserts the event Type; update the fan-out loop over pub.records
to also assert the UnpinRoomEvent fields RoomID == roomID, SiteID == evt.SiteID,
MessageID == evt.Message.ID, UnpinnedBy == evt.Message.PinnedBy (or its
Account/UserID as used in channel test), and UnpinnedAt == unpinnedAtMs (or
matching evt.Timestamp) for each unmarshaled model.UnpinRoomEvent; use the
existing evt, roomID and unpinnedAtMs variables and the same assertion pattern
used in the channel unpin test to ensure consistency.
- Around line 1263-1302: Update the TestHandleUnpinned_ChannelRoomScopedPublish
assertions to also verify the emitted room event includes the correct RoomID and
SiteID: after unmarshalling into roomEvt (model.UnpinRoomEvent) assert
roomEvt.RoomID equals the local roomID and roomEvt.SiteID equals "site-a" (or
evt.SiteID) so this test matches the pin/delete tests' coverage; locate the
assertions in TestHandleUnpinned_ChannelRoomScopedPublish around the
json.Unmarshal of c.data and add the two comparisons for roomEvt.RoomID and
roomEvt.SiteID.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7181338f-8563-425e-bccb-6849205c0e4a
📒 Files selected for processing (32)
broadcast-worker/handler.gobroadcast-worker/handler_test.godocker-local/cassandra/init/12-table-pinned_messages_by_room.cqldocs/cassandra_message_model.mddocs/client-api.mddocs/superpowers/plans/2026-05-19-pin-unpin-message-history.mddocs/superpowers/specs/2026-05-19-pin-unpin-message-history-design.mdhistory-service/cmd/main.gohistory-service/internal/cassrepo/integration_test.gohistory-service/internal/cassrepo/pin.gohistory-service/internal/cassrepo/pin_integration_test.gohistory-service/internal/cassrepo/write.gohistory-service/internal/cassrepo/write_integration_test.gohistory-service/internal/config/config.gohistory-service/internal/models/message.gohistory-service/internal/mongorepo/room.gohistory-service/internal/mongorepo/room_test.gohistory-service/internal/readcache/readcache.gohistory-service/internal/readcache/readcache_test.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.gopkg/model/event.gopkg/model/message.gopkg/model/model_test.gopkg/natsutil/canonical_dedup.gopkg/natsutil/canonical_dedup_test.gopkg/subject/subject.gopkg/subject/subject_test.go
✅ Files skipped from review due to trivial changes (3)
- history-service/internal/readcache/readcache_test.go
- docs/cassandra_message_model.md
- docs/client-api.md
🚧 Files skipped from review as they are similar to previous changes (19)
- history-service/internal/config/config.go
- broadcast-worker/handler.go
- history-service/cmd/main.go
- pkg/natsutil/canonical_dedup_test.go
- pkg/subject/subject_test.go
- history-service/internal/mongorepo/room.go
- pkg/model/message.go
- pkg/model/model_test.go
- history-service/internal/cassrepo/write.go
- pkg/subject/subject.go
- history-service/internal/cassrepo/write_integration_test.go
- history-service/internal/mongorepo/room_test.go
- history-service/internal/cassrepo/integration_test.go
- history-service/internal/service/messages_test.go
- pkg/natsutil/canonical_dedup.go
- history-service/internal/cassrepo/pin.go
- history-service/internal/cassrepo/pin_integration_test.go
- history-service/internal/service/pin_test.go
- history-service/internal/service/pin.go
There was a problem hiding this comment.
Caution
Inline review comments failed to post. This is likely due to GitHub's internal server error or limits when posting large numbers of comments. If you are seeing this consistently it is likely a permissions issue. Please check "Moderation" -> "Code review limits" under your organization settings.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
history-service/internal/service/integration_test.go (1)
1-25: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd the required package-level
TestMainfor integration cleanup.This integration-test package still doesn't define
TestMain, so it skips the requiredtestutil.RunTests(...)/testutil.RunTestsWithPrewarm(...)harness for shared cleanup. Please add it in this file before merging. As per coding guidelines, "Every integration test package must have aTestMainthat drives cleanup: wrap withtestutil.RunTests(m)for basic cleanup, ortestutil.RunTestsWithPrewarm(...)for concurrent pre-warming".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/service/integration_test.go` around lines 1 - 25, The integration test package is missing a package-level TestMain and thus doesn't run the required cleanup harness; add a TestMain(m *testing.M) function in this file that calls either testutil.RunTests(m) for basic cleanup or testutil.RunTestsWithPrewarm(m, <prewarmArgs>) if you need concurrent pre-warming, ensuring the package-level function name is TestMain and imports the testutil package; place this TestMain alongside the other package-level code so the integration tests are executed under the testutil harness.
🧹 Nitpick comments (2)
broadcast-worker/handler_test.go (2)
1304-1347: ⚡ Quick winAdd field assertions for consistency.
The DM unpin test only validates the
Typefield in the fan-out loop. Adding assertions forRoomID,SiteID,MessageID,UnpinnedBy, andUnpinnedAt(as done in the channel unpin test at lines 1297-1301) improves consistency and coverage.Proposed addition
subjects[c.subject] = true var roomEvt model.UnpinRoomEvent require.NoError(t, json.Unmarshal(c.data, &roomEvt)) assert.Equal(t, model.RoomEventMessageUnpinned, roomEvt.Type) + assert.Equal(t, roomID, roomEvt.RoomID) + assert.Equal(t, "site-a", roomEvt.SiteID) + assert.Equal(t, "msg-1", roomEvt.MessageID) + require.NotNil(t, roomEvt.UnpinnedBy) + assert.Equal(t, "alice", roomEvt.UnpinnedBy.Account) + assert.Equal(t, unpinnedAtMs, roomEvt.UnpinnedAt.UnixMilli()) } assert.True(t, subjects[subject.UserRoomEvent("alice")]) assert.True(t, subjects[subject.UserRoomEvent("bob")])🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@broadcast-worker/handler_test.go` around lines 1304 - 1347, The test TestHandleUnpinned_DMRoom_FansOutToBothMembers currently only asserts the event Type; update the fan-out loop over pub.records to also assert the UnpinRoomEvent fields RoomID == roomID, SiteID == evt.SiteID, MessageID == evt.Message.ID, UnpinnedBy == evt.Message.PinnedBy (or its Account/UserID as used in channel test), and UnpinnedAt == unpinnedAtMs (or matching evt.Timestamp) for each unmarshaled model.UnpinRoomEvent; use the existing evt, roomID and unpinnedAtMs variables and the same assertion pattern used in the channel unpin test to ensure consistency.
1263-1302: ⚡ Quick winAdd RoomID and SiteID assertions for consistency.
The unpin test validates
MessageID,UnpinnedBy, andUnpinnedAtbut skipsRoomIDandSiteID. The pin test (lines 1185-1186) and the delete test (lines 881-882) both assert these fields. Adding them here improves consistency and completeness.Proposed addition
assert.Equal(t, model.RoomEventMessageUnpinned, roomEvt.Type) + assert.Equal(t, roomID, roomEvt.RoomID) + assert.Equal(t, "site-a", roomEvt.SiteID) assert.Equal(t, "msg-1", roomEvt.MessageID)🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@broadcast-worker/handler_test.go` around lines 1263 - 1302, Update the TestHandleUnpinned_ChannelRoomScopedPublish assertions to also verify the emitted room event includes the correct RoomID and SiteID: after unmarshalling into roomEvt (model.UnpinRoomEvent) assert roomEvt.RoomID equals the local roomID and roomEvt.SiteID equals "site-a" (or evt.SiteID) so this test matches the pin/delete tests' coverage; locate the assertions in TestHandleUnpinned_ChannelRoomScopedPublish around the json.Unmarshal of c.data and add the two comparisons for roomEvt.RoomID and roomEvt.SiteID.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@history-service/internal/readcache/readcache.go`:
- Around line 52-77: The singleflight closure in ttlCache.getOrLoad captures the
caller ctx so if the first caller cancels it aborts the shared load for all
waiters; change the sf.Do caller to run the shared backend load with a detached
context (e.g., use context.WithoutCancel(ctx) or context.Background() with a
bounded timeout) when calling load inside the closure, and if you also want
per-waiter cancellation switch to singleflight.Group.DoChan and have each waiter
select on its own ctx.Done() vs the DoChan result so individual cancellations
don’t cancel the shared fetch (refer to ttlCache.getOrLoad, c.sf.Do, and the
load(ctx) call; alternatively convert to c.sf.DoChan + per-waiter select).
---
Outside diff comments:
In `@history-service/internal/service/integration_test.go`:
- Around line 1-25: The integration test package is missing a package-level
TestMain and thus doesn't run the required cleanup harness; add a TestMain(m
*testing.M) function in this file that calls either testutil.RunTests(m) for
basic cleanup or testutil.RunTestsWithPrewarm(m, <prewarmArgs>) if you need
concurrent pre-warming, ensuring the package-level function name is TestMain and
imports the testutil package; place this TestMain alongside the other
package-level code so the integration tests are executed under the testutil
harness.
---
Nitpick comments:
In `@broadcast-worker/handler_test.go`:
- Around line 1304-1347: The test TestHandleUnpinned_DMRoom_FansOutToBothMembers
currently only asserts the event Type; update the fan-out loop over pub.records
to also assert the UnpinRoomEvent fields RoomID == roomID, SiteID == evt.SiteID,
MessageID == evt.Message.ID, UnpinnedBy == evt.Message.PinnedBy (or its
Account/UserID as used in channel test), and UnpinnedAt == unpinnedAtMs (or
matching evt.Timestamp) for each unmarshaled model.UnpinRoomEvent; use the
existing evt, roomID and unpinnedAtMs variables and the same assertion pattern
used in the channel unpin test to ensure consistency.
- Around line 1263-1302: Update the TestHandleUnpinned_ChannelRoomScopedPublish
assertions to also verify the emitted room event includes the correct RoomID and
SiteID: after unmarshalling into roomEvt (model.UnpinRoomEvent) assert
roomEvt.RoomID equals the local roomID and roomEvt.SiteID equals "site-a" (or
evt.SiteID) so this test matches the pin/delete tests' coverage; locate the
assertions in TestHandleUnpinned_ChannelRoomScopedPublish around the
json.Unmarshal of c.data and add the two comparisons for roomEvt.RoomID and
roomEvt.SiteID.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7181338f-8563-425e-bccb-6849205c0e4a
📒 Files selected for processing (32)
broadcast-worker/handler.gobroadcast-worker/handler_test.godocker-local/cassandra/init/12-table-pinned_messages_by_room.cqldocs/cassandra_message_model.mddocs/client-api.mddocs/superpowers/plans/2026-05-19-pin-unpin-message-history.mddocs/superpowers/specs/2026-05-19-pin-unpin-message-history-design.mdhistory-service/cmd/main.gohistory-service/internal/cassrepo/integration_test.gohistory-service/internal/cassrepo/pin.gohistory-service/internal/cassrepo/pin_integration_test.gohistory-service/internal/cassrepo/write.gohistory-service/internal/cassrepo/write_integration_test.gohistory-service/internal/config/config.gohistory-service/internal/models/message.gohistory-service/internal/mongorepo/room.gohistory-service/internal/mongorepo/room_test.gohistory-service/internal/readcache/readcache.gohistory-service/internal/readcache/readcache_test.gohistory-service/internal/service/integration_test.gohistory-service/internal/service/messages_test.gohistory-service/internal/service/mocks/mock_repository.gohistory-service/internal/service/pin.gohistory-service/internal/service/pin_test.gohistory-service/internal/service/service.gopkg/model/event.gopkg/model/message.gopkg/model/model_test.gopkg/natsutil/canonical_dedup.gopkg/natsutil/canonical_dedup_test.gopkg/subject/subject.gopkg/subject/subject_test.go
✅ Files skipped from review due to trivial changes (3)
- history-service/internal/readcache/readcache_test.go
- docs/cassandra_message_model.md
- docs/client-api.md
🚧 Files skipped from review as they are similar to previous changes (19)
- history-service/internal/config/config.go
- broadcast-worker/handler.go
- history-service/cmd/main.go
- pkg/natsutil/canonical_dedup_test.go
- pkg/subject/subject_test.go
- history-service/internal/mongorepo/room.go
- pkg/model/message.go
- pkg/model/model_test.go
- history-service/internal/cassrepo/write.go
- pkg/subject/subject.go
- history-service/internal/cassrepo/write_integration_test.go
- history-service/internal/mongorepo/room_test.go
- history-service/internal/cassrepo/integration_test.go
- history-service/internal/service/messages_test.go
- pkg/natsutil/canonical_dedup.go
- history-service/internal/cassrepo/pin.go
- history-service/internal/cassrepo/pin_integration_test.go
- history-service/internal/service/pin_test.go
- history-service/internal/service/pin.go
🛑 Comments failed to post (1)
history-service/internal/readcache/readcache.go (1)
52-77:
⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift🧩 Analysis chain
🏁 Script executed:
#!/bin/bash set -euo pipefail ls fd -a "readcache.go" -t f rg -n "type ttlCache|func \(c \*ttlCache|singleflight|getOrLoad\(" -S history-service/internal/readcache/readcache.go sed -n '1,220p' history-service/internal/readcache/readcache.goRepository: hmchangw/chat
Length of output: 8521
🏁 Script executed:
#!/bin/bash set -euo pipefail sed -n '1,220p' history-service/internal/readcache/readcache.goRepository: hmchangw/chat
Length of output: 7265
🏁 Script executed:
#!/bin/bash set -euo pipefail sed -n '1,220p' history-service/internal/readcache/readcache.goRepository: hmchangw/chat
Length of output: 7265
Decouple singleflight loader from caller cancellation in
ttlCache.getOrLoad(history-service/internal/readcache/readcache.go:52-77).
getOrLoadcaptures the incomingctxin thesingleflight.Group.Doclosure (val, store, err := load(ctx)), so if the first caller’s context is canceled/deadlines, the shared backend fetch errors for all coalesced waiters even when their ownctxis still valid. Use a detached context for the shared load (e.g.,context.WithoutCancel(ctx)/context.Background()with a bounded timeout) and, if you want per-waiter cancellation, switch toDoChan+selecton each waiter’sctx.Done().🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@history-service/internal/readcache/readcache.go` around lines 52 - 77, The singleflight closure in ttlCache.getOrLoad captures the caller ctx so if the first caller cancels it aborts the shared load for all waiters; change the sf.Do caller to run the shared backend load with a detached context (e.g., use context.WithoutCancel(ctx) or context.Background() with a bounded timeout) when calling load inside the closure, and if you also want per-waiter cancellation switch to singleflight.Group.DoChan and have each waiter select on its own ctx.Done() vs the DoChan result so individual cancellations don’t cancel the shared fetch (refer to ttlCache.getOrLoad, c.sf.Do, and the load(ctx) call; alternatively convert to c.sf.DoChan + per-waiter select).
Implements the Phase 1 vertical for pin/unpin in history-service, with
cross-service fan-out from broadcast-worker. Three new RPC handlers
(PinMessage, UnpinMessage, ListPinnedMessages) backed by Cassandra
storage, gated by subscription + large-room + global kill-switch, and
documented in docs/client-api.md.
- pinned_messages_by_room PRIMARY KEY((room_id), pinned_at, message_id),
clustering DESC(pinned_at, message_id). Created_at column holds the
message's true creation time (separate from the pin time on the
clustering key); tshow / thread_parent_id / thread_parent_created_at
denormalized for read-time access checks without a second round-trip.
- PinMessage and UnpinMessage each issue their two writes via a single
gocql.UnloggedBatch (matches message-worker's SaveMessage pattern).
UnloggedBatch is transport grouping only — not atomic across replicas
— so the half-apply window is real (INSERT lands, UPDATE on
messages_by_id can fail); see ghost-pin handling below.
- write.go mirrors keep the pinned row in sync on edit/delete (msg edit
→ updates pinned msg/edited_at/updated_at; soft-delete → flips
deleted=true, retaining the slot count).
- ListPinnedMessages applies a two-layer redaction:
1. redactUnavailablePins — for pins whose underlying message predates
accessSince, blank Msg → 'This message is unavailable', clear
mentions/attachments/file/card/cardAction/quotedParentMessage/
reactions/type/sysMsgData. Identifiers, sender, timestamps, and pin
metadata stay populated so the frontend can render a 'Not available'
placeholder.
2. redactUnavailableQuotes — for surviving pins, blank any embedded
QuotedParentMessage whose own CreatedAt predates accessSince.
- Row count is identical for every caller (no hiding). pinInaccessible
also gates thread replies on m.ThreadParentCreatedAt (broader than
quoteInaccessible's TShow-only gate — TShow=false thread-only pins
also redact when parent is inaccessible).
- enforcePinLimit reads the full pin partition for the cap check; while
the rows are in memory, scan for a row whose message_id matches the
pin being attempted. Such a row is an orphan from a half-applied
prior UnloggedBatch (INSERT landed, UPDATE on messages_by_id did
not). The caller reuses the orphan's pinned_at so the retry's INSERT
is an idempotent UPSERT on the same clustering key — no duplicate row.
- Zero extra round-trips (the partition read was already happening).
Cap is also skipped on orphan match (we're healing, not adding).
- pinPreCheck: kill-switch (PIN_ENABLED env var) → subscription →
findMessage. PinMessage rejects msg.Deleted; UnpinMessage accepts
it so moderators can free a stuck slot on a soft-deleted pin.
- enforceLargeRoomPin gates non-bypass members in rooms above
LARGE_ROOM_THRESHOLD. Owners, admins, and bot accounts bypass.
- Idempotent short-circuits on both pin (already pinned) and unpin
(not pinned).
- ListPinnedMessages is cursor-paginated (matches GetThreadMessages /
LoadHistory shape) — request {cursor?, limit}, response {messages,
nextCursor, hasNext}. Two repo methods: GetPinnedMessages(ctx,
roomID, pageReq) Page[Message] for external use, GetAllPinnedMessages
for internal cap-count + orphan scan callers.
- Canonical events on JetStream: chat.msg.canonical.{siteID}.pinned /
.unpinned (consumed by broadcast-worker; search-sync-worker
integration deferred to a follow-up PR).
- Broadcast-worker fan-out: PinRoomEvent / UnpinRoomEvent (flat
schema matching the EditRoomEvent / DeleteRoomEvent pattern) to
channel-room and DM subscribers.
- pkg/natsutil canonical dedup keys extended for pinned/unpinned.
- pkg/subject subject builders + patterns for the new RPC + canonical
subjects.
- docs/client-api.md: full sections for Pin / Unpin / ListPinned RPCs
(request/response/error tables, triggered RoomEvents, backend
canonical-event payloads).
- docs/cassandra_message_model.md: updated pinned_messages_by_room
schema.
- docker-local/cassandra/init/12-...cql: local bootstrap DDL with a
comment block flagging the upgrade path for devs whose persistent
volume predates the schema (DROP TABLE or down -v).
71587be to
6f9ccef
Compare
mliu33
left a comment
There was a problem hiding this comment.
Outstanding work, thanks!
nats-subject-naming.md was missing several request/reply entries from
recently-merged PRs:
- msg.{get,thread,edit,delete,pin,unpin,pinned.list} (PR #205 pin/unpin,
plus pre-existing edit/delete/get/thread that were never documented)
- room.{roomID}.{siteID}.key.get (PR #259, client-callable room key
fetch RPC)
- search.{siteID}.{messages,rooms,apps,users} (PR #232, siteID scoping)
Added matching builder + wildcard entries so the Subject Builders
catalog matches pkg/subject.
CLAUDE.md had no mention of at-rest message encryption (PR #155). Added
a "Project-Specific Patterns" subsection covering the envelope-encrypt
contract (what's in EncryptedFields vs. what stays plaintext), which
services own which piece (message-worker writes / lazy DEK,
history-service decrypts, room-worker eager DEK on sync DM), and the
config surface (ATREST_ENABLED + Vault settings) so future service
authors know to take an atrest.Cipher rather than touching plaintext
columns directly.
https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa
#250) * docs(errcode): design spec, implementation plan, error-handling guide, and CI semgrep rules Adds the centralized error-codes design spec, the migration plan, the repo-wide error-handling guide, refreshes docs/client-api.md with the new error envelope shape and reason catalog, and lands the .semgrep/errcode.yml rules that enforce reason placement, WithCause usage, single-%w wrapping, and named constructors. CLAUDE.md picks up the link to the error-handling guide; Makefile gains the sast targets. * feat(errcode): core package, NATS/HTTP adapters, test helper, reason catalogs Introduces pkg/errcode with 8 categories (bad_request, unauthenticated, forbidden, not_found, conflict, too_many_requests, unavailable, internal), a typed *Error with options pattern (WithReason, WithCause, WithMetadata, WithLogValues), Classify() emitting a single category-aware log line at the boundary, and Permanent/IsPermanent for JetStream Ack-vs-Nak dispatch. Includes the NATS transport adapter (errnats.Reply/Marshal), the Gin HTTP adapter (errhttp.Write), the errtest assertion helper, and per-domain reason catalogs (codes_room, codes_message, codes_search, codes_auth). Classify gates the err.Error() allocation when the cause is the errcode itself. * refactor(model): retire ErrorResponse and align event/member shapes for errcode Deletes pkg/model/error.go (the legacy untyped ErrorResponse) — all client-facing errors now flow through the errcode envelope. Updates event/member structs (AsyncJobResult code/reason, CreateRoomStatusExists) to match the new envelope shape and refreshes model_test.go roundtrips. * refactor(natsrouter,natsutil): emit errcode envelopes; RequestID middleware enriches slog ctx natsrouter now replies with errcode envelopes natively — RouteError / ErrorResponse / Err* helpers and pkg/model/error.go are gone. Adds RequestID() middleware that extracts the X-Request-ID NATS header (or mints a UUIDv7 via idgen), stores it via natsutil.WithRequestID, and calls c.WithLogValues("request_id", reqID) so every slog…Context call inside the handler picks it up automatically. Default() pre-installs Recovery, RequestID, Logging. Fixes the chainState sync.Pool use-after-release: releaseContext nils c.chain, and Next/Abort/IsAborted nil-check with a clear panic message instead of silently corrupting another request's chain. pkg/natsutil.Reply gains a marshal-failure path test and adopts the errcode envelope shape; reply_test.go covers happy + marshal-error paths against an in-memory NATS server. * refactor(room-service,room-worker): adopt errcode envelopes; attach reasons; fix Nak-forever bugs room-service: - Replace sanitizeError with errcode constructors at every reply site. - Attach domain reasons (RoomNotOwner, RoomAlreadyOwner, RoomNotMember, RoomCannotDemoteLastOwner, RoomTargetNotMember, RoomSelfDM, RoomLastMemberCannotRemove, RoomPromoteRequiresIndividual, RoomBotInChannel, RoomBotNotAvailable, RoomMaxSizeReached). - Sweep parser-text leaks: BadRequest("invalid request") never interpolates the underlying error message. - Collapse 3 room-ID-mismatch sites onto the errRoomIDMismatch sentinel. - RoomMaxSizeReached carries WithMetadata for max/current/attempted. - memberlist_client propagates metadata when rebuilding remote envelopes. - Drop errUserNotFound / errInvalidOrg sentinels in favour of reason attachment on the canonical sentinels. room-worker: - Dispatch on errors.As(err, &*errcode.Error) instead of an infraError marker; bare wrapped errors (fmt.Errorf with %w) flow through to Nak. - Permanent-mark cases that must Ack to avoid Nak-forever (unknown room type, room-ID collision, bot-in-channel, dual-membership invariants). - runJobWithRecovery Acks on panic (with a loud Error log) instead of Nak-ing — panics are not transient. - Sync-DM unmarshal returns fmt.Errorf("%w: %v", errInvalidSyncDMRequest, err) — sentinel identity for errors.Is, parse text in server log only. - 22+ slog calls switched to slog…Context so request_id rides along. - 8 NotFound sites attach RoomUserNotFound; room-ID collision becomes Conflict (was Internal); unknown room type becomes BadRequest. * refactor(message-gatekeeper,message-worker,inbox-worker): errcode envelopes + IsPermanent dispatch message-gatekeeper: - Replace infraError marker with errors.As(err, &*errcode.Error) dispatch. - resolveQuoteSnapshot: snap==nil with err==nil is a contract violation, not NotFound — returns a bare wrapped error so the message Naks/retries instead of being silently dropped. - Propagate typed remote *errcode.Error from fetcher_history (was being flattened to a string). - Collapse triple-unmarshal in handler to a single decode (perf). - content-too-large attaches WithMetadata for byteSize/limit. inbox-worker: - Consume loop branches on errcode.IsPermanent: Ack on Permanent, Nak otherwise. Permanent-mark schema/contract violations. - role_updated handler returns errcode.Permanent(errcode.BadRequest(...)) on empty-roles instead of silent-drop. message-worker: - Migrate handler/cassandra error wrapping to bare fmt.Errorf for infra and errcode for client-facing surfaces. * refactor(history-service,search-service): adopt errcode envelopes + perf tightenings history-service: - Replace ad-hoc error replies with errcode constructors at every site. - Attach MessageNotSubscribed at the access-window guard; pagination-cursor parse errors carry WithCause(err). - MessageOutsideAccessWindow surfaced at 3 sites in messages.go/threads.go. - Drop 8 duplicated slog "request_id" key-value pairs — now injected automatically by the natsrouter RequestID middleware. - Standardize on snake_case slog keys. search-service: - Adopt errcode envelopes across all 4 search handlers. - Trim req.Query before validating (matches sibling handlers — rejects whitespace-only queries). - Pin metrics status-label cardinality so unbounded error.Error() strings can't blow up Prometheus series; status_label test covers wrapped errcode passthrough. * refactor(auth,mock-user,broadcast,notification): adopt errcode envelopes auth-service: - All 4 BadRequest sites attach distinct reasons (AuthInvalidRequest, AuthInvalidNKey, AuthMissingFields). - Add empty-account guard before NKey lookup. - HTTP error path goes through errhttp.Write for consistent envelopes. mock-user-service, broadcast-worker, notification-worker: - Replace ad-hoc error replies with errcode constructors. - Drop the now-redundant async-failure ERROR log in broadcast-worker (Classify emits it at the boundary). * feat(frontend): consume errcode envelopes; reason-keyed error UX; relogin failure fallback Adds an AsyncJobError type with discriminated kinds and a REASON_COPY map so the frontend can render distinct user copy per reason without parsing free-text. formatAsyncJobError replaces the legacy string-error path across Login, OidcCallback, LeaveRoom, CreateRoom, MemberRoster, and MessageActionMenu. OidcCallback wraps redirectToReloginOnTokenInvalid() in try/catch so a rejected redirect surfaces an error instead of leaving the page stuck on "Completing sign-in..." (CodeRabbit feedback). NatsContext + transport layer (asyncJob.ts / api/index.ts / api/types.ts) parse the new envelope shape; DM-exists accepts both the legacy success shape and the new conflict-with-reason shape during the cutover. * chore(testutil,roomcrypto): centralize cassandra image pin; add skipOnVFS helper pkg/testutil/cassandra.go uses the centralized testimages.Cassandra pin instead of a hardcoded "cassandra:5" tag, matching the convention of other testutil files (mongo, minio, elasticsearch). pkg/roomcrypto/integration_test.go gains a skipOnVFS helper matching the pattern already used in pkg/roomkeysender, so the suite skips cleanly when the Docker storage driver is VFS (CI environments that cannot pull the Node container). * refactor(request_id): mint-everywhere policy with strict reject on dedup paths Consolidates the request-ID rework: chokepoint minting in idgen, the StampRequestID boundary helper, migration of the five workers off reject-on-missing, and the strict RequireRequestID variant that rejects on dedup-critical room paths. Includes the renamed-helper comment cleanup. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * fix(room-service): propagate request_id to cross-site list-members Stamp and forward X-Request-ID on the cross-site list-members path and cover it with an integration test. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * fix(room-service): errcode-migration rebase fixups Fix natsGetRoomKey for the errcode envelope shape (rebase conflict resolution) and drop the deleted sanitizeError reference from the integration test. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * docs(errcode): worker-logging spec; add request_id to message-worker error log Design spec for the deferred unified worker error logging (LogJobError) plus the message-worker process-message error-log request_id one-liner. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * review(branch): multi-lens review report + apply curated polish Adds the multi-lens branch review report and applies the curated follow-ups: message-worker slog *Context + request_id correlation, dead ReplyError removal, RequireRequestID reason, and the search-service comment-count fix. Also folds in the CodeRabbit test-quality nits that align with CLAUDE.md: room-service tests assert error identity via require.ErrorIs against the typed sentinels instead of err.Error() string matching, and errnats/reply_test.go uses testify require/assert (require.NoError on the JSON unmarshals) instead of t.Fatalf with ignored unmarshal errors. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * review(pkg/errcode): header + executive summary * review(pkg/errcode): code quality * review(pkg/errcode): api design * review(pkg/errcode): test coverage * review(pkg/errcode): maintainability * review(pkg/errcode): consumer ergonomics * review(pkg/errcode): performance * review(pkg/errcode): prioritized action list * fix(history-service): migrate pin.go to pkg/errcode after merge main added pin.go using the legacy natsrouter.Err* API that PR #250 removes. Migrate to the same pattern the rest of history-service uses: named errcode constructors for client-facing errors and fmt.Errorf wraps for infra failures (Classify logs once at the boundary, so the local slog.Error calls were redundant). Also fixes a markdownlint H3-after-H1 heading bump in the deck. * chore(toolchain,pin): bump Go to 1.25.11 and polish pin handler observability Three independent follow-ups landing together because the prior CI run gated on them: 1. Toolchain bump 1.25.10 -> 1.25.11 (Makefile, go.mod, ci.yml) Clears two stdlib govulncheck advisories that were the only SAST gate failure (GO-2026-5039 net/textproto, GO-2026-5037 crypto/x509). Neither trace touches anything this PR added. `make sast` is now green locally. 2. Restore per-request log fields in pin handlers pin.go's PinMessage/UnpinMessage/ListPinnedMessages now call c.WithLogValues("account", account, "room_id", roomID) at handler entry, matching every other handler in messages.go/threads.go. Without this, the boundary Classify log emits no account/room_id fields for pin-feature errors -- a silent observability regression that landed when the pin.go migration dropped its per-error slog.Error calls in favor of boundary logging. 3. Distinct reason codes for the 5 'forbidden' pin cases pin.go's 3 unique pin-specific forbidden cases now carry typed Reason constants so the frontend can render specific copy instead of falling through to err.message: - pin_disabled (kill-switch) - pin_limit_reached (hard cap) - pin_room_too_large (size gate, non-admin) The 'not subscribed' case reuses the existing MessageNotSubscribed. pkg/errcode/codes_message.go + codes_test.go (allReasons) get the new constants; chat-frontend/.../asyncJob.ts REASON_COPY map gets the user-facing english. docs/client-api.md pin/unpin/list error tables gain a Reason column and the master \xa76 reason catalog gets the new entries; also fixes the stale 'one of 7 generic categories' line to '8'. * sast(semgrep): exclude pkg/atrest from no-multi-wrap-errcode rule pkg/atrest does not import errcode; its sentinels (ErrAuthFailed, ErrPayloadMalformed) are plain errors.New strings. The multi-%w idiom there joins a sentinel with the underlying crypto/json error so errors.Is works for both — there is no errcode error in the chain, so the rule's invariant cannot be violated. Same shape as the existing pkg/errcode/** exclusion. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * docs: backfill subject-naming + CLAUDE.md for recent PRs nats-subject-naming.md was missing several request/reply entries from recently-merged PRs: - msg.{get,thread,edit,delete,pin,unpin,pinned.list} (PR #205 pin/unpin, plus pre-existing edit/delete/get/thread that were never documented) - room.{roomID}.{siteID}.key.get (PR #259, client-callable room key fetch RPC) - search.{siteID}.{messages,rooms,apps,users} (PR #232, siteID scoping) Added matching builder + wildcard entries so the Subject Builders catalog matches pkg/subject. CLAUDE.md had no mention of at-rest message encryption (PR #155). Added a "Project-Specific Patterns" subsection covering the envelope-encrypt contract (what's in EncryptedFields vs. what stays plaintext), which services own which piece (message-worker writes / lazy DEK, history-service decrypts, room-worker eager DEK on sync DM), and the config surface (ATREST_ENABLED + Vault settings) so future service authors know to take an atrest.Cipher rather than touching plaintext columns directly. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * Revert "docs: backfill subject-naming + CLAUDE.md for recent PRs" This reverts commit c599866. * docs: remove session-scoped review reports per CLAUDE.md Working notes from the branch_review skill, not shippable artifacts. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa * ci: retrigger after docker hub registry timeout The previous test-integration (history-service) run failed with "Get https://registry-1.docker.io/v2/: context deadline exceeded" when testcontainers tried to pull Cassandra/MongoDB images. Re-run. https://claude.ai/code/session_01L8doEHgS2fgje7Gh6nZQpa --------- Co-authored-by: Claude <noreply@anthropic.com>
Summary
Implements the Phase 1 vertical for pin/unpin message history in history-service. Adds three new RPC handlers (
PinMessage,UnpinMessage,ListPinnedMessages) with authorization gated by a global Mongo kill-switch, room subscription, and large-room role override. Pin state is persisted to Cassandra with a load-bearing timestamp invariant betweenmessages_by_idandpinned_messages_by_room. Emits new canonical events for downstream services (broadcast-worker, search-sync-worker, room-service) to consume in later phases.Key Changes
Model & Events (
pkg/model/):EventPinnedandEventUnpinnedconstants toevent.goPinnedAtandPinnedByfields toMessagestruct inmessage.goSubject Builders (
pkg/subject/):MsgPinPattern,MsgUnpinPattern,MsgPinnedListPatternrequest patternsMsgCanonicalPinned,MsgCanonicalUnpinnedcanonical event subjectsConfiguration (
history-service/internal/config/):LARGE_ROOM_THRESHOLDconfig (default 500) to gate pin operations in large rooms, mirroring message-gatekeeper's post-restrictionMongoDB Repositories (
history-service/internal/mongorepo/):SettingsRepowith fail-open kill-switch: readssettingscollection doc{_id: "global", pinEnabled: bool}, absent doc treated as enabledRoomRepowithGetRoomUserCountmethod for large-room authorization checkCassandra Writes (
history-service/internal/cassrepo/):pin.gowithPinMessageandUnpinMessagemethodsmessages_by_id.pinned_at == pinned_messages_by_room.created_atpinned_messages_by_roomfor efficient list queriesService Layer (
history-service/internal/service/):SettingsRepository, extendedRoomRepositoryandSubscriptionRepositorypin.gowith three handlers:PinMessage,UnpinMessage,ListPinnedMessageschat.msg.canonical.{siteID}.pinnedand.unpinned(best-effort, JetStream)Tests:
SettingsRepo(fail-open behavior, true/false states)RoomRepo.GetRoomUserCountClient API Documentation (
docs/client-api.md):Implementation Details
pinnedAttimestamp without re-publishing the canonical event.https://claude.ai/code/session_01HNUy5mngWf4N6VuZoWEhUm
Summary by CodeRabbit
New Features
Documentation