Skip to content

Commit

Permalink
Merge #26934
Browse files Browse the repository at this point in the history
26934: storage: filter abort span during split copy r=nvanbenschoten a=tschottdorf

We are suspecting that some workloads create a sufficient number of
abort span records to cause split commands that are too large to be
proposed to Raft, in effect rendering the range permanently
unsplittable. This is a problem since the range will at some point
start to backpressure writes (and even without that, it's a resource
hog). Most of the problematic abort span records would very likely
be removed during garbage collection; however, abort span records
aren't counted in any quantity that triggers GC.

Instead of copying the entirety of the abort span, restrict to the
entries that would not be removed by a GC operation. In practice,
this means that unless a high number of abort span records are
created every hour, very few records will actually be copied, and
in return the split command size should be small.

See #26830.

Release note (bug fix): Avoid a situation in which ranges repeatedly
fail to perform a split.

Co-authored-by: Tobias Schottdorf <tobias.schottdorf@gmail.com>
  • Loading branch information
craig[bot] and tbg committed Jun 25, 2018
2 parents 03e5dbc + 46d6c47 commit e0164df
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 169 deletions.
92 changes: 7 additions & 85 deletions pkg/storage/abortspan/abortspan.go
Expand Up @@ -17,14 +17,11 @@ package abortspan
import (
"context"

"github.com/pkg/errors"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
)

Expand Down Expand Up @@ -105,91 +102,23 @@ func (sc *AbortSpan) Get(
}

// Iterate walks through the AbortSpan, invoking the given callback for
// each unmarshaled entry with the key, the transaction ID and the decoded
// entry.
// TODO(tschottdorf): should not use a pointer to UUID.
// each unmarshaled entry with the MVCC key and the decoded entry.
func (sc *AbortSpan) Iterate(
ctx context.Context, e engine.Reader, f func([]byte, roachpb.AbortSpanEntry),
) {
_, _ = engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{},
ctx context.Context, e engine.Reader, f func(roachpb.Key, roachpb.AbortSpanEntry) error,
) error {
_, err := engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{},
true /* consistent */, false /* tombstones */, nil /* txn */, false, /* reverse */
func(kv roachpb.KeyValue) (bool, error) {
var entry roachpb.AbortSpanEntry
if _, err := keys.DecodeAbortSpanKey(kv.Key, nil); err != nil {
panic(err) // TODO(tschottdorf): ReplicaCorruptionError
return false, err
}
if err := kv.Value.GetProto(&entry); err != nil {
panic(err) // TODO(tschottdorf): ReplicaCorruptionError
}
f(kv.Key, entry)
return false, nil
})
}

func copyAbortSpan(
e engine.ReadWriter, ms *enginepb.MVCCStats, dstID roachpb.RangeID, keyMin, keyMax engine.MVCCKey,
) (int, error) {
var scratch [64]byte
var count int
var meta enginepb.MVCCMetadata
// TODO(spencer): look into making this an MVCCIteration and writing
// the values using MVCC so we can avoid the ugliness of updating
// the MVCCStats by hand below.
err := e.Iterate(keyMin, keyMax,
func(kv engine.MVCCKeyValue) (bool, error) {
// Decode the key, skipping on error. Otherwise, write it to the
// corresponding key in the new cache.
txnID, err := decodeAbortSpanMVCCKey(kv.Key, scratch[:0])
if err != nil {
return false, errors.Errorf("could not decode an AbortSpan key %s: %s", kv.Key, err)
}
key := keys.AbortSpanKey(dstID, txnID)
encKey := engine.MakeMVCCMetadataKey(key)
// Decode the MVCCMetadata value.
if err := protoutil.Unmarshal(kv.Value, &meta); err != nil {
return false, errors.Errorf("could not decode mvcc metadata %s [% x]: %s", kv.Key, kv.Value, err)
}
value := engine.MakeValue(meta)
value.ClearChecksum()
value.InitChecksum(key)
meta.RawBytes = value.RawBytes

keyBytes, valBytes, err := engine.PutProto(e, encKey, &meta)
if err != nil {
return false, err
}
count++
if ms != nil {
ms.SysBytes += keyBytes + valBytes
ms.SysCount++
}
return false, nil
return false, f(kv.Key, entry)
})
return count, err
}

// CopyInto copies all the results from this AbortSpan into the destRangeID
// AbortSpan. Failures decoding individual cache entries return an error.
// On success, returns the number of entries (key-value pairs) copied.
func (sc *AbortSpan) CopyInto(
e engine.ReadWriter, ms *enginepb.MVCCStats, destRangeID roachpb.RangeID,
) (int, error) {
return copyAbortSpan(e, ms, destRangeID,
engine.MakeMVCCMetadataKey(sc.min()), engine.MakeMVCCMetadataKey(sc.max()))
}

// CopyFrom copies all the persisted results from the originRangeID
// AbortSpan into this one. Note that the cache will not be
// locked while copying is in progress. Failures decoding individual
// entries return an error. The copy is done directly using the engine
// instead of interpreting values through MVCC for efficiency.
// On success, returns the number of entries (key-value pairs) copied.
func (sc *AbortSpan) CopyFrom(
ctx context.Context, e engine.ReadWriter, ms *enginepb.MVCCStats, originRangeID roachpb.RangeID,
) (int, error) {
originMin := engine.MakeMVCCMetadataKey(keys.AbortSpanKey(originRangeID, txnIDMin))
originMax := engine.MakeMVCCMetadataKey(keys.AbortSpanKey(originRangeID, txnIDMax))
return copyAbortSpan(e, ms, sc.rangeID, originMin, originMax)
return err
}

// Del removes all AbortSpan entries for the given transaction.
Expand All @@ -211,10 +140,3 @@ func (sc *AbortSpan) Put(
key := keys.AbortSpanKey(sc.rangeID, txnID)
return engine.MVCCPutProto(ctx, e, ms, key, hlc.Timestamp{}, nil /* txn */, entry)
}

func decodeAbortSpanMVCCKey(encKey engine.MVCCKey, dest []byte) (uuid.UUID, error) {
if encKey.IsValue() {
return uuid.UUID{}, errors.Errorf("key %s is not a raw MVCC value", encKey)
}
return keys.DecodeAbortSpanKey(encKey.Key, dest)
}
70 changes: 0 additions & 70 deletions pkg/storage/abortspan/abortspan_test.go
Expand Up @@ -111,73 +111,3 @@ func TestAbortSpanEmptyParams(t *testing.T) {
t.Errorf("unexpected error putting response: %s", err)
}
}

// TestAbortSpanCopyInto tests that entries in one cache get
// transferred correctly to another cache using CopyInto().
func TestAbortSpanCopyInto(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
rc1, e := createTestAbortSpan(t, 1, stopper)
rc2, _ := createTestAbortSpan(t, 2, stopper)

entry := roachpb.AbortSpanEntry{
Key: testTxnKey,
Timestamp: testTxnTimestamp,
Priority: testTxnPriority,
}
if err := rc1.Put(context.Background(), e, nil, testTxnID, &entry); err != nil {
t.Errorf("unexpected error putting entry: %s", err)
}
// Copy the first cache into the second.
if count, err := rc1.CopyInto(e, nil, rc2.rangeID); err != nil {
t.Fatal(err)
} else if expCount := 1; count != expCount {
t.Errorf("unexpected number of copied entries: %d", count)
}
for _, cache := range []*AbortSpan{rc1, rc2} {
var actual roachpb.AbortSpanEntry
// Get should return 1 for both caches.
if aborted, readErr := cache.Get(context.Background(), e, testTxnID, &actual); !aborted || readErr != nil {
t.Errorf("unexpected failure getting response from source: %t, %s", aborted, readErr)
} else if !reflect.DeepEqual(entry, actual) {
t.Fatalf("wanted %v, got %v", entry, actual)
}
}
}

// TestAbortSpanCopyFrom tests that entries in one cache get
// transferred correctly to another cache using CopyFrom().
func TestAbortSpanCopyFrom(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
rc1, e := createTestAbortSpan(t, 1, stopper)
rc2, _ := createTestAbortSpan(t, 2, stopper)

entry := roachpb.AbortSpanEntry{
Key: testTxnKey,
Timestamp: testTxnTimestamp,
Priority: testTxnPriority,
}
if err := rc1.Put(context.Background(), e, nil, testTxnID, &entry); err != nil {
t.Errorf("unexpected error putting response: %s", err)
}

// Copy the first cache into the second.
if count, err := rc2.CopyFrom(context.Background(), e, nil, rc1.rangeID); err != nil {
t.Fatal(err)
} else if expCount := 1; count != expCount {
t.Errorf("unexpected number of copied entries: %d", count)
}

// Get should hit both caches.
for i, cache := range []*AbortSpan{rc1, rc2} {
var actual roachpb.AbortSpanEntry
if aborted, readErr := cache.Get(context.Background(), e, testTxnID, &actual); !aborted || readErr != nil {
t.Fatalf("%d: unexpected read error: %t, %s", i, aborted, readErr)
} else if !reflect.DeepEqual(entry, actual) {
t.Fatalf("expected %v, got %v", entry, actual)
}
}
}
124 changes: 124 additions & 0 deletions pkg/storage/client_split_test.go
Expand Up @@ -21,6 +21,8 @@ import (
"math"
"math/rand"
"reflect"
"sort"
"strconv"
"sync/atomic"
"testing"
"time"
Expand All @@ -38,6 +40,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/abortspan"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
Expand Down Expand Up @@ -96,6 +99,127 @@ func TestStoreRangeSplitAtIllegalKeys(t *testing.T) {
}
}

// Verify that on a split, only the non-expired abort span records are copied
// into the right hand side of the split.
func TestStoreSplitAbortSpan(t *testing.T) {
defer leaktest.AfterTest(t)()

manualClock := hlc.NewManualClock(2400 * time.Hour.Nanoseconds())
clock := hlc.NewClock(manualClock.UnixNano, time.Millisecond)
storeCfg := storage.TestStoreConfig(clock)
storeCfg.TestingKnobs.DisableSplitQueue = true

stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
store := createTestStoreWithConfig(t, stopper, storeCfg)
ctx := context.Background()

left, middle, right := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c")

txn := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.Transaction {
txn := roachpb.MakeTransaction("test", key, 0, enginepb.SERIALIZABLE, ts, 0)
return &txn
}

var expAll []roachpb.AbortSpanEntry

populateAbortSpan := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.ResolveIntentRequest {
pushee := txn(key, ts)
expAll = append(expAll, roachpb.AbortSpanEntry{
Key: key,
Timestamp: ts,
})
return &roachpb.ResolveIntentRequest{
RequestHeader: roachpb.RequestHeader{
Key: key,
},
IntentTxn: pushee.TxnMeta,
Status: roachpb.ABORTED,
Poison: true,
}
}

key := func(k roachpb.Key, i int) roachpb.Key {
var r []byte
r = append(r, k...)
r = append(r, []byte(strconv.Itoa(i))...)
return r
}

thresh := storage.GetGCQueueTxnCleanupThreshold().Nanoseconds()
// Pick a non-gcable and gcable timestamp, respectively. Avoid the clock's
// exact timestamp because of unpredictable logical ticks.
tsFresh := hlc.Timestamp{WallTime: manualClock.UnixNano() - thresh + 1}
tsStale := hlc.Timestamp{WallTime: manualClock.UnixNano() - thresh - 1}

args := []roachpb.Request{
populateAbortSpan(key(left, 1), tsFresh),
populateAbortSpan(key(left, 2), tsStale),
populateAbortSpan(key(middle, 1), tsFresh),
populateAbortSpan(key(middle, 2), tsStale),
populateAbortSpan(key(right, 1), tsFresh),
populateAbortSpan(key(right, 2), tsStale),
adminSplitArgs(middle),
}

// Nothing gets removed from the LHS during the split. This could
// be done but has to be done carefully to avoid large Raft proposals,
// and the stats computation needs to be checked carefully.
expL := []roachpb.AbortSpanEntry{
{Key: key(left, 1), Timestamp: tsFresh},
{Key: key(left, 2), Timestamp: tsStale},
{Key: key(middle, 1), Timestamp: tsFresh},
{Key: key(middle, 2), Timestamp: tsStale},
{Key: key(right, 1), Timestamp: tsFresh},
{Key: key(right, 2), Timestamp: tsStale},
}

// But we don't blindly copy everything over to the RHS. Only entries with
// recent timestamp are duplicated. This is important because otherwise the
// Raft command size can blow up and splits fail.
expR := []roachpb.AbortSpanEntry{
{Key: key(left, 1), Timestamp: tsFresh},
{Key: key(middle, 1), Timestamp: tsFresh},
{Key: key(right, 1), Timestamp: tsFresh},
}

for _, arg := range args {
_, pErr := client.SendWrapped(context.Background(), store.TestSender(), arg)
if pErr != nil {
t.Fatalf("while sending +%v: %s", arg, pErr)
}
}

collect := func(as *abortspan.AbortSpan) []roachpb.AbortSpanEntry {
var results []roachpb.AbortSpanEntry
if err := as.Iterate(ctx, store.Engine(), func(_ roachpb.Key, entry roachpb.AbortSpanEntry) error {
entry.Priority = 0 // don't care about that
results = append(results, entry)
return nil
}); err != nil {
t.Fatal(err)
}
sort.Slice(results, func(i, j int) bool {
c := bytes.Compare(results[i].Key, results[j].Key)
if c == 0 {
return results[i].Timestamp.Less(results[j].Timestamp)
}
return c < 0
})
return results
}

l := collect(store.LookupReplica(keys.MustAddr(left), nil).AbortSpan())
r := collect(store.LookupReplica(keys.MustAddr(right), nil).AbortSpan())

if !reflect.DeepEqual(expL, l) {
t.Fatalf("left hand side: expected %+v, got %+v", expL, l)
}
if !reflect.DeepEqual(expR, r) {
t.Fatalf("right hand side: expected %+v, got %+v", expR, r)
}
}

// TestStoreRangeSplitAtTablePrefix verifies a range can be split at
// UserTableDataMin and still gossip the SystemConfig properly.
func TestStoreRangeSplitAtTablePrefix(t *testing.T) {
Expand Down
8 changes: 6 additions & 2 deletions pkg/storage/gc_queue.go
Expand Up @@ -465,13 +465,17 @@ func processAbortSpan(
abortSpan := abortspan.New(rangeID)
infoMu.Lock()
defer infoMu.Unlock()
abortSpan.Iterate(ctx, snap, func(key []byte, v roachpb.AbortSpanEntry) {
if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error {
infoMu.AbortSpanTotal++
if v.Timestamp.Less(threshold) {
infoMu.AbortSpanGCNum++
gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key})
}
})
return nil
}); err != nil {
// Still return whatever we managed to collect.
log.Warning(ctx, err)
}
return gcKeys
}

Expand Down

0 comments on commit e0164df

Please sign in to comment.