Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Special case the error returned when we have a Raft leader but are not tracking it in the ServerLookup #9487

Merged
merged 1 commit into from Jan 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
29 changes: 21 additions & 8 deletions agent/consul/rpc.go
Expand Up @@ -593,15 +593,14 @@ CHECK_LEADER:
}

// Find the leader
isLeader, leader := s.getLeader()
isLeader, leader, rpcErr := s.getLeader()

// Handle the case we are the leader
if isLeader {
return false, nil
}

// Handle the case of a known leader
rpcErr := structs.ErrNoLeader
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
method, args, reply)
Expand Down Expand Up @@ -632,24 +631,38 @@ RETRY:

// getLeader returns if the current node is the leader, and if not then it
// returns the leader which is potentially nil if the cluster has not yet
// elected a leader.
func (s *Server) getLeader() (bool, *metadata.Server) {
// elected a leader. In the case of not having a leader elected yet
// then a NoClusterLeader error gets returned. In the case of Raft having
// a leader but out internal tracking failing to find the leader we
// return a LeaderNotTracked error. Therefore if the err is nil AND
// the bool is false then the Server will be non-nil
func (s *Server) getLeader() (bool, *metadata.Server, error) {
// Check if we are the leader
if s.IsLeader() {
return true, nil
return true, nil, nil
}

// Get the leader
leader := s.raft.Leader()
if leader == "" {
return false, nil
return false, nil, structs.ErrNoLeader
}

// Lookup the server
server := s.serverLookup.Server(leader)

// Server could be nil
return false, server
// if server is nil this indicates that while we have a Raft leader
// something has caused that node to be considered unhealthy which
// cascades into its removal from the serverLookup struct. In this case
// we should not report no cluster leader but instead report a different
// error so as not to confuse our users as to the what the root cause of
// an issue might be.
if server == nil {
s.logger.Warn("Raft has a leader but other tracking of the node would indicate that the node is unhealthy or does not exist. The network may be misconfigured.", "leader", leader)
return false, nil, structs.ErrLeaderNotTracked
}

return false, server, nil
}

// forwardDC is used to forward an RPC call to a remote DC, or fail if no servers
Expand Down
51 changes: 50 additions & 1 deletion agent/consul/rpc_test.go
Expand Up @@ -3,6 +3,7 @@ package consul
import (
"bytes"
"encoding/binary"
"errors"
"math"
"net"
"os"
Expand Down Expand Up @@ -130,7 +131,7 @@ func TestRPC_NoLeader_Retry(t *testing.T) {

// This isn't sure-fire but tries to check that we don't have a
// leader going into the RPC, so we exercise the retry logic.
if ok, _ := s1.getLeader(); ok {
if ok, _, _ := s1.getLeader(); ok {
t.Fatalf("should not have a leader yet")
}

Expand All @@ -142,6 +143,54 @@ func TestRPC_NoLeader_Retry(t *testing.T) {
}
}

func TestRPC_getLeader_ErrLeaderNotTracked(t *testing.T) {
mkeeler marked this conversation as resolved.
Show resolved Hide resolved
if testing.Short() {
t.Skip("too slow for testing.Short")
}

cluster := newTestCluster(t, &testClusterConfig{
Datacenter: "dc1",
Servers: 3,
ServerWait: func(t *testing.T, srv *Server) {
// The test cluster waits for a leader to be established
// but not for all the RPC tracking of all servers to be updated
// so we also want to wait for that here
retry.Run(t, func(r *retry.R) {
if !srv.IsLeader() {
_, _, err := srv.getLeader()
require.NoError(r, err)
}
})

},
})

// At this point we know we have a cluster with a leader and all followers are tracking that
// leader in the serverLookup struct. We need to find a follower to hack its server lookup
// to force the error we desire

var follower *Server
for _, srv := range cluster.Servers {
if !srv.IsLeader() {
follower = srv
break
}
}

_, leaderMeta, err := follower.getLeader()
require.NoError(t, err)

// now do some behind the scenes trickery on the followers server lookup
// to remove the leader from it so that we can force a ErrLeaderNotTracked error
follower.serverLookup.RemoveServer(leaderMeta)

isLeader, meta, err := follower.getLeader()
require.Error(t, err)
require.True(t, errors.Is(err, structs.ErrLeaderNotTracked))
require.Nil(t, meta)
require.False(t, isLeader)
}

type MockSink struct {
*bytes.Buffer
cancel bool
Expand Down
6 changes: 3 additions & 3 deletions agent/consul/snapshot_endpoint.go
Expand Up @@ -48,9 +48,9 @@ func (s *Server) dispatchSnapshotRequest(args *structs.SnapshotRequest, in io.Re

// Perform leader forwarding if required.
if !args.AllowStale {
if isLeader, server := s.getLeader(); !isLeader {
if server == nil {
return nil, structs.ErrNoLeader
if isLeader, server, err := s.getLeader(); !isLeader {
if err != nil {
return nil, err
}
return SnapshotRPC(s.connPool, args.Datacenter, server.ShortName, server.Addr, args, in, reply)
}
Expand Down
2 changes: 2 additions & 0 deletions agent/structs/errors.go
Expand Up @@ -15,6 +15,7 @@ const (
errRPCRateExceeded = "RPC rate limit exceeded"
errServiceNotFound = "Service not found: "
errQueryNotFound = "Query not found"
errLeaderNotTracked = "Raft leader not found in server lookup mapping"
)

var (
Expand All @@ -26,6 +27,7 @@ var (
ErrRPCRateExceeded = errors.New(errRPCRateExceeded)
ErrDCNotAvailable = errors.New(errDCNotAvailable)
ErrQueryNotFound = errors.New(errQueryNotFound)
ErrLeaderNotTracked = errors.New(errLeaderNotTracked)
)

func IsErrNoDCPath(err error) bool {
Expand Down