Skip to content

Commit

Permalink
storage: add limits to skipped data iteration
Browse files Browse the repository at this point in the history
Previously when iterating engine using  MVCCIncrementalIterator caller
could skip large amounts of non-matching data which would result in
"unbounded" resource usage.
This is becoming a problem for resource constrained clusters where low
priority requests like export that are used by backups to interfere with
high priority workloads. If we want to throttle backups we need to be able
to limit how many underlying operations we want to perform per request.
This change adds an optional iteration limit to the iterator. Once the
limit is reached, iterator will return an error. Error will provide a
resume key to continue iteration in next request.
Current limiter uses wall clock time to stop interation.

Release note: None

Release justification:
  • Loading branch information
aliher1911 committed Sep 14, 2021
1 parent c3307e0 commit a25bb90
Show file tree
Hide file tree
Showing 5 changed files with 460 additions and 63 deletions.
7 changes: 4 additions & 3 deletions pkg/storage/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,10 +653,11 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int

var batch Batch
var minWallTime int64
batchSize := len(keys) / numBatches
for i, key := range keys {
if scaled := len(keys) / numBatches; (i % scaled) == 0 {
if (i % batchSize) == 0 {
if i > 0 {
log.Infof(ctx, "committing (%d/~%d)", i/scaled, numBatches)
log.Infof(ctx, "committing (%d/~%d)", i/batchSize, numBatches)
if err := batch.Commit(false /* sync */); err != nil {
return nil, err
}
Expand All @@ -666,7 +667,7 @@ func loadTestData(dir string, numKeys, numBatches, batchTimeSpan, valueBytes int
}
}
batch = eng.NewBatch()
minWallTime = sstTimestamps[i/scaled]
minWallTime = sstTimestamps[i/batchSize]
}
timestamp := hlc.Timestamp{WallTime: minWallTime + rand.Int63n(int64(batchTimeSpan))}
value := roachpb.MakeValueFromBytes(randutil.RandBytes(rng, valueBytes))
Expand Down
7 changes: 7 additions & 0 deletions pkg/storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,13 @@ type ExportOptions struct {
// would stop immediately when targetSize is reached and return the next versions
// timestamp in resumeTs so that subsequent operation can pass it to firstKeyTs.
StopMidKey bool
// ResourceLimiter limits how long iterator could run until it exhausts allocated
// resources. Iterator queries limiter in its internal advance loop to break out
// of it once resources are exhausted.
// This option could only be used together with StopMidKey as it could break
// iteration over data on any version and even on a key and timestamp that would
// not be included in the resulting export.
ResourceLimiter ResourceLimiter
// If UseTBI is true, the backing MVCCIncrementalIterator will initialize a
// time-bound iterator along with its regular iterator. The TBI will be used
// as an optimization to skip over swaths of uninteresting keys i.e. keys
Expand Down
127 changes: 117 additions & 10 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,14 @@
package storage

import (
"fmt"
"time"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -33,6 +37,11 @@ import (
// MVCCIncrementalIterIntentPolicy. By default, an error will be
// returned.
//
// If iteration resource limit is requested, iterator would return an error as
// soon as limit is reached. The error will contain a resume key which could be
// outside of requested boundaries and should be used to resume iteration on
// subsequent requests.
//
// Note: The endTime is inclusive to be consistent with the non-incremental
// iterator, where reads at a given timestamp return writes at that
// timestamp. The startTime is then made exclusive so that iterating time 1 to
Expand Down Expand Up @@ -94,6 +103,74 @@ type MVCCIncrementalIterator struct {

// Optional collection of intents created on demand when first intent encountered.
intents []roachpb.Intent

// Limit iterator resources. Safe to leave as nil to use unbounded resources.
limiter ResourceLimiter
startKey MVCCKey
}

// ResourceLimiterOptions is defining limits for resource limiter to restrict number
// of iterations.
type ResourceLimiterOptions struct {
MaxRunTime time.Duration
}

// ResourceLimiter provides an optional facility to stop mvcc iterator without
// waiting for the next value to be returned.
type ResourceLimiter interface {
// IsExhausted returns true when limited resource is exhausted. Iterator is
// checking the exhaustion status of resource limiter every time it advances
// to the next underlying key value pair.
IsExhausted() bool
// Report provides an explanation string which resource was exhausted and could
// be used for logging and error messages.
Report() string
}

// ClockCheckEveryNIterations defines for how many iterations we could cache
// current time when performing iteration wall clock time limiting.
const ClockCheckEveryNIterations = 100

// TimeResourceLimiter provides limiter based on wall clock time.
type TimeResourceLimiter struct {
// Iteration time limit
maxRunTime time.Duration
startTime time.Time
ts timeutil.TimeSource

// Check throttling to decrease time query usage.
checkThrottle int
}

var _ ResourceLimiter = &TimeResourceLimiter{}

// NewResourceLimiter create new default resource limiter. Current implementation is wall clock time based.
// Timer starts as soon as limiter is created.
// If no limits are specified in opts nil is returned.
func NewResourceLimiter(opts ResourceLimiterOptions, ts timeutil.TimeSource) ResourceLimiter {
if opts.MaxRunTime == 0 {
return nil
}
return &TimeResourceLimiter{maxRunTime: opts.MaxRunTime, startTime: ts.Now(), ts: ts}
}

// IsExhausted implements ResourceLimiter interface.
func (l *TimeResourceLimiter) IsExhausted() bool {
if l.checkThrottle >= ClockCheckEveryNIterations && l.maxRunTime > 0 {
// Check if we exhausted allocated time.
now := l.ts.Now()
if now.Sub(l.startTime) > l.maxRunTime {
return true
}
l.checkThrottle = 0
}
l.checkThrottle++
return false
}

// Report implements ResourceLimiter interface.
func (l *TimeResourceLimiter) Report() string {
return fmt.Sprintf("iteration exhausted time limit of %v", l.maxRunTime)
}

var _ SimpleMVCCIterator = &MVCCIncrementalIterator{}
Expand Down Expand Up @@ -153,6 +230,18 @@ type MVCCIncrementalIterOptions struct {

IntentPolicy MVCCIncrementalIterIntentPolicy
InlinePolicy MVCCIncrementalIterInlinePolicy

// If Limiter is non nil iterator would only advance or seek up to a certain
// number of times before bailing out with ResourceLimitError. When using this feature
// it is important to use resume key from the error to ensure forward progress could
// be made.
// StartKey must also be populated with resume key. This is needed to ensure progress
// for cases when initial seek would exhaust resources and that subsequent call would
// restart from further position.
// Note that resume key is not necessarily a valid iteration key as we could stop in
// between eligible keys because of timestamp range limits.
Limiter ResourceLimiter
StartKey MVCCKey
}

// NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the
Expand Down Expand Up @@ -194,6 +283,8 @@ func NewMVCCIncrementalIterator(
timeBoundIter: timeBoundIter,
intentPolicy: opts.IntentPolicy,
inlinePolicy: opts.InlinePolicy,
limiter: opts.Limiter,
startKey: opts.StartKey,
}
}

Expand All @@ -217,9 +308,7 @@ func (i *MVCCIncrementalIterator) SeekGE(startKey MVCCKey) {
}
}
i.iter.SeekGE(startKey)
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
i.err = nil
Expand Down Expand Up @@ -423,6 +512,14 @@ func (i *MVCCIncrementalIterator) advance() {
return
}

// If we reached requested iteration limit here, stop iteration with an error and
// provide a resume key.
if i.limiter != nil && i.limiter.IsExhausted() && i.startKey.Less(i.iter.UnsafeKey()) {
i.err = &ResourceLimitError{Message: i.limiter.Report(), ResumeKey: i.Key()}
i.valid = false
return
}

// If we have an inline value and the policy was to error, we
// would have errored in the call above. If our policy is to
// emit inline values, we don't want to advance past it. Inline
Expand Down Expand Up @@ -463,10 +560,7 @@ func (i *MVCCIncrementalIterator) advance() {
// done.
break
}

if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
}
Expand Down Expand Up @@ -511,9 +605,7 @@ func (i *MVCCIncrementalIterator) UnsafeValue() []byte {
func (i *MVCCIncrementalIterator) NextIgnoringTime() {
for {
i.iter.Next()
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}

Expand Down Expand Up @@ -551,3 +643,18 @@ func (i *MVCCIncrementalIterator) TryGetIntentError() error {
Intents: i.intents,
}
}

// ResourceLimitError is returned by MVCCIncrementalIterator when iterator reaches
// maximum number of iterations in the underlying storage iterator. Error will
// provide used Limit hint as well as ResumeKey that must be used to resume iteration
// on the subsequent attempt.
type ResourceLimitError struct {
Message string
ResumeKey MVCCKey
}

var _ error = &ResourceLimitError{}

func (e *ResourceLimitError) Error() string {
return e.Message
}
Loading

0 comments on commit a25bb90

Please sign in to comment.