From 62eaa1c192d4c09c90caa5e227177eb9205a6267 Mon Sep 17 00:00:00 2001 From: Nick Cabatoff Date: Mon, 4 Dec 2023 09:38:08 -0500 Subject: [PATCH] Fix rare leadership transfer failures when writes happen during transfer (#581) After initiating a leadership transfer, for up to electiontimeout duration, the old leader will no longer permit writes to occur. Allowing writes to occur in this scenario can result in the transfer failing, due to some node other than the target having the highest index. Moreover the writes might be rolled back anyway by the new leader if insufficiently widely committed. --- raft.go | 15 ++++++++++++- raft_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++++++++--- testing.go | 7 +++--- 3 files changed, 75 insertions(+), 7 deletions(-) diff --git a/raft.go b/raft.go index 439fe6e2..28c11283 100644 --- a/raft.go +++ b/raft.go @@ -692,8 +692,21 @@ func (r *Raft) leaderLoop() { case err := <-doneCh: if err != nil { r.logger.Debug(err.Error()) + future.respond(err) + } else { + // Wait for up to ElectionTimeout before flagging the + // leadership transfer as done and unblocking applies in + // the leaderLoop. + select { + case <-time.After(r.config().ElectionTimeout): + err := fmt.Errorf("leadership transfer timeout") + r.logger.Debug(err.Error()) + future.respond(err) + case <-leftLeaderLoop: + r.logger.Debug("lost leadership during transfer (expected)") + future.respond(nil) + } } - future.respond(err) } }() diff --git a/raft_test.go b/raft_test.go index 8e1a00e3..3eaf1e3c 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2337,17 +2337,71 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { } } +func TestRaft_LeadershipTransferWithWrites(t *testing.T) { + conf := inmemConfig(t) + conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}) + c := MakeCluster(7, t, conf) + defer c.Close() + + doneCh := make(chan struct{}) + var writerErr error + var wg sync.WaitGroup + var writes int + wg.Add(1) + leader := c.Leader() + go func() { + defer wg.Done() + for { + select { + case <-doneCh: + return + default: + future := leader.Apply([]byte("test"), 0) + switch err := future.Error(); { + case errors.Is(err, ErrRaftShutdown): + return + case errors.Is(err, ErrNotLeader): + leader = c.Leader() + case errors.Is(err, ErrLeadershipTransferInProgress): + continue + case errors.Is(err, ErrLeadershipLost): + continue + case err == nil: + writes++ + default: + writerErr = err + } + time.Sleep(time.Millisecond) + } + } + }() + + follower := c.Followers()[0] + future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) + if future.Error() != nil { + t.Fatalf("Didn't expect error: %v", future.Error()) + } + if follower.localID != c.Leader().localID { + t.Error("Leadership should have been transitioned to specified server.") + } + close(doneCh) + wg.Wait() + if writerErr != nil { + t.Fatal(writerErr) + } + t.Logf("writes: %d", writes) +} + func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { c := MakeCluster(7, t, nil) defer c.Close() - oldLeader := c.Leader().localID follower := c.GetInState(Follower)[0] future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) if future.Error() != nil { t.Fatalf("Didn't expect error: %v", future.Error()) } - if oldLeader == c.Leader().localID { + if follower.localID != c.Leader().localID { t.Error("Leadership should have been transitioned to specified server.") } } @@ -2510,7 +2564,7 @@ func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) { } func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { - r := Raft{leaderState: leaderState{}} + r := Raft{leaderState: leaderState{}, logger: hclog.New(nil)} r.setupLeaderState() stopCh := make(chan struct{}) diff --git a/testing.go b/testing.go index 91cf6e76..e0885714 100644 --- a/testing.go +++ b/testing.go @@ -433,7 +433,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft { // restart the timer. pollStartTime := time.Now() for { - inState, highestTerm := c.pollState(s) + _, highestTerm := c.pollState(s) inStateTime := time.Now() // Sometimes this routine is called very early on before the @@ -479,8 +479,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft { c.t.Fatalf("timer channel errored") } - c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", - s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) + inState, highestTerm := c.pollState(s) + c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), highestTerm is %d, %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", + s, inStateTime, len(inState), highestTerm, inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) return inState } }