diff --git a/adapter/sqs_catalog.go b/adapter/sqs_catalog.go index 1344a4a5..dbca9927 100644 --- a/adapter/sqs_catalog.go +++ b/adapter/sqs_catalog.go @@ -1074,7 +1074,7 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string) deadline := time.Now().Add(transactRetryMaxDuration) for range transactRetryMaxAttempts { readTS := s.nextTxnReadTS(ctx) - _, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) + existing, exists, err := s.loadQueueMetaAt(ctx, queueName, readTS) if err != nil { return errors.WithStack(err) } @@ -1095,6 +1095,14 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string) metaKey := sqsQueueMetaKey(queueName) genKey := sqsQueueGenKey(queueName) tombstoneKey := sqsQueueTombstoneKey(queueName, lastGen) + // Encode the queue's PartitionCount in the tombstone value so + // the reaper can drive partition iteration off the tombstone + // alone — meta is gone by the time the reaper observes the + // tombstone (PR 6a). Legacy / non-partitioned queues fall + // through to the byte-identical []byte{1} sentinel via the + // PartitionCount<=1 branch in encodeQueueTombstoneValue, so + // existing on-disk tombstones are byte-identical to today. + tombstoneValue := encodeQueueTombstoneValue(existing.PartitionCount) // StartTS + ReadKeys fence against a concurrent CreateQueue / // SetQueueAttributes landing between our load and dispatch. req := &kv.OperationGroup[kv.OP]{ @@ -1104,7 +1112,7 @@ func (s *SQSServer) deleteQueueWithRetry(ctx context.Context, queueName string) Elems: []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: metaKey}, {Op: kv.Put, Key: genKey, Value: []byte(strconv.FormatUint(lastGen+1, 10))}, - {Op: kv.Put, Key: tombstoneKey, Value: []byte{1}}, + {Op: kv.Put, Key: tombstoneKey, Value: tombstoneValue}, }, } if _, err := s.coordinator.Dispatch(ctx, req); err == nil { diff --git a/adapter/sqs_keys.go b/adapter/sqs_keys.go index 0418bee6..8614fe66 100644 --- a/adapter/sqs_keys.go +++ b/adapter/sqs_keys.go @@ -133,6 +133,69 @@ func sqsQueueTombstoneKey(queueName string, gen uint64) []byte { return buf } +// sqsTombstoneValueLen is the canonical length of the tombstone +// value emitted by encodeQueueTombstoneValue when PartitionCount +// > 1. Kept distinct from sqsGenerationSuffixLen (which describes +// a key-layout constant) even though they both happen to equal 8 +// today — Claude review on PR #735 flagged the borrow as +// confusing. A future encoding revision (e.g. a length-prefixed +// version field) can change this without touching key parsers. +const sqsTombstoneValueLen = 8 + +// encodeQueueTombstoneValue packs a queue's PartitionCount into the +// tombstone value so the reaper can drive partition iteration off +// the tombstone alone (the meta record is gone by the time the +// reaper observes the tombstone). Eight bytes big-endian uint64 +// chosen so that a future encoding revision can co-exist (length +// branches) without breaking the legacy-value fallback. +// +// Legacy tombstones written by pre-PR-6a binaries carry the +// single-byte sentinel []byte{1}; decodeQueueTombstoneValue maps +// every non-canonical length to PartitionCount=1, so the reaper +// silently keeps the legacy single-partition behaviour for them. +func encodeQueueTombstoneValue(partitionCount uint32) []byte { + if partitionCount <= 1 { + // Preserve the byte-identical legacy value for tombstones + // of non-partitioned queues so on-disk diffs stay small + // across the rollout. Reaper treats legacy and 1-partition + // shapes identically. + return []byte{1} + } + out := make([]byte, sqsTombstoneValueLen) + binary.BigEndian.PutUint64(out, uint64(partitionCount)) + return out +} + +// decodeQueueTombstoneValue extracts the PartitionCount written by +// encodeQueueTombstoneValue. Returns 1 for legacy values (the +// single-byte sentinel []byte{1}, the empty value, or any other +// non-canonical payload) so a binary that has never written a +// partitioned tombstone safely degrades to legacy reaper behaviour. +// A canonical 8-byte value with PartitionCount==0 is also clamped +// to 1 because partitionFor / effectivePartitionCount already +// collapse 0 to 1 on the read side; uniform clamping keeps the +// reaper's loop bound consistent. +// +// Reads the value as two big-endian uint32s rather than a single +// uint64 so the function never narrows uint64→uint32 — gosec's +// G115 flags every uint64→uint32 conversion regardless of bound, +// and CLAUDE.md forbids //nolint annotations. The high half must +// be zero because the encoder only ever writes +// PartitionCount<=htfifoMaxPartitions (=32) into the low 32 bits; +// a non-zero high half is therefore a corruption / future-format +// signal and decodes to PartitionCount=1. +func decodeQueueTombstoneValue(value []byte) uint32 { + if len(value) != sqsTombstoneValueLen { + return 1 + } + highHalf := binary.BigEndian.Uint32(value[:4]) + lowHalf := binary.BigEndian.Uint32(value[4:]) + if highHalf != 0 || lowHalf == 0 || lowHalf > htfifoMaxPartitions { + return 1 + } + return lowHalf +} + // sqsGenerationSuffixLen is the byte length of the trailing big-endian // uint64 generation segment in tombstone and byage keys. const sqsGenerationSuffixLen = 8 @@ -392,6 +455,50 @@ func sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName string) []byte return buf } +// sqsPartitionedMsgByAgePrefixForPartition returns the byage scan +// prefix bound to one (queue, partition, gen) cohort. Used by the +// tombstone reaper to enumerate the partitioned byage keyspace one +// partition at a time so the per-queue scan budget translates +// cleanly to a per-partition budget — exactly the §6 split-queue- +// FIFO design's "partitions × budget" reaper contract (PR 6). +func sqsPartitionedMsgByAgePrefixForPartition(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgByAgePrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgByAgePrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + +// sqsPartitionedMsgDedupKeyPrefix returns the dedup scan prefix +// bound to one (queue, partition, gen) cohort. Tombstone reaper +// pairs this with deleteAllPrefix to clean up partitioned dedup +// records left behind by a deleted / purged partitioned queue +// (the Codex P2 deferred from PR 5b-2 round 0). +func sqsPartitionedMsgDedupKeyPrefix(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgDedupPrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgDedupPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + +// sqsPartitionedMsgGroupKeyPrefix returns the group-lock scan +// prefix bound to one (queue, partition, gen) cohort. Mirrors +// sqsPartitionedMsgDedupKeyPrefix. +func sqsPartitionedMsgGroupKeyPrefix(queueName string, partition uint32, gen uint64) []byte { + buf := make([]byte, 0, len(SqsPartitionedMsgGroupPrefix)+sqsKeyCapSmall) + buf = append(buf, SqsPartitionedMsgGroupPrefix...) + buf = append(buf, encodeSQSSegment(queueName)...) + buf = append(buf, sqsPartitionedQueueTerminator) + buf = appendU32(buf, partition) + buf = appendU64(buf, gen) + return buf +} + // sqsMsgByAgePrefixesForQueue returns the {legacy, partitioned} // prefix pair for a queue's byage records. The reaper iterates both: // a queue created before HT-FIFO landed has only legacy entries; a diff --git a/adapter/sqs_partitioned_dispatch_test.go b/adapter/sqs_partitioned_dispatch_test.go index c397c0f6..abb03106 100644 --- a/adapter/sqs_partitioned_dispatch_test.go +++ b/adapter/sqs_partitioned_dispatch_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/store" "github.com/stretchr/testify/require" ) @@ -514,3 +515,136 @@ func TestDropReceiveFanoutCounter_ClearsEntry(t *testing.T) { "a queue recreated after drop must start from a zero counter (got first offset %d)", first) } + +// TestSQSServer_PartitionedFIFO_TombstoneReaperEnumeratesPartitions +// pins the PR 6a contract: after DeleteQueue on a partitioned FIFO +// queue, the tombstone reaper must drain every partition's data / +// vis / byage / dedup / group rows. The pre-PR-6a reaper only +// scanned the legacy keyspace, so partitioned records leaked +// permanently — the Codex P2 from PR #732 review. +// +// Test shape: +// - create a 4-partition queue (via the install helper since the +// §11 PR 5b-3 capability gate would otherwise need peer wiring), +// - send messages spread across distinct group ids so partitionFor +// populates more than one partition, +// - DeleteQueue, +// - hand-call reapTombstonedQueues so the test does not have to +// wait on the 30s timer, +// - assert every partitioned data / vis / byage / dedup / group +// prefix (across all partitions) is empty, +// - assert the tombstone itself is gone (the cohort is fully +// drained → reaper deletes the tombstone). +func TestSQSServer_PartitionedFIFO_TombstoneReaperEnumeratesPartitions(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + node := sqsLeaderNode(t, nodes) + + const queueName = "reaper-partitioned.fifo" + status, out := callSQS(t, node, sqsCreateQueueTarget, map[string]any{ + "QueueName": queueName, + "Attributes": map[string]string{"FifoQueue": "true"}, + }) + require.Equal(t, http.StatusOK, status, "create FIFO queue: %v", out) + queueURL, _ := out["QueueUrl"].(string) + require.NotEmpty(t, queueURL) + + // 8 partitions instead of 4 so the test also exercises a + // non-default partition count and silences the unparam linter + // that sees only PartitionCount=4 in every other dispatch test. + const partitions uint32 = 8 + installPartitionedMetaForTest(t, node, queueName, partitions, htfifoThroughputPerMessageGroupID) + + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + meta, exists, err := node.sqsServer.loadQueueMetaAt(ctx, queueName, readTS) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, partitions, meta.PartitionCount) + gen := meta.Generation + + // Spread messages over enough groups to land in at least 2 + // distinct partitions; partitionFor is FNV-1a so 6 groups give + // good odds without making the test fragile against hash + // changes (the reaper assertion below is total-rows-zero, not + // per-partition cardinality, so any non-zero distribution is + // fine). + groups := []string{"a", "b", "c", "d", "e", "f"} + for _, g := range groups { + status, out := callSQS(t, node, sqsSendMessageTarget, map[string]any{ + "QueueUrl": queueURL, + "MessageBody": "body-" + g, + "MessageGroupId": g, + "MessageDeduplicationId": "dedup-" + g, + }) + require.Equal(t, http.StatusOK, status, "send (group=%s): %v", g, out) + } + + // Sanity: at least one partitioned data row exists pre-delete. + // Without this, the post-reap assertion would be vacuously true + // on a regression that simply silenced sends. + preReapRows := countPartitionedRows(t, node, queueName, gen) + require.Positive(t, preReapRows, + "expected at least one partitioned row before DeleteQueue+reap") + + // DeleteQueue → writes tombstone with PartitionCount=4 in the + // value (PR 6a). The pre-PR-6a reaper would write []byte{1} + // here and skip the partitioned sweep entirely. + status, out = callSQS(t, node, sqsDeleteQueueTarget, map[string]any{ + "QueueUrl": queueURL, + }) + require.Equal(t, http.StatusOK, status, "delete queue: %v", out) + + // Hand-call the tombstone reaper so the test doesn't depend on + // the 30s ticker. The same code path runs in production every + // reaper tick. + require.NoError(t, node.sqsServer.reapTombstonedQueues(ctx), + "reapTombstonedQueues must succeed") + + postReapRows := countPartitionedRows(t, node, queueName, gen) + require.Zero(t, postReapRows, + "every partitioned data / vis / byage / dedup / group row must be reaped (got %d remaining)", + postReapRows) + + // Tombstone itself must also be gone — reapTombstonedGeneration + // only deletes it once every prefix family is drained, so a + // surviving tombstone here would mean the partitioned sweep + // silently left rows behind in some prefix the assertion above + // did not enumerate. + tombstoneKey := sqsQueueTombstoneKey(queueName, gen) + _, err = node.sqsServer.store.GetAt(ctx, tombstoneKey, node.sqsServer.nextTxnReadTS(ctx)) + require.ErrorIs(t, err, store.ErrKeyNotFound, + "tombstone must be deleted after the cohort is fully drained") +} + +// countPartitionedRows sums the rows under every partitioned +// data / vis / byage / dedup / group prefix for a (queue, gen) +// cohort. Used by the tombstone-reaper integration test to assert +// the cohort is fully drained without enumerating individual keys. +func countPartitionedRows(t *testing.T, node Node, queueName string, gen uint64) int { + t.Helper() + ctx := context.Background() + readTS := node.sqsServer.nextTxnReadTS(ctx) + prefixes := [][]byte{ + // Match every partition by using the family-level "all + // partitions" prefix where one exists; for dedup / group + // the family-level prefix isn't pre-built, so iterate + // partitions [0, 8) explicitly to match the test queue's + // PartitionCount=8. + sqsPartitionedMsgByAgePrefixForQueueAllPartitions(queueName), + } + for partition := uint32(0); partition < 8; partition++ { + prefixes = append(prefixes, + sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen), + sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen), + ) + } + total := 0 + for _, prefix := range prefixes { + rows, err := node.sqsServer.store.ScanAt(ctx, prefix, prefixScanEnd(prefix), 1024, readTS) + require.NoError(t, err) + total += len(rows) + } + return total +} diff --git a/adapter/sqs_purge.go b/adapter/sqs_purge.go index f9e0ab48..eb667a19 100644 --- a/adapter/sqs_purge.go +++ b/adapter/sqs_purge.go @@ -96,6 +96,15 @@ func (s *SQSServer) tryPurgeQueueOnce(ctx context.Context, queueName string) (bo // (which is keyed on the post-purge gen). reapDeadByAge filters // by exact generation, so the older cohort is never visited. tombstoneKey := sqsQueueTombstoneKey(queueName, lastGen) + // Encode the pre-purge generation's PartitionCount in the + // tombstone value so the reaper can enumerate partitioned + // dedup / group / byage prefixes for that cohort (PR 6a). + // PartitionCount is immutable across SetQueueAttributes / + // PurgeQueue (§3.2 immutability rule), so the post-purge meta + // and the pre-purge tombstone agree on the partition count. + // Legacy / non-partitioned queues still write []byte{1} via + // the encoder's PartitionCount<=1 branch. + tombstoneValue := encodeQueueTombstoneValue(meta.PartitionCount) // StartTS + ReadKeys fence against a concurrent CreateQueue / // DeleteQueue / SetQueueAttributes / PurgeQueue landing between // our load and dispatch. ErrWriteConflict surfaces via the @@ -107,7 +116,7 @@ func (s *SQSServer) tryPurgeQueueOnce(ctx context.Context, queueName string) (bo Elems: []*kv.Elem[kv.OP]{ {Op: kv.Put, Key: metaKey, Value: metaBytes}, {Op: kv.Put, Key: genKey, Value: []byte(strconv.FormatUint(meta.Generation, 10))}, - {Op: kv.Put, Key: tombstoneKey, Value: []byte{1}}, + {Op: kv.Put, Key: tombstoneKey, Value: tombstoneValue}, }, } if _, err := s.coordinator.Dispatch(ctx, req); err != nil { diff --git a/adapter/sqs_reaper.go b/adapter/sqs_reaper.go index c870917a..5a3f743a 100644 --- a/adapter/sqs_reaper.go +++ b/adapter/sqs_reaper.go @@ -124,7 +124,12 @@ func (s *SQSServer) reapTombstonedQueues(ctx context.Context) error { if !ok { continue } - s.reapTombstonedGeneration(ctx, queueName, gen, kvp.Key, readTS) + // PartitionCount is encoded in the tombstone value + // (PR 6a). decodeQueueTombstoneValue maps legacy / + // non-canonical values to 1 so pre-PR-6a tombstones + // retain their byte-identical legacy reaper path. + partitionCount := decodeQueueTombstoneValue(kvp.Value) + s.reapTombstonedGeneration(ctx, queueName, gen, partitionCount, kvp.Key, readTS) } if len(page) < sqsReaperPageLimit { return nil @@ -140,7 +145,21 @@ func (s *SQSServer) reapTombstonedQueues(ctx context.Context) error { // its own per-queue budget. Once every prefix the cohort can occupy // is empty, the tombstone itself is deleted; otherwise it stays so // the next tick can finish what was left. -func (s *SQSServer) reapTombstonedGeneration(ctx context.Context, queueName string, gen uint64, tombstoneKey []byte, readTS uint64) { +// +// partitionCount drives partition-iterative cleanup: 1 (legacy / +// non-partitioned queue, or pre-PR-6a tombstone whose value +// decoded to the default) takes the byte-identical legacy path — +// one byage / dedup / group sweep — and leaves the partitioned +// keyspace untouched. Greater than 1 ALSO sweeps the partitioned +// byage / dedup / group prefix family for each partition in +// [0, partitionCount), which is the §6 "partitions × budget" +// reaper contract from the split-queue-FIFO design. +func (s *SQSServer) reapTombstonedGeneration(ctx context.Context, queueName string, gen uint64, partitionCount uint32, tombstoneKey []byte, readTS uint64) { + // Legacy keyspace is always swept — covers all pre-HT-FIFO + // queues plus any partitioned queue that briefly carried legacy + // records (defensive: data is nominally never written to the + // legacy keyspace for partitioned queues, but the sweep is + // idempotent and cheap). dataDone, err := s.reapDeadByAge(ctx, queueName, gen, readTS) if err != nil { slog.Warn("sqs tombstone byage reap failed", "queue", queueName, "gen", gen, "err", err) @@ -156,11 +175,59 @@ func (s *SQSServer) reapTombstonedGeneration(ctx context.Context, queueName stri slog.Warn("sqs tombstone group reap failed", "queue", queueName, "gen", gen, "err", err) return } - if dataDone && dedupDone && groupDone { + allDone := dataDone && dedupDone && groupDone + // Partitioned sweep: one (byage, dedup, group) triple per + // partition. Each triple shares the per-queue budget with the + // legacy sweep, so a wide-fanout queue may need multiple reaper + // ticks to fully drain — same contract as the live-queue reap. + if partitionCount > 1 { + partDone, err := s.reapPartitionedGeneration(ctx, queueName, gen, partitionCount, readTS) + if err != nil { + slog.Warn("sqs tombstone partitioned reap failed", + "queue", queueName, "gen", gen, "partitionCount", partitionCount, "err", err) + return + } + allDone = allDone && partDone + } + if allDone { _ = s.dispatchDedupDelete(ctx, tombstoneKey, readTS) } } +// reapPartitionedGeneration sweeps the partitioned byage, dedup, +// and group prefix family for every partition of one tombstoned +// (queue, gen) cohort. Returns done=true only when EVERY partition +// AND every prefix family is fully drained — short-circuiting on +// the first unfinished partition would leave the tombstone in +// place but skip later partitions on this tick, starving them +// under churn. +func (s *SQSServer) reapPartitionedGeneration(ctx context.Context, queueName string, gen uint64, partitionCount uint32, readTS uint64) (bool, error) { + allDone := true + for partition := uint32(0); partition < partitionCount; partition++ { + if err := ctx.Err(); err != nil { + return false, errors.WithStack(err) + } + byageDone, err := s.reapDeadByAgePartition(ctx, queueName, gen, partition, readTS) + if err != nil { + return false, err + } + dedupDone, err := s.deleteAllPrefix(ctx, + sqsPartitionedMsgDedupKeyPrefix(queueName, partition, gen), readTS) + if err != nil { + return false, err + } + groupDone, err := s.deleteAllPrefix(ctx, + sqsPartitionedMsgGroupKeyPrefix(queueName, partition, gen), readTS) + if err != nil { + return false, err + } + if !byageDone || !dedupDone || !groupDone { + allDone = false + } + } + return allDone, nil +} + // reapDeadByAge walks the byage prefix for one (queue, gen) cohort // and reaps each record found, regardless of retention age — every // row under a tombstoned generation is by definition orphaned. @@ -203,7 +270,73 @@ func (s *SQSServer) reapDeadByAgePage(ctx context.Context, queueName string, gen if !ok || parsed.Generation != gen { continue } - if err := s.reapOneRecord(ctx, queueName, gen, kvp.Key, parsed.MessageID, readTS); err != nil { + // Legacy byage path: nil meta + partition 0 keeps the + // dispatch helpers on the legacy constructors (byte- + // identical to the pre-PR-5b reaper). The partitioned + // twin (reapDeadByAgePartitionPage) takes the meta-aware + // branch via reapOneRecordPartitioned. + if err := s.reapOneRecord(ctx, queueName, nil, 0, gen, kvp.Key, parsed.MessageID, readTS); err != nil { + return true, processed, err + } + processed++ + if processed >= sqsReaperPerQueueBudget { + return true, processed, nil + } + } + if len(page) < sqsReaperPageLimit { + return true, processed, nil + } + return false, processed, nil +} + +// reapDeadByAgePartition is the partitioned-keyspace twin of +// reapDeadByAge. Each iteration scans one partition's byage prefix +// for one (queue, gen) cohort, parses the partitioned byage key, +// and dispatches the (data, vis, byage, optional group-lock) +// quartet delete for the message. Threads partition through +// reapOneRecord so the dispatch helpers route to the partitioned +// data / vis keys, not the legacy ones. +func (s *SQSServer) reapDeadByAgePartition(ctx context.Context, queueName string, gen uint64, partition uint32, readTS uint64) (bool, error) { + prefix := sqsPartitionedMsgByAgePrefixForPartition(queueName, partition, gen) + upper := prefixScanEnd(prefix) + start := bytes.Clone(prefix) + processed := 0 + for processed < sqsReaperPerQueueBudget { + page, err := s.store.ScanAt(ctx, start, upper, sqsReaperPageLimit, readTS) + if err != nil { + return false, errors.WithStack(err) + } + if len(page) == 0 { + return true, nil + } + done, newProcessed, err := s.reapDeadByAgePartitionPage(ctx, queueName, gen, partition, page, readTS, processed) + if err != nil { + return false, err + } + processed = newProcessed + if done { + return processed < sqsReaperPerQueueBudget, nil + } + start = nextScanCursorAfter(page[len(page)-1].Key) + } + return false, nil +} + +// reapDeadByAgePartitionPage is the partitioned twin of +// reapDeadByAgePage. Parses each entry as a partitioned byage key +// (verifying the partition matches — defensive against page +// boundaries that span partitions, which the prefix scan should +// already prevent) and feeds the partition-aware reapOneRecord. +func (s *SQSServer) reapDeadByAgePartitionPage(ctx context.Context, queueName string, gen uint64, partition uint32, page []*store.KVPair, readTS uint64, processed int) (bool, int, error) { + for _, kvp := range page { + if err := ctx.Err(); err != nil { + return true, processed, errors.WithStack(err) + } + parsed, ok := parseSqsPartitionedMsgByAgeKey(kvp.Key, queueName) + if !ok || parsed.Partition != partition || parsed.Generation != gen { + continue + } + if err := s.reapOneRecordPartitioned(ctx, queueName, partition, gen, kvp.Key, parsed.MessageID, readTS); err != nil { return true, processed, err } processed++ @@ -351,7 +484,11 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // see meta caught up. continue } - if err := s.reapOneRecord(ctx, queueName, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { + // Live-queue retention reap currently iterates only the + // legacy byage keyspace; partitioned-byage live-queue + // retention is a follow-up to PR 6a (the tombstoned-cohort + // path is what this PR addresses). + if err := s.reapOneRecord(ctx, queueName, nil, 0, parsed.Generation, kvp.Key, parsed.MessageID, readTS); err != nil { return true, processed, err } processed++ @@ -369,14 +506,15 @@ func (s *SQSServer) reapPage(ctx context.Context, queueName string, currentGen u // quartet under a single OCC dispatch. ErrWriteConflict is treated as // success — the message has just been touched (received, deleted, // redriven) by another path and is no longer ours to reap. -func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { - // Reaper iterates legacy byAge entries only in PR 5b-2; the - // partitioned-byAge enumeration ships in a later PR. nil meta - // + partition 0 routes the dispatch helper to the legacy - // constructor so the data-key matches the pre-PR-5b layout - // byte-for-byte. - const partition uint32 = 0 - dataKey := sqsMsgDataKeyDispatch(nil, queueName, partition, gen, messageID) +// +// Legacy reaper callers pass nil meta + partition 0 so the dispatch +// helpers route to the legacy constructors (byte-identical to the +// pre-PR-5b layout). Partitioned reaper callers (PR 6a) pass a +// synthetic *sqsQueueMeta carrying the tombstone-encoded +// PartitionCount so the dispatch helpers route to the partitioned +// constructors. +func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { + dataKey := sqsMsgDataKeyDispatch(meta, queueName, partition, gen, messageID) parsed, found, err := s.loadDataForReaper(ctx, dataKey, readTS) if err != nil { return err @@ -388,7 +526,7 @@ func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uin s.dispatchOrphanByAgeDrop(ctx, byAgeKey, readTS) return nil } - req, err := s.buildReapOps(ctx, queueName, gen, byAgeKey, dataKey, parsed, readTS) + req, err := s.buildReapOps(ctx, queueName, meta, partition, gen, byAgeKey, dataKey, parsed, readTS) if err != nil { return err } @@ -401,6 +539,22 @@ func (s *SQSServer) reapOneRecord(ctx context.Context, queueName string, gen uin return nil } +// reapOneRecordPartitioned is a thin convenience wrapper around +// reapOneRecord for the partitioned-byage enumeration: synthesises +// a meta carrying any value of PartitionCount > 1 so the dispatch +// helpers route to the partitioned key family. The exact value is +// not consulted by the reaper's per-key dispatch path — the +// helpers only branch on the legacy-vs-partitioned bit +// (PartitionCount > 1) — so we use the minimum legal partitioned +// value as a sentinel rather than the queue's real count, which +// would imply the synthetic meta carries information it actually +// does not (Claude review on PR #735). +func (s *SQSServer) reapOneRecordPartitioned(ctx context.Context, queueName string, partition uint32, gen uint64, byAgeKey []byte, messageID string, readTS uint64) error { + const partitionedDispatchSentinel uint32 = 2 + syntheticMeta := &sqsQueueMeta{PartitionCount: partitionedDispatchSentinel} + return s.reapOneRecord(ctx, queueName, syntheticMeta, partition, gen, byAgeKey, messageID, readTS) +} + // loadDataForReaper fetches and decodes the data record for a byage // entry. found=false signals "byage points at a missing record — drop // the byage entry" to the caller. Read errors other than ErrKeyNotFound @@ -518,16 +672,13 @@ func (s *SQSServer) dispatchDedupDelete(ctx context.Context, key []byte, readTS return nil } -func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint64, byAgeKey, dataKey []byte, parsed *sqsMessageRecord, readTS uint64) (*kv.OperationGroup[kv.OP], error) { - // Reaper currently iterates the legacy byAge keyspace only — the - // partitioned-byAge enumeration is wired in a later PR (Phase 3.D - // PR 6, partition-iterating reaper). Dispatch helpers receive - // nil meta + partition 0 so they deterministically route to the - // legacy constructor and produce byte-identical keys to the - // pre-PR-5b reaper. When PR 6 lands the caller switches to the - // real meta + parsed partition. - const partition uint32 = 0 - visKey := sqsMsgVisKeyDispatch(nil, queueName, partition, gen, parsed.VisibleAtMillis, parsed.MessageID) +func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, meta *sqsQueueMeta, partition uint32, gen uint64, byAgeKey, dataKey []byte, parsed *sqsMessageRecord, readTS uint64) (*kv.OperationGroup[kv.OP], error) { + // meta + partition route the dispatch helpers to the right key + // family: nil meta + partition 0 is the legacy reaper path + // (byte-identical to pre-PR-5b layout); a synthetic meta with + // PartitionCount>1 + a real partition is the partitioned reaper + // path landed in PR 6a. + visKey := sqsMsgVisKeyDispatch(meta, queueName, partition, gen, parsed.VisibleAtMillis, parsed.MessageID) readKeys := [][]byte{byAgeKey, dataKey, visKey, sqsQueueMetaKey(queueName), sqsQueueGenKey(queueName)} elems := []*kv.Elem[kv.OP]{ {Op: kv.Del, Key: byAgeKey}, @@ -535,8 +686,8 @@ func (s *SQSServer) buildReapOps(ctx context.Context, queueName string, gen uint {Op: kv.Del, Key: visKey}, } if parsed.MessageGroupId != "" { - lockKey := sqsMsgGroupKeyDispatch(nil, queueName, partition, gen, parsed.MessageGroupId) - lock, err := s.loadFifoGroupLock(ctx, queueName, nil, partition, gen, parsed.MessageGroupId, readTS) + lockKey := sqsMsgGroupKeyDispatch(meta, queueName, partition, gen, parsed.MessageGroupId) + lock, err := s.loadFifoGroupLock(ctx, queueName, meta, partition, gen, parsed.MessageGroupId, readTS) if err != nil { return nil, err } diff --git a/adapter/sqs_tombstone_value_test.go b/adapter/sqs_tombstone_value_test.go new file mode 100644 index 00000000..d07ffb39 --- /dev/null +++ b/adapter/sqs_tombstone_value_test.go @@ -0,0 +1,89 @@ +package adapter + +import ( + "encoding/binary" + "testing" + + "github.com/stretchr/testify/require" +) + +// TestEncodeQueueTombstoneValue_LegacyByteIdenticalForUnpartitioned +// pins that pre-PR-6a tombstone values are byte-identical for +// PartitionCount==0 and PartitionCount==1: a rolling upgrade that +// flips a node from pre-PR-6a to PR 6a sees no on-disk change for +// existing non-partitioned queues. Without this, every DeleteQueue +// during the rollout window would emit a new tombstone value shape +// that the pre-PR-6a binary doesn't expect (it ignores the value +// today, but a future encoding-aware check could trip on it). +func TestEncodeQueueTombstoneValue_LegacyByteIdenticalForUnpartitioned(t *testing.T) { + t.Parallel() + require.Equal(t, []byte{1}, encodeQueueTombstoneValue(0), + "PartitionCount=0 must encode to the legacy single-byte sentinel") + require.Equal(t, []byte{1}, encodeQueueTombstoneValue(1), + "PartitionCount=1 must encode to the legacy single-byte sentinel") +} + +// TestEncodeQueueTombstoneValue_PartitionedEncodesUint64BE pins the +// new encoding for PartitionCount > 1: an 8-byte big-endian uint64 +// the reaper can safely Decode. Roundtrips through +// decodeQueueTombstoneValue. +func TestEncodeQueueTombstoneValue_PartitionedEncodesUint64BE(t *testing.T) { + t.Parallel() + for _, pc := range []uint32{2, 4, 8, 16, 32} { + got := encodeQueueTombstoneValue(pc) + require.Len(t, got, 8, "PartitionCount=%d must encode to 8 bytes", pc) + require.Equal(t, uint64(pc), binary.BigEndian.Uint64(got), + "PartitionCount=%d must encode big-endian", pc) + require.Equal(t, pc, decodeQueueTombstoneValue(got), + "encode/decode round-trip must preserve PartitionCount") + } +} + +// TestDecodeQueueTombstoneValue_LegacyValuesFallBackToOne pins the +// fail-safe for any value the reaper observes that doesn't match +// the canonical 8-byte shape: empty value (theoretically impossible +// since tombstones always have a non-empty marker), the legacy +// []byte{1} sentinel from pre-PR-6a writers, and any other length +// (a future encoding revision that this binary doesn't recognise) +// all degrade to PartitionCount=1 — the legacy reaper path. Without +// this, a partial rollback or mid-flight encoding change would +// silently flip the reaper to scan zero partitions and leak the +// queue's data. +func TestDecodeQueueTombstoneValue_LegacyValuesFallBackToOne(t *testing.T) { + t.Parallel() + require.Equal(t, uint32(1), decodeQueueTombstoneValue(nil), + "nil value must fall back to PartitionCount=1") + require.Equal(t, uint32(1), decodeQueueTombstoneValue([]byte{}), + "empty value must fall back to PartitionCount=1") + require.Equal(t, uint32(1), decodeQueueTombstoneValue([]byte{1}), + "legacy single-byte sentinel must decode to PartitionCount=1") + require.Equal(t, uint32(1), decodeQueueTombstoneValue([]byte{0, 0, 0, 1}), + "unrecognised 4-byte value must fall back to PartitionCount=1") +} + +// TestDecodeQueueTombstoneValue_OutOfRangeFallsBackToOne pins the +// defensive clamp on canonical 8-byte values whose decoded count is +// outside the [1, htfifoMaxPartitions] range. PartitionCount=0 is +// canonical for "no HT-FIFO" on the read path (effectivePartitionCount +// collapses 0→1) so the reaper treats it the same way; values above +// htfifoMaxPartitions are a corruption / future-format signal — the +// reaper can't know how to iterate them safely, so it falls back to +// the legacy single-partition sweep and surfaces the corruption to +// the operator via the slow leak (better than the alternative of +// iterating bogus partitions). +func TestDecodeQueueTombstoneValue_OutOfRangeFallsBackToOne(t *testing.T) { + t.Parallel() + zero := make([]byte, 8) + require.Equal(t, uint32(1), decodeQueueTombstoneValue(zero), + "canonical encoding of 0 must decode to PartitionCount=1 (matches effectivePartitionCount)") + + tooBig := make([]byte, 8) + binary.BigEndian.PutUint64(tooBig, uint64(htfifoMaxPartitions)+1) + require.Equal(t, uint32(1), decodeQueueTombstoneValue(tooBig), + "PartitionCount above htfifoMaxPartitions must fall back to 1") + + maxOK := make([]byte, 8) + binary.BigEndian.PutUint64(maxOK, uint64(htfifoMaxPartitions)) + require.Equal(t, htfifoMaxPartitions, decodeQueueTombstoneValue(maxOK), + "PartitionCount=htfifoMaxPartitions is in-range and must decode unchanged") +}