Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix rare leadership transfer failures when writes happen during transfer #581

Merged
merged 7 commits into from
Dec 4, 2023
15 changes: 14 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,8 +692,21 @@ func (r *Raft) leaderLoop() {
case err := <-doneCh:
if err != nil {
r.logger.Debug(err.Error())
future.respond(err)
} else {
// Wait for up to ElectionTimeout before flagging the
// leadership transfer as done and unblocking applies in
// the leaderLoop.
select {
case <-time.After(r.config().ElectionTimeout):
err := fmt.Errorf("leadership transfer timeout")
r.logger.Debug(err.Error())
future.respond(err)
case <-leftLeaderLoop:
r.logger.Debug("lost leadership during transfer (expected)")
Comment on lines +697 to +706
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about just staying in the loop here instead of starting a new ElectionTimeout ticker and having to duplicate the leftLeaderLoop code path too?

I was originally expecting that we'd not wait for another ElectionTimeout but rather keep waiting for up to the original one we started for the transfer to complete. I don't think it's terrible to do it this way but it strikes me that it's less code and maybe simpler to reason able if the behaviour is just: block until leadership transfer works or one election timeout from handling the request...

The rationale for using ElectionTimeout here presumably was that, LeadershipTransfer is only an optimization to avoid just ungracefully stopping and having to wait for an election timeout... so letting leader transfer take longer than a whole election timeout is a little bit defeating the point and we should probably return fast and let the old leader just shut down if it wants to.

I think that rationale gets a bit more subtle in cases like Autopilot upgrade where we rely on Leadership Transfer working before we move on to the next phase rather than just shutting down anyway.

tl;dr, I don't think the behaviour here is bad or wrong especially and it beats the bug without this wait. If it turns out easier to follow to duplicate the code and make the behavior more explicit like you have here I'm OK with it. Just curious if you tried simply replacing this else branch with continue which I think is also a correct fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I tried that. It got messy due to the other cases wanting to read from doneCh. In the end I decided this was clearer.

future.respond(nil)
}
}
future.respond(err)
}
}()

Expand Down
60 changes: 57 additions & 3 deletions raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2337,17 +2337,71 @@ func TestRaft_LeadershipTransferWithOneNode(t *testing.T) {
}
}

func TestRaft_LeadershipTransferWithWrites(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have anecdata about roughly how frequently this reproduced a bug before your fix locally? I assume it's still non-deterministic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't give you numbers, but it didn't take many tries to provoke a failure. It can still fail if I up the concurrency enough, e.g. if I run 16 parallel -race instances. But I suspect the same is true for many other tests, as the failures are things like timeouts due to heartbeats not being processed quickly enough.

conf := inmemConfig(t)
conf.Logger = hclog.New(&hclog.LoggerOptions{Level: hclog.Trace})
c := MakeCluster(7, t, conf)
defer c.Close()

doneCh := make(chan struct{})
var writerErr error
var wg sync.WaitGroup
var writes int
wg.Add(1)
leader := c.Leader()
go func() {
defer wg.Done()
for {
select {
case <-doneCh:
return
default:
future := leader.Apply([]byte("test"), 0)
switch err := future.Error(); {
case errors.Is(err, ErrRaftShutdown):
return
case errors.Is(err, ErrNotLeader):
leader = c.Leader()
case errors.Is(err, ErrLeadershipTransferInProgress):
continue
case errors.Is(err, ErrLeadershipLost):
continue
case err == nil:
writes++
default:
writerErr = err
}
time.Sleep(time.Millisecond)
}
}
}()

follower := c.Followers()[0]
future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr)
if future.Error() != nil {
t.Fatalf("Didn't expect error: %v", future.Error())
}
if follower.localID != c.Leader().localID {
t.Error("Leadership should have been transitioned to specified server.")
}
close(doneCh)
wg.Wait()
if writerErr != nil {
t.Fatal(writerErr)
}
t.Logf("writes: %d", writes)
}

func TestRaft_LeadershipTransferWithSevenNodes(t *testing.T) {
c := MakeCluster(7, t, nil)
defer c.Close()

oldLeader := c.Leader().localID
follower := c.GetInState(Follower)[0]
future := c.Leader().LeadershipTransferToServer(follower.localID, follower.localAddr)
if future.Error() != nil {
t.Fatalf("Didn't expect error: %v", future.Error())
}
if oldLeader == c.Leader().localID {
if follower.localID != c.Leader().localID {
t.Error("Leadership should have been transitioned to specified server.")
}
}
Expand Down Expand Up @@ -2510,7 +2564,7 @@ func TestRaft_LeadershipTransferIgnoresNonvoters(t *testing.T) {
}

func TestRaft_LeadershipTransferStopRightAway(t *testing.T) {
r := Raft{leaderState: leaderState{}}
r := Raft{leaderState: leaderState{}, logger: hclog.New(nil)}
r.setupLeaderState()

stopCh := make(chan struct{})
Expand Down
7 changes: 4 additions & 3 deletions testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
// restart the timer.
pollStartTime := time.Now()
for {
inState, highestTerm := c.pollState(s)
_, highestTerm := c.pollState(s)
inStateTime := time.Now()

// Sometimes this routine is called very early on before the
Expand Down Expand Up @@ -479,8 +479,9 @@ func (c *cluster) GetInState(s RaftState) []*Raft {
c.t.Fatalf("timer channel errored")
}

c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)))
inState, highestTerm := c.pollState(s)
c.logger.Info(fmt.Sprintf("stable state for %s reached at %s (%d nodes), highestTerm is %d, %s from start of poll, %s from cluster start. Timeout at %s, %s after stability",
s, inStateTime, len(inState), highestTerm, inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)))
return inState
}
}
Expand Down