Skip to content

Commit

Permalink
kvstorage: speed up scan for range descriptors
Browse files Browse the repository at this point in the history
On startup we currently scan through all `/Local/Range/` keys in order
to find the range descriptors. This involves going through all
`/Local/Range/Transaction` records and tombstones so we end up reading
through a lot of data.

This change switches to using `SeekGE`:
 - when we encounter a key that has a suffix before `rdsc` like
   `/Local/Range/foo/prbe`, we seek directly to the corresponding
   descriptor key `/Local/Range/foo/rdsc`;
 - after we decode a descriptor, we seek straight to the next possible
   range descriptor `/Local/Range/EndKey/rdsc` using the `EndKey` in
   the descriptor.

Note that inside Pebble, seeks to keys that are very close to the
current position are optimized to try to iterate through the next few
keys before doing a real seek. So this change should not be
detrimental even when there aren't a lot of keys to skip.

I experimented on a store captured after running kv0 with batch size 2
against a single node. Before this change we step through 353 keys but
the iterator has to internally step through 160k tombstones:
```
range descriptor iteration done: 339 keys, 65 range descriptors (by suffix: map[qlpt:81 rdsc:65 txn-:193]);
  scan stats: stepped 353 times (169.743k internal); seeked 2 times (2 internal)...
```

After, we seek around the transaction records and end up doing just a
seek per range:
```
range descriptor iteration done: 65 range descriptors, 0 intents, 0 tombstones
+‹(interface (dir, seek, step): (fwd, 67, 0), (rev, 0, 0)), (internal (dir, seek, step): (fwd, 67, 0), (rev, 0, 0)),›
+‹(internal-stats: (block-bytes: (total 563KB, cached 26KB, read-time 755.128µs))...
```

Fixes: #109740
  • Loading branch information
RaduBerinde committed May 14, 2024
1 parent b185402 commit 47d286b
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 68 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvserver/kvstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ go_library(
"//pkg/raft/raftpb",
"//pkg/roachpb",
"//pkg/storage",
"//pkg/storage/enginepb",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/log",
Expand Down
7 changes: 3 additions & 4 deletions pkg/kv/kvserver/kvstorage/datadriven_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ func TestDataDriven(t *testing.T) {
defer leaktest.AfterTest(t)()

reStripFileLinePrefix := regexp.MustCompile(`^[^ ]+ `)
// Scan stats (shown after loading the range descriptors) can be different in
// race builds.
reStripScanStats := regexp.MustCompile(`scan stats: .*$`)
// Scan stats (shown after loading the range descriptors) can be non-deterministic.
reStripScanStats := regexp.MustCompile(`internal-stats: .*$`)

datadriven.Walk(t, datapathutils.TestDataPath(t), func(t *testing.T, path string) {
e := newEnv(t)
Expand All @@ -129,7 +128,7 @@ func TestDataDriven(t *testing.T) {
}
msg := string(l.Message)
msg = reStripFileLinePrefix.ReplaceAllString(msg, ``)
msg = reStripScanStats.ReplaceAllString(msg, `scan stats: <redacted>`)
msg = reStripScanStats.ReplaceAllString(msg, `internal-stats: <redacted>`)

fmt.Fprintln(&buf, msg)
}
Expand Down
149 changes: 102 additions & 47 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,18 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader"
"github.com/cockroachdb/cockroach/pkg/raft/raftpb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// FirstNodeID is the NodeID assigned to the node bootstrapping a new cluster.
Expand Down Expand Up @@ -245,76 +244,132 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent
}

// IterateRangeDescriptorsFromDisk discovers the initialized replicas and calls
// the provided function with each such descriptor from the provided Engine. The
// return values of this method and fn have semantics similar to
// storage.MVCCIterate.
// the provided function with each such descriptor from the provided Engine. If
// the function returns an error, IterateRangeDescriptorsFromDisk fails with
// that error.
func IterateRangeDescriptorsFromDisk(
ctx context.Context, reader storage.Reader, fn func(desc roachpb.RangeDescriptor) error,
) error {
log.Info(ctx, "beginning range descriptor iteration")
// MVCCIterator over all range-local key-based data.
start := keys.RangeDescriptorKey(roachpb.RKeyMin)
end := keys.RangeDescriptorKey(roachpb.RKeyMax)

allCount := 0
matchCount := 0
bySuffix := make(map[redact.RedactableString]int)

var scanStats kvpb.ScanStats
opts := storage.MVCCScanOptions{
Inconsistent: true,
ScanStats: &scanStats,

// We are going to find all range descriptor keys. This code is equivalent to
// using MVCCIterate on all range-local keys in Inconsistent mode and with
// hlc.MaxTimestamp, but is faster in that it completely skips over areas with
// transaction records (these areas can contain large numbers of LSM
// tombstones).
iter, err := reader.NewMVCCIterator(ctx, storage.MVCCKeyAndIntentsIterKind, storage.IterOptions{
UpperBound: keys.LocalRangeMax,
})
if err != nil {
return err
}
defer iter.Close()

var descriptorCount, intentCount, tombstoneCount int
lastReportTime := timeutil.Now()
kvToDesc := func(kv roachpb.KeyValue) error {

iter.SeekGE(storage.MVCCKey{Key: keys.LocalRangePrefix})
for {
if valid, err := iter.Valid(); err != nil {
return err
} else if !valid {
break
}

const reportPeriod = 10 * time.Second
if timeutil.Since(lastReportTime) >= reportPeriod {
// Note: MVCCIterate scans and buffers 1000 keys at a time which could
// make the scan stats confusing. However, because this callback can't
// take a long time, it's very unlikely that we will log twice for the
// same batch of keys.
log.Infof(ctx, "range descriptor iteration in progress: %d keys, %d range descriptors (by suffix: %v); %v",
allCount, matchCount, bySuffix, &scanStats)
lastReportTime = timeutil.Now()
stats := iter.Stats().Stats
log.Infof(ctx, "range descriptor iteration in progress: %d range descriptors, %d intents, %d tombstones\n%s",
descriptorCount, intentCount, tombstoneCount, stats.String())
}

allCount++
// Only consider range metadata entries; ignore others.
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
key := iter.UnsafeKey()
startKey, suffix, _, err := keys.DecodeRangeKey(key.Key)
if err != nil {
return err
}
bySuffix[redact.RedactableString(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return nil

if suffixCmp := bytes.Compare(suffix, keys.LocalRangeDescriptorSuffix); suffixCmp != 0 {
if suffixCmp < 0 {
// Seek to the range descriptor key for this range.
//
// Note that inside Pebble, SeekGE will try to iterate through the next
// few keys so it's ok to seek even if there are very few keys before the
// descriptor.
iter.SeekGE(storage.MVCCKey{Key: keys.RangeDescriptorKey(keys.MustAddr(startKey))})
} else {
// This case should be rare in practice: we have a key that isn't
// associated with any range descriptor.
iter.NextKey()
}
continue
}
var desc roachpb.RangeDescriptor
if err := kv.Value.GetProto(&desc); err != nil {

// We are at a descriptor key.
rawValue, err := iter.UnsafeValue()
if err != nil {
return err
}

if key.Timestamp.IsEmpty() {
// This is an intent. We want to get its timestamp and read the latest
// version before that timestamp. This is consistent with what MVCCIterate
// does in Inconsistent mode.
intentCount++
var meta enginepb.MVCCMetadata
if err := protoutil.Unmarshal(rawValue, &meta); err != nil {
return err
}
metaTS := meta.Timestamp.ToTimestamp()
if metaTS.IsEmpty() {
return errors.AssertionFailedf("range key has intent with no timestamp")
}
// Seek to the latest value below the intent timestamp.
iter.SeekGE(storage.MVCCKey{Key: key.Key, Timestamp: metaTS.Prev()})
continue
}

value, err := storage.DecodeValueFromMVCCValue(rawValue)
if err != nil {
return errors.Wrap(err, "decoding range descriptor MVCC value")
}
if len(value.RawBytes) == 0 {
// This is a tombstone, so this key no longer exists; skip over any older
// versions of this key.
tombstoneCount++
iter.NextKey()
continue
}

// This is what we are looking for: the latest version of a range
// descriptor key.
var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
return errors.Wrap(err, "decoding range descriptor proto")
}
// Descriptor for range `[a,z)` must be found at `/Local/Range/a/RangeDescriptor`.
if !startKey.Equal(desc.StartKey.AsRawKey()) {
return errors.AssertionFailedf("descriptor stored at %s but has StartKey %s",
kv.Key, desc.StartKey)
return errors.AssertionFailedf("descriptor stored at %q but has StartKey %q",
key.Key, desc.StartKey)
}
if !desc.IsInitialized() {
return errors.AssertionFailedf("uninitialized descriptor: %s", desc)
}
matchCount++
err = fn(desc)
if err == nil {
return nil
}
if iterutil.Map(err) == nil {
return iterutil.StopIteration()

descriptorCount++
nextRangeDescKey := keys.RangeDescriptorKey(desc.EndKey)
if err := fn(desc); err != nil {
return err
}
return err
// Seek to the next possible descriptor key. This seek is important as it
// can skip over a large amount of txn record keys or tombstones.
iter.SeekGE(storage.MVCCKey{Key: nextRangeDescKey})
}

_, err := storage.MVCCIterate(ctx, reader, start, end, hlc.MaxTimestamp, opts, kvToDesc)
log.Infof(ctx, "range descriptor iteration done: %d keys, %d range descriptors (by suffix: %v); %s",
allCount, matchCount, bySuffix, &scanStats)
return err
stats := iter.Stats().Stats
log.Infof(ctx, "range descriptor iteration done: %d range descriptors, %d intents, %d tombstones\n%s",
descriptorCount, intentCount, tombstoneCount, stats.String())
return nil
}

// A Replica references a CockroachDB Replica. The data in this struct does not
Expand Down
13 changes: 0 additions & 13 deletions pkg/kv/kvserver/kvstorage/testdata/assert_overlapping_replica

This file was deleted.

4 changes: 3 additions & 1 deletion pkg/kv/kvserver/kvstorage/testdata/assert_replicaid
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ load-and-reconcile trace=true
----
no RaftReplicaID for r5:{a-c} [(n1,s1):50, next=51, gen=0]
beginning range descriptor iteration
range descriptor iteration done: 1 keys, 1 range descriptors (by suffix: map[rdsc:1]); scan stats: <redacted>
range descriptor iteration done: 1 range descriptors, 0 intents, 0 tombstones
‹(interface (dir, seek, step): (fwd, 2, 0), (rev, 0, 0)), (internal (dir, seek, step): (fwd, 2, 0), (rev, 0, 0)),›
‹(internal-stats: <redacted>
loaded replica ID for 1/1 replicas
loaded Raft state for 1/1 replicas
loaded 1 replicas
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ load-and-reconcile trace=true
----
no RaftReplicaID for <nil>
beginning range descriptor iteration
range descriptor iteration done: 0 keys, 0 range descriptors (by suffix: map[]); scan stats: <redacted>
range descriptor iteration done: 0 range descriptors, 0 intents, 0 tombstones
‹(interface (dir, seek, step): (fwd, 1, 0), (rev, 0, 0)), (internal (dir, seek, step): (fwd, 1, 0), (rev, 0, 0))›
loaded replica ID for 0/0 replicas
loaded Raft state for 1/1 replicas
loaded 1 replicas
8 changes: 6 additions & 2 deletions pkg/kv/kvserver/kvstorage/testdata/init
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ load-and-reconcile trace=true
r6/60: uninitialized
r8/80: r8:{c-f} [(n1,s1):80, next=81, gen=0]
beginning range descriptor iteration
range descriptor iteration done: 1 keys, 1 range descriptors (by suffix: map[rdsc:1]); scan stats: <redacted>
range descriptor iteration done: 1 range descriptors, 0 intents, 0 tombstones
‹(interface (dir, seek, step): (fwd, 2, 0), (rev, 0, 0)), (internal (dir, seek, step): (fwd, 2, 0), (rev, 0, 0)),›
‹(internal-stats: <redacted>
loaded replica ID for 2/2 replicas
loaded Raft state for 2/2 replicas
loaded 2 replicas
Expand All @@ -26,7 +28,9 @@ load-and-reconcile trace=true
r6/60: uninitialized
r8/80: r8:{c-f} [(n1,s1):80, next=81, gen=0]
beginning range descriptor iteration
range descriptor iteration done: 1 keys, 1 range descriptors (by suffix: map[rdsc:1]); scan stats: <redacted>
range descriptor iteration done: 1 range descriptors, 0 intents, 0 tombstones
‹(interface (dir, seek, step): (fwd, 2, 0), (rev, 0, 0)), (internal (dir, seek, step): (fwd, 2, 0), (rev, 0, 0)),›
‹(internal-stats: <redacted>
loaded replica ID for 2/2 replicas
loaded Raft state for 2/2 replicas
loaded 2 replicas
Expand Down
16 changes: 16 additions & 0 deletions pkg/kv/kvserver/kvstorage/testdata/overlapping_replica
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Test the handling of an invalid case of overlapping ranges.

new-replica range-id=1 replica-id=10 k=a ek=c
----
r1:{a-c} [(n1,s1):10, next=11, gen=0]

new-replica range-id=2 replica-id=20 k=b ek=d
----
r2:{b-d} [(n1,s1):20, next=21, gen=0]

# The initialization process will skip over the range descriptor which is inside
# the first range.
load-and-reconcile
----
r1/10: r1:{a-c} [(n1,s1):10, next=11, gen=0]
r2/20: uninitialized

0 comments on commit 47d286b

Please sign in to comment.