Skip to content

Commit

Permalink
compactor: purge suggestions that have live data
Browse files Browse the repository at this point in the history
While debugging cockroachdb#24029 we discovered that RESTORE generates massive
numbers of suggested compactions as it splits and scatters ranges. As
the cluster rebalances, every removed replica leaves behind a range
deletion tombstone and a suggested compaction over the keys it covered.

Occasionally, the replica chosen for rebalancing will be a member of the
last range of the cluster. This range extends from wherever the restore
has last split the table it's restoring to the very last key. Suppose
we're restoring a table with 10,000 ranges evenly distributed across
primary keys from 1-10,000. If a replica in the last range gets
rebalanced early in the restore—say, after only the first 500 ranges
have been split off—at least one node in the cluster will have a
suggested compaction for a range like the following:

    /Table/51/1/500 - /Max

This creates a huge problem! The restore will eventually create 9500
more ranges in that keyspan, each about 32MiB in size. Some of those
ranges will necessarily rebalance back onto the node with the suggested
compaction. By the time the compaction queue gets around to processing
the suggestion, there might be hundreds of gigabytes within the range.

In our 2TB (replicated) store dump, snapshotted immediately after a
RESTORE, there were two such massive suggested compactions, each of
which took over 1h to complete. This bogs down the compaction queue with
unnecessary work and makes it especially dangerous to initiate a DROP
TABLE (cockroachdb#24029), as the incoming range deletion tombstones will pile up
until the prior compaction finishes, and the cluster grinds to a halt in
the meantime.

The same problem happens whenever any replica is rebalanced away and
back before the compaction queue has a chance to compact away the range
deletion tombstone, though the impact is limited because the keyspan is
smaller.

This commit prevents the compaction queue from getting bogged down with
suggestions based on outdated information. At the time the suggestion is
considered for compaction, the queue checks whether the key span
suggested has any live keys. The sign of even a single key is a good
indicator that something has changed—usually that the replica, or one of
its split children, has been rebalanced back onto the node. The
compaction queue now deletes this suggestion instead of acting on it.

This is a crucial piece of the fix to cockroachdb#24029. The other half, cockroachdb#26449,
involves rate-limiting ClearRange requests.

I suspect this change will have a nice performance boost for RESTORE.
Anecdotally we've noticed that restores slow down over time. I'm willing
to bet its because nonsense suggested compactions start hogging disk
I/O.

Release note: None
  • Loading branch information
benesch committed Jul 9, 2018
1 parent e348256 commit 1fc5f95
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 21 deletions.
86 changes: 65 additions & 21 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,27 +204,8 @@ func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
ctx, cleanup := tracing.EnsureContext(ctx, c.st.Tracer, "process suggested compactions")
defer cleanup()

// Collect all suggestions.
var suggestions []storagebase.SuggestedCompaction
var totalBytes int64
if err := c.eng.Iterate(
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin},
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax},
func(kv engine.MVCCKeyValue) (bool, error) {
var sc storagebase.SuggestedCompaction
var err error
sc.StartKey, sc.EndKey, err = keys.DecodeStoreSuggestedCompactionKey(kv.Key.Key)
if err != nil {
return false, errors.Wrapf(err, "failed to decode suggested compaction key")
}
if err := protoutil.Unmarshal(kv.Value, &sc.Compaction); err != nil {
return false, err
}
suggestions = append(suggestions, sc)
totalBytes += sc.Bytes
return false, nil // continue iteration
},
); err != nil {
suggestions, totalBytes, err := c.fetchSuggestions(ctx)
if err != nil {
return false, err
}

Expand Down Expand Up @@ -301,6 +282,69 @@ func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
return true, nil
}

// fetchSuggestions loads the persisted suggested compactions from the store.
func (c *Compactor) fetchSuggestions(
ctx context.Context,
) (suggestions []storagebase.SuggestedCompaction, totalBytes int64, err error) {
dataIter := c.eng.NewIterator(engine.IterOptions{
UpperBound: roachpb.KeyMax, // refined before every seek
})
defer dataIter.Close()

delBatch := c.eng.NewBatch()
defer delBatch.Close()

err = c.eng.Iterate(
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin},
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax},
func(kv engine.MVCCKeyValue) (bool, error) {
var sc storagebase.SuggestedCompaction
var err error
sc.StartKey, sc.EndKey, err = keys.DecodeStoreSuggestedCompactionKey(kv.Key.Key)
if err != nil {
return false, errors.Wrapf(err, "failed to decode suggested compaction key")
}
if err := protoutil.Unmarshal(kv.Value, &sc.Compaction); err != nil {
return false, err
}

dataIter.SetUpperBound(sc.EndKey)
dataIter.Seek(engine.MakeMVCCMetadataKey(sc.StartKey))
if ok, err := dataIter.Valid(); err != nil {
return false, err
} else if ok && dataIter.UnsafeKey().Less(engine.MakeMVCCMetadataKey(sc.EndKey)) {
// The suggested compaction span has live keys remaining. This is a
// strong indicator that compacting this range will be significantly
// more expensive than we expected when the compaction was suggested, as
// compactions are only suggested when a ClearRange request has removed
// all the keys in the span. Perhaps a replica was rebalanced away then
// back?
//
// Since we can't guarantee that this compaction will be an easy win,
// purge it to avoid bogging down the compaction queue.
log.Infof(ctx, "purging suggested compaction for range %s - %s that contains live data",
sc.StartKey, sc.EndKey)
if err := delBatch.Clear(kv.Key); err != nil {
log.Fatal(ctx, err) // should never happen on a batch
}
c.Metrics.BytesSkipped.Inc(sc.Bytes)
} else {
suggestions = append(suggestions, sc)
totalBytes += sc.Bytes
}

return false, nil // continue iteration
},
)
if err != nil {
return nil, 0, err
}
if err := delBatch.Commit(true); err != nil {
log.Warningf(ctx, "unable to delete suggested compaction records: %s", err)
}
return suggestions, totalBytes, nil
}

// processCompaction sends CompactRange requests to the storage engine if the
// aggregated suggestion exceeds size threshold(s). Otherwise, it either skips
// the compaction or skips the compaction *and* deletes the suggested compaction
Expand Down
62 changes: 62 additions & 0 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (we *wrappedEngine) GetSSTables() engine.SSTableInfos {
{Level: 2, Size: 100, Start: key("k"), End: key("o")},
{Level: 2, Size: 100, Start: key("r"), End: key("t")},
// Level 6.
{Level: 6, Size: 200, Start: key("0"), End: key("9")},
{Level: 6, Size: 201, Start: key("a"), End: key("c")},
{Level: 6, Size: 200, Start: key("d"), End: key("f")},
{Level: 6, Size: 300, Start: key("h"), End: key("r")},
Expand Down Expand Up @@ -162,6 +163,27 @@ func TestCompactorThresholds(t *testing.T) {
{Key: key("a"), EndKey: key("b")},
},
},
// Single suggestion which is over absolute bytes threshold should not
// trigger a compaction if the span contains keys.
{
name: "outdated single suggestion over absolute threshold",
suggestions: []storagebase.SuggestedCompaction{
{
StartKey: key("0"), EndKey: key("9"),
Compaction: storagebase.Compaction{
Bytes: thresholdBytes.Default(),
SuggestedAtNanos: nowNanos,
},
},
},
logicalBytes: thresholdBytes.Default() * 100, // not going to trigger fractional threshold
availableBytes: thresholdBytes.Default() * 100, // not going to trigger fractional threshold
expBytesCompacted: 0,
expCompactions: nil,
expUncompacted: []roachpb.Span{
{Key: key("0"), EndKey: key("9")},
},
},
// Single suggestion over the fractional threshold.
{
name: "single suggestion over fractional threshold",
Expand Down Expand Up @@ -331,6 +353,40 @@ func TestCompactorThresholds(t *testing.T) {
{Key: key("a"), EndKey: key("f")},
},
},
// Double suggestion with non-excessive gap, but there are live keys in the
// gap.
//
// NOTE: when a suggestion itself contains live keys, we skip the compaction
// because amounts of data may have been added to the span since the
// compaction was proposed. When only the gap contains live keys, however,
// it's still desirable to compact: the individual suggestions are empty, so
// we can assume there's lots of data to reclaim by compacting, and the
// aggregator is very careful not to jump gaps that span too many SSTs.
{
name: "double suggestion over gap with live keys",
suggestions: []storagebase.SuggestedCompaction{
{
StartKey: key("0"), EndKey: key("4"),
Compaction: storagebase.Compaction{
Bytes: thresholdBytes.Default() / 2,
SuggestedAtNanos: nowNanos,
},
},
{
StartKey: key("6"), EndKey: key("9"),
Compaction: storagebase.Compaction{
Bytes: thresholdBytes.Default() - (thresholdBytes.Default() / 2),
SuggestedAtNanos: nowNanos,
},
},
},
logicalBytes: thresholdBytes.Default() * 100, // not going to trigger fractional threshold
availableBytes: thresholdBytes.Default() * 100, // not going to trigger fractional threshold
expBytesCompacted: thresholdBytes.Default(),
expCompactions: []roachpb.Span{
{Key: key("0"), EndKey: key("9")},
},
},
// Double suggestion with excessive gap.
{
name: "double suggestion with excessive gap",
Expand Down Expand Up @@ -473,6 +529,12 @@ func TestCompactorThresholds(t *testing.T) {
// Shorten wait times for compactor processing.
minInterval.Override(&compactor.st.SV, time.Millisecond)

// Add a key so we can test that suggestions that span live data are
// ignored.
if err := we.Put(engine.MakeMVCCMetadataKey(key("5")), nil); err != nil {
t.Fatal(err)
}

for _, sc := range test.suggestions {
compactor.Suggest(context.Background(), sc)
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/spanset/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ func (s *Iterator) MVCCScan(
return s.i.MVCCScan(start, end, max, timestamp, txn, consistent, reverse, tombstones)
}

// SetUpperBound is part of the engine.Iterator interface.
func (s *Iterator) SetUpperBound(key roachpb.Key) {
s.i.SetUpperBound(key)
}

type spanSetReader struct {
r engine.Reader
spans *SpanSet
Expand Down

0 comments on commit 1fc5f95

Please sign in to comment.