Skip to content

Commit 71cce7b

Browse files
committed
db: take ScanInternal options through an options struct
Update ScanInternal methods to take options via the (now exported) ScanInternalOptions struct. Previously the function signature was a bit unwieldy.
1 parent ab9ffc9 commit 71cce7b

File tree

8 files changed

+129
-184
lines changed

8 files changed

+129
-184
lines changed

db.go

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1273,35 +1273,13 @@ func finishInitializingIter(ctx context.Context, buf *iterAlloc) *Iterator {
12731273
// creator ID was set (as creator IDs are necessary to enable shared storage)
12741274
// resulting in some lower level SSTs being on non-shared storage. Skip-shared
12751275
// iteration is invalid in those cases.
1276-
func (d *DB) ScanInternal(
1277-
ctx context.Context,
1278-
category block.Category,
1279-
lower, upper []byte,
1280-
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error,
1281-
visitRangeDel func(start, end []byte, seqNum SeqNum) error,
1282-
visitRangeKey func(start, end []byte, keys []rangekey.Key) error,
1283-
visitSharedFile func(sst *SharedSSTMeta) error,
1284-
visitExternalFile func(sst *ExternalFile) error,
1285-
) error {
1286-
scanInternalOpts := &scanInternalOptions{
1287-
category: category,
1288-
visitPointKey: visitPointKey,
1289-
visitRangeDel: visitRangeDel,
1290-
visitRangeKey: visitRangeKey,
1291-
visitSharedFile: visitSharedFile,
1292-
visitExternalFile: visitExternalFile,
1293-
IterOptions: IterOptions{
1294-
KeyTypes: IterKeyTypePointsAndRanges,
1295-
LowerBound: lower,
1296-
UpperBound: upper,
1297-
},
1298-
}
1299-
iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, scanInternalOpts)
1276+
func (d *DB) ScanInternal(ctx context.Context, opts ScanInternalOptions) error {
1277+
iter, err := d.newInternalIter(ctx, snapshotIterOpts{} /* snapshot */, &opts)
13001278
if err != nil {
13011279
return err
13021280
}
13031281
defer iter.close()
1304-
return scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
1282+
return scanInternalImpl(ctx, iter, &opts)
13051283
}
13061284

13071285
// newInternalIter constructs and returns a new scanInternalIterator on this db.
@@ -1312,7 +1290,7 @@ func (d *DB) ScanInternal(
13121290
// finishInitializingIter. Both pairs of methods should be refactored to reduce
13131291
// this duplication.
13141292
func (d *DB) newInternalIter(
1315-
ctx context.Context, sOpts snapshotIterOpts, o *scanInternalOptions,
1293+
ctx context.Context, sOpts snapshotIterOpts, o *ScanInternalOptions,
13161294
) (*scanInternalIterator, error) {
13171295
if err := d.closed.Load(); err != nil {
13181296
panic(err)
@@ -1399,7 +1377,7 @@ func finishInitializingInternalIter(
13991377
}
14001378
i.initializeBoundBufs(i.opts.LowerBound, i.opts.UpperBound)
14011379

1402-
if err := i.constructPointIter(i.opts.category, memtables, buf); err != nil {
1380+
if err := i.constructPointIter(i.opts.Category, memtables, buf); err != nil {
14031381
return nil, err
14041382
}
14051383

@@ -2984,8 +2962,8 @@ func (d *DB) ScanStatistics(
29842962
}
29852963
}
29862964

2987-
scanInternalOpts := &scanInternalOptions{
2988-
visitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
2965+
scanInternalOpts := &ScanInternalOptions{
2966+
VisitPointKey: func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error {
29892967
// If the previous key is equal to the current point key, the current key was
29902968
// pinned by a snapshot.
29912969
size := uint64(key.Size())
@@ -3012,34 +2990,34 @@ func (d *DB) ScanStatistics(
30122990
stats.BytesRead += uint64(key.Size() + value.Len())
30132991
return nil
30142992
},
3015-
visitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
2993+
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
30162994
stats.Accumulated.KindsCount[InternalKeyKindRangeDelete]++
30172995
stats.BytesRead += uint64(len(start) + len(end))
30182996
return nil
30192997
},
3020-
visitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
2998+
VisitRangeKey: func(start, end []byte, keys []rangekey.Key) error {
30212999
stats.BytesRead += uint64(len(start) + len(end))
30223000
for _, key := range keys {
30233001
stats.Accumulated.KindsCount[key.Kind()]++
30243002
stats.BytesRead += uint64(len(key.Value) + len(key.Suffix))
30253003
}
30263004
return nil
30273005
},
3028-
includeObsoleteKeys: true,
3006+
IncludeObsoleteKeys: true,
30293007
IterOptions: IterOptions{
30303008
KeyTypes: IterKeyTypePointsAndRanges,
30313009
LowerBound: lower,
30323010
UpperBound: upper,
30333011
},
3034-
rateLimitFunc: rateLimitFunc,
3012+
RateLimitFunc: rateLimitFunc,
30353013
}
30363014
iter, err := d.newInternalIter(ctx, snapshotIterOpts{}, scanInternalOpts)
30373015
if err != nil {
30383016
return LSMKeyStatistics{}, err
30393017
}
30403018
defer iter.close()
30413019

3042-
err = scanInternalImpl(ctx, lower, upper, iter, scanInternalOpts)
3020+
err = scanInternalImpl(ctx, iter, scanInternalOpts)
30433021

30443022
if err != nil {
30453023
return LSMKeyStatistics{}, err

excise_test.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
2323
"github.com/cockroachdb/pebble/objstorage/remote"
2424
"github.com/cockroachdb/pebble/sstable"
25-
"github.com/cockroachdb/pebble/sstable/block"
2625
"github.com/cockroachdb/pebble/vfs"
2726
"github.com/stretchr/testify/require"
2827
)
@@ -583,22 +582,27 @@ func TestConcurrentExcise(t *testing.T) {
583582
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)
584583

585584
var sharedSSTs []SharedSSTMeta
586-
err = from.ScanInternal(context.TODO(), block.CategoryUnknown, startKey, endKey,
587-
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
585+
err = from.ScanInternal(context.TODO(), ScanInternalOptions{
586+
IterOptions: IterOptions{
587+
KeyTypes: IterKeyTypePointsAndRanges,
588+
LowerBound: startKey,
589+
UpperBound: endKey,
590+
},
591+
VisitPointKey: func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
588592
val, _, err := value.Value(nil)
589593
require.NoError(t, err)
590594
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false /* forceObsolete */))
591595
return nil
592596
},
593-
func(start, end []byte, seqNum base.SeqNum) error {
597+
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
594598
require.NoError(t, w.EncodeSpan(keyspan.Span{
595599
Start: start,
596600
End: end,
597601
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
598602
}))
599603
return nil
600604
},
601-
func(start, end []byte, keys []keyspan.Key) error {
605+
VisitRangeKey: func(start, end []byte, keys []keyspan.Key) error {
602606
require.NoError(t, w.EncodeSpan(keyspan.Span{
603607
Start: start,
604608
End: end,
@@ -607,12 +611,11 @@ func TestConcurrentExcise(t *testing.T) {
607611
}))
608612
return nil
609613
},
610-
func(sst *SharedSSTMeta) error {
614+
VisitSharedFile: func(sst *SharedSSTMeta) error {
611615
sharedSSTs = append(sharedSSTs, *sst)
612616
return nil
613617
},
614-
nil,
615-
)
618+
})
616619
require.NoError(t, err)
617620
require.NoError(t, w.Close())
618621

ingest_test.go

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -841,22 +841,27 @@ func testIngestSharedImpl(
841841
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)
842842

843843
var sharedSSTs []SharedSSTMeta
844-
err = from.ScanInternal(context.TODO(), block.CategoryUnknown, startKey, endKey,
845-
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
844+
err = from.ScanInternal(context.TODO(), ScanInternalOptions{
845+
IterOptions: IterOptions{
846+
KeyTypes: IterKeyTypePointsAndRanges,
847+
LowerBound: startKey,
848+
UpperBound: endKey,
849+
},
850+
VisitPointKey: func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
846851
val, _, err := value.Value(nil)
847852
require.NoError(t, err)
848853
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false /* forceObsolete */))
849854
return nil
850855
},
851-
func(start, end []byte, seqNum base.SeqNum) error {
856+
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
852857
require.NoError(t, w.EncodeSpan(keyspan.Span{
853858
Start: start,
854859
End: end,
855860
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
856861
}))
857862
return nil
858863
},
859-
func(start, end []byte, keys []keyspan.Key) error {
864+
VisitRangeKey: func(start, end []byte, keys []keyspan.Key) error {
860865
require.NoError(t, w.EncodeSpan(keyspan.Span{
861866
Start: start,
862867
End: end,
@@ -865,12 +870,11 @@ func testIngestSharedImpl(
865870
}))
866871
return nil
867872
},
868-
func(sst *SharedSSTMeta) error {
873+
VisitSharedFile: func(sst *SharedSSTMeta) error {
869874
sharedSSTs = append(sharedSSTs, *sst)
870875
return nil
871876
},
872-
nil,
873-
)
877+
})
874878
require.NoError(t, err)
875879
require.NoError(t, w.Close())
876880

@@ -1361,22 +1365,27 @@ func TestIngestExternal(t *testing.T) {
13611365
w := sstable.NewRawWriter(objstorageprovider.NewFileWritable(f), writeOpts)
13621366

13631367
var externalFiles []ExternalFile
1364-
err = from.ScanInternal(context.TODO(), block.CategoryUnknown, startKey, endKey,
1365-
func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
1368+
err = from.ScanInternal(context.TODO(), ScanInternalOptions{
1369+
IterOptions: IterOptions{
1370+
KeyTypes: IterKeyTypePointsAndRanges,
1371+
LowerBound: startKey,
1372+
UpperBound: endKey,
1373+
},
1374+
VisitPointKey: func(key *InternalKey, value LazyValue, _ IteratorLevel) error {
13661375
val, _, err := value.Value(nil)
13671376
require.NoError(t, err)
13681377
require.NoError(t, w.Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false /* forceObsolete */))
13691378
return nil
13701379
},
1371-
func(start, end []byte, seqNum base.SeqNum) error {
1380+
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
13721381
require.NoError(t, w.EncodeSpan(keyspan.Span{
13731382
Start: start,
13741383
End: end,
13751384
Keys: []keyspan.Key{{Trailer: base.MakeTrailer(0, base.InternalKeyKindRangeDelete)}},
13761385
}))
13771386
return nil
13781387
},
1379-
func(start, end []byte, keys []keyspan.Key) error {
1388+
VisitRangeKey: func(start, end []byte, keys []keyspan.Key) error {
13801389
require.NoError(t, w.EncodeSpan(keyspan.Span{
13811390
Start: start,
13821391
End: end,
@@ -1385,12 +1394,11 @@ func TestIngestExternal(t *testing.T) {
13851394
}))
13861395
return nil
13871396
},
1388-
nil,
1389-
func(sst *ExternalFile) error {
1397+
VisitExternalFile: func(sst *ExternalFile) error {
13901398
externalFiles = append(externalFiles, *sst)
13911399
return nil
13921400
},
1393-
)
1401+
})
13941402
require.NoError(t, err)
13951403
require.NoError(t, w.Close())
13961404
_, err = to.IngestAndExcise(context.Background(), []string{sstPath}, nil /* shared */, externalFiles, KeyRange{Start: startKey, End: endKey})

metamorphic/ops.go

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/cockroachdb/pebble/internal/rangekey"
2626
"github.com/cockroachdb/pebble/objstorage/objstorageprovider"
2727
"github.com/cockroachdb/pebble/sstable"
28-
"github.com/cockroachdb/pebble/sstable/block"
2928
"github.com/cockroachdb/pebble/vfs"
3029
"github.com/cockroachdb/pebble/vfs/errorfs"
3130
)
@@ -2001,30 +2000,34 @@ func (r *replicateOp) runSharedReplicate(
20012000
) {
20022001
var sharedSSTs []pebble.SharedSSTMeta
20032002
var err error
2004-
err = source.ScanInternal(context.TODO(), block.CategoryUnknown, r.start, r.end,
2005-
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
2003+
err = source.ScanInternal(context.TODO(), pebble.ScanInternalOptions{
2004+
IterOptions: pebble.IterOptions{
2005+
KeyTypes: pebble.IterKeyTypePointsAndRanges,
2006+
LowerBound: r.start,
2007+
UpperBound: r.end,
2008+
},
2009+
VisitPointKey: func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
20062010
val, _, err := value.Value(nil)
20072011
if err != nil {
20082012
panic(err)
20092013
}
20102014
return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
20112015
},
2012-
func(start, end []byte, seqNum base.SeqNum) error {
2016+
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
20132017
return w.DeleteRange(start, end)
20142018
},
2015-
func(start, end []byte, keys []keyspan.Key) error {
2019+
VisitRangeKey: func(start, end []byte, keys []keyspan.Key) error {
20162020
return w.Raw().EncodeSpan(keyspan.Span{
20172021
Start: start,
20182022
End: end,
20192023
Keys: keys,
20202024
})
20212025
},
2022-
func(sst *pebble.SharedSSTMeta) error {
2026+
VisitSharedFile: func(sst *pebble.SharedSSTMeta) error {
20232027
sharedSSTs = append(sharedSSTs, *sst)
20242028
return nil
20252029
},
2026-
nil,
2027-
)
2030+
})
20282031
if err != nil {
20292032
h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
20302033
return
@@ -2064,21 +2067,26 @@ func (r *replicateOp) runExternalReplicate(
20642067
) {
20652068
var externalSSTs []pebble.ExternalFile
20662069
var err error
2067-
err = source.ScanInternal(context.TODO(), block.CategoryUnknown, r.start, r.end,
2068-
func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
2070+
err = source.ScanInternal(context.TODO(), pebble.ScanInternalOptions{
2071+
IterOptions: pebble.IterOptions{
2072+
KeyTypes: pebble.IterKeyTypePointsAndRanges,
2073+
LowerBound: r.start,
2074+
UpperBound: r.end,
2075+
},
2076+
VisitPointKey: func(key *pebble.InternalKey, value pebble.LazyValue, _ pebble.IteratorLevel) error {
20692077
val, _, err := value.Value(nil)
20702078
if err != nil {
20712079
panic(err)
20722080
}
20732081
t.opts.Comparer.ValidateKey.MustValidate(key.UserKey)
20742082
return w.Raw().Add(base.MakeInternalKey(key.UserKey, 0, key.Kind()), val, false)
20752083
},
2076-
func(start, end []byte, seqNum base.SeqNum) error {
2084+
VisitRangeDel: func(start, end []byte, seqNum base.SeqNum) error {
20772085
t.opts.Comparer.ValidateKey.MustValidate(start)
20782086
t.opts.Comparer.ValidateKey.MustValidate(end)
20792087
return w.DeleteRange(start, end)
20802088
},
2081-
func(start, end []byte, keys []keyspan.Key) error {
2089+
VisitRangeKey: func(start, end []byte, keys []keyspan.Key) error {
20822090
t.opts.Comparer.ValidateKey.MustValidate(start)
20832091
t.opts.Comparer.ValidateKey.MustValidate(end)
20842092
return w.Raw().EncodeSpan(keyspan.Span{
@@ -2087,12 +2095,11 @@ func (r *replicateOp) runExternalReplicate(
20872095
Keys: keys,
20882096
})
20892097
},
2090-
nil,
2091-
func(sst *pebble.ExternalFile) error {
2098+
VisitExternalFile: func(sst *pebble.ExternalFile) error {
20922099
externalSSTs = append(externalSSTs, *sst)
20932100
return nil
20942101
},
2095-
)
2102+
})
20962103
if err != nil {
20972104
h.Recordf("%s // %v", r.formattedString(t.testOpts.KeyFormat), err)
20982105
return

options.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -269,25 +269,25 @@ func (o *IterOptions) SpanIterOptions() keyspan.SpanIterOptions {
269269
}
270270
}
271271

272-
// scanInternalOptions is similar to IterOptions, meant for use with
272+
// ScanInternalOptions is similar to IterOptions, meant for use with
273273
// scanInternalIterator.
274-
type scanInternalOptions struct {
274+
type ScanInternalOptions struct {
275275
IterOptions
276276

277-
category block.Category
277+
Category block.Category
278278

279-
visitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
280-
visitRangeDel func(start, end []byte, seqNum SeqNum) error
281-
visitRangeKey func(start, end []byte, keys []rangekey.Key) error
282-
visitSharedFile func(sst *SharedSSTMeta) error
283-
visitExternalFile func(sst *ExternalFile) error
279+
VisitPointKey func(key *InternalKey, value LazyValue, iterInfo IteratorLevel) error
280+
VisitRangeDel func(start, end []byte, seqNum SeqNum) error
281+
VisitRangeKey func(start, end []byte, keys []rangekey.Key) error
282+
VisitSharedFile func(sst *SharedSSTMeta) error
283+
VisitExternalFile func(sst *ExternalFile) error
284284

285-
// includeObsoleteKeys specifies whether keys shadowed by newer internal keys
285+
// IncludeObsoleteKeys specifies whether keys shadowed by newer internal keys
286286
// are exposed. If false, only one internal key per user key is exposed.
287-
includeObsoleteKeys bool
287+
IncludeObsoleteKeys bool
288288

289-
// rateLimitFunc is used to limit the amount of bytes read per second.
290-
rateLimitFunc func(key *InternalKey, value LazyValue) error
289+
// RateLimitFunc is used to limit the amount of bytes read per second.
290+
RateLimitFunc func(key *InternalKey, value LazyValue) error
291291
}
292292

293293
// RangeKeyMasking configures automatic hiding of point keys by range keys. A

0 commit comments

Comments
 (0)