Skip to content

Commit 4d30835

Browse files
committed
iterv2: fix invalid TrySeekUsingNext in mergingIterV2 batch-refresh path
`mergingIterV2.seekGEAfterBatchRefresh` handles the `SeekGE(TrySeekUsingNext|BatchJustRefreshed)` case by pre-seeking the batch level to decide whether the merging iterator must move back. Previously, when the pre-seek showed that the iterator did not need to move back, the code fell through to a full slab rebuild (`m.seekGE(key, flags)`) with `TrySeekUsingNext` still enabled. That is unsound when `seekKey < prevTopKey`: lower levels may have been advanced past `seekKey` across one or more slab boundaries (see the analogous case in `SeekGE` at the non-`BatchJustRefreshed` fast path), so re-seeking them at `seekKey` with TSUN is illegal and can return incorrect results. This commit handles the `seekKey < prevTopKey` case directly: if the batch level's span keys are unchanged (so the existing slab state — per-level `minSeqNum`/`maxSeqNum`/`parked` — remains valid) the heap is simply rebuilt in place and the iterator returns its current top. Otherwise we fall through with TSUN disabled. The early exit at the `SeekGE` call site is routed through `findNextEntry` so any `SpanBoundary` or visibility-filtered KV at the new heap top is handled correctly. A small additional fast path skips the pre-seek entirely when the batch level is already at or past `seekKey`. `TestSetOptionsBatchRefreshRand` is extended to exercise the affected paths: RANGEDELs are added to the flushed sstables and to the batch (the batch gets an inert RANGEDEL up front so `SetOptions` doesn't tear down `pointIter`), RANGEDEL bounds may be either bare prefixes or suffixed keys so boundary user keys can collide with point keys, and the iteration count is raised.
1 parent 29f2090 commit 4d30835

2 files changed

Lines changed: 93 additions & 27 deletions

File tree

iterator_test.go

Lines changed: 54 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1302,32 +1302,79 @@ func TestSetOptionsBatchRefreshRand(t *testing.T) {
13021302
rng := rand.New(rand.NewPCG(seed, seed))
13031303

13041304
mem := vfs.NewMem()
1305-
d, err := Open("", &Options{FS: mem, Comparer: testkeys.Comparer})
1305+
d, err := Open("", &Options{
1306+
FS: mem,
1307+
Comparer: testkeys.Comparer,
1308+
DisableAutomaticCompactions: true,
1309+
})
13061310
require.NoError(t, err)
13071311
defer func() { require.NoError(t, d.Close()) }()
13081312

1309-
// Generate keys via iterv2.KeyGenConfig. A small prefix length and suffix
1310-
// range encourage collisions between batch and LSM keys.
1311-
keyCfg := iterv2.KeyGenConfig{MaxPrefixLen: 2, MaxSuffix: 3, MinSeqNum: 1, MaxSeqNum: 1}
1312-
prefixCfg := iterv2.KeyGenConfig{MaxPrefixLen: 2, MinSeqNum: 1, MaxSeqNum: 1}
1313+
// Generate keys via iterv2.KeyGenConfig. A moderate prefix length keeps the
1314+
// key space sparse enough that a seek to one key often lands on a
1315+
// significantly different key (so prevTopKey > newSeekKey can hold), while
1316+
// still producing collisions between batch and LSM keys.
1317+
keyCfg := iterv2.KeyGenConfig{MaxPrefixLen: 4, MaxSuffix: 3, MinSeqNum: 1, MaxSeqNum: 1}
1318+
prefixCfg := iterv2.KeyGenConfig{MaxPrefixLen: 4, MinSeqNum: 1, MaxSeqNum: 1}
13131319
randKey := func() []byte { return keyCfg.RandKey(rng) }
13141320
randPrefix := func() []byte { return prefixCfg.RandKey(rng) }
13151321

1322+
// addRangeDel writes a random RANGEDEL to the given writer. Each bound is
1323+
// independently either a bare prefix or a full suffixed key, so RANGEDEL
1324+
// boundary user keys can collide with point-key user keys generated by
1325+
// randKey() — this is required to exercise the merging iterator's heap
1326+
// tiebreak path when the batch level lands at the same user key as the
1327+
// previous top.
1328+
randBound := func() []byte {
1329+
if rng.IntN(2) == 0 {
1330+
return slices.Clone(randPrefix())
1331+
}
1332+
return slices.Clone(randKey())
1333+
}
1334+
addRangeDel := func(w interface {
1335+
DeleteRange([]byte, []byte, *WriteOptions) error
1336+
}) {
1337+
a := randBound()
1338+
b := randBound()
1339+
if bytes.Compare(a, b) > 0 {
1340+
a, b = b, a
1341+
}
1342+
if bytes.Equal(a, b) {
1343+
// Extend the prefix portion of b so it remains a valid testkeys
1344+
// key (possibly bare-prefix) that is strictly greater than a.
1345+
// Appending to a suffixed key directly would yield an invalid
1346+
// testkeys encoding.
1347+
b = append(slices.Clip(testkeys.Comparer.Prefix(b)), byte('a')+byte(rng.IntN(8)))
1348+
}
1349+
require.NoError(t, w.DeleteRange(a, b, nil))
1350+
}
1351+
13161352
// Seed the LSM with multiple flushed sstables so the merging iterator has
13171353
// several non-batch levels beneath the batch. Each flush writes a random
1318-
// scattering of keys; the final memtable layer adds another level.
1354+
// scattering of keys plus a couple of RANGEDELs so the lower levels have
1355+
// non-trivial slab boundaries; the final memtable layer adds another level.
13191356
for f := 0; f < 3; f++ {
13201357
for k := 0; k < 8; k++ {
13211358
require.NoError(t, d.Set(randKey(), randValue(4, rng), nil))
13221359
}
1360+
for r := 0; r < 1+rng.IntN(2); r++ {
1361+
addRangeDel(d)
1362+
}
13231363
require.NoError(t, d.Flush())
13241364
}
13251365
for k := 0; k < 8; k++ {
13261366
require.NoError(t, d.Set(randKey(), randValue(4, rng), nil))
13271367
}
1368+
addRangeDel(d)
13281369

13291370
batch := d.NewIndexedBatch()
13301371
defer batch.Close()
1372+
// Add an inert RANGEDEL so the batch's range-del iterator is wired into the
1373+
// InterleavingIter at iterator construction time. Without this, the first
1374+
// batch RANGEDEL added during a refresh would cause SetOptions to close and
1375+
// rebuild pointIter (see iterator.go SetOptions), bypassing the
1376+
// BatchJustRefreshed+TSUN path under test.
1377+
require.NoError(t, batch.DeleteRange([]byte("\xff\xfe"), []byte("\xff\xff"), nil))
13311378

13321379
iter, _ := batch.NewIter(nil)
13331380
defer iter.Close()
@@ -1376,17 +1423,7 @@ func TestSetOptionsBatchRefreshRand(t *testing.T) {
13761423
case 0:
13771424
require.NoError(t, batch.Set(randKey(), randValue(4, rng), nil))
13781425
case 1:
1379-
// Range del bounds use bare prefixes so we don't have to deal
1380-
// with testkeys suffix encoding when extending the upper bound.
1381-
a := slices.Clone(randPrefix())
1382-
b := slices.Clone(randPrefix())
1383-
if bytes.Compare(a, b) > 0 {
1384-
a, b = b, a
1385-
}
1386-
if bytes.Equal(a, b) {
1387-
b = append(b, byte('a')+byte(rng.IntN(8)))
1388-
}
1389-
require.NoError(t, batch.DeleteRange(a, b, nil))
1426+
addRangeDel(batch)
13901427
}
13911428
}
13921429

merging_iter_v2.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -437,7 +437,10 @@ func (m *mergingIterV2) SeekGE(key []byte, flags base.SeekGEFlags) (kv *base.Int
437437
if flags.BatchJustRefreshed() {
438438
var earlyExit bool
439439
if flags, earlyExit = m.seekGEAfterBatchRefresh(key, flags); earlyExit {
440-
return nil
440+
if m.heap.Len() == 0 {
441+
return nil
442+
}
443+
return m.findNextEntry()
441444
}
442445
// We fall back to rebuilding the entire slab, with the updated flags.
443446
// TODO(radu): the batch level is already in the right place; we could
@@ -509,13 +512,19 @@ func (m *mergingIterV2) SeekGE(key []byte, flags base.SeekGEFlags) (kv *base.Int
509512
// a backward move is required, and BatchJustRefreshed is always cleared
510513
// (the batch level was just seeked).
511514
func (m *mergingIterV2) seekGEAfterBatchRefresh(
512-
key []byte, flags base.SeekGEFlags,
515+
seekKey []byte, flags base.SeekGEFlags,
513516
) (newFlags base.SeekGEFlags, earlyExit bool) {
514-
newFlags = flags.DisableBatchJustRefreshed()
515517
if invariants.Enabled && m.slab.batchSnapshot == 0 {
516518
panic(errors.AssertionFailedf("BatchJustRefreshed with no batch iterator"))
517519
}
518520
level := &m.levels[m.slab.batchLevelIdx]
521+
522+
if level.iterKV != nil && m.heap.cmp(level.iterKV.K.UserKey, seekKey) <= 0 {
523+
// Fast path: we are seeking past the current batch iterator key anyway, so
524+
// no special handling needed. We must leave BatchJustRefreshed set.
525+
return flags, false
526+
}
527+
519528
var prevTopKey []byte
520529
// Whether the heap is empty or non-empty, we need to seek the batch level to
521530
// see if it causes the mergingIter to move back. If the heap is non-empty, we
@@ -530,24 +539,44 @@ func (m *mergingIterV2) seekGEAfterBatchRefresh(
530539
prevTopKey = m.keyBuf
531540
}
532541
}
542+
spanKeysDetector, _ := makeSpanKeysChangeDetector(level.span.Keys)
533543
m.prepareForLevelOp(level)
534-
level.iterKV = level.iter.SeekGE(key, flags)
544+
level.iterKV = level.iter.SeekGE(seekKey, flags)
545+
newFlags = flags.DisableBatchJustRefreshed()
535546
switch {
536547
case level.iterKV == nil:
537548
// The entire operation is done if there was an error or the merging
538549
// iterator is still exhausted.
539550
if m.levelHasError(level) || prevTopKey == nil {
540-
return newFlags, true
551+
return 0, true
541552
}
542-
// The heap is not moving back, we can use TrySeekUsingNext on other levels.
543553
case prevTopKey == nil || m.heap.cmp(level.iterKV.K.UserKey, prevTopKey) < 0:
544554
// Either the heap was empty (so other levels are exhausted and need
545555
// absolute re-seeks) or the slab moves back. Fall back to the general
546556
// seek path.
547-
newFlags = newFlags.DisableTrySeekUsingNext()
548-
default:
549-
// The heap is not moving back, we can use TrySeekUsingNext on other levels.
550-
}
557+
return newFlags.DisableTrySeekUsingNext(), false
558+
}
559+
// The heap is not moving back. Check if the iterator is already at the right
560+
// position. We have to handle this case because seeking all levels at seekKey
561+
// with TrySeekUsingNext might not be legal. See the corresponding case in
562+
// SeekGE for an example.
563+
if m.heap.cmp(seekKey, prevTopKey) < 0 {
564+
if spanKeysDetector.MayHaveChanged(level.span.Keys) {
565+
// The batch level's span may have changed. This could affect the
566+
// per-level visibility (minSeqNum) and parked status of lower levels,
567+
// requiring a proper slab rebuild. Fall back to the general seek path
568+
// with TSUN disabled.
569+
return newFlags.DisableTrySeekUsingNext(), false
570+
}
571+
// Rebuild the heap, since level.iterKV may have changed.
572+
level.maxSeqNum = m.slab.batchSnapshot
573+
m.initHeap(+1)
574+
// The batch level's span boundary may have changed, so the slab's
575+
// nextBoundary is potentially stale. Recompute it.
576+
m.slab.calcNextBoundary(+1)
577+
return 0, true
578+
}
579+
// We can use TrySeekUsingNext on other levels.
551580
return newFlags, false
552581
}
553582

0 commit comments

Comments
 (0)