Skip to content

Commit

Permalink
Merge #42379 #42724
Browse files Browse the repository at this point in the history
42379: storage: remove the replica up to date check when transfering lease r=darinpp a=darinpp

Previously, we had a check that filtered out all replicas that
are lagging behind the leader in case of a lease transfer.

We remove that check so in case of lease preference for 
a node that is constantly lagging - the lease transfer can occur without delay.

This removes the check that the candidates for lease transfer are only
replicas that aren't lagging behind. etcd implements the
3.10 Leadership transfer extension where the old leader will
bring up to date the new leader's log while blocking any new requests.

Release note (bug fix): now possible to transfer range leases to lagging replicas

42724: storage: create cluster setting for a minimum lease transfer interval r=nvanbenschoten a=nvanbenschoten

This minimum interval was previously set to 1 second in a constant, which was frustrating when setting up datasets with lease preferences. This change turns the interval into a configurable cluster setting so that it can be changed on-demand.

This value also seems fairly high, but to avoid causing any instability or thrashing I opted not to touch the default for the new cluster setting.

Release note (sql change): A new kv.allocator.min_lease_transfer_interval cluster setting was introduced, which allows the minimum interval between lease transfers initiated from each node to be configured.

Co-authored-by: Darin <darinp@gmail.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Nov 25, 2019
3 parents 400299a + 371738c + 39f24ea commit a75663e
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 8 deletions.
12 changes: 12 additions & 0 deletions pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,18 @@ import (
"go.etcd.io/etcd/raft"
)

func (s *Store) Transport() *RaftTransport {
return s.cfg.Transport
}

func (s *Store) FindTargetAndTransferLease(
ctx context.Context, repl *Replica, desc *roachpb.RangeDescriptor, zone *config.ZoneConfig,
) (bool, error) {
return s.replicateQueue.findTargetAndTransferLease(
ctx, repl, desc, zone, transferLeaseOptions{},
)
}

// AddReplica adds the replica to the store's replica map and to the sorted
// replicasByKey slice. To be used only by unittests.
func (s *Store) AddReplica(repl *Replica) error {
Expand Down
23 changes: 15 additions & 8 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/config"
"github.com/cockroachdb/cockroach/pkg/gossip"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -37,11 +38,6 @@ const (
// replicas.
replicateQueueTimerDuration = 0 // zero duration to process replication greedily

// minLeaseTransferInterval controls how frequently leases can be transferred
// for rebalancing. It does not prevent transferring leases in order to allow
// a replica to be removed from a range.
minLeaseTransferInterval = time.Second

// newReplicaGracePeriod is the amount of time that we allow for a new
// replica's raft state to catch up to the leader's before we start
// considering it to be behind for the sake of rebalancing. We choose a
Expand All @@ -51,6 +47,17 @@ const (
newReplicaGracePeriod = 5 * time.Minute
)

// minLeaseTransferInterval controls how frequently leases can be transferred
// for rebalancing. It does not prevent transferring leases in order to allow
// a replica to be removed from a range.
var minLeaseTransferInterval = settings.RegisterNonNegativeDurationSetting(
"kv.allocator.min_lease_transfer_interval",
"controls how frequently leases can be transferred for rebalancing. "+
"It does not prevent transferring leases in order to allow a "+
"replica to be removed from a range.",
1*time.Second,
)

var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
Expand Down Expand Up @@ -934,11 +941,10 @@ func (rq *replicateQueue) findTargetAndTransferLease(
) (bool, error) {
// Learner replicas aren't allowed to become the leaseholder or raft leader,
// so only consider the `Voters` replicas.
candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas().Voters())
target := rq.allocator.TransferLeaseTarget(
ctx,
zone,
candidates,
desc.Replicas().Voters(),
repl.store.StoreID(),
desc.RangeID,
repl.leaseholderStats,
Expand Down Expand Up @@ -1003,7 +1009,8 @@ func (rq *replicateQueue) changeReplicas(

func (rq *replicateQueue) canTransferLease() bool {
if lastLeaseTransfer := rq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil {
return timeutil.Since(lastLeaseTransfer.(time.Time)) > minLeaseTransferInterval
minInterval := minLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV)
return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval
}
return true
}
Expand Down
203 changes: 203 additions & 0 deletions pkg/storage/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/raft/tracker"
)

func TestReplicateQueueRebalance(t *testing.T) {
Expand Down Expand Up @@ -478,3 +480,204 @@ func TestLargeUnsplittableRangeReplicate(t *testing.T) {
return nil
})
}

type delayingRaftMessageHandler struct {
storage.RaftMessageHandler
leaseHolderNodeID uint64
rangeID roachpb.RangeID
}

const (
queryInterval = 10 * time.Millisecond
raftDelay = 175 * time.Millisecond
)

func (h delayingRaftMessageHandler) HandleRaftRequest(
ctx context.Context,
req *storage.RaftMessageRequest,
respStream storage.RaftMessageResponseStream,
) *roachpb.Error {
if h.rangeID != req.RangeID {
return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
}
go func() {
time.Sleep(raftDelay)
err := h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream)
if err != nil {
log.Infof(ctx, "HandleRaftRequest returned err %s", err)
}
}()

return nil
}

func TestTransferLeaseToLaggingNode(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
clusterArgs := base.TestClusterArgs{
ServerArgsPerNode: map[int]base.TestServerArgs{
0: {
ScanMaxIdleTime: time.Millisecond,
StoreSpecs: []base.StoreSpec{{
InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{"n1"}},
}},
},
1: {
ScanMaxIdleTime: time.Millisecond,
StoreSpecs: []base.StoreSpec{{
InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{"n2"}},
}},
},
2: {
ScanMaxIdleTime: time.Millisecond,
StoreSpecs: []base.StoreSpec{{
InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{"n3"}},
}},
},
},
}

tc := testcluster.StartTestCluster(t,
len(clusterArgs.ServerArgsPerNode), clusterArgs)
defer tc.Stopper().Stop(ctx)

if err := tc.WaitForFullReplication(); err != nil {
t.Fatal(err)
}

// Get the system.comments' range and lease holder
var rangeID roachpb.RangeID
var leaseHolderNodeID uint64
s := sqlutils.MakeSQLRunner(tc.Conns[0])
s.Exec(t, "insert into system.comments values(0,0,0,'abc')")
s.QueryRow(t,
"select range_id, lease_holder from "+
"[show ranges from table system.comments] limit 1",
).Scan(&rangeID, &leaseHolderNodeID)
remoteNodeID := uint64(1)
if leaseHolderNodeID == 1 {
remoteNodeID = 2
}
log.Infof(ctx, "RangeID %d, RemoteNodeID %d, LeaseHolderNodeID %d",
rangeID, remoteNodeID, leaseHolderNodeID)
leaseHolderSrv := tc.Servers[leaseHolderNodeID-1]
leaseHolderStoreID := leaseHolderSrv.GetFirstStoreID()
leaseHolderStore, err := leaseHolderSrv.Stores().GetStore(leaseHolderStoreID)
if err != nil {
t.Fatal(err)
}

// Start delaying Raft messages to the remote node
remoteSrv := tc.Servers[remoteNodeID-1]
remoteStoreID := remoteSrv.GetFirstStoreID()
remoteStore, err := remoteSrv.Stores().GetStore(remoteStoreID)
if err != nil {
t.Fatal(err)
}
remoteStore.Transport().Listen(
remoteStoreID,
delayingRaftMessageHandler{remoteStore, leaseHolderNodeID, rangeID},
)

workerReady := make(chan bool)
// Create persistent range load.
tc.Stopper().RunWorker(ctx, func(ctx context.Context) {
s = sqlutils.MakeSQLRunner(tc.Conns[remoteNodeID-1])
workerReady <- true
for {
s.Exec(t, fmt.Sprintf("update system.comments set comment='abc' "+
"where type=0 and object_id=0 and sub_id=0"))

select {
case <-ctx.Done():
return
case <-tc.Stopper().ShouldQuiesce():
return
case <-time.After(queryInterval):
}
}
})
<-workerReady
// Wait until we see remote making progress
leaseHolderRepl, err := leaseHolderStore.GetReplica(roachpb.RangeID(rangeID))
if err != nil {
t.Fatal(err)
}

var remoteRepl *storage.Replica
testutils.SucceedsSoon(t, func() error {
remoteRepl, err = remoteStore.GetReplica(roachpb.RangeID(rangeID))
return err
})
testutils.SucceedsSoon(t, func() error {
status := leaseHolderRepl.RaftStatus()
progress := status.Progress[uint64(remoteRepl.ReplicaID())]
if progress.Match > 0 {
return nil
}
return errors.Errorf(
"remote is not making progress: %+v", progress.Match,
)
})

// Wait until we see the remote replica lagging behind
for {
// Ensure that the replica on the remote node is lagging.
status := leaseHolderRepl.RaftStatus()
progress := status.Progress[uint64(remoteRepl.ReplicaID())]
if progress.State == tracker.StateReplicate &&
(status.Commit-progress.Match) > 0 {
break
}
time.Sleep(13 * time.Millisecond)
}

// Set the zone preference for the replica to show that it has to be moved
// to the remote node.
desc, zone := leaseHolderRepl.DescAndZone()
newZone := *zone
newZone.LeasePreferences = []config.LeasePreference{
{
Constraints: []config.Constraint{
{
Type: config.Constraint_REQUIRED,
Value: fmt.Sprintf("n%d", remoteNodeID),
},
},
},
}

// By now the lease holder may have changed.
testutils.SucceedsSoon(t, func() error {
leaseBefore, _ := leaseHolderRepl.GetLease()
log.Infof(ctx, "Lease before transfer %+v\n", leaseBefore)

if uint64(leaseBefore.Replica.NodeID) == remoteNodeID {
log.Infof(
ctx,
"Lease successfully transferred to desired node %d\n",
remoteNodeID,
)
return nil
}
currentSrv := tc.Servers[leaseBefore.Replica.NodeID-1]
leaseStore, err := currentSrv.Stores().GetStore(currentSrv.GetFirstStoreID())
if err != nil {
return err
}
leaseRepl, err := leaseStore.GetReplica(rangeID)
if err != nil {
return err
}
transferred, err := leaseStore.FindTargetAndTransferLease(
ctx, leaseRepl, desc, &newZone)
if err != nil {
return err
}
if !transferred {
return errors.Errorf("unable to transfer")
}
return errors.Errorf("Repeat check for correct leaseholder")
})
}

0 comments on commit a75663e

Please sign in to comment.