Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions adapter/sqs_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,7 +1074,7 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string)
deadline := time.Now().Add(transactRetryMaxDuration)
for range transactRetryMaxAttempts {
readTS := s.nextTxnReadTS(ctx)
_, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS)
existing, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS)
if err != nil {
return errors.WithStack(err)
}
Expand All @@ -1095,6 +1095,14 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string)
metaKey := sqsQueueMetaKey(queueName)
genKey := sqsQueueGenKey(queueName)
tombstoneKey := sqsQueueTombstoneKey(queueName, lastGen)
// Encode the queue's PartitionCount in the tombstone value so
// the reaper can drive partition iteration off the tombstone
// alone — meta is gone by the time the reaper observes the
// tombstone (PR 6a). Legacy / non-partitioned queues fall
// through to the byte-identical []byte{1} sentinel via the
// PartitionCount<=1 branch in encodeQueueTombstoneValue, so
// existing on-disk tombstones are byte-identical to today.
tombstoneValue := encodeQueueTombstoneValue(existing.PartitionCount)
// StartTS + ReadKeys fence against a concurrent CreateQueue /
// SetQueueAttributes landing between our load and dispatch.
req := &kv.OperationGroup[kv.OP]{
Expand All @@ -1104,7 +1112,7 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string)
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Del, Key: metaKey},
{Op: kv.Put, Key: genKey, Value: []byte(strconv.FormatUint(lastGen+1, 10))},
{Op: kv.Put, Key: tombstoneKey, Value: []byte{1}},
{Op: kv.Put, Key: tombstoneKey, Value: tombstoneValue},
},
}
if _, err := s.coordinator.Dispatch(ctx, req); err == nil {
Expand Down
107 changes: 107 additions & 0 deletions adapter/sqs_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,69 @@ func sqsQueueTombstoneKey(queueName string, gen uint64) []byte {
return buf
}

// sqsTombstoneValueLen is the canonical length of the tombstone
// value emitted by encodeQueueTombstoneValue when PartitionCount
// > 1. Kept distinct from sqsGenerationSuffixLen (which describes
// a key-layout constant) even though they both happen to equal 8
// today — Claude review on PR #735 flagged the borrow as
// confusing. A future encoding revision (e.g. a length-prefixed
// version field) can change this without touching key parsers.
const sqsTombstoneValueLen = 8

// encodeQueueTombstoneValue packs a queue's PartitionCount into the
// tombstone value so the reaper can drive partition iteration off
// the tombstone alone (the meta record is gone by the time the
// reaper observes the tombstone). Eight bytes big-endian uint64
// chosen so that a future encoding revision can co-exist (length
// branches) without breaking the legacy-value fallback.
//
// Legacy tombstones written by pre-PR-6a binaries carry the
// single-byte sentinel []byte{1}; decodeQueueTombstoneValue maps
// every non-canonical length to PartitionCount=1, so the reaper
// silently keeps the legacy single-partition behaviour for them.
func encodeQueueTombstoneValue(partitionCount uint32) []byte {
if partitionCount <= 1 {
// Preserve the byte-identical legacy value for tombstones
// of non-partitioned queues so on-disk diffs stay small
// across the rollout. Reaper treats legacy and 1-partition
// shapes identically.
return []byte{1}
}
out := make([]byte, sqsTombstoneValueLen)
binary.BigEndian.PutUint64(out, uint64(partitionCount))
return out
}

// decodeQueueTombstoneValue extracts the PartitionCount written by
// encodeQueueTombstoneValue. Returns 1 for legacy values (the
// single-byte sentinel []byte{1}, the empty value, or any other
// non-canonical payload) so a binary that has never written a
// partitioned tombstone safely degrades to legacy reaper behaviour.
// A canonical 8-byte value with PartitionCount==0 is also clamped
// to 1 because partitionFor / effectivePartitionCount already
// collapse 0 to 1 on the read side; uniform clamping keeps the
// reaper's loop bound consistent.
//
// Reads the value as two big-endian uint32s rather than a single
// uint64 so the function never narrows uint64→uint32 — gosec's
// G115 flags every uint64→uint32 conversion regardless of bound,
// and CLAUDE.md forbids //nolint annotations. The high half must
// be zero because the encoder only ever writes
// PartitionCount<=htfifoMaxPartitions (=32) into the low 32 bits;
// a non-zero high half is therefore a corruption / future-format
// signal and decodes to PartitionCount=1.
func decodeQueueTombstoneValue(value []byte) uint32 {
if len(value) != sqsTombstoneValueLen {
return 1
}
highHalf := binary.BigEndian.Uint32(value[:4])
lowHalf := binary.BigEndian.Uint32(value[4:])
if highHalf != 0 || lowHalf == 0 || lowHalf > htfifoMaxPartitions {
return 1
}
return lowHalf
}

// sqsGenerationSuffixLen is the byte length of the trailing big-endian
// uint64 generation segment in tombstone and byage keys.
const sqsGenerationSuffixLen = 8
Expand Down Expand Up @@ -392,6 +455,50 @@ func sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName string) []byte
return buf
}

// sqsPartitionedMsgByAgePrefixForPartition returns the byage scan
// prefix bound to one (queue, partition, gen) cohort. Used by the
// tombstone reaper to enumerate the partitioned byage keyspace one
// partition at a time so the per-queue scan budget translates
// cleanly to a per-partition budget — exactly the §6 split-queue-
// FIFO design's "partitions × budget" reaper contract (PR 6).
func sqsPartitionedMsgByAgePrefixForPartition(queueName string, partition uint32, gen uint64) []byte {
buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapSmall)
buf = append(buf, SqsPartitionedMsgByAgePrefix...)
buf = append(buf, encodeSQSSegment(queueName)...)
buf = append(buf, sqsPartitionedQueueTerminator)
buf = appendU32(buf, partition)
buf = appendU64(buf, gen)
return buf
}

// sqsPartitionedMsgDedupKeyPrefix returns the dedup scan prefix
// bound to one (queue, partition, gen) cohort. Tombstone reaper
// pairs this with deleteAllPrefix to clean up partitioned dedup
// records left behind by a deleted / purged partitioned queue
// (the Codex P2 deferred from PR 5b-2 round 0).
func sqsPartitionedMsgDedupKeyPrefix(queueName string, partition uint32, gen uint64) []byte {
buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapSmall)
buf = append(buf, SqsPartitionedMsgDedupPrefix...)
buf = append(buf, encodeSQSSegment(queueName)...)
buf = append(buf, sqsPartitionedQueueTerminator)
buf = appendU32(buf, partition)
buf = appendU64(buf, gen)
return buf
}

// sqsPartitionedMsgGroupKeyPrefix returns the group-lock scan
// prefix bound to one (queue, partition, gen) cohort. Mirrors
// sqsPartitionedMsgDedupKeyPrefix.
func sqsPartitionedMsgGroupKeyPrefix(queueName string, partition uint32, gen uint64) []byte {
buf := make([]byte, 0, len(SqsPartitionedMsgGroupPrefix)+sqsKeyCapSmall)
buf = append(buf, SqsPartitionedMsgGroupPrefix...)
buf = append(buf, encodeSQSSegment(queueName)...)
buf = append(buf, sqsPartitionedQueueTerminator)
buf = appendU32(buf, partition)
buf = appendU64(buf, gen)
return buf
}

// sqsMsgByAgePrefixesForQueue returns the {legacy, partitioned}
// prefix pair for a queue's byage records. The reaper iterates both:
// a queue created before HT-FIFO landed has only legacy entries; a
Expand Down
134 changes: 134 additions & 0 deletions adapter/sqs_partitioned_dispatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/bootjp/elastickv/kv"
"github.com/bootjp/elastickv/store"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -514,3 +515,136 @@ func TestDropReceiveFanoutCounter_ClearsEntry(t *testing.T) {
"a queue recreated after drop must start from a zero counter (got first offset %d)",
first)
}

// TestSQSServer_PartitionedFIFO_TombstoneReaperEnumeratesPartitions
// pins the PR 6a contract: after DeleteQueue on a partitioned FIFO
// queue, the tombstone reaper must drain every partition's data /
// vis / byage / dedup / group rows. The pre-PR-6a reaper only
// scanned the legacy keyspace, so partitioned records leaked
// permanently — the Codex P2 from PR #732 review.
//
// Test shape:
// - create a 4-partition queue (via the install helper since the
// §11 PR 5b-3 capability gate would otherwise need peer wiring),
// - send messages spread across distinct group ids so partitionFor
// populates more than one partition,
// - DeleteQueue,
// - hand-call reapTombstonedQueues so the test does not have to
// wait on the 30s timer,
// - assert every partitioned data / vis / byage / dedup / group
// prefix (across all partitions) is empty,
// - assert the tombstone itself is gone (the cohort is fully
// drained → reaper deletes the tombstone).
func TestSQSServer_PartitionedFIFO_TombstoneReaperEnumeratesPartitions(t *testing.T) {
t.Parallel()
nodes, _, _ := createNode(t, 1)
defer shutdown(nodes)
node := sqsLeaderNode(t, nodes)

const queueName = "reaper-partitioned.fifo"
status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{
"QueueName": queueName,
"Attributes": map[string]string{"FifoQueue": "true"},
})
require.Equal(t, http.StatusOK, status, "create FIFO queue: %v", out)
queueURL, _ := out["QueueUrl"].(string)
require.NotEmpty(t, queueURL)

// 8 partitions instead of 4 so the test also exercises a
// non-default partition count and silences the unparam linter
// that sees only PartitionCount=4 in every other dispatch test.
const partitions uint32 = 8
installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID)

ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS)
require.NoError(t, err)
require.True(t, exists)
require.Equal(t, partitions, meta.PartitionCount)
gen := meta.Generation

// Spread messages over enough groups to land in at least 2
// distinct partitions; partitionFor is FNV-1a so 6 groups give
// good odds without making the test fragile against hash
// changes (the reaper assertion below is total-rows-zero, not
// per-partition cardinality, so any non-zero distribution is
// fine).
groups := []string{"a", "b", "c", "d", "e", "f"}
for _, g := range groups {
status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{
"QueueUrl": queueURL,
"MessageBody": "body-" + g,
"MessageGroupId": g,
"MessageDeduplicationId": "dedup-" + g,
})
require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out)
}

// Sanity: at least one partitioned data row exists pre-delete.
// Without this, the post-reap assertion would be vacuously true
// on a regression that simply silenced sends.
preReapRows := countPartitionedRows(t, node, queueName, gen)
require.Positive(t, preReapRows,
"expected at least one partitioned row before DeleteQueue+reap")

// DeleteQueue → writes tombstone with PartitionCount=4 in the
// value (PR 6a). The pre-PR-6a reaper would write []byte{1}
// here and skip the partitioned sweep entirely.
status, out = callSQS(t, node, sqsDeleteQueueTarget, map[string]any{
"QueueUrl": queueURL,
})
require.Equal(t, http.StatusOK, status, "delete queue: %v", out)

// Hand-call the tombstone reaper so the test doesn't depend on
// the 30s ticker. The same code path runs in production every
// reaper tick.
require.NoError(t, node.sqsServer.reapTombstonedQueues(ctx),
"reapTombstonedQueues must succeed")

postReapRows := countPartitionedRows(t, node, queueName, gen)
require.Zero(t, postReapRows,
"every partitioned data / vis / byage / dedup / group row must be reaped (got %d remaining)",
postReapRows)

// Tombstone itself must also be gone — reapTombstonedGeneration
// only deletes it once every prefix family is drained, so a
// surviving tombstone here would mean the partitioned sweep
// silently left rows behind in some prefix the assertion above
// did not enumerate.
tombstoneKey := sqsQueueTombstoneKey(queueName, gen)
_, err = node.sqsServer.store.GetAt(ctx, tombstoneKey, node.sqsServer.nextTxnReadTS(ctx))
require.ErrorIs(t, err, store.ErrKeyNotFound,
"tombstone must be deleted after the cohort is fully drained")
}

// countPartitionedRows sums the rows under every partitioned
// data / vis / byage / dedup / group prefix for a (queue, gen)
// cohort. Used by the tombstone-reaper integration test to assert
// the cohort is fully drained without enumerating individual keys.
func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) int {
t.Helper()
ctx := context.Background()
readTS := node.sqsServer.nextTxnReadTS(ctx)
prefixes := [][]byte{
// Match every partition by using the family-level "all
// partitions" prefix where one exists; for dedup / group
// the family-level prefix isn't pre-built, so iterate
// partitions [0, 8) explicitly to match the test queue's
// PartitionCount=8.
sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName),
}
for partition := uint32(0); partition < 8; partition++ {
prefixes = append(prefixes,
sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen),
sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen),
)
}
total := 0
for _, prefix := range prefixes {
rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS)
require.NoError(t, err)
total += len(rows)
}
return total
}
Comment on lines +621 to +650
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Expand this helper to scan the actual data/vis keyspace.

countPartitionedRows currently ignores partitioned data and vis rows, so this test can still pass if the reaper drops byage/dedup/group but leaks message records. The hard-coded 8 also makes the helper undercount future callers with a different partition count.

💡 Tighten the helper to cover the full cohort
-func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) int {
+func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64, partitionCount uint32) int {
 	t.Helper()
 	ctx := context.Background()
 	readTS := node.sqsServer.nextTxnReadTS(ctx)
-	prefixes := [][]byte{
-		// Match every partition by using the family-level "all
-		// partitions" prefix where one exists; for dedup / group
-		// the family-level prefix isn't pre-built, so iterate
-		// partitions [0, 4) explicitly.
-		sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName),
-	}
-	for partition := uint32(0); partition < 8; partition++ {
+	prefixes := make([][]byte, 0, int(partitionCount)*5)
+	for partition := uint32(0); partition < partitionCount; partition++ {
 		prefixes = append(prefixes,
+			sqsPartitionedMsgDataKey(queueName, partition, gen, ""),
+			sqsPartitionedMsgVisPrefixForQueue(queueName, partition, gen),
+			sqsPartitionedMsgByAgePrefixForPartition(queueName, partition, gen),
 			sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen),
 			sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen),
 		)
 	}
 	total := 0

Please also pass partitions at the two call sites in this test.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@adapter/sqs_partitioned_dispatch_test.go` around lines 621 - 649, Modify
countPartitionedRows to accept a partitions int parameter (replace the
hard-coded 8) and include scans for the data and vis keyspaces in addition to
byage/dedup/group. Keep the existing readTS logic (node.sqsServer.nextTxnReadTS)
and prefixScanEnd usage, add the family-level all-partitions prefixes for
data/vis (e.g. sqsPartitionedMsgDataPrefixForQueueAllPartitions(queueName) and
sqsPartitionedMsgVisPrefixForQueueAllPartitions(queueName)) to prefixes, and
iterate partition from 0 to partitions-1 to append
sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen) and
sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen). Update the two call
sites in this test to pass the actual partitions value. Ensure require.NoError
remains around each ScanAt call and total sums len(rows) as before.

11 changes: 10 additions & 1 deletion adapter/sqs_purge.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,15 @@ func (s *SQSServer) tryPurgeQueueOnce(ctx context.Context, queueName string) (bo
// (which is keyed on the post-purge gen). reapDeadByAge filters
// by exact generation, so the older cohort is never visited.
tombstoneKey := sqsQueueTombstoneKey(queueName, lastGen)
// Encode the pre-purge generation's PartitionCount in the
// tombstone value so the reaper can enumerate partitioned
// dedup / group / byage prefixes for that cohort (PR 6a).
// PartitionCount is immutable across SetQueueAttributes /
// PurgeQueue (§3.2 immutability rule), so the post-purge meta
// and the pre-purge tombstone agree on the partition count.
// Legacy / non-partitioned queues still write []byte{1} via
// the encoder's PartitionCount<=1 branch.
tombstoneValue := encodeQueueTombstoneValue(meta.PartitionCount)
// StartTS + ReadKeys fence against a concurrent CreateQueue /
// DeleteQueue / SetQueueAttributes / PurgeQueue landing between
// our load and dispatch. ErrWriteConflict surfaces via the
Expand All @@ -107,7 +116,7 @@ func (s *SQSServer) tryPurgeQueueOnce(ctx context.Context, queueName string) (bo
Elems: []*kv.Elem[kv.OP]{
{Op: kv.Put, Key: metaKey, Value: metaBytes},
{Op: kv.Put, Key: genKey, Value: []byte(strconv.FormatUint(meta.Generation, 10))},
{Op: kv.Put, Key: tombstoneKey, Value: []byte{1}},
{Op: kv.Put, Key: tombstoneKey, Value: tombstoneValue},
},
}
if _, err := s.coordinator.Dispatch(ctx, req); err != nil {
Expand Down
Loading
Loading