Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage/storageccl: Add an option to break export mid key #68102

Merged
merged 2 commits into from Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions pkg/ccl/storageccl/export.go
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -177,11 +178,19 @@ func evalExport(

// Time-bound iterators only make sense to use if the start time is set.
useTBI := args.EnableTimeBoundIteratorOptimization && !args.StartTime.IsEmpty()
// Only use resume timestamp if splitting mid key is enabled.
resumeKeyTS := hlc.Timestamp{}
if args.SplitMidKey {
if !args.ReturnSST {
return result.Result{}, errors.New("SplitMidKey could only be used with ReturnSST option")
}
resumeKeyTS = args.ResumeKeyTS
}
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, err := reader.ExportMVCCToSst(ctx, start, args.EndKey, args.StartTime,
h.Timestamp, exportAllRevisions, targetSize, maxSize, useTBI, destFile)
summary, resume, resumeTS, err := reader.ExportMVCCToSst(ctx, start, args.EndKey, args.StartTime,
h.Timestamp, resumeKeyTS, exportAllRevisions, targetSize, maxSize, args.SplitMidKey, useTBI, destFile)
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
Expand All @@ -206,6 +215,7 @@ func evalExport(
}
exported := roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resumeTS,
Exported: summary,
LocalityKV: localityKV,
}
Expand Down Expand Up @@ -249,6 +259,7 @@ func evalExport(
}
reply.Files = append(reply.Files, exported)
start = resume
resumeKeyTS = resumeTS

// If we are not returning the SSTs to the processor, there is no need to
// paginate the ExportRequest since the reply size will not grow large
Expand Down
102 changes: 65 additions & 37 deletions pkg/ccl/storageccl/export_test.go
Expand Up @@ -444,7 +444,7 @@ func exportUsingGoIterator(
filter roachpb.MVCCFilter,
startTime, endTime hlc.Timestamp,
startKey, endKey roachpb.Key,
enableTimeBoundIteratorOptimization bool,
enableTimeBoundIteratorOptimization timeBoundOptimisation,
reader storage.Reader,
) ([]byte, error) {
memFile := &storage.MemFile{}
Expand All @@ -466,7 +466,7 @@ func exportUsingGoIterator(

iter := storage.NewMVCCIncrementalIterator(reader, storage.MVCCIncrementalIterOptions{
EndKey: endKey,
EnableTimeBoundIteratorOptimization: enableTimeBoundIteratorOptimization,
EnableTimeBoundIteratorOptimization: bool(enableTimeBoundIteratorOptimization),
StartTime: startTime,
EndTime: endTime,
})
Expand Down Expand Up @@ -539,13 +539,29 @@ func loadSST(t *testing.T, data []byte, start, end roachpb.Key) []storage.MVCCKe
return kvs
}

type exportRevisions bool
type batchBoundaries bool
type timeBoundOptimisation bool

const (
exportAll exportRevisions = true
exportLatest exportRevisions = false

stopAtTimestamps batchBoundaries = true
stopAtKeys batchBoundaries = false

optimizeTimeBounds timeBoundOptimisation = true
dontOptimizeTimeBounds timeBoundOptimisation = false
)

func assertEqualKVs(
ctx context.Context,
e storage.Engine,
startKey, endKey roachpb.Key,
startTime, endTime hlc.Timestamp,
exportAllRevisions bool,
enableTimeBoundIteratorOptimization bool,
exportAllRevisions exportRevisions,
stopMidKey batchBoundaries,
enableTimeBoundIteratorOptimization timeBoundOptimisation,
targetSize uint64,
) func(*testing.T) {
return func(t *testing.T) {
Expand All @@ -568,14 +584,16 @@ func assertEqualKVs(

// Run the actual code path used when exporting MVCCs to SSTs.
var kvs []storage.MVCCKeyValue
var resumeTs hlc.Timestamp
for start := startKey; start != nil; {
var sst []byte
var summary roachpb.BulkOpSummary
maxSize := uint64(0)
prevStart := start
prevTs := resumeTs
sstFile := &storage.MemFile{}
summary, start, err = e.ExportMVCCToSst(ctx, start, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization, sstFile)
summary, start, resumeTs, err = e.ExportMVCCToSst(ctx, start, endKey, startTime, endTime, resumeTs,
bool(exportAllRevisions), targetSize, maxSize, bool(stopMidKey), bool(enableTimeBoundIteratorOptimization), sstFile)
require.NoError(t, err)
sst = sstFile.Data()
loaded := loadSST(t, sst, startKey, endKey)
Expand Down Expand Up @@ -614,8 +632,8 @@ func assertEqualKVs(
if dataSizeWhenExceeded == maxSize {
maxSize--
}
_, _, err = e.ExportMVCCToSst(ctx, prevStart, endKey, startTime, endTime,
exportAllRevisions, targetSize, maxSize, enableTimeBoundIteratorOptimization, &storage.MemFile{})
_, _, _, err = e.ExportMVCCToSst(ctx, prevStart, endKey, startTime, endTime, prevTs,
bool(exportAllRevisions), targetSize, maxSize, false, bool(enableTimeBoundIteratorOptimization), &storage.MemFile{})
require.Regexp(t, fmt.Sprintf("export size \\(%d bytes\\) exceeds max size \\(%d bytes\\)",
dataSizeWhenExceeded, maxSize), err)
}
Expand All @@ -639,6 +657,7 @@ func assertEqualKVs(
}
}
}

func TestRandomKeyAndTimestampExport(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand All @@ -658,12 +677,13 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
cleanupDir()
}
}
const keySize = 100
const bytesPerValue = 300
getNumKeys := func(t *testing.T, rnd *rand.Rand, targetSize uint64) (numKeys int) {
const (
targetPages = 10
bytesPerValue = 300
minNumKeys = 2 // need > 1 keys for random key test
maxNumKeys = 5000
targetPages = 10
minNumKeys = 2 // need > 1 keys for random key test
maxNumKeys = 5000
)
numKeys = maxNumKeys
if targetSize > 0 {
Expand Down Expand Up @@ -695,11 +715,13 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
timestamps = append(timestamps, ts)

// Make keys unique and ensure they are monotonically increasing.
key := roachpb.Key(randutil.RandBytes(rnd, 100))
key := roachpb.Key(randutil.RandBytes(rnd, keySize))
key = append([]byte(fmt.Sprintf("#%d", i)), key...)
keys = append(keys, key)

value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, 200))
averageValueSize := bytesPerValue - keySize
valueSize := randutil.RandIntInRange(rnd, averageValueSize-100, averageValueSize+100)
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rnd, valueSize))
value.InitChecksum(key)
if err := storage.MVCCPut(ctx, batch, nil, key, ts, value, nil); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -741,28 +763,35 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0}
)

t.Run("ts (0-∞], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, false, targetSize))
t.Run("ts (0-∞], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, false, targetSize))
t.Run("ts (0-∞], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, false, true, targetSize))
t.Run("ts (0-∞], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, tsMin, tsMax, true, true, targetSize))

upperBound := randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound := rnd.Intn(upperBound)

// Exercise random key ranges.
t.Run("kv [randLower, randUpper), latest, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, false, targetSize))
t.Run("kv [randLower, randUpper), all, nontimebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, false, targetSize))
t.Run("kv [randLower, randUpper), latest, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, false, true, targetSize))
t.Run("kv [randLower, randUpper), all, timebound", assertEqualKVs(ctx, e, keys[lowerBound], keys[upperBound], tsMin, tsMax, true, true, targetSize))

upperBound = randutil.RandIntInRange(rnd, 1, numKeys)
lowerBound = rnd.Intn(upperBound)

// Exercise random timestamps.
t.Run("kv (randLowerTime, randUpperTime], latest, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, nontimebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, false, targetSize))
t.Run("kv (randLowerTime, randUpperTime], latest, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], false, true, targetSize))
t.Run("kv (randLowerTime, randUpperTime], all, timebound", assertEqualKVs(ctx, e, keyMin, keyMax, timestamps[lowerBound], timestamps[upperBound], true, true, targetSize))
keyUpperBound := randutil.RandIntInRange(rnd, 1, numKeys)
keyLowerBound := rnd.Intn(keyUpperBound)
tsUpperBound := randutil.RandIntInRange(rnd, 1, numKeys)
tsLowerBound := rnd.Intn(tsUpperBound)

for _, s := range []struct {
name string
keyMin roachpb.Key
keyMax roachpb.Key
tsMin hlc.Timestamp
tsMax hlc.Timestamp
}{
{"ts (0-∞]", keyMin, keyMax, tsMin, tsMax},
{"kv [randLower, randUpper)", keys[keyLowerBound], keys[keyUpperBound], tsMin, tsMax},
{"kv (randLowerTime, randUpperTime]", keyMin, keyMax, timestamps[tsLowerBound], timestamps[tsUpperBound]},
} {
t.Run(fmt.Sprintf("%s, latest, nontimebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportLatest, stopAtKeys, dontOptimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, nontimebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtKeys, dontOptimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, split rows, nontimebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtTimestamps, dontOptimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, latest, timebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportLatest, stopAtKeys, optimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, timebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtKeys, optimizeTimeBounds, targetSize))
t.Run(fmt.Sprintf("%s, all, split rows, timebound", s.name),
assertEqualKVs(ctx, e, s.keyMin, s.keyMax, s.tsMin, s.tsMax, exportAll, stopAtTimestamps, optimizeTimeBounds, targetSize))
}
}
// Exercise min to max time and key ranges.
for _, targetSize := range []uint64{
Expand All @@ -772,5 +801,4 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
testWithTargetSize(t, targetSize)
})
}

}
10 changes: 5 additions & 5 deletions pkg/kv/kvserver/spanset/batch.go
Expand Up @@ -401,14 +401,14 @@ func (s spanSetReader) Closed() bool {
func (s spanSetReader) ExportMVCCToSst(
ctx context.Context,
startKey, endKey roachpb.Key,
startTS, endTS hlc.Timestamp,
startTS, endTS, firstKeyTS hlc.Timestamp,
exportAllRevisions bool,
targetSize, maxSize uint64,
useTBI bool,
stopMidKey, useTBI bool,
dest io.Writer,
) (roachpb.BulkOpSummary, roachpb.Key, error) {
return s.r.ExportMVCCToSst(ctx, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize,
maxSize, useTBI, dest)
) (roachpb.BulkOpSummary, roachpb.Key, hlc.Timestamp, error) {
return s.r.ExportMVCCToSst(ctx, startKey, endKey, startTS, endTS, firstKeyTS, exportAllRevisions, targetSize,
maxSize, stopMidKey, useTBI, dest)
}

func (s spanSetReader) MVCCGet(key storage.MVCCKey) ([]byte, error) {
Expand Down