Skip to content

Commit

Permalink
db: fix incorrect SeekPrefixGE call in merging iterator
Browse files Browse the repository at this point in the history
We also add assertions checking the prefix at various levels.
  • Loading branch information
RaduBerinde committed Feb 21, 2024
1 parent 21f5b6f commit e3d2888
Show file tree
Hide file tree
Showing 6 changed files with 31 additions and 10 deletions.
16 changes: 9 additions & 7 deletions get_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,12 @@ func (g *getIter) Next() (*InternalKey, base.LazyValue) {

// Compute the key prefix for bloom filtering if split function is
// specified, or use the user key as default.
prefix := g.key
if g.comparer.Split != nil {
prefix = g.key[:g.comparer.Split(g.key)]
prefix := g.key[:g.comparer.Split(g.key)]
g.iterKey, g.iterValue = g.iter.SeekPrefixGE(prefix, g.key, base.SeekGEFlagsNone)
} else {
g.iterKey, g.iterValue = g.iter.SeekGE(g.key, base.SeekGEFlagsNone)
}
g.iterKey, g.iterValue = g.iter.SeekPrefixGE(prefix, g.key, base.SeekGEFlagsNone)
if err := g.iter.Error(); err != nil {
g.err = err
return nil, base.LazyValue{}
Expand Down Expand Up @@ -230,12 +231,13 @@ func (g *getIter) Next() (*InternalKey, base.LazyValue) {
g.iter = &g.levelIter

// Compute the key prefix for bloom filtering if split function is
// specified, or use the user key as default.
prefix := g.key
// specified.
if g.comparer.Split != nil {
prefix = g.key[:g.comparer.Split(g.key)]
prefix := g.key[:g.comparer.Split(g.key)]
g.iterKey, g.iterValue = g.iter.SeekPrefixGE(prefix, g.key, base.SeekGEFlagsNone)
} else {
g.iterKey, g.iterValue = g.iter.SeekGE(g.key, base.SeekGEFlagsNone)
}
g.iterKey, g.iterValue = g.iter.SeekPrefixGE(prefix, g.key, base.SeekGEFlagsNone)
if err := g.iter.Error(); err != nil {
g.err = err
return nil, base.LazyValue{}
Expand Down
4 changes: 3 additions & 1 deletion iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,9 @@ func TestIteratorSeekOpt(t *testing.T) {
return &minSeqNumPropertyCollector{}
},
}
comparer := *base.DefaultComparer
comparer.Split = func(a []byte) int { return len(a) }
opts.Comparer = &comparer

var err error
if d, err = runDBDefineCmd(td, opts); err != nil {
Expand Down Expand Up @@ -944,7 +947,6 @@ func TestIteratorSeekOpt(t *testing.T) {
}
iter, _ = snap.NewIter(nil)
iter.readSampling.forceReadSampling = true
iter.comparer.Split = func(a []byte) int { return len(a) }
iter.forceEnableSeekOpt = true
iter.merging.forceEnableSeekOpt = true
}
Expand Down
6 changes: 6 additions & 0 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package pebble

import (
"bytes"
"context"
"fmt"
"runtime/debug"
Expand Down Expand Up @@ -714,6 +715,11 @@ func (l *levelIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey, ba
func (l *levelIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
if invariants.Enabled {
if !bytes.HasPrefix(key, prefix) || len(prefix) != l.comparer.Split(key) {
panic(fmt.Sprintf("invalid SeekPrefixGE prefix %q for key %q", prefix, key))
}
}
l.err = nil // clear cached iteration error
if l.boundaryContext != nil {
l.boundaryContext.isSyntheticIterBoundsKey = false
Expand Down
8 changes: 8 additions & 0 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,6 +1098,9 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
// was greater than or equal to m.lower, the new key will
// continue to be greater than or equal to m.lower.
key = l.tombstone.End
if m.prefix != nil && !bytes.Equal(m.prefix, key[:m.split(key)]) {
m.prefix = nil
}
}
}
}
Expand Down Expand Up @@ -1126,6 +1129,11 @@ func (m *mergingIter) SeekGE(key []byte, flags base.SeekGEFlags) (*InternalKey,
func (m *mergingIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
if invariants.Enabled {
if !bytes.HasPrefix(key, prefix) || len(prefix) != m.split(key) {
panic(fmt.Sprintf("invalid SeekPrefixGE prefix %q for key %q", prefix, key))
}
}
m.prefix = prefix
m.err = m.seekGE(key, 0 /* start level */, flags)
if m.err != nil {
Expand Down
3 changes: 3 additions & 0 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -810,6 +810,9 @@ func (i *singleLevelIterator) seekGEHelper(
func (i *singleLevelIterator) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, base.LazyValue) {
if invariants.Enabled && !bytes.HasPrefix(key, prefix) {
panic(fmt.Sprintf("invalid SeekPrefixGE prefix %q for key %q", prefix, key))
}
if i.vState != nil {
// Callers of SeekPrefixGE aren't aware of virtual sstable bounds, so
// we may have to internally restrict the bounds.
Expand Down
4 changes: 2 additions & 2 deletions testdata/merging_iter
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ seek-prefix-ge d true
----
a#10,1:a10
.
.
d#10,1:d10
d#10,1:d10
d#10,1:d10

Expand All @@ -603,7 +603,7 @@ next
----
a#10,1:a10
.
.
d#10,1:d10
d#10,1:d10
.

Expand Down

0 comments on commit e3d2888

Please sign in to comment.