Skip to content

Commit

Permalink
kvcoord,kvclient,gossip: gossip range lease acquisition
Browse files Browse the repository at this point in the history
Before this patch, it was possible for a range cache to contain an
outdated lease if the node with the lease was restarted or went
AWOL.

This patch introduces a mechanism by which the new owner of a range
lease announces this ownership to other nodes via gossip.
Any cached lease for that range gets updated from the gossip update
if the gossiped lease is more recent than the one known.

Release note (general change): CockroachDB nodes now learn more
actively of range leadership transfers from other nodes. This makes
query performance generally more resilient to routine node restarts,
as fewer queries now get routed to an outdated or unavailable node.
  • Loading branch information
knz committed Aug 11, 2020
1 parent 3b2078b commit f94d76a
Show file tree
Hide file tree
Showing 13 changed files with 225 additions and 32 deletions.
40 changes: 40 additions & 0 deletions pkg/cli/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,6 +777,9 @@ func runDebugGossipValues(cmd *cobra.Command, args []string) error {

func parseGossipValues(gossipInfo *gossip.InfoStatus) (string, error) {
var output []string
leaseByStoreID := make(map[roachpb.StoreID][]string)
var allStores []int

for key, info := range gossipInfo.Infos {
bytes, err := info.Value.GetBytes()
if err != nil {
Expand Down Expand Up @@ -845,9 +848,46 @@ func parseGossipValues(gossipInfo *gossip.InfoStatus) (string, error) {
output = append(output, fmt.Sprintf("%q: %v", key, gossipedTime))
} else if strings.HasPrefix(key, gossip.KeyGossipClientsPrefix) {
output = append(output, fmt.Sprintf("%q: %v", key, string(bytes)))
} else if strings.HasPrefix(key, gossip.KeyRangeLeases) {
// For range lease details, we don't really like to print one line of output per entry
// in gossip, because we have one per range and there's potentially a lot of range.
// Instead, we print one line of output per store, and for each store
// we list the ranges for which gossip announces a lease on that store.
//
// Example output:
// "leases": s1=[r1/1:12 r2/1:2 r21/1:6 r23/1:2 r24/1:6 r26/1:2 r27/1:2 r6/1:2 r7/1:2 r8/1:2]
// "leases": s2=[r10/2:5 r11/2:9 r22/2:5 r3/2:4 r31/2:5 r33/2:5 r4/2:10 r9/2:7]
//
// (The number after the colon is the lease sequence
// number. Enables detection of leases that don't transfer
// properly.)
//
rangeID, err := gossip.RangeIDFromKey(key, gossip.KeyRangeLeases)
if err != nil {
return "", err
}
var lease roachpb.Lease
if err := protoutil.Unmarshal(bytes, &lease); err != nil {
return "", errors.Wrapf(err, "failed to parse value for key %q", key)
}
if _, ok := leaseByStoreID[lease.Replica.StoreID]; !ok {
allStores = append(allStores, int(lease.Replica.StoreID))
}
leaseByStoreID[lease.Replica.StoreID] = append(leaseByStoreID[lease.Replica.StoreID],
fmt.Sprintf("r%d/%d:%d", rangeID, lease.Replica.ReplicaID, lease.Sequence))
} else {
output = append(output, fmt.Sprintf("unknown key %q: %v", key, bytes))
}
}

sort.Ints(allStores)
for _, sid := range allStores {
storeID := roachpb.StoreID(sid)
ranges := leaseByStoreID[storeID]
sort.Strings(ranges)
output = append(output, fmt.Sprintf("%q: s%d=[%s]", gossip.KeyRangeLeases, storeID, strings.Join(ranges, " ")))
}

sort.Strings(output)
return strings.Join(output, "\n"), nil
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/gossip/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ const (
// info.
KeyNodeLivenessPrefix = "liveness"

// KeyRangeLeases is the key prefix for gossiping lease per range.
// The range ID is appended as suffix to KeyRangeLeases, see
// RangeIDFromKey below to get it back.
// The value is the acquired roachpb.Lease for that range.
KeyRangeLeases = "leases"

// KeySentinel is a key for gossip which must not expire or
// else the node considers itself partitioned and will retry with
// bootstrap hosts. The sentinel is gossiped by the node that holds
Expand Down Expand Up @@ -108,6 +114,26 @@ func MakePrefixPattern(prefix string) string {
return regexp.QuoteMeta(prefix+separator) + ".*"
}

// MakeRangeIDKey returns the gossip lease key for the range ID.
func MakeRangeIDKey(rangeID roachpb.RangeID) string {
return MakeKey(KeyRangeLeases, rangeID.String())
}

// RangeIDFromKey attempts to extract a range ID from the provided key after
// stripping the provided prefix. Returns an error if the key is not of the
// correct type or is not parsable.
func RangeIDFromKey(key string, prefix string) (roachpb.RangeID, error) {
trimmedKey, err := removePrefixFromKey(key, prefix)
if err != nil {
return 0, err
}
rangeID, err := strconv.ParseInt(trimmedKey, 10 /* base */, 64 /* bitSize */)
if err != nil {
return 0, errors.Wrapf(err, "failed parsing RangeID from key %q", key)
}
return roachpb.RangeID(rangeID), nil
}

// MakeNodeIDKey returns the gossip key for node ID info.
func MakeNodeIDKey(nodeID roachpb.NodeID) string {
return MakeKey(KeyNodeIDPrefix, nodeID.String())
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// splits to exercise that codepath, but we also want to make sure we
// still handle an unexpected split, so we make our own range cache and
// only populate it with one of our two splits.
mockCache := kvcoord.NewRangeDescriptorCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, s.Stopper())
mockCache := kvcoord.NewRangeDescriptorCache(s.ClusterSettings(), nil, func() int64 { return 2 << 10 }, nil, s.Stopper())
addr, err := keys.Addr(key(0))
if err != nil {
t.Fatal(err)
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,10 @@ type DistSenderConfig struct {
RPCContext *rpc.Context
NodeDialer *nodedialer.Dialer

// Gossip, if provided, is used to update range cache entries when a
// lease is acquired on a different node.
Gossip *gossip.Gossip

// One of the following two must be provided, but not both.
//
// If only FirstRangeProvider is supplied, DistSender will use itself as a
Expand Down Expand Up @@ -330,7 +334,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = NewRangeDescriptorCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
ds.rangeCache = NewRangeDescriptorCache(ds.st, rdb, getRangeDescCacheSize, cfg.Gossip, cfg.RPCContext.Stopper)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4170,7 +4170,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := NewRangeDescriptorCache(st, nil /* db */, getRangeDescCacheSize, stopper)
rc := NewRangeDescriptorCache(st, nil /* db */, getRangeDescCacheSize, nil, stopper)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: desc,
Lease: roachpb.Lease{
Expand Down
82 changes: 81 additions & 1 deletion pkg/kv/kvclient/kvcoord/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/biogo/store/llrb"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvbase"
"github.com/cockroachdb/cockroach/pkg/roachpb"
Expand All @@ -27,6 +28,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/cache"
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
Expand Down Expand Up @@ -177,8 +179,15 @@ func makeLookupRequestKey(
// NewRangeDescriptorCache returns a new RangeDescriptorCache which
// uses the given RangeDescriptorDB as the underlying source of range
// descriptors.
//
// The Gossip instance, if provided, is used to update cached
// entries when a lease is acquired on a different node.
func NewRangeDescriptorCache(
st *cluster.Settings, db RangeDescriptorDB, size func() int64, stopper *stop.Stopper,
st *cluster.Settings,
db RangeDescriptorDB,
size func() int64,
g *gossip.Gossip,
stopper *stop.Stopper,
) *RangeDescriptorCache {
rdc := &RangeDescriptorCache{st: st, db: db, stopper: stopper}
rdc.rangeCache.cache = cache.NewOrderedCache(cache.Config{
Expand All @@ -187,6 +196,14 @@ func NewRangeDescriptorCache(
return int64(n) > size()
},
})

if g != nil {
// If we have a gossip instance, support updating entries from
// incoming gossip notifications.
callback := gossip.Callback(rdc.updateFromGossip)
g.RegisterCallback(gossip.KeyRangeLeases, callback)
}

return rdc
}

Expand Down Expand Up @@ -1261,3 +1278,66 @@ func (e *rangeCacheEntry) updateLease(l *roachpb.Lease) (updated bool, newEntry
lease: *l,
}
}

func (rdc *RangeDescriptorCache) updateFromGossip(key string, v roachpb.Value) {
// TODO(knz): assign a proper context. This require rehauling the gossip package.
ctx := context.Background()

// The range ID is in the gossip key. Extract it.
rangeID, err := gossip.RangeIDFromKey(key, gossip.KeyRangeLeases)
if err != nil {
log.Warningf(ctx, "can't parse gossip update: %v", err)
}
// Now the range ID is known, equip this context with the range ID,
// this will clarify any subsequent log entry below.
ctx = logtags.AddTag(ctx, "r", rangeID)

// Retrieve the lease payload.
b, err := v.GetBytes()
if err != nil {
log.Warningf(ctx, "no bytes in lease gossip update: %v", err)
return
}
var newLease roachpb.Lease
if err := protoutil.Unmarshal(b, &newLease); err != nil {
log.Warningf(ctx, "error decoding lease gossip update: %v", err)
return
}
if log.V(1) {
log.Infof(ctx, "received lease gossip update: %+v", &newLease)
}

// If we have an entry in the cache for this range already,
// then update the lease for that entry.
rdc.rangeCache.Lock()
defer rdc.rangeCache.Unlock()

// The work here is rendered a bit inefficient (linear lookup)
// because the cache is organized by key span, and we need to look
// up by Range ID.
//
// TODO(knz,andrei): If this is found to be a bottleneck, accelerate
// by providing by-ID lookup for cache entries.
rdc.rangeCache.cache.DoEntry(func(e *cache.Entry) (exit bool) {
cached := rdc.getValue(e)
if cached.desc.RangeID != rangeID {
// Not our range? Goto next.
return false
}
// Update the lease that's already there. UpdateLease() is a no-op
// if the lease in the cache is more recent than the one received
// via gossip.
updated, newEntry := cached.updateLease(&newLease)
if updated && newEntry != nil {
// Entry was updated. Overwrite what we have in the cache.
e.Value = newEntry
if log.V(1) {
log.Infof(ctx, "updated cached lease from gossip, now at n%d,s%d",
newLease.Replica.NodeID, newLease.Replica.StoreID)
}
}
// There won't be another entry in the cache with the same range
// ID. Stop the iteration here.
return true
})
}
14 changes: 7 additions & 7 deletions pkg/kv/kvclient/kvcoord/range_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func initTestDescriptorDB(t *testing.T) *testDescriptorDB {
}
}
// TODO(andrei): don't leak this Stopper. Someone needs to Stop() it.
db.cache = NewRangeDescriptorCache(st, db, staticSize(2<<10), stop.NewStopper())
db.cache = NewRangeDescriptorCache(st, db, staticSize(2<<10), nil, stop.NewStopper())
return db
}

Expand Down Expand Up @@ -973,7 +973,7 @@ func TestRangeCacheClearOverlapping(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), nil, stopper)
cache.rangeCache.cache.Add(rangeCacheKey(keys.RangeMetaKey(roachpb.RKeyMax)), &rangeCacheEntry{desc: *defDesc})

// Now, add a new, overlapping set of descriptors.
Expand Down Expand Up @@ -1149,7 +1149,7 @@ func TestRangeCacheClearOlderOverlapping(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeDescriptorCache(st, nil /* db */, staticSize(2<<10), stopper)
cache := NewRangeDescriptorCache(st, nil /* db */, staticSize(2<<10), nil, stopper)
for _, d := range tc.cachedDescs {
cache.Insert(ctx, roachpb.RangeInfo{Desc: d})
}
Expand Down Expand Up @@ -1201,7 +1201,7 @@ func TestRangeCacheClearOverlappingMeta(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), nil, stopper)
cache.Insert(ctx,
roachpb.RangeInfo{Desc: firstDesc},
roachpb.RangeInfo{Desc: restDesc})
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func TestGetCachedRangeDescriptorInverted(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(context.Background())
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), nil, stopper)
for _, rd := range testData {
cache.rangeCache.cache.Add(
rangeCacheKey(keys.RangeMetaKey(rd.EndKey)), &rangeCacheEntry{desc: rd})
Expand Down Expand Up @@ -1374,7 +1374,7 @@ func TestRangeCacheGeneration(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), nil, stopper)
cache.Insert(ctx, roachpb.RangeInfo{Desc: *descAM2}, roachpb.RangeInfo{Desc: *descMZ4})
cache.Insert(ctx, roachpb.RangeInfo{Desc: *tc.insertDesc})

Expand Down Expand Up @@ -1445,7 +1445,7 @@ func TestRangeCacheUpdateLease(t *testing.T) {
st := cluster.MakeTestingClusterSettings()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), stopper)
cache := NewRangeDescriptorCache(st, nil, staticSize(2<<10), nil, stopper)

cache.Insert(ctx, roachpb.RangeInfo{
Desc: desc1,
Expand Down
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/replica_gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,38 @@ func (r *Replica) gossipFirstRange(ctx context.Context) {
}
}

// gossipLeaseAcquired informs other nodes of the new leaseholder, to
// invalidate any entries already present in the leaseholder cache of
// other nodes.
func (r *Replica) gossipLeaseAcquired(ctx context.Context, newLease *roachpb.Lease) {
r.mu.RLock()
defer r.mu.RUnlock()
// Gossip is not provided for the bootstrap store and for some tests.
if r.store.Gossip() == nil {
return
}
// If gossip is not yet ready, don't even try.
select {
case <-r.store.Gossip().Connected:
default:
if log.V(1) {
log.Infof(ctx, "not gossiping lease acquisition: cluster not yet initialized")
}
return
}
if log.V(1) {
log.Infof(ctx, "gossiping lease acquisition")
}

if err := r.store.Gossip().AddInfoProto(
gossip.MakeRangeIDKey(r.RangeID), /* the range we're gossiping for */
newLease, /* tell the world about our fresh new lease */
0, /* gossip forever */
); err != nil {
log.Warningf(ctx, "failed to gossip lease acquisition: %v", err)
}
}

// shouldGossip returns true if this replica should be gossiping. Gossip is
// inherently inconsistent and asynchronous, we're using the lease as a way to
// ensure that only one node gossips at a time.
Expand Down
40 changes: 25 additions & 15 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,21 +409,31 @@ func (r *Replica) leasePostApply(ctx context.Context, newLease roachpb.Lease, pe
expirationBasedLease := r.requiresExpiringLeaseRLocked()
r.mu.Unlock()

// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
// lease request and attempt to gossip the first range.
if leaseChangingHands && iAmTheLeaseHolder && r.IsFirstRange() && r.IsLeaseValid(ctx, newLease, r.store.Clock().Now()) {
r.gossipFirstRange(ctx)
}

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if leaseChangingHands && iAmTheLeaseHolder && expirationBasedLease && r.IsLeaseValid(ctx, newLease, r.store.Clock().Now()) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
default:
if leaseChangingHands && iAmTheLeaseHolder {
// Gossip the first range whenever its lease is acquired. We check to make
// sure the lease is active so that a trailing replica won't process an old
// lease request and attempt to gossip the first range.
if r.IsFirstRange() && r.IsLeaseValid(ctx, newLease, r.store.Clock().Now()) {
r.gossipFirstRange(ctx)
}

// Gossip that we have the new lease, to
// update any cached entry on other nodes.
// FIXME(andrei): do we want to move this call under the condition
// expirationBasedLease && r.IsLeaseValid below?
// Are there non-expiration leases we want to invalidate in other
// caches in this way? (my opinion: yes)
r.gossipLeaseAcquired(ctx, &newLease)

// Whenever we first acquire an expiration-based lease, notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires.
if expirationBasedLease && r.IsLeaseValid(ctx, newLease, r.store.Clock().Now()) {
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
default:
}
}
}

Expand Down

0 comments on commit f94d76a

Please sign in to comment.