diff --git a/raft.go b/raft.go index 439fe6e2..28c11283 100644 --- a/raft.go +++ b/raft.go @@ -692,8 +692,21 @@ func (r *Raft) leaderLoop() { case err := <-doneCh: if err != nil { r.logger.Debug(err.Error()) + future.respond(err) + } else { + // Wait for up to ElectionTimeout before flagging the + // leadership transfer as done and unblocking applies in + // the leaderLoop. + select { + case <-time.After(r.config().ElectionTimeout): + err := fmt.Errorf("leadership transfer timeout") + r.logger.Debug(err.Error()) + future.respond(err) + case <-leftLeaderLoop: + r.logger.Debug("lost leadership during transfer (expected)") + future.respond(nil) + } } - future.respond(err) } }() diff --git a/raft_test.go b/raft_test.go index 8e1a00e3..3eaf1e3c 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2337,17 +2337,71 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) { } } +func TestRaft_LeadershipTransferWithWrites(t *testing.T) { + conf := inmemConfig(t) + conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace}) + c := MakeCluster(7, t, conf) + defer c.Close() + + doneCh := make(chan struct{}) + var writerErr error + var wg sync.WaitGroup + var writes int + wg.Add(1) + leader := c.Leader() + go func() { + defer wg.Done() + for { + select { + case <-doneCh: + return + default: + future := leader.Apply([]byte("test"), 0) + switch err := future.Error(); { + case errors.Is(err, ErrRaftShutdown): + return + case errors.Is(err, ErrNotLeader): + leader = c.Leader() + case errors.Is(err, ErrLeadershipTransferInProgress): + continue + case errors.Is(err, ErrLeadershipLost): + continue + case err == nil: + writes++ + default: + writerErr = err + } + time.Sleep(time.Millisecond) + } + } + }() + + follower := c.Followers()[0] + future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) + if future.Error() != nil { + t.Fatalf("Didn't expect error: %v", future.Error()) + } + if follower.localID != c.Leader().localID { + t.Error("Leadership should have been transitioned to specified server.") + } + close(doneCh) + wg.Wait() + if writerErr != nil { + t.Fatal(writerErr) + } + t.Logf("writes: %d", writes) +} + func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) { c := MakeCluster(7, t, nil) defer c.Close() - oldLeader := c.Leader().localID follower := c.GetInState(Follower)[0] future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr) if future.Error() != nil { t.Fatalf("Didn't expect error: %v", future.Error()) } - if oldLeader == c.Leader().localID { + if follower.localID != c.Leader().localID { t.Error("Leadership should have been transitioned to specified server.") } } @@ -2510,7 +2564,7 @@ func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) { } func TestRaft_LeadershipTransferStopRightAway(t *testing.T) { - r := Raft{leaderState: leaderState{}} + r := Raft{leaderState: leaderState{}, logger: hclog.New(nil)} r.setupLeaderState() stopCh := make(chan struct{}) diff --git a/testing.go b/testing.go index 91cf6e76..e0885714 100644 --- a/testing.go +++ b/testing.go @@ -433,7 +433,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft { // restart the timer. pollStartTime := time.Now() for { - inState, highestTerm := c.pollState(s) + _, highestTerm := c.pollState(s) inStateTime := time.Now() // Sometimes this routine is called very early on before the @@ -479,8 +479,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft { c.t.Fatalf("timer channel errored") } - c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", - s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) + inState, highestTerm := c.pollState(s) + c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), highestTerm is %d, %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", + s, inStateTime, len(inState), highestTerm, inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime))) return inState } }