Skip to content

Commit

Permalink
Merge pull request #100686 from erikgrinaker/backport23.1-100392-100430
Browse files Browse the repository at this point in the history
release-23.1: kvserver: eagerly extend expiration leases in Raft scheduler
  • Loading branch information
erikgrinaker committed Apr 12, 2023
2 parents f1c637f + 27030bf commit 285fc42
Show file tree
Hide file tree
Showing 12 changed files with 343 additions and 55 deletions.
10 changes: 8 additions & 2 deletions pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,8 +1646,14 @@ func (r *Replica) checkExecutionCanProceedBeforeStorageSnapshot(
}
}

if shouldExtend {
// If we're asked to extend the lease, trigger (async) lease renewal.
if shouldExtend && !ExpirationLeasesOnly.Get(&r.ClusterSettings().SV) {
// If we're asked to extend the lease, trigger (async) lease renewal. We
// don't do this if kv.expiration_leases_only.enabled is true, since we in
// that case eagerly extend expiration leases during Raft ticks instead.
//
// TODO(erikgrinaker): Remove this when we always eagerly extend
// expiration leases.
//
// Kicking this off requires an exclusive lock, and we hold a read-only lock
// already, so we jump through a hoop to run it in a suitably positioned
// defer.
Expand Down
206 changes: 196 additions & 10 deletions pkg/kv/kvserver/replica_lease_renewal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test
package kvserver

import (
"context"
Expand All @@ -17,36 +17,222 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// TestLeaseRenewer tests that the store lease renewer or the Raft scheduler
// correctly tracks and extends expiration-based leases. The responsibility
// is given by kv.expiration_leases_only.enabled.
func TestLeaseRenewer(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// stressrace and deadlock make the test too slow, resulting in an inability
// to maintain leases and Raft leadership.
skip.UnderStressRace(t)
skip.UnderDeadlock(t)

// When kv.expiration_leases_only.enabled is true, the Raft scheduler is
// responsible for extensions, but we still track expiration leases for system
// ranges in the store lease renewer in case the setting changes.
testutils.RunTrueAndFalse(t, "kv.expiration_leases_only.enabled", func(t *testing.T, expOnly bool) {
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
ExpirationLeasesOnly.Override(ctx, &st.SV, expOnly)
tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: st,
// Speed up lease extensions to speed up the test, but adjust tick-based
// timeouts to retain their default wall-time values.
RaftConfig: base.RaftConfig{
RangeLeaseRenewalFraction: 0.95,
RaftTickInterval: 100 * time.Millisecond,
RaftElectionTimeoutTicks: 20,
RaftReproposalTimeoutTicks: 30,
},
Knobs: base.TestingKnobs{
Store: &StoreTestingKnobs{
LeaseRenewalDurationOverride: 100 * time.Millisecond,
},
},
},
})
defer tc.Stopper().Stop(ctx)

require.NoError(t, tc.WaitForFullReplication())

lookupNode := func(nodeID roachpb.NodeID) int {
for idx, id := range tc.NodeIDs() {
if id == nodeID {
return idx
}
}
t.Fatalf("couldn't look up node %d", nodeID)
return 0
}

getNodeStore := func(nodeID roachpb.NodeID) *Store {
srv := tc.Server(lookupNode(nodeID))
s, err := srv.GetStores().(*Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)
return s
}

getNodeReplica := func(nodeID roachpb.NodeID, rangeID roachpb.RangeID) *Replica {
repl, err := getNodeStore(nodeID).GetReplica(rangeID)
require.NoError(t, err)
return repl
}

getLeaseRenewers := func(rangeID roachpb.RangeID) []roachpb.NodeID {
var renewers []roachpb.NodeID
for _, nodeID := range tc.NodeIDs() {
if _, ok := getNodeStore(nodeID).renewableLeases.Load(int64(rangeID)); ok {
renewers = append(renewers, nodeID)
}
}
return renewers
}

// assertLeaseExtension asserts that the given range has an expiration-based
// lease that is eagerly extended.
assertLeaseExtension := func(rangeID roachpb.RangeID) {
repl := getNodeReplica(1, rangeID)
lease, _ := repl.GetLease()
require.Equal(t, roachpb.LeaseExpiration, lease.Type())

var extensions int
require.Eventually(t, func() bool {
newLease, _ := repl.GetLease()
require.Equal(t, roachpb.LeaseExpiration, newLease.Type())
if *newLease.Expiration != *lease.Expiration {
extensions++
lease = newLease
t.Logf("r%d lease extended: %v", rangeID, lease)
}
return extensions >= 3
}, 20*time.Second, 100*time.Millisecond)
}

// assertLeaseUpgrade asserts that the range is eventually upgraded
// to an epoch lease.
assertLeaseUpgrade := func(rangeID roachpb.RangeID) {
repl := getNodeReplica(1, rangeID)
require.Eventually(t, func() bool {
lease, _ := repl.GetLease()
return lease.Type() == roachpb.LeaseEpoch
}, 20*time.Second, 100*time.Millisecond)
}

// assertStoreLeaseRenewer asserts that the range is tracked by the store lease
// renewer on the leaseholder.
assertStoreLeaseRenewer := func(rangeID roachpb.RangeID) {
repl := getNodeReplica(1, rangeID)
require.Eventually(t, func() bool {
lease, _ := repl.GetLease()
renewers := getLeaseRenewers(rangeID)
renewedByLeaseholder := len(renewers) == 1 && renewers[0] == lease.Replica.NodeID
// If kv.expiration_leases_only.enabled is true, then the store lease
// renewer is disabled -- it will still track the ranges in case the
// setting changes, but won't detect the new lease and untrack them as
// long as it has a replica. We therefore allow multiple nodes to track
// it, as long as the leaseholder is one of them.
if expOnly {
for _, renewer := range renewers {
if renewer == lease.Replica.NodeID {
renewedByLeaseholder = true
break
}
}
}
if !renewedByLeaseholder {
t.Logf("r%d renewers: %v", rangeID, renewers)
}
return renewedByLeaseholder
}, 20*time.Second, 100*time.Millisecond)
}

// assertNoStoreLeaseRenewer asserts that the range is not tracked by any
// lease renewer.
assertNoStoreLeaseRenewer := func(rangeID roachpb.RangeID) {
require.Eventually(t, func() bool {
renewers := getLeaseRenewers(rangeID)
if len(renewers) > 0 {
t.Logf("r%d renewers: %v", rangeID, renewers)
return false
}
return true
}, 20*time.Second, 100*time.Millisecond)
}

// The meta range should always be eagerly renewed.
firstRangeID := tc.LookupRangeOrFatal(t, keys.MinKey).RangeID
assertLeaseExtension(firstRangeID)
assertStoreLeaseRenewer(firstRangeID)

// Split off an expiration-based range, and assert that the lease is extended.
desc := tc.LookupRangeOrFatal(t, tc.ScratchRangeWithExpirationLease(t))
assertLeaseExtension(desc.RangeID)
assertStoreLeaseRenewer(desc.RangeID)

// Transfer the lease to a different leaseholder, and assert that the lease is
// still extended. Wait for the split to apply on all nodes first.
require.NoError(t, tc.WaitForFullReplication())
lease, _ := getNodeReplica(1, desc.RangeID).GetLease()
target := tc.Target(lookupNode(lease.Replica.NodeID%3 + 1))
tc.TransferRangeLeaseOrFatal(t, desc, target)
assertLeaseExtension(desc.RangeID)
assertStoreLeaseRenewer(desc.RangeID)

// Merge the range back. This should unregister it from the lease renewer.
require.NoError(t, tc.Server(0).DB().AdminMerge(ctx, desc.StartKey.AsRawKey().Prevish(16)))
assertNoStoreLeaseRenewer(desc.RangeID)

// Split off a regular non-system range. This should only be eagerly
// extended if kv.expiration_leases_only.enabled is true, and should never
// be tracked by the store lease renewer (which only handles system ranges).
desc = tc.LookupRangeOrFatal(t, tc.ScratchRange(t))
if expOnly {
assertLeaseExtension(desc.RangeID)
} else {
assertLeaseUpgrade(desc.RangeID)
}
assertNoStoreLeaseRenewer(desc.RangeID)
})
}

func setupLeaseRenewerTest(
ctx context.Context, t *testing.T, init func(*base.TestClusterArgs),
) (
cycles *int32, /* atomic */
_ *testcluster.TestCluster,
_ serverutils.TestClusterInterface,
) {
cycles = new(int32)
var args base.TestClusterArgs
args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{
args.ServerArgs.Knobs.Store = &StoreTestingKnobs{
LeaseRenewalOnPostCycle: func() {
atomic.AddInt32(cycles, 1)
},
}
init(&args)
tc := testcluster.StartTestCluster(t, 1, args)
tc := serverutils.StartNewTestCluster(t, 1, args)
t.Cleanup(func() { tc.Stopper().Stop(ctx) })

desc := tc.LookupRangeOrFatal(t, tc.ScratchRangeWithExpirationLease(t))
s := tc.GetFirstStoreFromServer(t, 0)
srv := tc.Server(0)
s, err := srv.GetStores().(*Stores).GetStore(srv.GetFirstStoreID())
require.NoError(t, err)

_, err := s.DB().Get(ctx, desc.StartKey)
_, err = s.DB().Get(ctx, desc.StartKey)
require.NoError(t, err)

repl, err := s.GetReplica(desc.RangeID)
Expand All @@ -67,7 +253,7 @@ func TestLeaseRenewerExtendsExpirationBasedLeases(t *testing.T) {
t.Run("triggered", func(t *testing.T) {
renewCh := make(chan struct{})
cycles, tc := setupLeaseRenewerTest(ctx, t, func(args *base.TestClusterArgs) {
args.ServerArgs.Knobs.Store.(*kvserver.StoreTestingKnobs).LeaseRenewalSignalChan = renewCh
args.ServerArgs.Knobs.Store.(*StoreTestingKnobs).LeaseRenewalSignalChan = renewCh
})
defer tc.Stopper().Stop(ctx)

Expand All @@ -93,7 +279,7 @@ func TestLeaseRenewerExtendsExpirationBasedLeases(t *testing.T) {

t.Run("periodic", func(t *testing.T) {
cycles, tc := setupLeaseRenewerTest(ctx, t, func(args *base.TestClusterArgs) {
args.ServerArgs.Knobs.Store.(*kvserver.StoreTestingKnobs).LeaseRenewalDurationOverride = 10 * time.Millisecond
args.ServerArgs.Knobs.Store.(*StoreTestingKnobs).LeaseRenewalDurationOverride = 10 * time.Millisecond
})
defer tc.Stopper().Stop(ctx)

Expand Down
19 changes: 13 additions & 6 deletions pkg/kv/kvserver/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ func (r *Replica) leasePostApplyLocked(
// Everything we do before then doesn't need to worry about requests being
// evaluated under the new lease.

// maybeSplit is true if we may have been called during splitPostApply, where
// prevLease equals newLease and we're applying the RHS lease.
var maybeSplit bool

// Sanity check to make sure that the lease sequence is moving in the right
// direction.
if s1, s2 := prevLease.Sequence, newLease.Sequence; s1 != 0 {
Expand All @@ -248,6 +252,7 @@ func (r *Replica) leasePostApplyLocked(
log.Fatalf(ctx, "sequence identical for different leases, prevLease=%s, newLease=%s",
redact.Safe(prevLease), redact.Safe(newLease))
}
maybeSplit = prevLease.Equal(newLease)
case s2 == s1+1:
// Lease sequence incremented by 1. Expected case.
case s2 > s1+1 && jumpOpt == assertNoLeaseJump:
Expand Down Expand Up @@ -366,21 +371,23 @@ func (r *Replica) leasePostApplyLocked(
r.gossipFirstRangeLocked(ctx)
}

hasExpirationLease := newLease.Type() == roachpb.LeaseExpiration
if leaseChangingHands && iAmTheLeaseHolder && hasExpirationLease && r.ownsValidLeaseRLocked(ctx, now) {
isExpirationLease := newLease.Type() == roachpb.LeaseExpiration
if isExpirationLease && (leaseChangingHands || maybeSplit) && iAmTheLeaseHolder {
if r.requiresExpirationLeaseRLocked() {
// Whenever we first acquire an expiration-based lease for a range that
// requires it (i.e. the liveness or meta ranges), notify the lease
// renewer worker that we want it to keep proactively renewing the lease
// before it expires. We don't eagerly renew other expiration leases,
// because a more sophisticated scheduler is needed to handle large
// numbers of expiration leases.
// before it expires.
//
// Other expiration leases are only proactively renewed if
// kv.expiration_leases_only.enabled is true, but in that case the renewal
// is handled by the Raft scheduler during Raft ticks.
r.store.renewableLeases.Store(int64(r.RangeID), unsafe.Pointer(r))
select {
case r.store.renewableLeasesSignal <- struct{}{}:
default:
}
} else if !r.shouldUseExpirationLeaseRLocked() {
} else if !r.shouldUseExpirationLeaseRLocked() && r.ownsValidLeaseRLocked(ctx, now) {
// We received an expiration lease for a range that shouldn't keep using
// it, most likely as part of a lease transfer (which is always
// expiration-based). We've also applied it before it has expired. Upgrade
Expand Down
20 changes: 20 additions & 0 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,6 +1210,26 @@ func (r *Replica) tick(

r.maybeTransferRaftLeadershipToLeaseholderLocked(ctx, now)

// Eagerly extend expiration leases if kv.expiration_leases_only.enabled is
// set. We can do this here because we don't allow ranges with expiration
// leases to quiesce in this case. We check the lease type and owner first,
// since leaseStatusAtRLocked() is moderately expensive.
//
// TODO(erikgrinaker): we should remove the store lease renewer and always
// do this, but we keep it for now out of caution.
//
// TODO(erikgrinaker): the replicate queue is responsible for acquiring leases
// for ranges that don't have one, and for switching the lease type when e.g.
// kv.expiration_leases_only.enabled changes. We should do this here when we
// remove quiescence.
if !r.store.cfg.TestingKnobs.DisableAutomaticLeaseRenewal {
if l := r.mu.state.Lease; l.Type() == roachpb.LeaseExpiration && l.OwnedBy(r.StoreID()) {
if ExpirationLeasesOnly.Get(&r.ClusterSettings().SV) {
r.maybeExtendLeaseAsyncLocked(ctx, r.leaseStatusAtRLocked(ctx, now))
}
}
}

// For followers, we update lastUpdateTimes when we step a message from them
// into the local Raft group. The leader won't hit that path, so we update
// it whenever it ticks. In effect, this makes sure it always sees itself as
Expand Down
Loading

0 comments on commit 285fc42

Please sign in to comment.