Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

db: refactor table cache newIters #3247

Merged
merged 1 commit into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 4 additions & 20 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1427,38 +1427,22 @@ func (c *compaction) newRangeDelIter(
bytesIterated *uint64,
) (keyspan.FragmentIterator, io.Closer, error) {
opts.level = l
iter, rangeDelIter, err := newIters(context.Background(), f.FileMetadata,
iterSet, err := newIters(context.Background(), f.FileMetadata,
&opts, internalIterOpts{
bytesIterated: &c.bytesIterated,
bufferPool: &c.bufferPool,
})
}, iterRangeDeletions)
if err != nil {
return nil, nil, err
}
// TODO(peter): It is mildly wasteful to open the point iterator only to
// immediately close it. One way to solve this would be to add new
// methods to tableCache for creating point and range-deletion iterators
// independently. We'd only want to use those methods here,
// though. Doesn't seem worth the hassle in the near term.
if err = iter.Close(); err != nil {
if rangeDelIter != nil {
err = errors.CombineErrors(err, rangeDelIter.Close())
}
return nil, nil, err
}
if rangeDelIter == nil {
} else if iterSet.rangeDeletion == nil {
// The file doesn't contain any range deletions.
return nil, nil, nil
}

// Ensure that rangeDelIter is not closed until the compaction is
// finished. This is necessary because range tombstone processing
// requires the range tombstones to be held in memory for up to the
// lifetime of the compaction.
closer := rangeDelIter
rangeDelIter = noCloseIter{rangeDelIter}

return rangeDelIter, closer, nil
return noCloseIter{iterSet.rangeDeletion}, iterSet.rangeDeletion, nil
}

func (c *compaction) String() string {
Expand Down
6 changes: 3 additions & 3 deletions compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2891,9 +2891,9 @@ func TestCompactionCheckOrdering(t *testing.T) {
}

newIters := func(
_ context.Context, _ *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts,
) (internalIterator, keyspan.FragmentIterator, error) {
return &errorIter{}, nil, nil
_ context.Context, _ *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
return iterSet{point: &errorIter{}}, nil
}
result := "OK"
_, err := c.newInputIter(newIters, nil, nil)
Expand Down
69 changes: 26 additions & 43 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -3077,42 +3077,33 @@ func (d *DB) checkVirtualBounds(m *fileMetadata) {
return
}

if m.HasPointKeys {
pointIter, rangeDelIter, err := d.newIters(context.TODO(), m, nil, internalIterOpts{})
if err != nil {
panic(errors.Wrap(err, "pebble: error creating point iterator"))
}
iters, err := d.newIters(context.TODO(), m, nil, internalIterOpts{}, iterPointKeys|iterRangeDeletions|iterRangeKeys)
if err != nil {
panic(errors.Wrap(err, "pebble: error creating iterators"))
}
defer iters.CloseAll()

defer pointIter.Close()
if rangeDelIter != nil {
defer rangeDelIter.Close()
}
if m.HasPointKeys {
pointIter := iters.Point()
rangeDelIter := iters.RangeDeletion()

// Check that the lower bound is tight.
pointKey, _ := pointIter.First()
var rangeDel *keyspan.Span
if rangeDelIter != nil {
rangeDel, err = rangeDelIter.First()
if err != nil {
panic(err)
}
rangeDel, err := rangeDelIter.First()
if err != nil {
panic(err)
}

// Check that the lower bound is tight.
if (rangeDel == nil || d.cmp(rangeDel.SmallestKey().UserKey, m.SmallestPointKey.UserKey) != 0) &&
(pointKey == nil || d.cmp(pointKey.UserKey, m.SmallestPointKey.UserKey) != 0) {
panic(errors.Newf("pebble: virtual sstable %s lower point key bound is not tight", m.FileNum))
}

// Check that the upper bound is tight.
pointKey, _ = pointIter.Last()
rangeDel = nil
if rangeDelIter != nil {
rangeDel, err = rangeDelIter.Last()
if err != nil {
panic(err)
}
rangeDel, err = rangeDelIter.Last()
if err != nil {
panic(err)
}

// Check that the upper bound is tight.
if (rangeDel == nil || d.cmp(rangeDel.LargestKey().UserKey, m.LargestPointKey.UserKey) != 0) &&
(pointKey == nil || d.cmp(pointKey.UserKey, m.LargestPointKey.UserKey) != 0) {
panic(errors.Newf("pebble: virtual sstable %s upper point key bound is not tight", m.FileNum))
Expand All @@ -3124,32 +3115,24 @@ func (d *DB) checkVirtualBounds(m *fileMetadata) {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, key.UserKey))
}
}

if rangeDelIter != nil {
s, err := rangeDelIter.First()
for ; s != nil; s, err = rangeDelIter.Next() {
if d.cmp(s.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
}
if d.cmp(s.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
}
s, err := rangeDelIter.First()
for ; s != nil; s, err = rangeDelIter.Next() {
if d.cmp(s.SmallestKey().UserKey, m.SmallestPointKey.UserKey) < 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.SmallestKey().UserKey))
}
if err != nil {
panic(err)
if d.cmp(s.LargestKey().UserKey, m.LargestPointKey.UserKey) > 0 {
panic(errors.Newf("pebble: virtual sstable %s point key %s is not within bounds", m.FileNum, s.LargestKey().UserKey))
}
}
if err != nil {
panic(err)
}
}

if !m.HasRangeKeys {
return
}

rangeKeyIter, err := d.tableNewRangeKeyIter(m, keyspan.SpanIterOptions{})
if err != nil {
panic(errors.Wrap(err, "pebble: error creating range key iterator"))
}
defer rangeKeyIter.Close()
rangeKeyIter := iters.RangeKey()

// Check that the lower bound is tight.
if s, err := rangeKeyIter.First(); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,8 @@ func NewExternalIterWithContext(
// Add the readers to the Iterator so that Close closes them, and
// SetOptions can re-construct iterators from them.
externalReaders: readers,
newIters: func(
ctx context.Context, f *manifest.FileMetadata, opts *IterOptions,
internalOpts internalIterOpts) (internalIterator, keyspan.FragmentIterator, error) {
newIters: func(context.Context, *manifest.FileMetadata, *IterOptions,
internalIterOpts, iterKinds) (iterSet, error) {
// NB: External iterators are currently constructed without any
// `levelIters`. newIters should never be called. When we support
// organizing multiple non-overlapping files into a single level
Expand Down
2 changes: 1 addition & 1 deletion flushable.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func (s *ingestedFlushable) constructRangeDelIter(
) (keyspan.FragmentIterator, error) {
// Note that the keyspan level iter expects a non-nil iterator to be
// returned even if there is an error. So, we return the emptyKeyspanIter.
iter, rangeDelIter, err := s.newIters(context.Background(), file, nil, internalIterOpts{})
iter, rangeDelIter, err := s.newIters.TODO(context.Background(), file, nil, internalIterOpts{})
if err != nil {
return emptyKeyspanIter, err
}
Expand Down
9 changes: 4 additions & 5 deletions get_iter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/testkeys"
)
Expand Down Expand Up @@ -387,13 +386,13 @@ func TestGetIter(t *testing.T) {
// m is a map from file numbers to DBs.
m := map[FileNum]*memTable{}
newIter := func(
_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts,
) (internalIterator, keyspan.FragmentIterator, error) {
_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds,
) (iterSet, error) {
d, ok := m[file.FileNum]
if !ok {
return nil, nil, errors.New("no such file")
return iterSet{}, errors.New("no such file")
}
return d.newIter(nil), nil, nil
return iterSet{point: d.newIter(nil)}, nil
}

var files [numLevels][]*fileMetadata
Expand Down
4 changes: 2 additions & 2 deletions ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1663,7 +1663,7 @@ func (d *DB) excise(
// This file will contain point keys
smallestPointKey := m.SmallestPointKey
var err error
iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{
iter, rangeDelIter, err = d.newIters.TODO(context.TODO(), m, &IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
Expand Down Expand Up @@ -1781,7 +1781,7 @@ func (d *DB) excise(
largestPointKey := m.LargestPointKey
var err error
if iter == nil && rangeDelIter == nil {
iter, rangeDelIter, err = d.newIters(context.TODO(), m, &IterOptions{
iter, rangeDelIter, err = d.newIters.TODO(context.TODO(), m, &IterOptions{
CategoryAndQoS: sstable.CategoryAndQoS{
Category: "pebble-ingest",
QoSLevel: sstable.LatencySensitiveQoSLevel,
Expand Down
11 changes: 5 additions & 6 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/bytealloc"
"github.com/cockroachdb/pebble/internal/invalidating"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/internal/testkeys"
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
Expand Down Expand Up @@ -919,14 +918,14 @@ func TestIteratorSeekOpt(t *testing.T) {
oldNewIters := d.newIters
d.newIters = func(
ctx context.Context, file *manifest.FileMetadata, opts *IterOptions,
internalOpts internalIterOpts) (internalIterator, keyspan.FragmentIterator, error) {
iter, rangeIter, err := oldNewIters(ctx, file, opts, internalOpts)
iterWrapped := &iterSeekOptWrapper{
internalIterator: iter,
internalOpts internalIterOpts, kinds iterKinds) (iterSet, error) {
iters, err := oldNewIters(ctx, file, opts, internalOpts, kinds)
iters.point = &iterSeekOptWrapper{
internalIterator: iters.point,
seekGEUsingNext: &seekGEUsingNext,
seekPrefixGEUsingNext: &seekPrefixGEUsingNext,
}
return iterWrapped, rangeIter, err
return iters, err
}
return s

Expand Down
2 changes: 1 addition & 1 deletion level_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func checkRangeTombstones(c *checkConfig) error {
for f := files.First(); f != nil; f = files.Next() {
lf := files.Take()
//lower, upper := manifest.KeyRange(c.cmp, lf.Iter())
iterToClose, iter, err := c.newIters(
iterToClose, iter, err := c.newIters.TODO(
context.Background(), lf.FileMetadata, &IterOptions{level: manifest.Level(lsmLevel)}, internalIterOpts{})
if err != nil {
return err
Expand Down
11 changes: 7 additions & 4 deletions level_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,20 @@ func TestCheckLevelsCornerCases(t *testing.T) {

var fileNum FileNum
newIters :=
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts) (internalIterator, keyspan.FragmentIterator, error) {
func(_ context.Context, file *manifest.FileMetadata, _ *IterOptions, _ internalIterOpts, _ iterKinds) (iterSet, error) {
r := readers[file.FileNum]
rangeDelIter, err := r.NewRawRangeDelIter()
if err != nil {
return nil, nil, err
return iterSet{}, err
}
iter, err := r.NewIter(nil /* lower */, nil /* upper */)
if err != nil {
return nil, nil, err
return iterSet{}, err
}
return iter, rangeDelIter, nil
return iterSet{
point: iter,
rangeDeletion: rangeDelIter,
}, nil
}

fm := &failMerger{}
Expand Down
34 changes: 1 addition & 33 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,38 +16,6 @@ import (
"github.com/cockroachdb/pebble/sstable"
)

// tableNewIters creates a new point and range-del iterator for the given file
// number.
//
// On success, the internalIterator is not-nil and must be closed; the
// FragmentIterator can be nil.
// TODO(radu): always return a non-nil FragmentIterator.
//
// On error, the iterators are nil.
//
// The only (non-test) implementation of tableNewIters is tableCacheContainer.newIters().
type tableNewIters func(
ctx context.Context,
file *manifest.FileMetadata,
opts *IterOptions,
internalOpts internalIterOpts,
) (internalIterator, keyspan.FragmentIterator, error)

// tableNewRangeDelIter takes a tableNewIters and returns a TableNewSpanIter
// for the rangedel iterator returned by tableNewIters.
func tableNewRangeDelIter(ctx context.Context, newIters tableNewIters) keyspan.TableNewSpanIter {
return func(file *manifest.FileMetadata, iterOptions keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
iter, rangeDelIter, err := newIters(ctx, file, nil, internalIterOpts{})
if iter != nil {
_ = iter.Close()
}
if rangeDelIter == nil {
rangeDelIter = emptyKeyspanIter
}
return rangeDelIter, err
}
}

type internalIterOpts struct {
bytesIterated *uint64
bufferPool *sstable.BufferPool
Expand Down Expand Up @@ -676,7 +644,7 @@ func (l *levelIter) loadFile(file *fileMetadata, dir int) loadFileReturnIndicato

var rangeDelIter keyspan.FragmentIterator
var iter internalIterator
iter, rangeDelIter, l.err = l.newIters(l.ctx, l.iterFile, &l.tableOpts, l.internalOpts)
iter, rangeDelIter, l.err = l.newIters.TODO(l.ctx, l.iterFile, &l.tableOpts, l.internalOpts)
l.iter = iter
if l.err != nil {
return noFileLoaded
Expand Down
Loading
Loading