Skip to content

Commit

Permalink
storage: add limits to skipped data iteration
Browse files Browse the repository at this point in the history
Previously when exporting data from pebble iterator could spend unbouded
time skipping entries regardless of export size limits.
This is becoming a problem for resource constrained clusters where low
priority requests like export that are used by backups to interfere with
high priority workloads. If we want to throttle backups we need to be able
to limit how many underlying operations we want to perform per request.
This change adds an optional iteration limit to export. Once the
limit is reached, export will end its current chunk and return a resume
span even if desired size is not reached.
Current limiter uses wall clock time to stop interation.

Release note: None
  • Loading branch information
aliher1911 committed Oct 5, 2021
1 parent 6622d0f commit e2f01e4
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 72 deletions.
10 changes: 10 additions & 0 deletions pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
load("//build:STRINGER.bzl", "stringer")

go_library(
name = "storage",
Expand Down Expand Up @@ -28,6 +29,7 @@ go_library(
"pebble_iterator.go",
"pebble_merge.go",
"pebble_mvcc_scanner.go",
"resource_limiter.go",
"row_counter.go",
"slice.go",
"slice_go1.9.go",
Expand All @@ -37,6 +39,7 @@ go_library(
"temp_dir.go",
"temp_engine.go",
"testing_knobs.go",
":gen-resourcelimitreached-stringer", # keep
],
importpath = "github.com/cockroachdb/cockroach/pkg/storage",
visibility = ["//visibility:public"],
Expand Down Expand Up @@ -109,6 +112,7 @@ go_test(
"pebble_file_registry_test.go",
"pebble_mvcc_scanner_test.go",
"pebble_test.go",
"resource_limiter_test.go",
"sst_info_test.go",
"sst_iterator_test.go",
"sst_writer_test.go",
Expand Down Expand Up @@ -160,3 +164,9 @@ go_test(
"@org_golang_x_sync//errgroup",
],
)

stringer(
name = "gen-resourcelimitreached-stringer",
src = "resource_limiter.go",
typ = "ResourceLimitReached",
)
7 changes: 4 additions & 3 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,11 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int

var batch Batch
var minWallTime int64
batchSize := len(keys) / numBatches
for i, key := range keys {
if scaled := len(keys) / numBatches; (i % scaled) == 0 {
if (i % batchSize) == 0 {
if i > 0 {
log.Infof(ctx, "committing (%d/~%d)", i/scaled, numBatches)
log.Infof(ctx, "committing (%d/~%d)", i/batchSize, numBatches)
if err := batch.Commit(false /* sync */); err != nil {
return nil, err
}
Expand All @@ -666,7 +667,7 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int
}
}
batch = eng.NewBatch()
minWallTime = sstTimestamps[i/scaled]
minWallTime = sstTimestamps[i/batchSize]
}
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,10 @@ type ExportOptions struct {
// would stop immediately when targetSize is reached and return the next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
StopMidKey bool
// ResourceLimiter limits how long iterator could run until it exhausts allocated
// resources. Expot queries limiter in its iteration loop to break out once
// resources are exhausted.
ResourceLimiter ResourceLimiter
// If UseTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
Expand Down
13 changes: 3 additions & 10 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,7 @@ func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) {
}
}
i.iter.SeekGE(startKey)
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
i.err = nil
Expand Down Expand Up @@ -463,10 +461,7 @@ func (i *MVCCIncrementalIterator) advance() {
// done.
break
}

if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
}
Expand Down Expand Up @@ -511,9 +506,7 @@ func (i *MVCCIncrementalIterator) UnsafeValue() []byte {
func (i *MVCCIncrementalIterator) NextIgnoringTime() {
for {
i.iter.Next()
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}

Expand Down
117 changes: 61 additions & 56 deletions pkg/storage/mvcc_incremental_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,36 @@ import (

const all, latest = true, false

func makeKVT(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue {
return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value}
}

func makeKVTxn(
key roachpb.Key, val []byte, ts hlc.Timestamp,
) (roachpb.Transaction, roachpb.Value, roachpb.Intent) {
txnID := uuid.MakeV4()
txnMeta := enginepb.TxnMeta{
Key: key,
ID: txnID,
Epoch: 1,
WriteTimestamp: ts,
}
return roachpb.Transaction{
TxnMeta: txnMeta,
ReadTimestamp: ts,
}, roachpb.Value{
RawBytes: val,
}, roachpb.MakeIntent(&txnMeta, key)
}

func intents(intents ...roachpb.Intent) []roachpb.Intent {
return intents
}

func kvs(kvs ...MVCCKeyValue) []MVCCKeyValue {
return kvs
}

func iterateExpectErr(
e Engine,
startKey, endKey roachpb.Key,
Expand Down Expand Up @@ -413,16 +443,11 @@ func TestMVCCIncrementalIteratorNextIgnoringTime(t *testing.T) {
tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0}
)

makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue {
return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value}
}

kv1_1_1 := makeKVT(testKey1, testValue1, ts1)
kv1_2_2 := makeKVT(testKey1, testValue2, ts2)
kv2_2_2 := makeKVT(testKey2, testValue3, ts2)
kv2_4_4 := makeKVT(testKey2, testValue4, ts4)
kv1_3Deleted := makeKVT(testKey1, nil, ts3)
kvs := func(kvs ...MVCCKeyValue) []MVCCKeyValue { return kvs }

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name, func(t *testing.T) {
Expand Down Expand Up @@ -549,9 +574,6 @@ func TestMVCCIncrementalIteratorInlinePolicy(t *testing.T) {
tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0}
)

makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue {
return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value}
}
inline1_1_1 := makeKVT(testKey1, testValue1, hlc.Timestamp{})
kv2_1_1 := makeKVT(testKey2, testValue1, ts1)
kv2_2_2 := makeKVT(testKey2, testValue2, ts2)
Expand Down Expand Up @@ -635,9 +657,6 @@ func TestMVCCIncrementalIteratorIntentPolicy(t *testing.T) {
}, roachpb.MakeIntent(&txnMeta, key)
}

makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue {
return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value}
}
kv1_1_1 := makeKVT(testKey1, testValue1, ts1)
kv1_2_2 := makeKVT(testKey1, testValue2, ts2)
kv1_3_3 := makeKVT(testKey1, testValue3, ts3)
Expand Down Expand Up @@ -803,33 +822,12 @@ func TestMVCCIncrementalIterator(t *testing.T) {
tsMax = hlc.Timestamp{WallTime: math.MaxInt64, Logical: 0}
)

makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue {
return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value}
}
makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp) (roachpb.Transaction, roachpb.Value, roachpb.Intent) {
txnID := uuid.MakeV4()
txnMeta := enginepb.TxnMeta{
Key: key,
ID: txnID,
Epoch: 1,
WriteTimestamp: ts,
}
return roachpb.Transaction{
TxnMeta: txnMeta,
ReadTimestamp: ts,
}, roachpb.Value{
RawBytes: val,
}, roachpb.MakeIntent(&txnMeta, key)
}
intents := func(intents ...roachpb.Intent) []roachpb.Intent { return intents }

// Keys are named as kv<key>_<value>_<ts>.
kv1_1_1 := makeKVT(testKey1, testValue1, ts1)
kv1_4_4 := makeKVT(testKey1, testValue4, ts4)
kv1_2_2 := makeKVT(testKey1, testValue2, ts2)
kv2_2_2 := makeKVT(testKey2, testValue3, ts2)
kv1Deleted3 := makeKVT(testKey1, nil, ts3)
kvs := func(kvs ...MVCCKeyValue) []MVCCKeyValue { return kvs }

for _, engineImpl := range mvccEngineImpls {
t.Run(engineImpl.name+"-latest", func(t *testing.T) {
Expand Down Expand Up @@ -864,11 +862,11 @@ func TestMVCCIncrementalIterator(t *testing.T) {
t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, latest, kvs(kv1Deleted3, kv2_2_2)))

// Exercise intent handling.
txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4)
txn1, txn1Val, intentErr1 := makeKVTxn(testKey1, testValue4, ts4)
if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil {
t.Fatal(err)
}
txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4)
txn2, txn2Val, intentErr2 := makeKVTxn(testKey2, testValue4, ts4)
if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -932,11 +930,11 @@ func TestMVCCIncrementalIterator(t *testing.T) {
t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, all, kvs(kv1Deleted3, kv1_2_2, kv2_2_2)))

// Exercise intent handling.
txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4)
txn1, txn1Val, intentErr1 := makeKVTxn(testKey1, testValue4, ts4)
if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil {
t.Fatal(err)
}
txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4)
txn2, txn2Val, intentErr2 := makeKVTxn(testKey2, testValue4, ts4)
if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -1349,32 +1347,39 @@ func TestMVCCIterateTimeBound(t *testing.T) {
t.Run(fmt.Sprintf("%s-%s", testCase.start, testCase.end), func(t *testing.T) {
defer leaktest.AfterTest(t)()

var expectedKVs []MVCCKeyValue
iter := eng.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: localMax})
for {
ok, err := iter.Valid()
if err != nil {
t.Fatal(err)
} else if !ok {
break
}
ts := iter.Key().Timestamp
if (ts.Less(testCase.end) || testCase.end == ts) && testCase.start.Less(ts) {
expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
}
iter.Next()
}
if len(expectedKVs) < 1 {
t.Fatalf("source of truth had no expected KVs; likely a bug in the test itself")
}
expectedKVs := collectMatchingWithMVCCIterator(t, eng, testCase.start, testCase.end)

assertEqualKVs(eng, keys.LocalMax, keys.MaxKey, testCase.start, testCase.end, latest, expectedKVs)(t)
})
}
}

func collectMatchingWithMVCCIterator(
t *testing.T, eng Engine, start, end hlc.Timestamp,
) []MVCCKeyValue {
var expectedKVs []MVCCKeyValue
iter := eng.NewMVCCIterator(MVCCKeyAndIntentsIterKind, IterOptions{UpperBound: roachpb.KeyMax})
defer iter.Close()
iter.SeekGE(MVCCKey{Key: localMax})
for {
ok, err := iter.Valid()
if err != nil {
t.Fatal(err)
} else if !ok {
break
}
ts := iter.Key().Timestamp
if (ts.Less(end) || end == ts) && start.Less(ts) {
expectedKVs = append(expectedKVs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()})
}
iter.Next()
}
if len(expectedKVs) < 1 {
t.Fatalf("source of truth had no expected KVs; likely a bug in the test itself")
}
return expectedKVs
}

func runIncrementalBenchmark(
b *testing.B, emk engineMaker, useTBI bool, ts hlc.Timestamp, opts benchDataOptions,
) {
Expand Down
33 changes: 30 additions & 3 deletions pkg/storage/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -2013,11 +2013,11 @@ func pebbleExportToSst(
var resumeKey roachpb.Key
var resumeTS hlc.Timestamp
paginated := options.TargetSize > 0
trackKeyBoundary := paginated || options.ResourceLimiter != nil
firstIteration := true
for iter.SeekGE(options.StartKey); ; {
ok, err := iter.Valid()
if err != nil {
// This is an underlying iterator error, return it to the caller to deal
// with.
return roachpb.BulkOpSummary{}, MVCCKey{}, err
}
if !ok {
Expand All @@ -2034,10 +2034,37 @@ func pebbleExportToSst(

unsafeValue := iter.UnsafeValue()
isNewKey := !options.ExportAllRevisions || !unsafeKey.Key.Equal(curKey)
if paginated && options.ExportAllRevisions && isNewKey {
if trackKeyBoundary && options.ExportAllRevisions && isNewKey {
curKey = append(curKey[:0], unsafeKey.Key...)
}

if options.ResourceLimiter != nil {
// Don't check resources on first iteration to ensure we can make some progress regardless
// of starvation. Otherwise operations could spin indefinitely.
if firstIteration {
firstIteration = false
} else {
// In happy day case we want to only stop at key boundaries as it allows callers to use
// produced sst's directly. But if we can't find key boundary within reasonable number of
// iterations we would split mid key.
// To achieve that we use soft and hard thresholds in limiter. Once soft limit is reached
// we would start searching for key boundary and return as soon as it is reached. If we
// can't find it before hard limit is reached and caller requested mid key stop we would
// immediately return.
limit := options.ResourceLimiter.IsExhausted()
// We can stop at key once any threshold is reached or force stop at hard limit if midkey
// split is allowed.
if limit >= ResourceLimitReachedSoft && isNewKey || limit == ResourceLimitReachedHard && options.StopMidKey {
// Reached iteration limit, stop with resume span
resumeKey = append(make(roachpb.Key, 0, len(unsafeKey.Key)), unsafeKey.Key...)
if !isNewKey {
resumeTS = unsafeKey.Timestamp
}
break
}
}
}

// Skip tombstone (len=0) records when start time is zero (non-incremental)
// and we are not exporting all versions.
skipTombstones := !options.ExportAllRevisions && options.StartTS.IsEmpty()
Expand Down
Loading

0 comments on commit e2f01e4

Please sign in to comment.