diff --git a/pkg/storage/abortspan/abortspan.go b/pkg/storage/abortspan/abortspan.go index 243efb79bf91..9b93ce7d6b82 100644 --- a/pkg/storage/abortspan/abortspan.go +++ b/pkg/storage/abortspan/abortspan.go @@ -17,14 +17,11 @@ package abortspan import ( "context" - "github.com/pkg/errors" - "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" - "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) @@ -105,91 +102,23 @@ func (sc *AbortSpan) Get( } // Iterate walks through the AbortSpan, invoking the given callback for -// each unmarshaled entry with the key, the transaction ID and the decoded -// entry. -// TODO(tschottdorf): should not use a pointer to UUID. +// each unmarshaled entry with the MVCC key and the decoded entry. func (sc *AbortSpan) Iterate( - ctx context.Context, e engine.Reader, f func([]byte, roachpb.AbortSpanEntry), -) { - _, _ = engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{}, + ctx context.Context, e engine.Reader, f func(roachpb.Key, roachpb.AbortSpanEntry) error, +) error { + _, err := engine.MVCCIterate(ctx, e, sc.min(), sc.max(), hlc.Timestamp{}, true /* consistent */, false /* tombstones */, nil /* txn */, false, /* reverse */ func(kv roachpb.KeyValue) (bool, error) { var entry roachpb.AbortSpanEntry if _, err := keys.DecodeAbortSpanKey(kv.Key, nil); err != nil { - panic(err) // TODO(tschottdorf): ReplicaCorruptionError + return false, err } if err := kv.Value.GetProto(&entry); err != nil { - panic(err) // TODO(tschottdorf): ReplicaCorruptionError - } - f(kv.Key, entry) - return false, nil - }) -} - -func copyAbortSpan( - e engine.ReadWriter, ms *enginepb.MVCCStats, dstID roachpb.RangeID, keyMin, keyMax engine.MVCCKey, -) (int, error) { - var scratch [64]byte - var count int - var meta enginepb.MVCCMetadata - // TODO(spencer): look into making this an MVCCIteration and writing - // the values using MVCC so we can avoid the ugliness of updating - // the MVCCStats by hand below. - err := e.Iterate(keyMin, keyMax, - func(kv engine.MVCCKeyValue) (bool, error) { - // Decode the key, skipping on error. Otherwise, write it to the - // corresponding key in the new cache. - txnID, err := decodeAbortSpanMVCCKey(kv.Key, scratch[:0]) - if err != nil { - return false, errors.Errorf("could not decode an AbortSpan key %s: %s", kv.Key, err) - } - key := keys.AbortSpanKey(dstID, txnID) - encKey := engine.MakeMVCCMetadataKey(key) - // Decode the MVCCMetadata value. - if err := protoutil.Unmarshal(kv.Value, &meta); err != nil { - return false, errors.Errorf("could not decode mvcc metadata %s [% x]: %s", kv.Key, kv.Value, err) - } - value := engine.MakeValue(meta) - value.ClearChecksum() - value.InitChecksum(key) - meta.RawBytes = value.RawBytes - - keyBytes, valBytes, err := engine.PutProto(e, encKey, &meta) - if err != nil { return false, err } - count++ - if ms != nil { - ms.SysBytes += keyBytes + valBytes - ms.SysCount++ - } - return false, nil + return false, f(kv.Key, entry) }) - return count, err -} - -// CopyInto copies all the results from this AbortSpan into the destRangeID -// AbortSpan. Failures decoding individual cache entries return an error. -// On success, returns the number of entries (key-value pairs) copied. -func (sc *AbortSpan) CopyInto( - e engine.ReadWriter, ms *enginepb.MVCCStats, destRangeID roachpb.RangeID, -) (int, error) { - return copyAbortSpan(e, ms, destRangeID, - engine.MakeMVCCMetadataKey(sc.min()), engine.MakeMVCCMetadataKey(sc.max())) -} - -// CopyFrom copies all the persisted results from the originRangeID -// AbortSpan into this one. Note that the cache will not be -// locked while copying is in progress. Failures decoding individual -// entries return an error. The copy is done directly using the engine -// instead of interpreting values through MVCC for efficiency. -// On success, returns the number of entries (key-value pairs) copied. -func (sc *AbortSpan) CopyFrom( - ctx context.Context, e engine.ReadWriter, ms *enginepb.MVCCStats, originRangeID roachpb.RangeID, -) (int, error) { - originMin := engine.MakeMVCCMetadataKey(keys.AbortSpanKey(originRangeID, txnIDMin)) - originMax := engine.MakeMVCCMetadataKey(keys.AbortSpanKey(originRangeID, txnIDMax)) - return copyAbortSpan(e, ms, sc.rangeID, originMin, originMax) + return err } // Del removes all AbortSpan entries for the given transaction. @@ -211,10 +140,3 @@ func (sc *AbortSpan) Put( key := keys.AbortSpanKey(sc.rangeID, txnID) return engine.MVCCPutProto(ctx, e, ms, key, hlc.Timestamp{}, nil /* txn */, entry) } - -func decodeAbortSpanMVCCKey(encKey engine.MVCCKey, dest []byte) (uuid.UUID, error) { - if encKey.IsValue() { - return uuid.UUID{}, errors.Errorf("key %s is not a raw MVCC value", encKey) - } - return keys.DecodeAbortSpanKey(encKey.Key, dest) -} diff --git a/pkg/storage/abortspan/abortspan_test.go b/pkg/storage/abortspan/abortspan_test.go index 40addab109ee..86d80e4e8be7 100644 --- a/pkg/storage/abortspan/abortspan_test.go +++ b/pkg/storage/abortspan/abortspan_test.go @@ -111,73 +111,3 @@ func TestAbortSpanEmptyParams(t *testing.T) { t.Errorf("unexpected error putting response: %s", err) } } - -// TestAbortSpanCopyInto tests that entries in one cache get -// transferred correctly to another cache using CopyInto(). -func TestAbortSpanCopyInto(t *testing.T) { - defer leaktest.AfterTest(t)() - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - rc1, e := createTestAbortSpan(t, 1, stopper) - rc2, _ := createTestAbortSpan(t, 2, stopper) - - entry := roachpb.AbortSpanEntry{ - Key: testTxnKey, - Timestamp: testTxnTimestamp, - Priority: testTxnPriority, - } - if err := rc1.Put(context.Background(), e, nil, testTxnID, &entry); err != nil { - t.Errorf("unexpected error putting entry: %s", err) - } - // Copy the first cache into the second. - if count, err := rc1.CopyInto(e, nil, rc2.rangeID); err != nil { - t.Fatal(err) - } else if expCount := 1; count != expCount { - t.Errorf("unexpected number of copied entries: %d", count) - } - for _, cache := range []*AbortSpan{rc1, rc2} { - var actual roachpb.AbortSpanEntry - // Get should return 1 for both caches. - if aborted, readErr := cache.Get(context.Background(), e, testTxnID, &actual); !aborted || readErr != nil { - t.Errorf("unexpected failure getting response from source: %t, %s", aborted, readErr) - } else if !reflect.DeepEqual(entry, actual) { - t.Fatalf("wanted %v, got %v", entry, actual) - } - } -} - -// TestAbortSpanCopyFrom tests that entries in one cache get -// transferred correctly to another cache using CopyFrom(). -func TestAbortSpanCopyFrom(t *testing.T) { - defer leaktest.AfterTest(t)() - stopper := stop.NewStopper() - defer stopper.Stop(context.Background()) - rc1, e := createTestAbortSpan(t, 1, stopper) - rc2, _ := createTestAbortSpan(t, 2, stopper) - - entry := roachpb.AbortSpanEntry{ - Key: testTxnKey, - Timestamp: testTxnTimestamp, - Priority: testTxnPriority, - } - if err := rc1.Put(context.Background(), e, nil, testTxnID, &entry); err != nil { - t.Errorf("unexpected error putting response: %s", err) - } - - // Copy the first cache into the second. - if count, err := rc2.CopyFrom(context.Background(), e, nil, rc1.rangeID); err != nil { - t.Fatal(err) - } else if expCount := 1; count != expCount { - t.Errorf("unexpected number of copied entries: %d", count) - } - - // Get should hit both caches. - for i, cache := range []*AbortSpan{rc1, rc2} { - var actual roachpb.AbortSpanEntry - if aborted, readErr := cache.Get(context.Background(), e, testTxnID, &actual); !aborted || readErr != nil { - t.Fatalf("%d: unexpected read error: %t, %s", i, aborted, readErr) - } else if !reflect.DeepEqual(entry, actual) { - t.Fatalf("expected %v, got %v", entry, actual) - } - } -} diff --git a/pkg/storage/client_split_test.go b/pkg/storage/client_split_test.go index 63c8bfbf2e8a..1933d5096342 100644 --- a/pkg/storage/client_split_test.go +++ b/pkg/storage/client_split_test.go @@ -21,6 +21,8 @@ import ( "math" "math/rand" "reflect" + "sort" + "strconv" "sync/atomic" "testing" "time" @@ -38,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/storage/abortspan" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" @@ -96,6 +99,127 @@ func TestStoreRangeSplitAtIllegalKeys(t *testing.T) { } } +// Verify that on a split, only the non-expired abort span records are copied +// into the right hand side of the split. +func TestStoreSplitAbortSpan(t *testing.T) { + defer leaktest.AfterTest(t)() + + manualClock := hlc.NewManualClock(2400 * time.Hour.Nanoseconds()) + clock := hlc.NewClock(manualClock.UnixNano, time.Millisecond) + storeCfg := storage.TestStoreConfig(clock) + storeCfg.TestingKnobs.DisableSplitQueue = true + + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + store := createTestStoreWithConfig(t, stopper, storeCfg) + ctx := context.Background() + + left, middle, right := roachpb.Key("a"), roachpb.Key("b"), roachpb.Key("c") + + txn := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.Transaction { + txn := roachpb.MakeTransaction("test", key, 0, enginepb.SERIALIZABLE, ts, 0) + return &txn + } + + var expAll []roachpb.AbortSpanEntry + + populateAbortSpan := func(key roachpb.Key, ts hlc.Timestamp) *roachpb.ResolveIntentRequest { + pushee := txn(key, ts) + expAll = append(expAll, roachpb.AbortSpanEntry{ + Key: key, + Timestamp: ts, + }) + return &roachpb.ResolveIntentRequest{ + RequestHeader: roachpb.RequestHeader{ + Key: key, + }, + IntentTxn: pushee.TxnMeta, + Status: roachpb.ABORTED, + Poison: true, + } + } + + key := func(k roachpb.Key, i int) roachpb.Key { + var r []byte + r = append(r, k...) + r = append(r, []byte(strconv.Itoa(i))...) + return r + } + + thresh := storage.GetGCQueueTxnCleanupThreshold().Nanoseconds() + // Pick a non-gcable and gcable timestamp, respectively. Avoid the clock's + // exact timestamp because of unpredictable logical ticks. + tsFresh := hlc.Timestamp{WallTime: manualClock.UnixNano() - thresh + 1} + tsStale := hlc.Timestamp{WallTime: manualClock.UnixNano() - thresh - 1} + + args := []roachpb.Request{ + populateAbortSpan(key(left, 1), tsFresh), + populateAbortSpan(key(left, 2), tsStale), + populateAbortSpan(key(middle, 1), tsFresh), + populateAbortSpan(key(middle, 2), tsStale), + populateAbortSpan(key(right, 1), tsFresh), + populateAbortSpan(key(right, 2), tsStale), + adminSplitArgs(middle), + } + + // Nothing gets removed from the LHS during the split. This could + // be done but has to be done carefully to avoid large Raft proposals, + // and the stats computation needs to be checked carefully. + expL := []roachpb.AbortSpanEntry{ + {Key: key(left, 1), Timestamp: tsFresh}, + {Key: key(left, 2), Timestamp: tsStale}, + {Key: key(middle, 1), Timestamp: tsFresh}, + {Key: key(middle, 2), Timestamp: tsStale}, + {Key: key(right, 1), Timestamp: tsFresh}, + {Key: key(right, 2), Timestamp: tsStale}, + } + + // But we don't blindly copy everything over to the RHS. Only entries with + // recent timestamp are duplicated. This is important because otherwise the + // Raft command size can blow up and splits fail. + expR := []roachpb.AbortSpanEntry{ + {Key: key(left, 1), Timestamp: tsFresh}, + {Key: key(middle, 1), Timestamp: tsFresh}, + {Key: key(right, 1), Timestamp: tsFresh}, + } + + for _, arg := range args { + _, pErr := client.SendWrapped(context.Background(), store.TestSender(), arg) + if pErr != nil { + t.Fatalf("while sending +%v: %s", arg, pErr) + } + } + + collect := func(as *abortspan.AbortSpan) []roachpb.AbortSpanEntry { + var results []roachpb.AbortSpanEntry + if err := as.Iterate(ctx, store.Engine(), func(_ roachpb.Key, entry roachpb.AbortSpanEntry) error { + entry.Priority = 0 // don't care about that + results = append(results, entry) + return nil + }); err != nil { + t.Fatal(err) + } + sort.Slice(results, func(i, j int) bool { + c := bytes.Compare(results[i].Key, results[j].Key) + if c == 0 { + return results[i].Timestamp.Less(results[j].Timestamp) + } + return c < 0 + }) + return results + } + + l := collect(store.LookupReplica(keys.MustAddr(left), nil).AbortSpan()) + r := collect(store.LookupReplica(keys.MustAddr(right), nil).AbortSpan()) + + if !reflect.DeepEqual(expL, l) { + t.Fatalf("left hand side: expected %+v, got %+v", expL, l) + } + if !reflect.DeepEqual(expR, r) { + t.Fatalf("right hand side: expected %+v, got %+v", expR, r) + } +} + // TestStoreRangeSplitAtTablePrefix verifies a range can be split at // UserTableDataMin and still gossip the SystemConfig properly. func TestStoreRangeSplitAtTablePrefix(t *testing.T) { diff --git a/pkg/storage/gc_queue.go b/pkg/storage/gc_queue.go index 03aceaed1983..684cbdc7e0e3 100644 --- a/pkg/storage/gc_queue.go +++ b/pkg/storage/gc_queue.go @@ -465,13 +465,17 @@ func processAbortSpan( abortSpan := abortspan.New(rangeID) infoMu.Lock() defer infoMu.Unlock() - abortSpan.Iterate(ctx, snap, func(key []byte, v roachpb.AbortSpanEntry) { + if err := abortSpan.Iterate(ctx, snap, func(key roachpb.Key, v roachpb.AbortSpanEntry) error { infoMu.AbortSpanTotal++ if v.Timestamp.Less(threshold) { infoMu.AbortSpanGCNum++ gcKeys = append(gcKeys, roachpb.GCRequest_GCKey{Key: key}) } - }) + return nil + }); err != nil { + // Still return whatever we managed to collect. + log.Warning(ctx, err) + } return gcKeys } diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index 0a1d5c82f75d..5af396c0d5b9 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -45,6 +45,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/protoutil" "github.com/cockroachdb/cockroach/pkg/util/retry" + "github.com/cockroachdb/cockroach/pkg/util/uuid" ) // evaluateCommand delegates to the eval method for the given @@ -1125,8 +1126,8 @@ func splitTrigger( // and compute stats for it instead of having a constraint that the // left hand side is smaller. - // Compute (absolute) stats for LHS range. This means that no more writes - // to the LHS must happen below this point. + // Compute (absolute) stats for LHS range. Don't write to the LHS below; + // this needs to happen before this step. leftMS, err := rditer.ComputeStatsForRange(&split.LeftDesc, batch, ts.WallTime) if err != nil { return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to compute stats for LHS range after split") @@ -1144,13 +1145,46 @@ func splitTrigger( return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy last replica GC timestamp") } - // Initialize the RHS range's AbortSpan by copying the LHS's. - seqCount, err := rec.AbortSpan().CopyInto(batch, &bothDeltaMS, split.RightDesc.RangeID) - if err != nil { - // TODO(tschottdorf): ReplicaCorruptionError. - return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy AbortSpan to RHS split range") + // Initialize the RHS range's AbortSpan by copying the LHS's. Put a little extra + // effort into only copying records that are required: certain workloads create + // sizable abort spans, and repeated splitting can blow them up further. Once + // it reaches approximately the Raft MaxCommandSize, splits become impossible, + // which is pretty bad (see #25233). + { + var abortSpanCopyCount, abortSpanSkipCount int + // Abort span entries before this span are eligible for GC, so we don't + // copy them into the new range. We could try to delete them from the LHS + // as well, but that could create a large Raft command in itself. Plus, + // we'd have to adjust the stats computations. + threshold := ts.Add(-txnCleanupThreshold.Nanoseconds(), 0) + var scratch [64]byte + if err := rec.AbortSpan().Iterate(ctx, batch, func(k roachpb.Key, entry roachpb.AbortSpanEntry) error { + if entry.Timestamp.Less(threshold) { + // The entry would be garbage collected (if GC had run), so + // don't bother copying it. Note that we can't filter on the key, + // that is just where the txn record lives, but it doesn't tell + // us whether the intents that triggered the abort span record + // where on the LHS, RHS, or both. + abortSpanSkipCount++ + return nil + } + + abortSpanCopyCount++ + var txnID uuid.UUID + txnID, err = keys.DecodeAbortSpanKey(k, scratch[:0]) + if err != nil { + return err + } + return engine.MVCCPutProto(ctx, batch, &bothDeltaMS, + keys.AbortSpanKey(split.RightDesc.RangeID, txnID), + hlc.Timestamp{}, nil, &entry, + ) + }); err != nil { + // TODO(tschottdorf): ReplicaCorruptionError. + return enginepb.MVCCStats{}, result.Result{}, errors.Wrap(err, "unable to copy AbortSpan to RHS split range") + } + log.Eventf(ctx, "abort span: copied %d entries, skipped %d", abortSpanCopyCount, abortSpanSkipCount) } - log.Eventf(ctx, "copied AbortSpan (%d entries)", seqCount) // Compute (absolute) stats for RHS range. var rightMS enginepb.MVCCStats @@ -1482,10 +1516,8 @@ func mergeTrigger( rightMS.SysBytes, rightMS.SysCount = 0, 0 mergedMS.Add(rightMS) - // Copy the RHS range's AbortSpan to the new LHS one. - if _, err := rec.AbortSpan().CopyFrom(ctx, batch, &mergedMS, rightRangeID); err != nil { - return result.Result{}, errors.Errorf("unable to copy AbortSpan to new split range: %s", err) - } + // TODO(benesch): copy the non-expired abort span records from the RHS into the LHS. + // See the corresponding code for splits. // Remove the RHS range's metadata. Note that we don't need to // keep track of stats here, because we already set the right range's