Skip to content

Commit

Permalink
Merge #58138
Browse files Browse the repository at this point in the history
58138: storage: cleanup how we enable the TBI optimization during backup r=sumeerbhola a=adityamaru

Previously, we would set the Max/MinTSHint in an IterOptions struct,
which in turn was nested in IncIterOptions. This was set quite high up
the stack, and plumbed through various methods. This information was
only really required when the MVCCIncrementalIterator was setting up its
underlying iterators.

This change switches to plumbing an explicit bool, and resolving the min
and max ts hints only when setting up the iters. This reduces the
possibility of mistakenly updating the options struct somewhere along
the stack.

Release note: None

Co-authored-by: Aditya Maru <adityamaru@gmail.com>
  • Loading branch information
craig[bot] and adityamaru committed Dec 27, 2020
2 parents 4e3a9cc + 87e02ca commit 65fe158
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 88 deletions.
19 changes: 4 additions & 15 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,30 +145,19 @@ func evalExport(
return result.Result{}, errors.Errorf("unknown MVCC filter: %s", args.MVCCFilter)
}

io := storage.IterOptions{
UpperBound: args.EndKey,
}

// Time-bound iterators only make sense to use if the start time is set.
if args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty() {
// The call to startTime.Next() converts our exclusive start bound into the
// inclusive start bound that MinTimestampHint expects. This is strictly a
// performance optimization; omitting the call would still return correct
// results.
io.MinTimestampHint = args.StartTime.Next()
io.MaxTimestampHint = h.Timestamp
}

e := spanset.GetDBEngine(batch, roachpb.Span{Key: args.Key, EndKey: args.EndKey})
targetSize := uint64(args.TargetFileSize)
var maxSize uint64
allowedOverage := ExportRequestMaxAllowedFileSizeOverage.Get(&cArgs.EvalCtx.ClusterSettings().SV)
if targetSize > 0 && allowedOverage > 0 {
maxSize = targetSize + uint64(allowedOverage)
}

// Time-bound iterators only make sense to use if the start time is set.
useTBI := args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty()
for start := args.Key; start != nil; {
data, summary, resume, err := e.ExportMVCCToSst(start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, io)
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI)
if err != nil {
return result.Result{}, err
}
Expand Down
35 changes: 12 additions & 23 deletions pkg/ccl/storageccl/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func TestExportGCThreshold(t *testing.T) {
}

// exportUsingGoIterator uses the legacy implementation of export, and is used
// as an oracle to check the correctness of the new C++ implementation.
// as an oracle to check the correctness of pebbleExportToSst.
func exportUsingGoIterator(
filter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
Expand All @@ -312,17 +312,11 @@ func exportUsingGoIterator(
return nil, nil
}

io := storage.IterOptions{
UpperBound: endKey,
}
if enableTimeBoundIteratorOptimization {
io.MaxTimestampHint = endTime
io.MinTimestampHint = startTime.Next()
}
iter := storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
IterOptions: io,
StartTime: startTime,
EndTime: endTime,
EndKey: endKey,
EnableTimeBoundIteratorOptimization: enableTimeBoundIteratorOptimization,
StartTime: startTime,
EndTime: endTime,
})
defer iter.Close()
for iter.SeekGE(storage.MakeMVCCMetadataKey(startKey)); ; iterFn(iter) {
Expand Down Expand Up @@ -412,29 +406,23 @@ func assertEqualKVs(
filter = roachpb.MVCCFilter_Latest
}

// Run oracle (go implementation of the IncrementalIterator).
// Run the oracle which is a legacy implementation of pebbleExportToSst
// backed by an MVCCIncrementalIterator.
expected, err := exportUsingGoIterator(filter, startTime, endTime,
startKey, endKey, enableTimeBoundIteratorOptimization, e)
if err != nil {
t.Fatalf("Oracle failed to export provided key range.")
}

// Run new C++ implementation of IncrementalIterator.
io := storage.IterOptions{
UpperBound: endKey,
}
if enableTimeBoundIteratorOptimization {
io.MaxTimestampHint = endTime
io.MinTimestampHint = startTime.Next()
}
// Run the actual code path used when exporting MVCCs to SSTs.
var kvs []storage.MVCCKeyValue
for start := startKey; start != nil; {
var sst []byte
var summary roachpb.BulkOpSummary
maxSize := uint64(0)
prevStart := start
sst, summary, start, err = e.ExportMVCCToSst(start, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, io)
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization)
require.NoError(t, err)
loaded := loadSST(t, sst, startKey, endKey)
// Ensure that the pagination worked properly.
Expand Down Expand Up @@ -473,14 +461,15 @@ func assertEqualKVs(
maxSize--
}
_, _, _, err = e.ExportMVCCToSst(prevStart, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, io)
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization)
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
kvs = append(kvs, loaded...)
}

// Compare new C++ implementation against the oracle.
// Compare the output of the current export MVCC to SST logic against the
// legacy oracle output.
expectedKVS := loadSST(t, expected, startKey, endKey)
if len(kvs) != len(expectedKVS) {
t.Fatalf("got %d kvs but expected %d:\n%v\n%v", len(kvs), len(expectedKVS), kvs, expectedKVS)
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,10 @@ func (s spanSetReader) ExportMVCCToSst(
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
io storage.IterOptions,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
return s.r.ExportMVCCToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
return s.r.ExportMVCCToSst(startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI)
}

func (s spanSetReader) MVCCGet(key storage.MVCCKey) ([]byte, error) {
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1066,10 +1066,8 @@ func runExportToSst(
for i := 0; i < b.N; i++ {
startTS := hlc.Timestamp{WallTime: int64(numRevisions / 2)}
endTS := hlc.Timestamp{WallTime: int64(numRevisions + 2)}
_, _, _, err := engine.ExportMVCCToSst(roachpb.KeyMin, roachpb.KeyMax, startTS, endTS, exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, IterOptions{
LowerBound: roachpb.KeyMin,
UpperBound: roachpb.KeyMax,
})
_, _, _, err := engine.ExportMVCCToSst(roachpb.KeyMin, roachpb.KeyMax, startTS, endTS,
exportAllRevisions, 0 /* targetSize */, 0 /* maxSize */, false /* useTBI */)
if err != nil {
b.Fatal(err)
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,12 +293,16 @@ type Reader interface {
// to an SST that exceeds maxSize, an error will be returned. This parameter
// exists to prevent creating SSTs which are too large to be used.
//
// If useTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
// outside our time bounds, while locating the KVs to export.
//
// This function looks at MVCC versions and intents, and returns an error if an
// intent is found.
ExportMVCCToSst(
startKey, endKey roachpb.Key, startTS, endTS hlc.Timestamp,
exportAllRevisions bool, targetSize uint64, maxSize uint64,
io IterOptions,
exportAllRevisions bool, targetSize uint64, maxSize uint64, useTBI bool,
) (sst []byte, _ roachpb.BulkOpSummary, resumeKey roachpb.Key, _ error)
// Get returns the value for the given key, nil otherwise. Semantically, it
// behaves as if an iterator with MVCCKeyAndIntentsIterKind was used.
Expand Down
19 changes: 14 additions & 5 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@ var _ SimpleMVCCIterator = &MVCCIncrementalIterator{}

// MVCCIncrementalIterOptions bundles options for NewMVCCIncrementalIterator.
type MVCCIncrementalIterOptions struct {
IterOptions IterOptions
EnableTimeBoundIteratorOptimization bool
EndKey roachpb.Key
// Keys visible by the MVCCIncrementalIterator must be within (StartTime,
// EndTime]. Note that if {Min,Max}TimestampHints are specified in
// IterOptions, the timestamp hints interval should include the start and end
Expand All @@ -112,17 +113,25 @@ func NewMVCCIncrementalIterator(
) *MVCCIncrementalIterator {
var iter MVCCIterator
var timeBoundIter MVCCIterator
if !opts.IterOptions.MinTimestampHint.IsEmpty() && !opts.IterOptions.MaxTimestampHint.IsEmpty() {
if opts.EnableTimeBoundIteratorOptimization {
// An iterator without the timestamp hints is created to ensure that the
// iterator visits every required version of every key that has changed.
iter = reader.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{
UpperBound: opts.IterOptions.UpperBound,
UpperBound: opts.EndKey,
})
// The timeBoundIter is only required to see versioned keys, since the
// intents will be found by iter.
timeBoundIter = reader.NewMVCCIterator(MVCCKeyIterKind, opts.IterOptions)
timeBoundIter = reader.NewMVCCIterator(MVCCKeyIterKind, IterOptions{
UpperBound: opts.EndKey,
// The call to startTime.Next() converts our exclusive start bound into
// the inclusive start bound that MinTimestampHint expects.
MinTimestampHint: opts.StartTime.Next(),
MaxTimestampHint: opts.EndTime,
})
} else {
iter = reader.NewMVCCIterator(MVCCKeyAndIntentsIterKind, opts.IterOptions)
iter = reader.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{
UpperBound: opts.EndKey,
})
}

return &MVCCIncrementalIterator{
Expand Down
47 changes: 21 additions & 26 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,7 @@ func iterateExpectErr(
return func(t *testing.T) {
t.Helper()
iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{
IterOptions: IterOptions{
UpperBound: endKey,
},
EndKey: endKey,
StartTime: startTime,
EndTime: endTime,
})
Expand Down Expand Up @@ -75,11 +73,12 @@ func assertExportedKVs(
startKey, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
revisions bool,
io IterOptions,
expected []MVCCKeyValue,
useTBI bool,
) {
const big = 1 << 30
data, _, _, err := e.ExportMVCCToSst(startKey, endKey, startTime, endTime, revisions, big, big, io)
data, _, _, err := e.ExportMVCCToSst(startKey, endKey, startTime, endTime, revisions, big, big,
useTBI)
require.NoError(t, err)

if data == nil {
Expand Down Expand Up @@ -114,13 +113,14 @@ func assertIteratedKVs(
startKey, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
revisions bool,
io IterOptions,
expected []MVCCKeyValue,
useTBI bool,
) {
iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{
IterOptions: io,
StartTime: startTime,
EndTime: endTime,
EndKey: endKey,
EnableTimeBoundIteratorOptimization: useTBI,
StartTime: startTime,
EndTime: endTime,
})
defer iter.Close()
var iterFn func()
Expand Down Expand Up @@ -161,25 +161,22 @@ func assertEqualKVs(
) func(*testing.T) {
return func(t *testing.T) {
t.Helper()
io := IterOptions{UpperBound: endKey}
t.Run("iterate", func(t *testing.T) {
assertIteratedKVs(t, e, startKey, endKey, startTime, endTime, revisions, io, expected)
assertIteratedKVs(t, e, startKey, endKey, startTime, endTime, revisions, expected,
false /* useTBI */)
})
t.Run("iterate-tbi", func(t *testing.T) {
io := io
io.MinTimestampHint = startTime.Next()
io.MaxTimestampHint = endTime
assertIteratedKVs(t, e, startKey, endKey, startTime, endTime, revisions, io, expected)
assertIteratedKVs(t, e, startKey, endKey, startTime, endTime, revisions, expected,
true /* useTBI */)
})

t.Run("export", func(t *testing.T) {
assertExportedKVs(t, e, startKey, endKey, startTime, endTime, revisions, io, expected)
assertExportedKVs(t, e, startKey, endKey, startTime, endTime, revisions, expected,
false /* useTBI */)
})
t.Run("export-tbi", func(t *testing.T) {
io := io
io.MinTimestampHint = startTime.Next()
io.MaxTimestampHint = endTime
assertExportedKVs(t, e, startKey, endKey, startTime, endTime, revisions, io, expected)
assertExportedKVs(t, e, startKey, endKey, startTime, endTime, revisions, expected,
true /* useTBI */)
})
}
}
Expand Down Expand Up @@ -403,9 +400,7 @@ func slurpKVsInTimeRange(
) ([]MVCCKeyValue, error) {
endKey := prefix.PrefixEnd()
iter := NewMVCCIncrementalIterator(reader, MVCCIncrementalIterOptions{
IterOptions: IterOptions{
UpperBound: endKey,
},
EndKey: endKey,
StartTime: startTime,
EndTime: endTime,
})
Expand Down Expand Up @@ -703,9 +698,9 @@ func TestMVCCIncrementalIteratorIntentStraddlesSStables(t *testing.T) {
// inclusive on the end time. The expectation is that we'll see a write
// intent error.
it := NewMVCCIncrementalIterator(db2, MVCCIncrementalIterOptions{
IterOptions: IterOptions{UpperBound: keys.MaxKey},
StartTime: hlc.Timestamp{WallTime: 1},
EndTime: hlc.Timestamp{WallTime: 2},
EndKey: keys.MaxKey,
StartTime: hlc.Timestamp{WallTime: 1},
EndTime: hlc.Timestamp{WallTime: 2},
})
defer it.Close()
for it.SeekGE(MVCCKey{Key: keys.LocalMax}); ; it.Next() {
Expand Down
24 changes: 14 additions & 10 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,10 +645,11 @@ func (p *Pebble) ExportMVCCToSst(
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
io IterOptions,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
r, _ := tryWrapReader(p, MVCCKeyAndIntentsIterKind)
return pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
return pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI)
}

// MVCCGet implements the Engine interface.
Expand Down Expand Up @@ -1230,10 +1231,11 @@ func (p *pebbleReadOnly) ExportMVCCToSst(
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
io IterOptions,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
r, _ := tryWrapReader(p, MVCCKeyAndIntentsIterKind)
return pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
return pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI)
}

func (p *pebbleReadOnly) MVCCGet(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1435,10 +1437,11 @@ func (p *pebbleSnapshot) ExportMVCCToSst(
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
io IterOptions,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
r, _ := tryWrapReader(p, MVCCKeyAndIntentsIterKind)
return pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io)
return pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI)
}

// Get implements the Reader interface.
Expand Down Expand Up @@ -1521,7 +1524,7 @@ func pebbleExportToSst(
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
io IterOptions,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
sstFile := &MemFile{}
sstWriter := MakeBackupSSTWriter(sstFile)
Expand All @@ -1531,9 +1534,10 @@ func pebbleExportToSst(
iter := NewMVCCIncrementalIterator(
reader,
MVCCIncrementalIterOptions{
IterOptions: io,
StartTime: startTS,
EndTime: endTS,
EndKey: endKey,
EnableTimeBoundIteratorOptimization: useTBI,
StartTime: startTS,
EndTime: endTS,
})
defer iter.Close()
var curKey roachpb.Key // only used if exportAllRevisions
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/pebble_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (p *pebbleBatch) ExportMVCCToSst(
startTS, endTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
io IterOptions,
useTBI bool,
) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) {
panic("unimplemented")
}
Expand Down

0 comments on commit 65fe158

Please sign in to comment.