Skip to content

Commit

Permalink
kvstorage: speed up scan for range descriptors
Browse files Browse the repository at this point in the history
On startup we currently scan through all `/Local/Range/` keys in order
to find the range descriptors. This involves going through all
`/Local/Range/Transaction` records and tombstones so we end up reading
through a lot of data.

This change switches to using `SeekGE`:
 - when we encounter a key that has a suffix before `rdsc` like
   `/Local/Range/foo/prbe`, we seek directly to the corresponding
   descriptor key `/Local/Range/foo/rdsc`;
 - after we decode a descriptor, we seek straight to the next possible
   range descriptor `/Local/Range/EndKey/rdsc` using the `EndKey` in
   the descriptor.

Note that inside Pebble, seeks to a key that is very close to the
current position are optimized to try to iterate through the next few
keys before doing a real seek. So this change should not be
detrimental even when there aren't a lot of keys to skip.

I experimented on a store captured after running kv0 with batch size 2
against a single node. Before this change we step through 353 keys but
the iterator has to internally step through 160k tombstones:
```
range descriptor iteration done: 339 keys, 65 range descriptors (by suffix: map[qlpt:81 rdsc:65 txn-:193]);
  scan stats: stepped 353 times (169.743k internal); seeked 2 times (2 internal)...
```

After, we seek around the transaction records and end up doing just a
seek per range:
```
range descriptor iteration done: 65 range descriptors, 0 intents, 0 tombstones
+‹(interface (dir, seek, step): (fwd, 67, 0), (rev, 0, 0)), (internal (dir, seek, step): (fwd, 67, 0), (rev, 0, 0)),›
+‹(internal-stats: (block-bytes: (total 563KB, cached 26KB, read-time 755.128µs))...
```

Fixes: cockroachdb#109740
  • Loading branch information
RaduBerinde committed May 12, 2024
1 parent 4c2e776 commit 9232118
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 48 deletions.
197 changes: 153 additions & 44 deletions pkg/kv/kvserver/kvstorage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ package kvstorage
import (
"bytes"
"context"
"fmt"
"sort"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/keys"
Expand Down Expand Up @@ -245,76 +247,183 @@ func ReadStoreIdent(ctx context.Context, eng storage.Engine) (roachpb.StoreIdent
}

// IterateRangeDescriptorsFromDisk discovers the initialized replicas and calls
// the provided function with each such descriptor from the provided Engine. The
// return values of this method and fn have semantics similar to
// storage.MVCCIterate.
// the provided function with each such descriptor from the provided Engine. If
// the function returns an error, IterateRangeDescriptorsFromDisk fails with
// that error.
func IterateRangeDescriptorsFromDisk(
ctx context.Context, reader storage.Reader, fn func(desc roachpb.RangeDescriptor) error,
) error {
log.Info(ctx, "beginning range descriptor iteration")
// MVCCIterator over all range-local key-based data.
start := keys.RangeDescriptorKey(roachpb.RKeyMin)
end := keys.RangeDescriptorKey(roachpb.RKeyMax)

allCount := 0
matchCount := 0
bySuffix := make(map[redact.RedactableString]int)

var scanStats kvpb.ScanStats
opts := storage.MVCCScanOptions{
Inconsistent: true,
ScanStats: &scanStats,
iter, err := reader.NewMVCCIterator(ctx, storage.MVCCKeyIterKind, storage.IterOptions{
UpperBound: keys.LocalRangeMax,
})
if err != nil {
return err
}
defer iter.Close()

var descriptorCount, intentCount, tombstoneCount int
lastReportTime := timeutil.Now()
kvToDesc := func(kv roachpb.KeyValue) error {

iter.SeekGE(storage.MVCCKey{Key: keys.LocalRangePrefix})
for {
if valid, err := iter.Valid(); err != nil {
return err
} else if !valid {
break
}

const reportPeriod = 10 * time.Second
if timeutil.Since(lastReportTime) >= reportPeriod {
// Note: MVCCIterate scans and buffers 1000 keys at a time which could
// make the scan stats confusing. However, because this callback can't
// take a long time, it's very unlikely that we will log twice for the
// same batch of keys.
log.Infof(ctx, "range descriptor iteration in progress: %d keys, %d range descriptors (by suffix: %v); %v",
allCount, matchCount, bySuffix, &scanStats)
lastReportTime = timeutil.Now()
}

allCount++
// Only consider range metadata entries; ignore others.
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
stats := iter.Stats().Stats
log.Infof(ctx, "range descriptor iteration in progress: %d range descriptors, %d intents, %d tombstones\n%s",
descriptorCount, intentCount, tombstoneCount, stats.String())
}

key := iter.UnsafeKey()
startKey, suffix, _, err := keys.DecodeRangeKey(key.Key)
if err != nil {
return err
}
bySuffix[redact.RedactableString(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return nil
switch suffixCmp := bytes.Compare(suffix, keys.LocalRangeDescriptorSuffix); {
case suffixCmp < 0:
// Seek to the range descriptor key for this range.
//
// Note that inside Pebble, SeekGE will try to iterate through the next
// few keys so it's ok to seek even if there are very few keys before the
// descriptor.
iter.SeekGE(storage.MVCCKey{Key: keys.RangeDescriptorKey(keys.MustAddr(startKey))})
continue

case suffixCmp > 0:
// This case should be rare in practice: we have a key that isn't
// associated with any range descriptor.
iter.NextKey()
continue

case key.Timestamp.IsEmpty():
// This is an intent for a range descriptor key; ignore it.
intentCount++
iter.Next()
continue
}
var desc roachpb.RangeDescriptor
if err := kv.Value.GetProto(&desc); err != nil {

// This is what we are looking for: the latest version of a range
// descriptor key.
rawValue, err := iter.UnsafeValue()
if err != nil {
return err
}
value, err := storage.DecodeValueFromMVCCValue(rawValue)
if err != nil {
return errors.Wrap(err, "decoding range descriptor MVCC value")
}
if len(value.RawBytes) == 0 {
// This is a tombstone; ignore it and skip over any older versions of
// this key.
tombstoneCount++
iter.NextKey()
continue
}

var desc roachpb.RangeDescriptor
if err := value.GetProto(&desc); err != nil {
var buf strings.Builder
for _, b := range value.RawBytes {
fmt.Fprintf(&buf, "%02x ", b)
}
log.Infof(ctx, "value: %s\n", buf.String())
return errors.Wrap(err, "decoding range descriptor proto")
}
// Descriptor for range `[a,z)` must be found at `/Local/Range/a/RangeDescriptor`.
if !startKey.Equal(desc.StartKey.AsRawKey()) {
return errors.AssertionFailedf("descriptor stored at %s but has StartKey %s",
kv.Key, desc.StartKey)
return errors.AssertionFailedf("descriptor stored at %q but has StartKey %q",
key.Key, desc.StartKey)
}
if !desc.IsInitialized() {
return errors.AssertionFailedf("uninitialized descriptor: %s", desc)
}
matchCount++
err = fn(desc)
if err == nil {
return nil

descriptorCount++
nextRangeDescKey := keys.RangeDescriptorKey(desc.EndKey)
if err := fn(desc); err != nil {
return err
}
if iterutil.Map(err) == nil {
return iterutil.StopIteration()
// Seek to the next possible descriptor key. This seek is important as it
// can skip over a large amount of txn record keys or tombstones.
iter.SeekGE(storage.MVCCKey{Key: nextRangeDescKey})
}
stats := iter.Stats().Stats
log.Infof(ctx, "range descriptor iteration done: %d range descriptors, %d intents, %d tombstones\n%s",
descriptorCount, intentCount, tombstoneCount, stats.String())

if false {
allCount := 0
matchCount := 0
bySuffix := make(map[redact.RedactableString]int)

var scanStats kvpb.ScanStats
opts := storage.MVCCScanOptions{
Inconsistent: true,
ScanStats: &scanStats,
}
lastReportTime := timeutil.Now()
kvToDesc := func(kv roachpb.KeyValue) error {
const reportPeriod = 10 * time.Second
if timeutil.Since(lastReportTime) >= reportPeriod {
// Note: MVCCIterate scans and buffers 1000 keys at a time which could
// make the scan stats confusing. However, because this callback can't
// take a long time, it's very unlikely that we will log twice for the
// same batch of keys.
log.Infof(ctx, "range descriptor iteration in progress: %d keys, %d range descriptors (by suffix: %v); %v",
allCount, matchCount, bySuffix, &scanStats)
lastReportTime = timeutil.Now()
}

allCount++
// Only consider range metadata entries; ignore others.
startKey, suffix, _, err := keys.DecodeRangeKey(kv.Key)
if err != nil {
return err
}
bySuffix[redact.RedactableString(suffix)]++
if !bytes.Equal(suffix, keys.LocalRangeDescriptorSuffix) {
return nil
}
var desc roachpb.RangeDescriptor
//if err := kv.Value.GetProto(&desc); err != nil {
if err := protoutil.Unmarshal(kv.Value.RawBytes, &desc); err != nil {
return err
}
// Descriptor for range `[a,z)` must be found at `/Local/Range/a/RangeDescriptor`.
if !startKey.Equal(desc.StartKey.AsRawKey()) {
return errors.AssertionFailedf("descriptor stored at %s but has StartKey %s",
kv.Key, desc.StartKey)
}
if !desc.IsInitialized() {
return errors.AssertionFailedf("uninitialized descriptor: %s", desc)
}
matchCount++
err = fn(desc)
if err == nil {
return nil
}
if iterutil.Map(err) == nil {
return iterutil.StopIteration()
}
return err
}

start := keys.RangeDescriptorKey(roachpb.RKeyMin)
end := keys.RangeDescriptorKey(roachpb.RKeyMax)
_, err := storage.MVCCIterate(ctx, reader, start, end, hlc.MaxTimestamp, opts, kvToDesc)
log.Infof(ctx, "range descriptor iteration done: %d keys, %d range descriptors (by suffix: %v); %s",
allCount, matchCount, bySuffix, &scanStats)
return err
}

_, err := storage.MVCCIterate(ctx, reader, start, end, hlc.MaxTimestamp, opts, kvToDesc)
log.Infof(ctx, "range descriptor iteration done: %d keys, %d range descriptors (by suffix: %v); %s",
allCount, matchCount, bySuffix, &scanStats)
return err
return nil
}

// A Replica references a CockroachDB Replica. The data in this struct does not
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/kvstorage/testdata/assert_replicaid
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load-and-reconcile trace=true
----
no RaftReplicaID for r5:{a-c} [(n1,s1):50, next=51, gen=0]
beginning range descriptor iteration
range descriptor iteration done: 1 keys, 1 range descriptors (by suffix: map[rdsc:1]); scan stats: <redacted>
range descriptor iteration done: 1 range descriptors, 0 seeks, 0 skipped intents
loaded replica ID for 1/1 replicas
loaded Raft state for 1/1 replicas
loaded 1 replicas
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ load-and-reconcile trace=true
----
no RaftReplicaID for <nil>
beginning range descriptor iteration
range descriptor iteration done: 0 keys, 0 range descriptors (by suffix: map[]); scan stats: <redacted>
range descriptor iteration done: 0 range descriptors, 0 seeks, 0 skipped intents
loaded replica ID for 0/0 replicas
loaded Raft state for 1/1 replicas
loaded 1 replicas
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/kvstorage/testdata/init
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ load-and-reconcile trace=true
r6/60: uninitialized
r8/80: r8:{c-f} [(n1,s1):80, next=81, gen=0]
beginning range descriptor iteration
range descriptor iteration done: 1 keys, 1 range descriptors (by suffix: map[rdsc:1]); scan stats: <redacted>
range descriptor iteration done: 1 range descriptors, 0 seeks, 0 skipped intents
loaded replica ID for 2/2 replicas
loaded Raft state for 2/2 replicas
loaded 2 replicas
Expand All @@ -26,7 +26,7 @@ load-and-reconcile trace=true
r6/60: uninitialized
r8/80: r8:{c-f} [(n1,s1):80, next=81, gen=0]
beginning range descriptor iteration
range descriptor iteration done: 1 keys, 1 range descriptors (by suffix: map[rdsc:1]); scan stats: <redacted>
range descriptor iteration done: 1 range descriptors, 0 seeks, 0 skipped intents
loaded replica ID for 2/2 replicas
loaded Raft state for 2/2 replicas
loaded 2 replicas
Expand Down

0 comments on commit 9232118

Please sign in to comment.