diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index d54de75ad769..ddd996e23309 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -44,6 +44,18 @@ import ( "go.etcd.io/etcd/raft" ) +func (s *Store) Transport() *RaftTransport { + return s.cfg.Transport +} + +func (s *Store) FindTargetAndTransferLease( + ctx context.Context, repl *Replica, desc *roachpb.RangeDescriptor, zone *config.ZoneConfig, +) (bool, error) { + return s.replicateQueue.findTargetAndTransferLease( + ctx, repl, desc, zone, transferLeaseOptions{}, + ) +} + // AddReplica adds the replica to the store's replica map and to the sorted // replicasByKey slice. To be used only by unittests. func (s *Store) AddReplica(repl *Replica) error { diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index edcdac1307c5..c71be5a8b2ed 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config" "github.com/cockroachdb/cockroach/pkg/gossip" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -37,11 +38,6 @@ const ( // replicas. replicateQueueTimerDuration = 0 // zero duration to process replication greedily - // minLeaseTransferInterval controls how frequently leases can be transferred - // for rebalancing. It does not prevent transferring leases in order to allow - // a replica to be removed from a range. - minLeaseTransferInterval = time.Second - // newReplicaGracePeriod is the amount of time that we allow for a new // replica's raft state to catch up to the leader's before we start // considering it to be behind for the sake of rebalancing. We choose a @@ -51,6 +47,17 @@ const ( newReplicaGracePeriod = 5 * time.Minute ) +// minLeaseTransferInterval controls how frequently leases can be transferred +// for rebalancing. It does not prevent transferring leases in order to allow +// a replica to be removed from a range. +var minLeaseTransferInterval = settings.RegisterNonNegativeDurationSetting( + "kv.allocator.min_lease_transfer_interval", + "controls how frequently leases can be transferred for rebalancing. "+ + "It does not prevent transferring leases in order to allow a "+ + "replica to be removed from a range.", + 1*time.Second, +) + var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", @@ -934,11 +941,10 @@ func (rq *replicateQueue) findTargetAndTransferLease( ) (bool, error) { // Learner replicas aren't allowed to become the leaseholder or raft leader, // so only consider the `Voters` replicas. - candidates := filterBehindReplicas(repl.RaftStatus(), desc.Replicas().Voters()) target := rq.allocator.TransferLeaseTarget( ctx, zone, - candidates, + desc.Replicas().Voters(), repl.store.StoreID(), desc.RangeID, repl.leaseholderStats, @@ -1003,7 +1009,8 @@ func (rq *replicateQueue) changeReplicas( func (rq *replicateQueue) canTransferLease() bool { if lastLeaseTransfer := rq.lastLeaseTransfer.Load(); lastLeaseTransfer != nil { - return timeutil.Since(lastLeaseTransfer.(time.Time)) > minLeaseTransferInterval + minInterval := minLeaseTransferInterval.Get(&rq.store.cfg.Settings.SV) + return timeutil.Since(lastLeaseTransfer.(time.Time)) > minInterval } return true } diff --git a/pkg/storage/replicate_queue_test.go b/pkg/storage/replicate_queue_test.go index ac9c643c0523..99456480a363 100644 --- a/pkg/storage/replicate_queue_test.go +++ b/pkg/storage/replicate_queue_test.go @@ -28,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/storagepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -35,6 +36,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/tracker" ) func TestReplicateQueueRebalance(t *testing.T) { @@ -478,3 +480,204 @@ func TestLargeUnsplittableRangeReplicate(t *testing.T) { return nil }) } + +type delayingRaftMessageHandler struct { + storage.RaftMessageHandler + leaseHolderNodeID uint64 + rangeID roachpb.RangeID +} + +const ( + queryInterval = 10 * time.Millisecond + raftDelay = 175 * time.Millisecond +) + +func (h delayingRaftMessageHandler) HandleRaftRequest( + ctx context.Context, + req *storage.RaftMessageRequest, + respStream storage.RaftMessageResponseStream, +) *roachpb.Error { + if h.rangeID != req.RangeID { + return h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + } + go func() { + time.Sleep(raftDelay) + err := h.RaftMessageHandler.HandleRaftRequest(ctx, req, respStream) + if err != nil { + log.Infof(ctx, "HandleRaftRequest returned err %s", err) + } + }() + + return nil +} + +func TestTransferLeaseToLaggingNode(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + clusterArgs := base.TestClusterArgs{ + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: { + ScanMaxIdleTime: time.Millisecond, + StoreSpecs: []base.StoreSpec{{ + InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{"n1"}}, + }}, + }, + 1: { + ScanMaxIdleTime: time.Millisecond, + StoreSpecs: []base.StoreSpec{{ + InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{"n2"}}, + }}, + }, + 2: { + ScanMaxIdleTime: time.Millisecond, + StoreSpecs: []base.StoreSpec{{ + InMemory: true, Attributes: roachpb.Attributes{Attrs: []string{"n3"}}, + }}, + }, + }, + } + + tc := testcluster.StartTestCluster(t, + len(clusterArgs.ServerArgsPerNode), clusterArgs) + defer tc.Stopper().Stop(ctx) + + if err := tc.WaitForFullReplication(); err != nil { + t.Fatal(err) + } + + // Get the system.comments' range and lease holder + var rangeID roachpb.RangeID + var leaseHolderNodeID uint64 + s := sqlutils.MakeSQLRunner(tc.Conns[0]) + s.Exec(t, "insert into system.comments values(0,0,0,'abc')") + s.QueryRow(t, + "select range_id, lease_holder from "+ + "[show ranges from table system.comments] limit 1", + ).Scan(&rangeID, &leaseHolderNodeID) + remoteNodeID := uint64(1) + if leaseHolderNodeID == 1 { + remoteNodeID = 2 + } + log.Infof(ctx, "RangeID %d, RemoteNodeID %d, LeaseHolderNodeID %d", + rangeID, remoteNodeID, leaseHolderNodeID) + leaseHolderSrv := tc.Servers[leaseHolderNodeID-1] + leaseHolderStoreID := leaseHolderSrv.GetFirstStoreID() + leaseHolderStore, err := leaseHolderSrv.Stores().GetStore(leaseHolderStoreID) + if err != nil { + t.Fatal(err) + } + + // Start delaying Raft messages to the remote node + remoteSrv := tc.Servers[remoteNodeID-1] + remoteStoreID := remoteSrv.GetFirstStoreID() + remoteStore, err := remoteSrv.Stores().GetStore(remoteStoreID) + if err != nil { + t.Fatal(err) + } + remoteStore.Transport().Listen( + remoteStoreID, + delayingRaftMessageHandler{remoteStore, leaseHolderNodeID, rangeID}, + ) + + workerReady := make(chan bool) + // Create persistent range load. + tc.Stopper().RunWorker(ctx, func(ctx context.Context) { + s = sqlutils.MakeSQLRunner(tc.Conns[remoteNodeID-1]) + workerReady <- true + for { + s.Exec(t, fmt.Sprintf("update system.comments set comment='abc' "+ + "where type=0 and object_id=0 and sub_id=0")) + + select { + case <-ctx.Done(): + return + case <-tc.Stopper().ShouldQuiesce(): + return + case <-time.After(queryInterval): + } + } + }) + <-workerReady + // Wait until we see remote making progress + leaseHolderRepl, err := leaseHolderStore.GetReplica(roachpb.RangeID(rangeID)) + if err != nil { + t.Fatal(err) + } + + var remoteRepl *storage.Replica + testutils.SucceedsSoon(t, func() error { + remoteRepl, err = remoteStore.GetReplica(roachpb.RangeID(rangeID)) + return err + }) + testutils.SucceedsSoon(t, func() error { + status := leaseHolderRepl.RaftStatus() + progress := status.Progress[uint64(remoteRepl.ReplicaID())] + if progress.Match > 0 { + return nil + } + return errors.Errorf( + "remote is not making progress: %+v", progress.Match, + ) + }) + + // Wait until we see the remote replica lagging behind + for { + // Ensure that the replica on the remote node is lagging. + status := leaseHolderRepl.RaftStatus() + progress := status.Progress[uint64(remoteRepl.ReplicaID())] + if progress.State == tracker.StateReplicate && + (status.Commit-progress.Match) > 0 { + break + } + time.Sleep(13 * time.Millisecond) + } + + // Set the zone preference for the replica to show that it has to be moved + // to the remote node. + desc, zone := leaseHolderRepl.DescAndZone() + newZone := *zone + newZone.LeasePreferences = []config.LeasePreference{ + { + Constraints: []config.Constraint{ + { + Type: config.Constraint_REQUIRED, + Value: fmt.Sprintf("n%d", remoteNodeID), + }, + }, + }, + } + + // By now the lease holder may have changed. + testutils.SucceedsSoon(t, func() error { + leaseBefore, _ := leaseHolderRepl.GetLease() + log.Infof(ctx, "Lease before transfer %+v\n", leaseBefore) + + if uint64(leaseBefore.Replica.NodeID) == remoteNodeID { + log.Infof( + ctx, + "Lease successfully transferred to desired node %d\n", + remoteNodeID, + ) + return nil + } + currentSrv := tc.Servers[leaseBefore.Replica.NodeID-1] + leaseStore, err := currentSrv.Stores().GetStore(currentSrv.GetFirstStoreID()) + if err != nil { + return err + } + leaseRepl, err := leaseStore.GetReplica(rangeID) + if err != nil { + return err + } + transferred, err := leaseStore.FindTargetAndTransferLease( + ctx, leaseRepl, desc, &newZone) + if err != nil { + return err + } + if !transferred { + return errors.Errorf("unable to transfer") + } + return errors.Errorf("Repeat check for correct leaseholder") + }) +}