feat(sqs): partition resolver for HT-FIFO routing (Phase 3.D PR 4-B-2)#715
feat(sqs): partition resolver for HT-FIFO routing (Phase 3.D PR 4-B-2)#715
Conversation
Adds the routing-layer half of PR 4-B: a PartitionResolver that ShardRouter consults BEFORE falling through to the byte-range engine. SQS HT-FIFO needs partition-aware dispatch — partition K of queue Q lives on a different Raft group than partition K+1 — but the engine's non-overlapping-cover model cannot express overlay routes without breaking legacy keys (a partition route would leave a lexicographic gap that legacy keys fall into). The resolver-first dispatch sidesteps this: the resolver answers only for partitioned-keyspace keys, returns (0, false) otherwise, and the engine handles everything else exactly as today. What changes - kv.PartitionResolver interface: ResolveGroup([]byte) (uint64, bool). - kv.ShardRouter.WithPartitionResolver: option, nil-safe, idempotent. - kv.ShardRouter.resolveGroup: tries the resolver first, falls through to engine.GetRoute. groupRequests and Get both call this unified path so reads and writes share the same dispatch logic. - kv.ShardedCoordinator.WithPartitionResolver: delegates to the router so main.go can install the resolver via the existing fluent-construction style. - adapter.SQSPartitionResolver: parses (queue, partition) from the partitioned key shape, looks up the operator-chosen group from the runtime config map. Defensive copy at construction; nil-safe ResolveGroup; returns (0, false) for legacy / non-matching keys. - main.go: builds the resolver from runtimeConfig.sqsFifoPartitionMap (canonicalized to numeric uint64 group IDs by parseSQSFifoGroupList) and installs it via WithPartitionResolver. The resolver is nil for a non-partitioned cluster — request hot path stays engine-only. What does NOT change yet - htfifoCapabilityAdvertised stays false. PR 4-B-3 wires the §8 leadership-refusal hook + catalog-polling helper for the CreateQueue capability gate, then flips the constant to true. - Send / Receive partition fanout is still PR 5. PR 5 lifts the PartitionCount > 1 dormancy gate from PR 2 in the same commit that wires the data-plane fanout. Self-review (per CLAUDE.md) 1. Data loss — routing layer only; no FSM/Pebble/retention path. No issue. 2. Concurrency — partitionResolver field is set once at startup before any request lands. ResolveGroup reads from a constructor- time defensive copy, so a hot-reload of --sqsFifoPartitionMap (a future capability) cannot perturb in-flight requests. No issue. 3. Performance — one map lookup + a 4-byte BigEndian decode per resolver hit, which only fires on partitioned-prefix matches. Engine-only path (the common case for non-partitioned clusters) adds a single `if s.partitionResolver != nil` branch — negligible. No issue. 4. Data consistency — the resolver's output strictly OVERRIDES the engine. A partitioned-FIFO key always reaches the operator-chosen partition group; legacy keys still flow through the engine. The resolver's "queue not found" / "partition out of range" branches return (0, false) so the router surfaces an error rather than silently mis-routing. No issue. 5. Test coverage — adapter/sqs_partition_resolver_test.go: 9 top-level tests (nil-on-empty, defensive-copy, partition dispatch across all 5 families, queue-name prefix isolation, legacy fall- through, unknown queue, out-of-range partition, nil receiver, prefix alignment). kv/shard_router_partition_test.go: 4 tests (resolver wins, engine fallthrough for unknown keys, nil resolver no-op, Get path also uses resolver). Existing TestShardRouter* tests unchanged.
|
@claude review Phase 3.D PR 4-B-2: routing-layer resolver for HT-FIFO. |
|
Warning Rate limit exceeded
To keep reviews running without waiting, you can enable usage-based add-on for your organization. This allows additional reviews beyond the hourly cap. Account admins can enable it under billing. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (9)
📝 WalkthroughWalkthroughThe changes introduce partitioned SQS routing by adding a new Changes
Sequence DiagramsequenceDiagram
participant Client
participant ShardRouter
participant PartitionResolver
participant ByteRangeEngine
participant RaftGroup
Client->>ShardRouter: Commit(key, value)
ShardRouter->>ShardRouter: resolveGroup(key)
alt Partition Resolver Installed
ShardRouter->>PartitionResolver: ResolveGroup(key)
alt Partitioned SQS Key
PartitionResolver->>PartitionResolver: Parse queue & partition
PartitionResolver-->>ShardRouter: (groupID, true)
else Unrecognized/Legacy Key
PartitionResolver-->>ShardRouter: (0, false)
end
end
alt Resolver Returned Valid Group
ShardRouter->>RaftGroup: Dispatch to resolved group
else Resolver Returned (0, false) or Not Installed
ShardRouter->>ByteRangeEngine: GetRoute(key)
ByteRangeEngine-->>ShardRouter: groupID from byte range
ShardRouter->>RaftGroup: Dispatch to engine group
end
RaftGroup-->>Client: Response
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Review rate limit: 0/1 reviews remaining, refill in 43 minutes and 38 seconds.Comment |
|
Claude finished @bootjp's task in 6m 32s —— View job Code Review: PR #715 —
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1f48eeb15d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if len(key) == 0 { | ||
| return nil, ErrInvalidRequest | ||
| } | ||
| route, ok := s.engine.GetRoute(key) | ||
| gid, ok := s.resolveGroup(key) |
There was a problem hiding this comment.
Resolve partition routes before normalizing SQS keys
groupRequests passes routeKey(r.Mutations[0].Key) into resolveGroup, but routeKey collapses every !sqs|... key to !sqs|route|global (kv/shard_key.go), so SQSPartitionResolver never sees the partitioned prefixes it parses (!sqs|msg|...|p|...). In practice this makes the new partition resolver a no-op for Commit/Abort (and similarly in Get), causing partitioned HT-FIFO traffic to keep following the engine fallback instead of the configured per-partition group map.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Code Review
This pull request introduces a PartitionResolver interface and a specific implementation for SQS partitioned queues, allowing the ShardRouter to override default byte-range routing for specific keyspaces. The review feedback identifies a critical issue where several methods in ShardedCoordinator still consult the engine directly, which could lead to misrouting for transactions and certain read paths. Additionally, there are suggestions to optimize the SQS resolver by pre-converting family prefixes to byte slices and to improve documentation regarding the thread-safety limitations of installing the resolver at runtime.
| func (c *ShardedCoordinator) WithPartitionResolver(r PartitionResolver) *ShardedCoordinator { | ||
| c.router.WithPartitionResolver(r) | ||
| return c | ||
| } |
There was a problem hiding this comment.
While WithPartitionResolver correctly wires the resolver into the ShardRouter, the ShardedCoordinator itself contains several critical methods that bypass the router and consult c.engine directly for routing decisions. Specifically, groupMutations, engineGroupIDForKey, groupForKey, and routeAndGroupForKey all use c.engine.GetRoute without considering the PartitionResolver.
This will cause transactions (2PC) and certain read paths for partitioned SQS queues to be misrouted to the engine's default group instead of the resolver-chosen group. These methods should be refactored to use c.router.resolveGroup to ensure consistent routing across the entire coordinator.
| var sqsResolverFamilyPrefixes = []string{ | ||
| SqsPartitionedMsgDataPrefix, | ||
| SqsPartitionedMsgVisPrefix, | ||
| SqsPartitionedMsgDedupPrefix, | ||
| SqsPartitionedMsgGroupPrefix, | ||
| SqsPartitionedMsgByAgePrefix, | ||
| } |
There was a problem hiding this comment.
Defining sqsResolverFamilyPrefixes as a slice of strings leads to repeated string-to-byte conversions in stripPartitionedFamilyPrefix on the request hot path. Pre-calculating these as a slice of byte slices would improve performance and avoid unnecessary allocations.
| var sqsResolverFamilyPrefixes = []string{ | |
| SqsPartitionedMsgDataPrefix, | |
| SqsPartitionedMsgVisPrefix, | |
| SqsPartitionedMsgDedupPrefix, | |
| SqsPartitionedMsgGroupPrefix, | |
| SqsPartitionedMsgByAgePrefix, | |
| } | |
| var sqsResolverFamilyPrefixes = [][]byte{ | |
| []byte(SqsPartitionedMsgDataPrefix), | |
| []byte(SqsPartitionedMsgVisPrefix), | |
| []byte(SqsPartitionedMsgDedupPrefix), | |
| []byte(SqsPartitionedMsgGroupPrefix), | |
| []byte(SqsPartitionedMsgByAgePrefix), | |
| } |
| func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { | ||
| for _, prefix := range sqsResolverFamilyPrefixes { | ||
| if bytes.HasPrefix(key, []byte(prefix)) { | ||
| return key[len(prefix):], true | ||
| } | ||
| } | ||
| return nil, false |
There was a problem hiding this comment.
With sqsResolverFamilyPrefixes updated to [][]byte, the loop here can avoid the []byte(prefix) conversion.
| func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { | |
| for _, prefix := range sqsResolverFamilyPrefixes { | |
| if bytes.HasPrefix(key, []byte(prefix)) { | |
| return key[len(prefix):], true | |
| } | |
| } | |
| return nil, false | |
| func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { | |
| for _, prefix := range sqsResolverFamilyPrefixes { | |
| if bytes.HasPrefix(key, prefix) { | |
| return key[len(prefix):], true | |
| } | |
| } | |
| return nil, false | |
| } |
| // WithPartitionResolver installs a partition-keyspace resolver that | ||
| // is consulted before the byte-range engine on every dispatch. A | ||
| // nil resolver clears any previously-installed resolver. Returns | ||
| // the receiver so callers can chain. | ||
| // | ||
| // Setting the resolver is idempotent — re-installing the same value | ||
| // is a no-op. Concurrent reads against ResolveGroup remain safe | ||
| // because both the read in resolveGroup and the assignment here | ||
| // happen against the same field; routine startup wires the resolver | ||
| // once before any request lands, so the rare write does not need a | ||
| // lock. | ||
| func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { | ||
| s.partitionResolver = r | ||
| return s | ||
| } |
There was a problem hiding this comment.
The comment regarding idempotency and thread safety is misleading. Interface assignment in Go is not atomic; if this method is called while requests are being processed, concurrent readers in resolveGroup could observe a torn interface. As per repository guidelines for limited-scope implementations, the documentation should explicitly reflect that this is intended for startup use to prevent misuse in a general context, or the implementation should use atomic operations if concurrent updates are intended to be supported safely.
// WithPartitionResolver installs a partition-keyspace resolver that
// is consulted before the byte-range engine on every dispatch. A
// nil resolver clears any previously-installed resolver. Returns
// the receiver so callers can chain.
//
// Note: This method is intended for use during startup. Interface
// assignment is not atomic in Go; if the resolver is updated while
// requests are in flight, concurrent readers in resolveGroup might
// observe a partially updated interface value.
func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter {
s.partitionResolver = r
return s
}References
- For prototype or limited-scope implementations, explicitly document the limitations (e.g., single-node only, blocking startup) and add runtime validation to prevent misuse in a general context.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
adapter/sqs_partition_resolver.go (1)
127-131: ⚡ Quick winAvoid repeated string→byte conversion in prefix matching hot path.
bytes.HasPrefix(key, []byte(prefix))inside the loop can add avoidable overhead on every resolve. Precompute byte prefixes once.Proposed refactor
var sqsResolverFamilyPrefixes = []string{ SqsPartitionedMsgDataPrefix, SqsPartitionedMsgVisPrefix, SqsPartitionedMsgDedupPrefix, SqsPartitionedMsgGroupPrefix, SqsPartitionedMsgByAgePrefix, } + +var sqsResolverFamilyPrefixBytes = [][]byte{ + []byte(SqsPartitionedMsgDataPrefix), + []byte(SqsPartitionedMsgVisPrefix), + []byte(SqsPartitionedMsgDedupPrefix), + []byte(SqsPartitionedMsgGroupPrefix), + []byte(SqsPartitionedMsgByAgePrefix), +} @@ func stripPartitionedFamilyPrefix(key []byte) ([]byte, bool) { - for _, prefix := range sqsResolverFamilyPrefixes { - if bytes.HasPrefix(key, []byte(prefix)) { + for _, prefix := range sqsResolverFamilyPrefixBytes { + if bytes.HasPrefix(key, prefix) { return key[len(prefix):], true } } return nil, false }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@adapter/sqs_partition_resolver.go` around lines 127 - 131, The hot path in stripPartitionedFamilyPrefix repeatedly converts strings to []byte by calling bytes.HasPrefix(key, []byte(prefix)) for each prefix in sqsResolverFamilyPrefixes; precompute a parallel slice of byte-prefixes (e.g. sqsResolverFamilyPrefixBytes [][]byte) at package init time and replace the loop in stripPartitionedFamilyPrefix to iterate over those byte slices and call bytes.HasPrefix(key, bytePrefix) instead, keeping the function signature and return semantics the same and ensuring the new precomputed variable is populated once (init or var block) so no per-call allocations occur.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@kv/shard_router.go`:
- Around line 68-80: Protect s.partitionResolver with the shard router mutex
when writing and reading: in WithPartitionResolver(lock using the existing
mutex) set s.partitionResolver under the lock, and in resolveGroup(lock the
mutex while loading the resolver). Also guard against a typed-nil resolver by
checking the interface value for nilness via reflect (e.g.,
reflect.ValueOf(r).IsNil()) before storing or before using the loaded interface;
if the provided resolver is a typed nil treat it as nil (do not store) so
resolveGroup’s nil-check behaves correctly.
---
Nitpick comments:
In `@adapter/sqs_partition_resolver.go`:
- Around line 127-131: The hot path in stripPartitionedFamilyPrefix repeatedly
converts strings to []byte by calling bytes.HasPrefix(key, []byte(prefix)) for
each prefix in sqsResolverFamilyPrefixes; precompute a parallel slice of
byte-prefixes (e.g. sqsResolverFamilyPrefixBytes [][]byte) at package init time
and replace the loop in stripPartitionedFamilyPrefix to iterate over those byte
slices and call bytes.HasPrefix(key, bytePrefix) instead, keeping the function
signature and return semantics the same and ensuring the new precomputed
variable is populated once (init or var block) so no per-call allocations occur.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5c2dd747-e994-411b-b0c5-bcbb6824fef7
📒 Files selected for processing (6)
adapter/sqs_partition_resolver.goadapter/sqs_partition_resolver_test.gokv/shard_router.gokv/shard_router_partition_test.gokv/sharded_coordinator.gomain.go
| func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { | ||
| s.partitionResolver = r | ||
| return s | ||
| } | ||
|
|
||
| // resolveGroup tries the partition resolver first (when installed), | ||
| // then falls through to the byte-range engine. Returns the resolved | ||
| // Raft group ID and a found flag; (0, false) means no route in either | ||
| // the resolver or the engine — the caller surfaces this as an | ||
| // "unknown group" error. | ||
| func (s *ShardRouter) resolveGroup(key []byte) (uint64, bool) { | ||
| if s.partitionResolver != nil { | ||
| if gid, ok := s.partitionResolver.ResolveGroup(key); ok { |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, locate and examine the shard_router.go file
find . -type f -name "shard_router.go" | head -5Repository: bootjp/elastickv
Length of output: 81
🏁 Script executed:
# Check the structure of ShardRouter
cat -n kv/shard_router.go | head -100Repository: bootjp/elastickv
Length of output: 4246
🏁 Script executed:
# Look for the mutex definition and its usage
rg -n "type ShardRouter struct" -A 15 kv/shard_router.goRepository: bootjp/elastickv
Length of output: 562
🏁 Script executed:
# Find PartitionResolver interface definition
rg -n "type PartitionResolver interface" -A 10Repository: bootjp/elastickv
Length of output: 697
🏁 Script executed:
# Check for concurrent access patterns - search for mutex usage in ShardRouter methods
rg -n "s\.mu\." kv/shard_router.goRepository: bootjp/elastickv
Length of output: 144
🏁 Script executed:
# Look at how WithPartitionResolver is used in tests or elsewhere
rg -n "WithPartitionResolver" --type go -B 2 -A 2Repository: bootjp/elastickv
Length of output: 4854
🏁 Script executed:
# Search for all calls to WithPartitionResolver across the codebase
rg -n "WithPartitionResolver" --type go -B 3 -A 3 | grep -E "(WithPartitionResolver|func|if|after|hot|runtime|reload|dynamic|concurrent)"Repository: bootjp/elastickv
Length of output: 1859
🏁 Script executed:
# Check initialization order in main.go specifically
rg -n "WithPartitionResolver|Start\|Run\|Listen" main.go | head -20Repository: bootjp/elastickv
Length of output: 211
🏁 Script executed:
# Verify if resolveGroup is called during request processing (proving it's hot path)
rg -n "resolveGroup" kv/shard_router.go -B 2 -A 2Repository: bootjp/elastickv
Length of output: 1359
🏁 Script executed:
# Check if there's any dynamic resolver updates after initialization
rg -n "WithPartitionResolver\|partitionResolver\s*=" --type go | grep -v "test" | head -20Repository: bootjp/elastickv
Length of output: 42
Add synchronization for partition resolver access; prevent typed-nil bypass.
WithPartitionResolver directly mutates s.partitionResolver while resolveGroup reads it on the request hot path without any synchronization. Although the code documents an assumption that the resolver is set only during startup before requests arrive, this architectural constraint is not enforced and Go's interface semantics do not guarantee safety: interface assignments involve multiple words and are not atomic. If resolver updates ever occur after startup, a data race becomes possible. Additionally, a typed-nil resolver (e.g., var r *MyResolver; r == nil is true, but assigning r to an interface field makes the interface check fail) can bypass the nil check in resolveGroup, allowing stale resolver behavior.
Protect the field with the existing mutex during both read and write:
Proposed fix
func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter {
- s.partitionResolver = r
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ s.partitionResolver = r
return s
}
func (s *ShardRouter) resolveGroup(key []byte) (uint64, bool) {
- if s.partitionResolver != nil {
- if gid, ok := s.partitionResolver.ResolveGroup(key); ok {
+ s.mu.RLock()
+ resolver := s.partitionResolver
+ s.mu.RUnlock()
+ if resolver != nil {
+ if gid, ok := resolver.ResolveGroup(key); ok {
return gid, true
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { | |
| s.partitionResolver = r | |
| return s | |
| } | |
| // resolveGroup tries the partition resolver first (when installed), | |
| // then falls through to the byte-range engine. Returns the resolved | |
| // Raft group ID and a found flag; (0, false) means no route in either | |
| // the resolver or the engine — the caller surfaces this as an | |
| // "unknown group" error. | |
| func (s *ShardRouter) resolveGroup(key []byte) (uint64, bool) { | |
| if s.partitionResolver != nil { | |
| if gid, ok := s.partitionResolver.ResolveGroup(key); ok { | |
| func (s *ShardRouter) WithPartitionResolver(r PartitionResolver) *ShardRouter { | |
| s.mu.Lock() | |
| defer s.mu.Unlock() | |
| s.partitionResolver = r | |
| return s | |
| } | |
| // resolveGroup tries the partition resolver first (when installed), | |
| // then falls through to the byte-range engine. Returns the resolved | |
| // Raft group ID and a found flag; (0, false) means no route in either | |
| // the resolver or the engine — the caller surfaces this as an | |
| // "unknown group" error. | |
| func (s *ShardRouter) resolveGroup(key []byte) (uint64, bool) { | |
| s.mu.RLock() | |
| resolver := s.partitionResolver | |
| s.mu.RUnlock() | |
| if resolver != nil { | |
| if gid, ok := resolver.ResolveGroup(key); ok { |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@kv/shard_router.go` around lines 68 - 80, Protect s.partitionResolver with
the shard router mutex when writing and reading: in WithPartitionResolver(lock
using the existing mutex) set s.partitionResolver under the lock, and in
resolveGroup(lock the mutex while loading the resolver). Also guard against a
typed-nil resolver by checking the interface value for nilness via reflect
(e.g., reflect.ValueOf(r).IsNil()) before storing or before using the loaded
interface; if the provided resolver is a typed nil treat it as nil (do not
store) so resolveGroup’s nil-check behaves correctly.
…lver PR #715 round 1 review caught two correctness bugs and two minor items. This commit addresses all four. 1) P1 (Codex): routeKey collapsed every !sqs|... key to !sqs|route|global before resolveGroup ran, so the resolver never saw a partitioned-prefix key — partitioned-FIFO traffic silently fell through to the engine's default group. ResolveGroup now runs on the RAW key. The fallback to the engine still uses routeKey-normalised input (so the engine's per-shard layout is unchanged for legacy traffic). New regression test TestShardRouter_ResolverSeesRawKeyNotNormalized uses a recording resolver to assert the raw key reaches it. 2) HIGH (Gemini): ShardedCoordinator's per-key helpers (groupForKey, routeAndGroupForKey, engineGroupIDForKey, groupMutations) called c.engine.GetRoute directly, bypassing the resolver. This left 2PC + read paths with unrouted partitioned traffic. Each helper now consults c.router.ResolveGroup for the gid. The engine is queried separately for the catalog RouteID (keyviz observation) — the resolver is opaque to keyviz today; partition-aware heatmap is a Phase 3.D follow-up. 3) MEDIUM (Gemini, perf): sqsResolverFamilyPrefixes was []string, forcing []byte(prefix) per check on the request hot path. Pre-converted to [][]byte; stripPartitionedFamilyPrefix loops over byte slices directly. 4) MEDIUM (Gemini, doc): WithPartitionResolver claimed concurrent reads were safe, which is incorrect — interface assignment is not atomic in Go. Comment now reflects the startup-only intent. ResolveGroup is now exported so ShardedCoordinator's helpers can share the resolver-first dispatch path with Commit / Abort / Get.
|
@claude review Round 2 — addresses all four review findings:
|
|
Claude finished @bootjp's task in 3m 48s —— View job Code Review: PR #715 Round 2 —
|
Round 1 Claude review on PR #715 caught a typed-nil interface bug: buildSQSPartitionResolver had return type *adapter.SQSPartitionResolver, so on a non-partitioned cluster it returned a typed-nil pointer. When that pointer was passed into kv.ShardRouter.WithPartitionResolver (parameter type kv.PartitionResolver), Go wrapped it into a NON-NIL interface — the resolver-first short-circuit `s.partitionResolver != nil` would always pass on every request, defeating the "non-partitioned cluster keeps engine-only hot path" contract from the PR description. The (*SQSPartitionResolver).ResolveGroup nil-receiver guard kept this functionally safe (correct routing) but not free (extra map lookup per request). Fix - Return type changed from *adapter.SQSPartitionResolver to kv.PartitionResolver. Untyped `nil` returns now propagate as a true nil interface, the short-circuit fires correctly, and the hot path stays engine-only. - Defensive nil guard after NewSQSPartitionResolver in case canonicalisation collapses every entry — the typed pointer from the constructor would otherwise wrap to a non-nil interface even when its underlying pointer is nil. - New main_sqs_resolver_test.go regression test: TestBuildSQSPartitionResolver_NilOnEmpty uses requireNilInterface to force the kv.PartitionResolver conversion at the call boundary (a plain require.Nil on the concrete pointer would pass even with the bug present, since the pointer itself IS nil — only the interface wrap exposes the failure mode). - TestBuildSQSPartitionResolver_NonEmptyReturnsResolver pins the happy path so a future "always return nil" regression is caught.
|
@claude review Round 3 — addresses Round 1 Claude's typed-nil interface bug: MEDIUM: Fix:
|
|
Claude finished @bootjp's task in 6m 49s —— View job Code Review: PR #715 Round 3 —
|
Round 2 Claude review on PR #715 flagged that the Gemini-HIGH fix (ShardedCoordinator's groupMutations / groupForKey / etc. now route through c.router.ResolveGroup) had no coordinator-level regression test — the existing TestShardRouter_* tests pin the dispatch logic at the router layer but don't exercise the path Dispatch → groupMutations → c.router.ResolveGroup. Per CLAUDE.md ("when code review surfaces a defect, first add a failing test that reproduces the issue, then make it pass with the fix"), this commit lands the missing test. Two new tests in kv/sharded_coordinator_partition_test.go: - TestShardedCoordinator_DispatchHonoursPartitionResolver pins the Gemini HIGH fix: with the engine routing everything to group 1 but the resolver claiming a specific key for group 42, Dispatch on that key MUST hit group 42's recordingTransactional. Before the round-2 fix the request would have landed on group 1 because groupMutations called c.engine.GetRoute directly. Also asserts the resolver received the RAW partitioned key — pins the codex-P1 fix at the coordinator-call boundary. - TestShardedCoordinator_DispatchFallsThroughForUnclaimedKeys pins the inverse: keys NOT claimed by the resolver continue to route via the byte-range engine. Without this, the resolver-first short-circuit could mask engine routing decisions. stubResolver is a kv-internal PartitionResolver double so the tests don't pull in the adapter package. Each call records the raw key bytes (defensive copy) so concurrent reads stay race-safe under -race.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 7f3a643406
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| // Engine routes against the user-key view of the byte-range | ||
| // space; routeKey may rewrite SQS / DynamoDB / Redis-internal | ||
| // keys to a stable per-table or per-namespace route key so the | ||
| // engine sees one route per logical entity. | ||
| route, ok := s.engine.GetRoute(routeKey(rawKey)) |
There was a problem hiding this comment.
Block engine fallback on unresolved partitioned SQS keys
When a partition resolver is installed, unresolved partitioned keys currently fall through to engine.GetRoute(routeKey(rawKey)). For !sqs|... keys, routeKey normalizes to the global SQS route, so cases like unknown queue or out-of-range partition (which SQSPartitionResolver returns as ok=false) get silently routed to the engine default shard instead of failing closed. This can misroute HT-FIFO traffic during partition-map drift or partial rollout, causing writes/reads to land on the wrong group rather than returning a routing error.
Useful? React with 👍 / 👎.
|
@claude review Round 4 — addresses Round 2 Claude's missing-regression-test note: CLAUDE.md gap: the Gemini HIGH fix (ShardedCoordinator's helpers route through New tests in
Round 3's typed-nil fix (return type |
|
Claude finished @bootjp's task in 4m 39s —— View job Code Review: PR #715 Round 4 —
|
Round 2 Codex P1 on PR #715: when a partition resolver is installed, an unresolved partitioned-shape key (unknown queue / out-of-range partition) silently falls through to engine.GetRoute(routeKey(rawKey)). For !sqs|... keys, routeKey collapses to !sqs|route|global, so the engine routes the misconfiguration to the SQS catalog default group instead of returning a routing error. During partition-map drift / partial rollout this would silently mis-route HT-FIFO traffic. Fix - kv.PartitionResolver gains RecognisesPartitionedKey(key) — a shape-only predicate. Implementations answer purely on prefix / structural inspection so the router can use it independently of the in-memory mapping. - ShardRouter.ResolveGroup: after ResolveGroup returns ok=false, consult RecognisesPartitionedKey. If the resolver recognises the shape, return (0, false) WITHOUT engine fallback — the caller surfaces a routing error. If the shape is not recognised, fall through to the engine as before. - adapter.SQSPartitionResolver: implements RecognisesPartitionedKey via parsePartitionedSQSKey (the same parser ResolveGroup uses). Nil-receiver and empty-key return false. Tests - kv/shard_router_partition_test.go: new TestShardRouter_FailClosedOnRecognisedButUnresolved — pins that a recognised-but-unresolved partitioned key returns (0, false) AND that a non-recognised key still falls through to the engine. - adapter/sqs_partition_resolver_test.go: new TestSQSPartitionResolver_RecognisesPartitionedKey covers 8 shape cases (data/vis/byage families with known + unknown queues, OOR partition, legacy SQS, queue meta, non-SQS, empty, nil). New TestSQSPartitionResolver_RecognisesPartitionedKey_NilReceiver pins the typed-nil-safe branch. - Renamed and expanded TestSQSPartitionResolver_UnknownQueueRecognisedButUnresolved / TestSQSPartitionResolver_OutOfRangePartitionRecognisedButUnresolved to assert RecognisesPartitionedKey == true, pinning the router- side fail-closed contract. - Existing fakePartitionResolver / stubResolver / recordingResolver test doubles get the new method (the kv-internal tests don't depend on the adapter resolver).
|
@claude review Round 5 — addresses Codex round 2 P1: P1 (Codex): when the resolver returns Fix:
Tests:
|
|
Claude finished @bootjp's task in 4m 47s —— View job Code Review: PR #715 Round 5 —
|
…test Round 3 + Round 4 review on PR #715 caught two test gaps. Both were real — the regressions they claim to catch were not actually caught by the tests as written. Round 3: requireNilInterface used testify's require.Nil testify's require.Nil reflects through to the underlying pointer and considers a nil pointer wrapped in a non-nil interface as "nil". So if buildSQSPartitionResolver's return type were reverted to *adapter.SQSPartitionResolver (the typed-nil bug), the regression test would still pass — require.Nil on the typed nil returns true. Switch to require.True(t, r == nil, msg). Go's `==` operator on the interface checks BOTH the type tag AND the value tag — it only returns true for a true nil interface, which is the exact invariant the typed-nil fix produces. Round 4: TestShardedCoordinator_DispatchHonoursPartitionResolver did not actually regress the groupMutations bypass For a single-mutation batch, the test passes even if groupMutations bypasses the resolver, because rawLogs produces one pb.Request and router.Commit's groupRequests re-routes by the raw key — the router rescues the mis-routing the coordinator would have introduced. To genuinely regress the bypass, the test must dispatch TWO mutations belonging to TWO different partition groups. With the buggy groupMutations both end up under the engine-default group, rawLogs produces one request, and router.Commit puts both mutations on whichever group claims Mutations[0].Key — the second group receives nothing. Added TestShardedCoordinator_DispatchSplitsMutationsByResolverGroup: - Engine routes everything to group 1. - Resolver claims keyP0 → group 42, keyP1 → group 43. - Dispatch with [Put keyP0, Put keyP1]. - Asserts BOTH g42 and g43 each receive exactly one request. - Pre-fix: g43 receives ZERO (bypass put both under group 1's rawLog, then router put them under g42 because of Mutations[0]). - Post-fix: groupMutations splits via c.router.ResolveGroup, two separate requests, each group gets its own. Updated TestShardedCoordinator_DispatchHonoursPartitionResolver comment to reflect what it actually pins (WithPartitionResolver wiring + raw-key dispatch, not groupMutations).
|
@claude review Round 6 — addresses Round 3 + Round 4 test gaps: Round 3 catch (real bug): Round 4 catch (real bug): Added Updated the original test's comment to reflect what it actually pins (WithPartitionResolver wiring + raw-key dispatch, not groupMutations).
|
|
Claude finished @bootjp's task in 2m 50s —— View job Code Review: PR #715 Round 6 —
|
Round 5 review on PR #715 raised a nit: RecognisesPartitionedKey delegated to parsePartitionedSQSKey, which fails when the queue segment is malformed base64 or the partition segment is truncated. For such corrupt-shape keys the predicate returned false, so the router fell through to the engine and silently routed via routeKey's !sqs|route|global collapse to the SQS catalog default group — exactly the failure mode the round 5 fail-closed change was introduced to prevent. The fix is the structural-only intent the kv.PartitionResolver contract already documents: "Implementations answer purely on prefix / structural inspection". Make RecognisesPartitionedKey match the intent — accept ANY key that starts with one of the partitioned family prefixes, regardless of subsequent corruption. ResolveGroup still returns (0, false) for malformed keys, and the router pairs that with Recognised=true to fail closed. Tests - TestSQSPartitionResolver_RecognisesMalformedPartitionedKey: three sub-cases pin the new contract — prefix-only, prefix + invalid base64 queue segment, prefix + valid queue + '|' but truncated partition bytes. All assert Recognised=true and ResolveGroup ok=false, which is exactly the fail-closed pairing the router consumes. - Existing TestSQSPartitionResolver_RecognisesPartitionedKey cases stay valid (the well-formed shapes still match).
|
@claude review Round 7 — addresses Round 5's malformed-queue-encoding nit: Round 5 nit: Fix: New
|
|
Claude finished @bootjp's task in 2m 28s —— View job Code Review: PR #715 Round 7 —
|
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 87561fd9f3
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| func (c *ShardedCoordinator) engineGroupIDForKey(key []byte) uint64 { | ||
| route, ok := c.engine.GetRoute(routeKey(key)) | ||
| gid, ok := c.router.ResolveGroup(key) | ||
| if !ok { | ||
| return 0 |
There was a problem hiding this comment.
Reject unresolved read keys from partition resolver
engineGroupIDForKey now maps resolver misses to 0, but the txn read-key paths (allReadKeysInShard and groupReadKeysByShardID) treat gid==0 as non-fatal and simply skip those keys. With the new fail-closed resolver behavior, a partitioned key that is recognized but unresolved (e.g., queue missing from sqsFifoPartitionMap during drift/rollout) can be dropped from read-key validation instead of failing the transaction, so OCC checks no longer cover that read set and the write can commit with an incomplete conflict check.
Useful? React with 👍 / 👎.
Round 5 introduced fail-closed semantics in ShardRouter.ResolveGroup
(recognised-but-unresolved partitioned keys return ok=false instead
of falling through to the engine), but the read-key path inside
ShardedCoordinator was not audited for the new contract.
engineGroupIDForKey discards the resolver's ok flag and returns 0
for any failure. groupReadKeysByShardID then loops `if gid == 0
{ continue }`, silently dropping unrouted read keys from the
prewrite payload. With the new fail-closed resolver, a partitioned
read key whose queue has drifted out of --sqsFifoPartitionMap
(partial rollout / config drift) gets dropped from OCC validation.
The FSM never sees that key in ReadKeys, a concurrent write to the
same key commits without conflict, and SSI is broken.
Codex round-2 P1 on PR #715 caught this — addressed here.
Fix
groupReadKeysByShardID returns (map, error). Any read key that
fails to route surfaces as ErrInvalidRequest, dispatchTxn
propagates the error, and the transaction aborts before prewrite.
Calls c.router.ResolveGroup directly (rather than via
engineGroupIDForKey) so the (gid, ok) signal is preserved through
the boundary.
Tests
- TestGroupReadKeysByShardID_FailsClosedOnUnroutable replaces
TestGroupReadKeysByShardID_SkipsUnroutableKeys (which had been
pinning the buggy skip-silently behaviour). Asserts the new
fail-closed contract: unroutable keys → error, no partial map.
- TestShardedCoordinator_TxnFailsClosedForUnresolvedReadKey is
the coordinator-level regression: a transaction with a
recognised-but-unresolved partitioned read key MUST abort
before any prewrite. Asserts no group received an RPC.
- The three existing TestGroupReadKeysByShardID_* tests are
updated for the new (map, error) signature.
|
@claude review Round 8 — addresses Codex P1 missed in earlier rounds (見落としすみません): P1 (Codex): Round 5 introduced fail-closed semantics in Fix: Tests:
|
|
Claude finished @bootjp's task in 3m 55s —— View job Code Review: PR #715 Round 8 —
|
## Summary Phase 3.D PR 4-B-3a — adds the stateless `htfifo` capability poller that PR 5's CreateQueue gate consumes. Stacks on the now-merged #715 (PR 4-B-2, partition resolver). This PR is purely additive: new helper file, new test file, no existing code touched. Next is PR 4-B-3b (leadership-refusal hook + flag flip). ## What's added - `adapter/sqs_capability_poller.go`: - `HTFIFOCapabilityReport{AllAdvertise, Peers}` — binary go/no-go signal + per-peer detail for operator triage. - `HTFIFOCapabilityPeerStatus{Address, HasHTFIFO, Capabilities, Error}` — one peer's polling result. - `PollSQSHTFIFOCapability(ctx, client, peers)` — concurrent goroutine-per-peer poll, indexed-channel result aggregation (race-free). - Per-peer timeout `defaultSQSCapabilityPollTimeout = 3s` so a single hung peer can't stall the cluster-wide poll. - Body capped at 1 KiB via `io.LimitReader` so a misconfigured peer can't drain memory. - Bare `host:port` and full `http://…` / `https://…` URLs both accepted. - Fail-closed on every failure mode: timeout, transport error, non-200, malformed JSON, missing capability. Empty peer list → vacuously `AllAdvertise=true` (caller validates list completeness). ## What's NOT added (deferred) - `htfifoCapabilityAdvertised` stays `false`. PR 4-B-3b adds the §8 leadership-refusal hook + per-acquisition observer in `kv/raftengine/etcd` and flips the flag. - `CreateQueue` does NOT yet call this helper. PR 5 lifts the `PartitionCount > 1` dormancy gate AND wires the capability check in the same commit (per the §11 rollout plan's "gate-and-lift atomically" rule). ## Test plan 9 top-level tests covering the contract surface: - [x] `TestPollSQSHTFIFOCapability_AllAdvertise` — happy path, multiple peers. - [x] `TestPollSQSHTFIFOCapability_OneMissingFailsClosed` — old-binary peer with empty capabilities drops `AllAdvertise`. - [x] `TestPollSQSHTFIFOCapability_HTTPErrorFailsClosed` — HTTP 500, connection refused, malformed JSON all surface as `Error`. - [x] `TestPollSQSHTFIFOCapability_TimeoutFailsClosed` — hung peer respects per-peer timeout, full poll bounded. - [x] `TestPollSQSHTFIFOCapability_EmptyPeersIsVacuouslyTrue` — empty peer list contract. - [x] `TestPollSQSHTFIFOCapability_EmptyPeerAddressFailsClosed` — `""` entry in peers slice surfaces explicit Error. - [x] `TestPollSQSHTFIFOCapability_FullURLPeer` — `http://` and `https://` URLs accepted alongside bare `host:port`. - [x] `TestPollSQSHTFIFOCapability_ConcurrentPolling` — 5×200ms peers finish in well under 1s. - [x] `TestPollSQSHTFIFOCapability_RespectsBodyLimit` — 10 KiB response truncated mid-string surfaces as JSON parse error, not garbage decode. - [x] `TestBuildSQSHealthURL` — URL construction edge cases. - [x] `go test -race ./adapter/...` pass. - [x] `golangci-lint ./adapter/...` clean. ## Self-review (per CLAUDE.md) 1. **Data loss** — read-only HTTP poll; no FSM/Pebble/retention path. No issue. 2. **Concurrency / distributed failures** — peer polls run in independent goroutines; results land via an indexed channel so slice writes are obviously race-free. Per-peer timeout enforced via `context.WithTimeout` so a slow peer can't stall the rest. Body capped via `io.LimitReader`. No issue. 3. **Performance** — N peers polled concurrently, not serially; the test pins this. Per-peer cost is one HTTP round-trip + a JSON parse of a tiny body. No hot-path impact (CreateQueue is a control-plane operation, not request hot path). No issue. 4. **Data consistency** — fail-closed on every failure mode preserves the §8.5 "any peer that doesn't respond is treated as not-yet-upgraded" rule. The vacuously-true empty-peer-list case is documented and the caller's responsibility. No issue. 5. **Test coverage** — every documented failure path (HTTP error, transport error, JSON parse, timeout, missing capability, empty peer, body-size cap) is pinned. Concurrent polling is pinned (would have caught a regression to serial). URL construction edges pinned.
Summary
Routing-layer half of PR 4-B. Adds a
PartitionResolverthatShardRouterconsults BEFORE falling through to the byte-range engine. SQS HT-FIFO needs partition-aware dispatch, but the engine's non-overlapping-cover model can't express overlay routes — the resolver-first dispatch sidesteps this cleanly.Stacks on top of #708 (PR 4-B-1, capability JSON). Next is PR 4-B-3 (leadership-refusal + catalog polling + flip
htfifoCapabilityAdvertisedtotrue).What's added
kv.PartitionResolverinterface —ResolveGroup([]byte) (uint64, bool).kv.ShardRouter.WithPartitionResolver(...)— fluent option, nil-safe.kv.ShardRouter.resolveGroup(...)— unified dispatch path: resolver first, engine fallback. BothgroupRequests(Commit/Abort) andGetroute through it.kv.ShardedCoordinator.WithPartitionResolver(...)— delegates to the router somain.gocan install via the existing fluent-construction style.adapter.SQSPartitionResolver— parses(queue, partition)from the partitioned key shape, looks up the operator-chosen group. Defensive copy at construction, nil-safeResolveGroup, returns(0, false)for legacy / non-matching keys.main.go— builds the resolver fromruntimeConfig.sqsFifoPartitionMapand installs it. Resolver isnilon a non-partitioned cluster — hot path stays engine-only.What's NOT added (deferred to PR 4-B-3)
kv(refuses leadership for an SQS Raft group hosting a partitioned queue when the binary lackshtfifo).htfifoCapabilityAdvertisedfromfalsetotrue.The design's "advertise htfifo only when both routing AND leadership-refusal are in place" rule keeps the constant
falsein this PR — PR 4-B-3 flips it.Test plan
adapter/sqs_partition_resolver_test.go— 9 top-level tests: nil-on-empty, defensive-copy, partition dispatch across all 5 families, queue-name prefix isolation ("queue"vs"queue1"), legacy fall-through (8 sub-cases), unknown queue, out-of-range partition, nil receiver, prefix alignment withsqs_keys.goconstants.kv/shard_router_partition_test.go— 4 tests: resolver wins over engine, engine fallthrough on resolver-miss, nil resolver no-op,Getpath also routes through the resolver.go test -race ./kv/...pass.go test -race ./adapter/...pass.golangci-lint ./kv/... ./adapter/... .clean.Self-review (per CLAUDE.md)
partitionResolveris set once at startup before any request.ResolveGroupreads a constructor-time defensive copy, so a future hot-reload of--sqsFifoPartitionMapcannot perturb in-flight requests. No issue.if s.partitionResolver != nilbranch. No issue.(0, false)so the router surfaces an explicit error rather than silently mis-routing. No issue.TestShardRouter*tests unchanged. Both override and fall-through paths pinned, plus the queue-name prefix-isolation invariant from PR feat(sqs): HT-FIFO partitioned-keyspace constructors (Phase 3.D PR 3) #703.Summary by CodeRabbit