diff --git a/api.go b/api.go index 01f573ce..cff2eaac 100644 --- a/api.go +++ b/api.go @@ -213,6 +213,10 @@ type Raft struct { // mainThreadSaturation measures the saturation of the main raft goroutine. mainThreadSaturation *saturationMetric + + // preVoteDisabled control if the pre-vote feature is activated, + // prevote feature is disabled if set to true. + preVoteDisabled bool } // BootstrapCluster initializes a server's storage with the given cluster @@ -531,6 +535,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna applyCh = make(chan *logFuture, conf.MaxAppendEntries) } + _, transportSupportPreVote := trans.(WithPreVote) // Create Raft struct. r := &Raft{ protocolVersion: protocolVersion, @@ -560,6 +565,10 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), mainThreadSaturation: newSaturationMetric([]string{"raft", "thread", "main", "saturation"}, 1*time.Second), + preVoteDisabled: conf.PreVoteDisabled || !transportSupportPreVote, + } + if !transportSupportPreVote && !conf.PreVoteDisabled { + r.logger.Warn("pre-vote is disabled because it is not supported by the Transport") } r.conf.Store(*conf) diff --git a/commands.go b/commands.go index 1a74e052..1ec76cb2 100644 --- a/commands.go +++ b/commands.go @@ -120,6 +120,40 @@ func (r *RequestVoteResponse) GetRPCHeader() RPCHeader { return r.RPCHeader } +// RequestPreVoteRequest is the command used by a candidate to ask a Raft peer +// for a vote in an election. +type RequestPreVoteRequest struct { + RPCHeader + + // Provide the term and our id + Term uint64 + + // Used to ensure safety + LastLogIndex uint64 + LastLogTerm uint64 +} + +// GetRPCHeader - See WithRPCHeader. +func (r *RequestPreVoteRequest) GetRPCHeader() RPCHeader { + return r.RPCHeader +} + +// RequestPreVoteResponse is the response returned from a RequestPreVoteRequest. +type RequestPreVoteResponse struct { + RPCHeader + + // Newer term if leader is out of date. + Term uint64 + + // Is the vote granted. + Granted bool +} + +// GetRPCHeader - See WithRPCHeader. +func (r *RequestPreVoteResponse) GetRPCHeader() RPCHeader { + return r.RPCHeader +} + // InstallSnapshotRequest is the command sent to a Raft peer to bootstrap its // log (and state machine) from a snapshot on another peer. type InstallSnapshotRequest struct { diff --git a/config.go b/config.go index b97b4338..d14392fc 100644 --- a/config.go +++ b/config.go @@ -232,6 +232,9 @@ type Config struct { // raft's configuration and index values. NoSnapshotRestoreOnStart bool + // PreVoteDisabled deactivate the pre-vote feature when set to true + PreVoteDisabled bool + // skipStartup allows NewRaft() to bypass all background work goroutines skipStartup bool } diff --git a/fuzzy/go.mod b/fuzzy/go.mod index c1c51543..196abb44 100644 --- a/fuzzy/go.mod +++ b/fuzzy/go.mod @@ -3,7 +3,7 @@ module github.com/hashicorp/raft/fuzzy go 1.20 require ( - github.com/hashicorp/go-hclog v1.5.0 + github.com/hashicorp/go-hclog v1.6.2 github.com/hashicorp/go-msgpack/v2 v2.1.1 github.com/hashicorp/raft v1.2.0 github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea diff --git a/fuzzy/go.sum b/fuzzy/go.sum index 1f99677f..de7a9127 100644 --- a/fuzzy/go.sum +++ b/fuzzy/go.sum @@ -33,6 +33,7 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= @@ -91,7 +92,10 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= -github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/fuzzy/transport.go b/fuzzy/transport.go index 5fdb4c7b..d6030d9b 100644 --- a/fuzzy/transport.go +++ b/fuzzy/transport.go @@ -221,6 +221,11 @@ func (t *transport) RequestVote(id raft.ServerID, target raft.ServerAddress, arg return t.sendRPC(string(target), args, resp) } +// RequestPreVote sends the appropriate RPC to the target node. +func (t *transport) RequestPreVote(id raft.ServerID, target raft.ServerAddress, args *raft.RequestPreVoteRequest, resp *raft.RequestPreVoteResponse) error { + return t.sendRPC(string(target), args, resp) +} + // InstallSnapshot is used to push a snapshot down to a follower. The data is read from // the ReadCloser and streamed to the client. func (t *transport) InstallSnapshot(id raft.ServerID, target raft.ServerAddress, args *raft.InstallSnapshotRequest, resp *raft.InstallSnapshotResponse, data io.Reader) error { diff --git a/inmem_transport.go b/inmem_transport.go index 5d9365b7..561ba73d 100644 --- a/inmem_transport.go +++ b/inmem_transport.go @@ -125,6 +125,18 @@ func (i *InmemTransport) RequestVote(id ServerID, target ServerAddress, args *Re return nil } +func (i *InmemTransport) RequestPreVote(id ServerID, target ServerAddress, args *RequestPreVoteRequest, resp *RequestPreVoteResponse) error { + rpcResp, err := i.makeRPC(target, args, nil, i.timeout) + if err != nil { + return err + } + + // Copy the result back + out := rpcResp.Response.(*RequestPreVoteResponse) + *resp = *out + return nil +} + // InstallSnapshot implements the Transport interface. func (i *InmemTransport) InstallSnapshot(id ServerID, target ServerAddress, args *InstallSnapshotRequest, resp *InstallSnapshotResponse, data io.Reader) error { rpcResp, err := i.makeRPC(target, args, data, 10*i.timeout) diff --git a/net_transport.go b/net_transport.go index bc34285e..5d91fb97 100644 --- a/net_transport.go +++ b/net_transport.go @@ -24,6 +24,7 @@ const ( rpcRequestVote rpcInstallSnapshot rpcTimeoutNow + rpcRequestPreVote // DefaultTimeoutScale is the default TimeoutScale in a NetworkTransport. DefaultTimeoutScale = 256 * 1024 // 256KB @@ -470,6 +471,11 @@ func (n *NetworkTransport) RequestVote(id ServerID, target ServerAddress, args * return n.genericRPC(id, target, rpcRequestVote, args, resp) } +// RequestPreVote implements the Transport interface. +func (n *NetworkTransport) RequestPreVote(id ServerID, target ServerAddress, args *RequestPreVoteRequest, resp *RequestPreVoteResponse) error { + return n.genericRPC(id, target, rpcRequestPreVote, args, resp) +} + // genericRPC handles a simple request/response RPC. func (n *NetworkTransport) genericRPC(id ServerID, target ServerAddress, rpcType uint8, args interface{}, resp interface{}) error { // Get a conn @@ -682,6 +688,13 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *codec.Decoder, en } rpc.Command = &req labels = []metrics.Label{{Name: "rpcType", Value: "RequestVote"}} + case rpcRequestPreVote: + var req RequestPreVoteRequest + if err := dec.Decode(&req); err != nil { + return err + } + rpc.Command = &req + labels = []metrics.Label{{Name: "rpcType", Value: "RequestPreVote"}} case rpcInstallSnapshot: var req InstallSnapshotRequest if err := dec.Decode(&req); err != nil { diff --git a/raft-compat/go.mod b/raft-compat/go.mod index 23ff53c7..5d86c2a9 100644 --- a/raft-compat/go.mod +++ b/raft-compat/go.mod @@ -7,13 +7,14 @@ require github.com/stretchr/testify v1.8.4 require ( github.com/armon/go-metrics v0.4.1 // indirect github.com/fatih/color v1.13.0 // indirect - github.com/hashicorp/go-hclog v1.5.0 // indirect + github.com/hashicorp/go-hclog v1.6.2 // indirect github.com/hashicorp/go-immutable-radix v1.0.0 // indirect github.com/hashicorp/go-msgpack v0.5.5 // indirect + github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect github.com/hashicorp/golang-lru v0.5.0 // indirect github.com/mattn/go-colorable v0.1.12 // indirect github.com/mattn/go-isatty v0.0.14 // indirect - golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 // indirect + golang.org/x/sys v0.13.0 // indirect ) replace github.com/hashicorp/raft-previous-version => ./raft-previous-version @@ -22,7 +23,7 @@ replace github.com/hashicorp/raft => ../ require ( github.com/davecgh/go-spew v1.1.1 // indirect - github.com/hashicorp/raft v1.2.0 + github.com/hashicorp/raft v1.6.1 github.com/hashicorp/raft-previous-version v1.2.0 github.com/pmezard/go-difflib v1.0.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/raft-compat/go.sum b/raft-compat/go.sum index ed3b7f04..9608c05c 100644 --- a/raft-compat/go.sum +++ b/raft-compat/go.sum @@ -31,10 +31,14 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/ github.com/hashicorp/go-cleanhttp v0.5.0/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtngrth3wmdIIUrZ80= github.com/hashicorp/go-hclog v1.5.0 h1:bI2ocEMgcVlz55Oj1xZNBsVi900c7II+fWDyV9o+13c= github.com/hashicorp/go-hclog v1.5.0/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= +github.com/hashicorp/go-hclog v1.6.2 h1:NOtoftovWkDheyUM/8JW3QMiXyxJK3uHRK7wV04nD2I= +github.com/hashicorp/go-hclog v1.6.2/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0 h1:AKDB1HM5PWEA7i4nhcpwOrO2byshxBjXVn/J/3+z5/0= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI= github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= +github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I= +github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= github.com/hashicorp/go-uuid v1.0.0 h1:RS8zrF7PhGwyNPOtxSClXXj9HA8feRnJzgnI1RJCSnM= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -113,6 +117,8 @@ golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6 h1:nonptSpoQ4vQjyraW20DXPAglgQfVnM9ZC6MmNLMR60= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= diff --git a/raft-compat/prevote_test.go b/raft-compat/prevote_test.go new file mode 100644 index 00000000..b21cc02d --- /dev/null +++ b/raft-compat/prevote_test.go @@ -0,0 +1,290 @@ +package raft_compat + +import ( + "github.com/hashicorp/raft" + raftprevious "github.com/hashicorp/raft-previous-version" + "github.com/hashicorp/raft/compat/testcluster" + "github.com/hashicorp/raft/compat/utils" + "github.com/stretchr/testify/require" + "testing" + "time" +) + +func TestRaft_PreVote_BootStrap_PreVote(t *testing.T) { + leaveTransfer := func(t *testing.T, cluster testcluster.RaftCluster, id string) { + if cluster.GetLeader().GetLocalID() == id { + transfer := cluster.Raft(id).(*raftprevious.Raft).LeadershipTransfer() + utils.WaitFuture(t, transfer) + } + f := cluster.Raft(id).(*raftprevious.Raft).Shutdown() + utils.WaitFuture(t, f) + } + leaveNoTransfer := func(t *testing.T, cluster testcluster.RaftCluster, id string) { + fr := cluster.GetLeader().GetRaft().(*raftprevious.Raft).RemoveServer(raftprevious.ServerID(id), 0, 0) + utils.WaitFuture(t, fr) + f := cluster.Raft(id).(*raftprevious.Raft).Shutdown() + utils.WaitFuture(t, f) + } + tcs := []struct { + name string + numNodes int + preVote bool + Leave func(t *testing.T, cluster testcluster.RaftCluster, id string) + }{ + {"no prevote -> prevote (leave transfer)", 3, true, leaveTransfer}, + {"no prevote -> prevote (leave no transfer)", 3, true, leaveNoTransfer}, + {"no prevote -> prevote (leave transfer) 5", 5, true, leaveTransfer}, + {"no prevote -> prevote (leave no transfer) 5", 5, true, leaveNoTransfer}, + {"no prevote -> no prevote (leave transfer)", 3, false, leaveTransfer}, + {"no prevote -> no prevote (leave no transfer)", 3, false, leaveNoTransfer}, + {"no prevote -> no prevote (leave transfer) 5", 5, false, leaveTransfer}, + {"no prevote -> no prevote (leave no transfer) 5", 5, false, leaveNoTransfer}, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + + cluster := testcluster.NewPreviousRaftCluster(t, tc.numNodes, "raftNode") + configuration := raftprevious.Configuration{} + + for i := 0; i < tc.numNodes; i++ { + var err error + require.NoError(t, err) + configuration.Servers = append(configuration.Servers, raftprevious.Server{ + ID: raftprevious.ServerID(cluster.ID(i)), + Address: raftprevious.ServerAddress(cluster.Addr(i)), + }) + } + raft0 := cluster.Raft(cluster.ID(0)).(*raftprevious.Raft) + boot := raft0.BootstrapCluster(configuration) + if err := boot.Error(); err != nil { + t.Fatalf("bootstrap err: %v", err) + } + utils.WaitForNewLeader(t, "", cluster) + getLeader := cluster.GetLeader() + require.NotEmpty(t, getLeader) + a, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.NotEmpty(t, a) + future := getLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test"), time.Second) + utils.WaitFuture(t, future) + + leader, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.NotEmpty(t, leader) + // Upgrade all the followers + for i := 0; i < tc.numNodes; i++ { + if getLeader.GetLocalID() == cluster.ID(i) { + continue + } + + // Check Leader haven't changed + a, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.Equal(t, a, leader) + tc.Leave(t, cluster, cluster.ID(i)) + + // Keep the store, to be passed to the upgraded node. + store := cluster.Store(cluster.ID(i)) + id := cluster.ID(i) + + //Delete the node from the cluster + cluster.DeleteNode(cluster.ID(i)) + + //Create an upgraded node with the store + rUIT := testcluster.InitUITWithStore(t, id, store.(*raftprevious.InmemStore), func(config *raft.Config) { + config.PreVoteDisabled = !tc.preVote + }) + future := getLeader.GetRaft().(*raftprevious.Raft).AddVoter(raftprevious.ServerID(rUIT.GetLocalID()), raftprevious.ServerAddress(rUIT.GetLocalAddr()), 0, 0) + utils.WaitFuture(t, future) + //Add the new node to the cluster + cluster.AddNode(rUIT) + + // Wait enough to have the configuration propagated. + time.Sleep(time.Second) + + //Apply some logs + future = getLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test2"), time.Second) + require.NoError(t, future.Error()) + + // Check Leader haven't changed as we haven't replaced the leader yet + a, _ = getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.Equal(t, a, leader) + } + // keep a reference to the store + store := cluster.Store(getLeader.GetLocalID()) + id := getLeader.GetLocalID() + + //Remove and shutdown the leader node + tc.Leave(t, cluster, getLeader.GetLocalID()) + + // Delete the old leader node from the cluster + cluster.DeleteNode(getLeader.GetLocalID()) + oldLeaderID := getLeader.GetLocalID() + + // Wait for a new leader to be elected + utils.WaitForNewLeader(t, oldLeaderID, cluster) + getLeader = cluster.GetLeader() + require.NotEmpty(t, getLeader) + + // Create a new node to replace the deleted one + rUIT := testcluster.InitUITWithStore(t, id, store.(*raftprevious.InmemStore), func(config *raft.Config) { config.PreVoteDisabled = false }) + fa := getLeader.GetRaft().(*raft.Raft).AddVoter(raft.ServerID(rUIT.GetLocalID()), raft.ServerAddress(rUIT.GetLocalAddr()), 0, 0) + utils.WaitFuture(t, fa) + + // Wait for new leader, (this happens because of not having prevote) + utils.WaitForNewLeader(t, "", cluster) + newLeaderID := rUIT.GetLeaderID() + require.NotEmpty(t, newLeaderID) + + require.NotEqual(t, newLeaderID, leader) + + newLeader := cluster.GetLeader() + //Apply some logs + future = newLeader.GetRaft().(*raft.Raft).Apply([]byte("test2"), time.Second) + require.NoError(t, future.Error()) + + // Check Leader haven't changed as we haven't replaced the leader yet + newAddr, _ := newLeader.GetRaft().(*raft.Raft).LeaderWithID() + require.Equal(t, string(newAddr), newLeader.GetLocalAddr()) + + require.Equal(t, tc.numNodes, rUIT.NumLogs()) + }) + } + +} + +func TestRaft_PreVote_Rollback(t *testing.T) { + leaveTransfer := func(t *testing.T, cluster testcluster.RaftCluster, id string) { + if cluster.GetLeader().GetLocalID() == id { + transfer := cluster.Raft(id).(*raft.Raft).LeadershipTransfer() + utils.WaitFuture(t, transfer) + } + f := cluster.Raft(id).(*raft.Raft).Shutdown() + utils.WaitFuture(t, f) + } + leaveNoTransfer := func(t *testing.T, cluster testcluster.RaftCluster, id string) { + fr := cluster.GetLeader().GetRaft().(*raft.Raft).RemoveServer(raft.ServerID(id), 0, 0) + utils.WaitFuture(t, fr) + f := cluster.Raft(id).(*raft.Raft).Shutdown() + utils.WaitFuture(t, f) + } + tcs := []struct { + name string + numNodes int + preVote bool + Leave func(t *testing.T, cluster testcluster.RaftCluster, id string) + }{ + {"no prevote -> prevote (leave transfer)", 3, true, leaveTransfer}, + {"no prevote -> prevote (leave no transfer)", 3, true, leaveNoTransfer}, + {"no prevote -> prevote (leave transfer) 5", 5, true, leaveTransfer}, + {"no prevote -> prevote (leave no transfer) 5", 5, true, leaveNoTransfer}, + {"no prevote -> no prevote (leave transfer)", 3, false, leaveTransfer}, + {"no prevote -> no prevote (leave no transfer)", 3, false, leaveNoTransfer}, + {"no prevote -> no prevote (leave transfer) 5", 5, false, leaveTransfer}, + {"no prevote -> no prevote (leave no transfer) 5", 5, false, leaveNoTransfer}, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + + cluster := testcluster.NewUITRaftCluster(t, tc.numNodes, "raftIUTNode") + configuration := raft.Configuration{} + + for i := 0; i < tc.numNodes; i++ { + var err error + require.NoError(t, err) + configuration.Servers = append(configuration.Servers, raft.Server{ + ID: raft.ServerID(cluster.ID(i)), + Address: raft.ServerAddress(cluster.Addr(i)), + }) + } + raft0 := cluster.Raft(cluster.ID(0)).(*raft.Raft) + boot := raft0.BootstrapCluster(configuration) + if err := boot.Error(); err != nil { + t.Fatalf("bootstrap err: %v", err) + } + utils.WaitForNewLeader(t, "", cluster) + getLeader := cluster.GetLeader() + require.NotEmpty(t, getLeader) + a, _ := getLeader.GetRaft().(*raft.Raft).LeaderWithID() + require.NotEmpty(t, a) + future := getLeader.GetRaft().(*raft.Raft).Apply([]byte("test"), time.Second) + utils.WaitFuture(t, future) + + leader, _ := getLeader.GetRaft().(*raft.Raft).LeaderWithID() + require.NotEmpty(t, leader) + // Upgrade all the followers + for i := 0; i < tc.numNodes; i++ { + if getLeader.GetLocalID() == cluster.ID(i) { + continue + } + + // Check Leader haven't changed + a, _ := getLeader.GetRaft().(*raft.Raft).LeaderWithID() + require.Equal(t, a, leader) + tc.Leave(t, cluster, cluster.ID(i)) + + // Keep the store, to be passed to the upgraded node. + store := cluster.Store(cluster.ID(i)) + id := cluster.ID(i) + + //Delete the node from the cluster + cluster.DeleteNode(cluster.ID(i)) + + //Create an upgraded node with the store + rUIT := testcluster.InitPreviousWithStore(t, id, store.(*raft.InmemStore), func(config *raftprevious.Config) { + }) + future := getLeader.GetRaft().(*raft.Raft).AddVoter(raft.ServerID(rUIT.GetLocalID()), raft.ServerAddress(rUIT.GetLocalAddr()), 0, 0) + utils.WaitFuture(t, future) + //Add the new node to the cluster + cluster.AddNode(rUIT) + + // Wait enough to have the configuration propagated. + time.Sleep(time.Second) + + //Apply some logs + future = getLeader.GetRaft().(*raft.Raft).Apply([]byte("test2"), time.Second) + require.NoError(t, future.Error()) + + // Check Leader haven't changed as we haven't replaced the leader yet + a, _ = getLeader.GetRaft().(*raft.Raft).LeaderWithID() + require.Equal(t, a, leader) + } + // keep a reference to the store + store := cluster.Store(getLeader.GetLocalID()) + id := getLeader.GetLocalID() + + //Remove and shutdown the leader node + tc.Leave(t, cluster, getLeader.GetLocalID()) + + // Delete the old leader node from the cluster + cluster.DeleteNode(getLeader.GetLocalID()) + oldLeaderID := getLeader.GetLocalID() + + // Wait for a new leader to be elected + utils.WaitForNewLeader(t, oldLeaderID, cluster) + getLeader = cluster.GetLeader() + require.NotEmpty(t, getLeader) + + // Create a new node to replace the deleted one + rUIT := testcluster.InitPreviousWithStore(t, id, store.(*raft.InmemStore), func(config *raftprevious.Config) {}) + fa := getLeader.GetRaft().(*raftprevious.Raft).AddVoter(raftprevious.ServerID(rUIT.GetLocalID()), raftprevious.ServerAddress(rUIT.GetLocalAddr()), 0, 0) + utils.WaitFuture(t, fa) + + // Wait for new leader, (this happens because of not having prevote) + utils.WaitForNewLeader(t, "", cluster) + newLeaderID := rUIT.GetLeaderID() + require.NotEmpty(t, newLeaderID) + + require.NotEqual(t, newLeaderID, leader) + + newLeader := cluster.GetLeader() + //Apply some logs + future = newLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test2"), time.Second) + require.NoError(t, future.Error()) + + // Check Leader haven't changed as we haven't replaced the leader yet + newAddr, _ := newLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.Equal(t, string(newAddr), newLeader.GetLocalAddr()) + + require.Equal(t, tc.numNodes, rUIT.NumLogs()) + }) + } + +} diff --git a/raft-compat/raft-previous-version b/raft-compat/raft-previous-version index 8fdc4ce5..b96f998f 160000 --- a/raft-compat/raft-previous-version +++ b/raft-compat/raft-previous-version @@ -1 +1 @@ -Subproject commit 8fdc4ce5b75cd34904974859fb8e57a2fc4dc145 +Subproject commit b96f998ff7e752c7eb68615f086a9c52008a40b6 diff --git a/raft-compat/rolling_upgrade_test.go b/raft-compat/rolling_upgrade_test.go index a18eaa25..516e3da4 100644 --- a/raft-compat/rolling_upgrade_test.go +++ b/raft-compat/rolling_upgrade_test.go @@ -1,6 +1,7 @@ package raft_compat import ( + "fmt" "github.com/hashicorp/raft" raftprevious "github.com/hashicorp/raft-previous-version" "github.com/hashicorp/raft/compat/testcluster" @@ -14,73 +15,143 @@ import ( // wait for it to join the cluster and remove one of the old nodes, until all nodes // are cycled func TestRaft_RollingUpgrade(t *testing.T) { + tcs := []struct { + Name string + Leave func(t *testing.T, cluster testcluster.RaftCluster, id string) + }{ + { + Name: "leave before shutdown", + Leave: func(t *testing.T, cluster testcluster.RaftCluster, id string) { + fr := cluster.GetLeader().GetRaft().(*raftprevious.Raft).RemoveServer(raftprevious.ServerID(id), 0, 0) + utils.WaitFuture(t, fr) + f := cluster.Raft(id).(*raftprevious.Raft).Shutdown() + utils.WaitFuture(t, f) + }, + }, + { + Name: "leader transfer", + Leave: func(t *testing.T, cluster testcluster.RaftCluster, id string) { + if cluster.GetLeader().GetLocalID() == id { + transfer := cluster.Raft(id).(*raftprevious.Raft).LeadershipTransfer() + utils.WaitFuture(t, transfer) + utils.WaitForNewLeader(t, id, cluster) + } + switch cluster.GetLeader().GetRaft().(type) { + case *raftprevious.Raft: + fr := cluster.GetLeader().GetRaft().(*raftprevious.Raft).RemoveServer(raftprevious.ServerID(id), 0, 0) + utils.WaitFuture(t, fr) + f := cluster.Raft(id).(*raftprevious.Raft).Shutdown() + utils.WaitFuture(t, f) + case *raft.Raft: + fr := cluster.GetLeader().GetRaft().(*raft.Raft).RemoveServer(raft.ServerID(id), 0, 0) + utils.WaitFuture(t, fr) + f := cluster.Raft(id).(*raftprevious.Raft).Shutdown() + utils.WaitFuture(t, f) + } + + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.Name, func(t *testing.T) { + initCount := 3 + cluster := testcluster.NewPreviousRaftCluster(t, initCount, "raftNode") + configuration := raftprevious.Configuration{} - initCount := 3 - rLatest := testcluster.NewRaftCluster(t, testcluster.InitLatest, initCount, "raftOld") - configuration := raftprevious.Configuration{} + for i := 0; i < initCount; i++ { + var err error + require.NoError(t, err) + configuration.Servers = append(configuration.Servers, raftprevious.Server{ + ID: raftprevious.ServerID(cluster.ID(i)), + Address: raftprevious.ServerAddress(cluster.Addr(i)), + }) + } + raft0 := cluster.Raft(cluster.ID(0)).(*raftprevious.Raft) + boot := raft0.BootstrapCluster(configuration) + if err := boot.Error(); err != nil { + t.Fatalf("bootstrap err: %v", err) + } + utils.WaitForNewLeader(t, "", cluster) + getLeader := cluster.GetLeader() + require.NotEmpty(t, getLeader) + a, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.NotEmpty(t, a) + future := getLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test"), time.Second) + utils.WaitFuture(t, future) + + leader, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.NotEmpty(t, leader) + // Upgrade all the followers + leaderIdx := 0 - for i := 0; i < initCount; i++ { - var err error - require.NoError(t, err) - configuration.Servers = append(configuration.Servers, raftprevious.Server{ - ID: raftprevious.ServerID(rLatest.ID(i)), - Address: raftprevious.ServerAddress(rLatest.Addr(i)), + followers := make([]string, 0) + for i := 0; i < initCount; i++ { + if getLeader.GetLocalID() == cluster.ID(i) { + leaderIdx = i + continue + } + followers = append(followers, cluster.ID(i)) + } + + for _, f := range followers { + require.NotEqual(t, f, getLeader.GetLocalID()) + // Check Leader haven't changed + a, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.Equal(t, a, leader) + + //Create an upgraded node with the store + rUIT := testcluster.InitUIT(t, fmt.Sprintf("%s-new", f)) + future := getLeader.GetRaft().(*raftprevious.Raft).AddVoter(raftprevious.ServerID(rUIT.GetLocalID()), raftprevious.ServerAddress(rUIT.GetLocalAddr()), 0, 0) + utils.WaitFuture(t, future) + + //Add the new node to the cluster + + tc.Leave(t, cluster, f) + + //Delete the node from the cluster + cluster.AddNode(rUIT) + cluster.DeleteNode(f) + } + + // Wait enough to have the configuration propagated. + time.Sleep(time.Second) + + //Apply some logs + future = getLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test2"), time.Second) + require.NoError(t, future.Error()) + + // Check Leader haven't changed as we haven't replaced the leader yet + a, _ = getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() + require.Equal(t, a, leader) + + //Remove and shutdown the leader node + tc.Leave(t, cluster, getLeader.GetLocalID()) + + // Delete the old leader node from the cluster + + oldLeaderID := getLeader.GetLocalID() + + // Wait for a new leader to be elected + utils.WaitForNewLeader(t, oldLeaderID, cluster) + getLeader = cluster.GetLeader() + require.NotEmpty(t, getLeader) + + // Create a new node to replace the deleted one + rUIT := testcluster.InitUIT(t, fmt.Sprintf("raftNew-%d", leaderIdx)) + fa := getLeader.GetRaft().(*raft.Raft).AddVoter(raft.ServerID(rUIT.GetLocalID()), raft.ServerAddress(rUIT.GetLocalAddr()), 0, 0) + utils.WaitFuture(t, fa) + + // Wait for new leader, (this happens because of not having prevote) + utils.WaitForNewLeader(t, "", cluster) + newLeader := rUIT.GetLeaderID() + require.NotEmpty(t, newLeader) + require.NotEqual(t, newLeader, leader) + + cluster.DeleteNode(getLeader.GetLocalID()) + require.Equal(t, rUIT.NumLogs(), 2) }) } - raft0 := rLatest.Raft(rLatest.ID(0)).(*raftprevious.Raft) - boot := raft0.BootstrapCluster(configuration) - if err := boot.Error(); err != nil { - t.Fatalf("bootstrap err: %v", err) - } - utils.WaitForNewLeader(t, "", rLatest) - getLeader := rLatest.GetLeader() - require.NotEmpty(t, getLeader) - a, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() - require.NotEmpty(t, a) - future := getLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test"), time.Second) - utils.WaitFuture(t, future) - - rUIT := testcluster.NewRaftCluster(t, testcluster.InitUIT, initCount, "raftNew") - leader, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() - require.NotEmpty(t, leader) - - // Upgrade all the followers - leaderIdx := 0 - for i := 0; i < initCount; i++ { - if getLeader.GetLocalID() == rLatest.ID(i) { - leaderIdx = i - continue - } - - future := getLeader.GetRaft().(*raftprevious.Raft).AddVoter(raftprevious.ServerID(rUIT.ID(i)), raftprevious.ServerAddress(rUIT.Addr(i)), 0, 0) - - utils.WaitFuture(t, future) - // Check Leader haven't changed as we are not replacing the leader - a, _ := getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() - require.Equal(t, a, leader) - getLeader.GetRaft().(*raftprevious.Raft).RemoveServer(raftprevious.ServerID(rLatest.ID(i)), 0, 0) - rLatest.Raft(rLatest.ID(i)).(*raftprevious.Raft).Shutdown() - } - future = getLeader.GetRaft().(*raftprevious.Raft).Apply([]byte("test2"), time.Second) - require.NoError(t, future.Error()) - - fa := getLeader.GetRaft().(*raftprevious.Raft).AddVoter(raftprevious.ServerID(rUIT.ID(leaderIdx)), raftprevious.ServerAddress(rUIT.Addr(leaderIdx)), 0, 0) - utils.WaitFuture(t, fa) - - // Check Leader haven't changed as we haven't replaced it yet - a, _ = getLeader.GetRaft().(*raftprevious.Raft).LeaderWithID() - require.Equal(t, a, leader) - fr := getLeader.GetRaft().(*raftprevious.Raft).RemoveServer(raftprevious.ServerID(rLatest.ID(leaderIdx)), 0, 0) - utils.WaitFuture(t, fr) - rLatest.Raft(getLeader.GetLocalID()).(*raftprevious.Raft).Shutdown() - utils.WaitForNewLeader(t, getLeader.GetLocalID(), rUIT) - newLeader := rUIT.GetLeader() - require.NotEmpty(t, newLeader) - aNew, _ := newLeader.GetRaft().(*raft.Raft).LeaderWithID() - require.NotEqual(t, aNew, leader) - - require.Equal(t, newLeader.NumLogs(), 2) - } // TestRaft_ReplaceUpgrade This test perform a rolling upgrade by removing an old node, @@ -124,7 +195,7 @@ func TestRaft_ReplaceUpgrade(t *testing.T) { for _, tc := range tcs { t.Run(tc.Name, func(t *testing.T) { initCount := 3 - cluster := testcluster.NewRaftCluster(t, testcluster.InitLatest, initCount, "raftOld") + cluster := testcluster.NewPreviousRaftCluster(t, initCount, "raftNode") configuration := raftprevious.Configuration{} for i := 0; i < initCount; i++ { @@ -171,7 +242,7 @@ func TestRaft_ReplaceUpgrade(t *testing.T) { cluster.DeleteNode(cluster.ID(i)) //Create an upgraded node with the store - rUIT := testcluster.InitUITWithStore(t, id, store.(*raftprevious.InmemStore)) + rUIT := testcluster.InitUITWithStore(t, id, store.(*raftprevious.InmemStore), func(config *raft.Config) {}) future := getLeader.GetRaft().(*raftprevious.Raft).AddVoter(raftprevious.ServerID(rUIT.GetLocalID()), raftprevious.ServerAddress(rUIT.GetLocalAddr()), 0, 0) utils.WaitFuture(t, future) //Add the new node to the cluster @@ -206,7 +277,7 @@ func TestRaft_ReplaceUpgrade(t *testing.T) { require.NotEmpty(t, getLeader) // Create a new node to replace the deleted one - rUIT := testcluster.InitUITWithStore(t, id, store.(*raftprevious.InmemStore)) + rUIT := testcluster.InitUITWithStore(t, id, store.(*raftprevious.InmemStore), func(config *raft.Config) {}) fa := getLeader.GetRaft().(*raft.Raft).AddVoter(raft.ServerID(rUIT.GetLocalID()), raft.ServerAddress(rUIT.GetLocalAddr()), 0, 0) utils.WaitFuture(t, fa) @@ -221,10 +292,3 @@ func TestRaft_ReplaceUpgrade(t *testing.T) { }) } } - -func leave(t *testing.T, cluster testcluster.RaftCluster, id string) { - fr := cluster.GetLeader().GetRaft().(*raftprevious.Raft).RemoveServer(raftprevious.ServerID(id), 0, 0) - utils.WaitFuture(t, fr) - f := cluster.Raft(id).(*raftprevious.Raft).Shutdown() - utils.WaitFuture(t, f) -} diff --git a/raft-compat/testcluster/cluster.go b/raft-compat/testcluster/cluster.go index 4096b606..8930287a 100644 --- a/raft-compat/testcluster/cluster.go +++ b/raft-compat/testcluster/cluster.go @@ -117,6 +117,14 @@ func NewRaftCluster(t *testing.T, f func(t *testing.T, id string) RaftNode, coun return rc } +func NewPreviousRaftCluster(t *testing.T, count int, name string) RaftCluster { + return NewRaftCluster(t, InitPrevious, count, name) +} + +func NewUITRaftCluster(t *testing.T, count int, name string) RaftCluster { + return NewRaftCluster(t, InitUIT, count, name) +} + func (r *RaftCluster) GetLeader() RaftNode { for _, n := range r.rafts { if n.GetLocalID() == n.GetLeaderID() { @@ -151,12 +159,13 @@ func (r *RaftCluster) GetIndex(id string) int { } func InitUIT(t *testing.T, id string) RaftNode { - return InitUITWithStore(t, id, nil) + return InitUITWithStore(t, id, nil, func(config *raft.Config) {}) } -func InitUITWithStore(t *testing.T, id string, store *raftprevious.InmemStore) RaftNode { +func InitUITWithStore(t *testing.T, id string, store *raftprevious.InmemStore, cfgMod func(config *raft.Config)) RaftNode { node := RaftUIT{} node.Config = raft.DefaultConfig() + cfgMod(node.Config) node.Config.HeartbeatTimeout = 50 * time.Millisecond node.Config.ElectionTimeout = 50 * time.Millisecond node.Config.LeaderLeaseTimeout = 50 * time.Millisecond @@ -164,7 +173,7 @@ func InitUITWithStore(t *testing.T, id string, store *raftprevious.InmemStore) R node.id = raft.ServerID(id) node.Config.LocalID = node.id if store != nil { - node.Store = convertInMemStore(store) + node.Store = convertInMemStoreToUIT(store) } else { node.Store = raft.NewInmemStore() } @@ -180,7 +189,12 @@ func InitUITWithStore(t *testing.T, id string, store *raftprevious.InmemStore) R return node } -func InitLatest(t *testing.T, id string) RaftNode { +func InitPrevious(t *testing.T, id string) RaftNode { + return InitPreviousWithStore(t, id, nil, func(config *raftprevious.Config) { + }) +} + +func InitPreviousWithStore(t *testing.T, id string, store *raft.InmemStore, f func(config *raftprevious.Config)) RaftNode { node := RaftLatest{} node.Config = raftprevious.DefaultConfig() node.Config.HeartbeatTimeout = 50 * time.Millisecond @@ -189,8 +203,13 @@ func InitLatest(t *testing.T, id string) RaftNode { node.Config.CommitTimeout = 5 * time.Millisecond node.id = raftprevious.ServerID(id) node.Config.LocalID = node.id + f(node.Config) - node.Store = raftprevious.NewInmemStore() + if store != nil { + node.Store = convertInMemStoreToPrevious(store) + } else { + node.Store = raftprevious.NewInmemStore() + } node.Snap = raftprevious.NewInmemSnapshotStore() node.fsm = &raftprevious.MockFSM{} var err error @@ -202,7 +221,7 @@ func InitLatest(t *testing.T, id string) RaftNode { return node } -func convertLog(ll *raftprevious.Log) *raft.Log { +func convertLogToUIT(ll *raftprevious.Log) *raft.Log { l := new(raft.Log) l.Index = ll.Index l.AppendedAt = ll.AppendedAt @@ -212,6 +231,16 @@ func convertLog(ll *raftprevious.Log) *raft.Log { l.Extensions = ll.Extensions return l } +func convertLogToPrevious(ll *raft.Log) *raftprevious.Log { + l := new(raftprevious.Log) + l.Index = ll.Index + l.AppendedAt = ll.AppendedAt + l.Type = raftprevious.LogType(ll.Type) + l.Term = ll.Term + l.Data = ll.Data + l.Extensions = ll.Extensions + return l +} var ( keyCurrentTerm = []byte("CurrentTerm") @@ -219,14 +248,45 @@ var ( keyLastVoteCand = []byte("LastVoteCand") ) -func convertInMemStore(s *raftprevious.InmemStore) *raft.InmemStore { +func convertInMemStoreToPrevious(s *raft.InmemStore) *raftprevious.InmemStore { + ss := raftprevious.NewInmemStore() + fi, _ := s.FirstIndex() + li, _ := s.LastIndex() + for i := fi; i <= li; i++ { + log := new(raft.Log) + s.GetLog(i, log) + ss.StoreLog(convertLogToPrevious(log)) + } + + get, _ := ss.Get(keyCurrentTerm) + ss.Set(keyCurrentTerm, get) + + get, _ = ss.Get(keyLastVoteTerm) + ss.Set(keyLastVoteTerm, get) + + get, _ = ss.Get(keyLastVoteCand) + ss.Set(keyLastVoteCand, get) + + get64, _ := ss.GetUint64(keyCurrentTerm) + ss.SetUint64(keyCurrentTerm, get64) + + get64, _ = ss.GetUint64(keyLastVoteTerm) + ss.SetUint64(keyLastVoteTerm, get64) + + get64, _ = ss.GetUint64(keyLastVoteCand) + ss.SetUint64(keyLastVoteCand, get64) + + return ss +} + +func convertInMemStoreToUIT(s *raftprevious.InmemStore) *raft.InmemStore { ss := raft.NewInmemStore() fi, _ := s.FirstIndex() li, _ := s.LastIndex() for i := fi; i <= li; i++ { log := new(raftprevious.Log) s.GetLog(i, log) - ss.StoreLog(convertLog(log)) + ss.StoreLog(convertLogToUIT(log)) } get, _ := ss.Get(keyCurrentTerm) diff --git a/raft.go b/raft.go index 28c11283..183f041a 100644 --- a/raft.go +++ b/raft.go @@ -8,6 +8,7 @@ import ( "container/list" "fmt" "io" + "strings" "sync/atomic" "time" @@ -17,8 +18,9 @@ import ( ) const ( - minCheckInterval = 10 * time.Millisecond - oldestLogGaugeInterval = 10 * time.Second + minCheckInterval = 10 * time.Millisecond + oldestLogGaugeInterval = 10 * time.Second + rpcUnexpectedCommandError = "unexpected command" ) var ( @@ -286,7 +288,16 @@ func (r *Raft) runCandidate() { metrics.IncrCounter([]string{"raft", "state", "candidate"}, 1) // Start vote for us, and set a timeout - voteCh := r.electSelf() + var voteCh <-chan *voteResult + var prevoteCh <-chan *preVoteResult + + // check if pre-vote is active and that this is not a leader transfer. + // Leader transfer do not perform prevote by design + if !r.preVoteDisabled && !r.candidateFromLeadershipTransfer.Load() { + prevoteCh = r.preElectSelf() + } else { + voteCh = r.electSelf() + } // Make sure the leadership transfer flag is reset after each run. Having this // flag will set the field LeadershipTransfer in a RequestVoteRequst to true, @@ -299,6 +310,8 @@ func (r *Raft) runCandidate() { electionTimer := randomTimeout(electionTimeout) // Tally the votes, need a simple majority + preVoteGrantedVotes := 0 + preVoteRefusedVotes := 0 grantedVotes := 0 votesNeeded := r.quorumSize() r.logger.Debug("calculated votes needed", "needed", votesNeeded, "term", term) @@ -310,7 +323,43 @@ func (r *Raft) runCandidate() { case rpc := <-r.rpcCh: r.mainThreadSaturation.working() r.processRPC(rpc) + case preVote := <-prevoteCh: + // This a pre-vote case it should trigger a "real" election if the pre-vote is won. + r.mainThreadSaturation.working() + r.logger.Debug("pre-vote received", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes) + // Check if the term is greater than ours, bail + if preVote.Term > term { + r.logger.Debug("pre-vote denied: found newer term, falling back to follower", "term", preVote.Term) + r.setState(Follower) + r.setCurrentTerm(preVote.Term) + return + } + // Check if the preVote is granted + if preVote.Granted { + preVoteGrantedVotes++ + r.logger.Debug("pre-vote granted", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes) + } else { + preVoteRefusedVotes++ + r.logger.Debug("pre-vote denied", "from", preVote.voterID, "term", preVote.Term, "tally", preVoteGrantedVotes) + } + + // Check if we've won the pre-vote and proceed to election if so + if preVoteGrantedVotes >= votesNeeded { + r.logger.Info("pre-vote successful, starting election", "term", preVote.Term, + "tally", preVoteGrantedVotes, "refused", preVoteRefusedVotes, "votesNeeded", votesNeeded) + preVoteGrantedVotes = 0 + preVoteRefusedVotes = 0 + electionTimer = randomTimeout(electionTimeout) + prevoteCh = nil + voteCh = r.electSelf() + } + // Check if we've lost the pre-vote and wait for the election to timeout so we can do another time of + // prevote. + if preVoteRefusedVotes >= votesNeeded { + r.logger.Info("pre-vote campaign failed, waiting for election timeout", "term", preVote.Term, + "tally", preVoteGrantedVotes, "refused", preVoteRefusedVotes, "votesNeeded", votesNeeded) + } case vote := <-voteCh: r.mainThreadSaturation.working() // Check if the term is greater than ours, bail @@ -334,7 +383,6 @@ func (r *Raft) runCandidate() { r.setLeader(r.localAddr, r.localID) return } - case c := <-r.configurationChangeCh: r.mainThreadSaturation.working() // Reject any operations since we are not the leader @@ -1350,6 +1398,8 @@ func (r *Raft) processRPC(rpc RPC) { r.appendEntries(rpc, cmd) case *RequestVoteRequest: r.requestVote(rpc, cmd) + case *RequestPreVoteRequest: + r.requestPreVote(rpc, cmd) case *InstallSnapshotRequest: r.installSnapshot(rpc, cmd) case *TimeoutNowRequest: @@ -1357,7 +1407,8 @@ func (r *Raft) processRPC(rpc RPC) { default: r.logger.Error("got unexpected command", "command", hclog.Fmt("%#v", rpc.Command)) - rpc.Respond(nil, fmt.Errorf("unexpected command")) + + rpc.Respond(nil, fmt.Errorf(rpcUnexpectedCommandError)) } } @@ -1615,6 +1666,7 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { r.logger.Debug("lost leadership because received a requestVote with a newer term") r.setState(Follower) r.setCurrentTerm(req.Term) + resp.Term = req.Term } @@ -1680,6 +1732,82 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { r.setLastContact() } +// requestPreVote is invoked when we get a request Pre-Vote RPC call. +func (r *Raft) requestPreVote(rpc RPC, req *RequestPreVoteRequest) { + defer metrics.MeasureSince([]string{"raft", "rpc", "requestVote"}, time.Now()) + r.observe(*req) + + // Setup a response + resp := &RequestPreVoteResponse{ + RPCHeader: r.getRPCHeader(), + Term: r.getCurrentTerm(), + Granted: false, + } + var rpcErr error + defer func() { + rpc.Respond(resp, rpcErr) + }() + + // Check if we have an existing leader [who's not the candidate] and also + var candidate ServerAddress + candidateID := ServerID(req.ID) + + // if the Servers list is empty that mean the cluster is very likely trying to bootstrap, + // Grant the vote + if len(r.configurations.latest.Servers) > 0 && !inConfiguration(r.configurations.latest, candidateID) { + r.logger.Warn("rejecting pre-vote request since node is not in configuration", + "from", candidate) + return + } + + if leaderAddr, leaderID := r.LeaderWithID(); leaderAddr != "" && leaderAddr != candidate { + r.logger.Warn("rejecting pre-vote request since we have a leader", + "from", candidate, + "leader", leaderAddr, + "leader-id", string(leaderID)) + return + } + + // Ignore an older term + if req.Term < r.getCurrentTerm() { + return + } + + if req.Term > r.getCurrentTerm() { + // continue processing here to possibly grant the pre-vote as in a "real" vote this will transition us to follower + r.logger.Debug("received a requestPreVote with a newer term, grant the pre-vote") + resp.Term = req.Term + } + + // if we get a request for a pre-vote from a nonVoter and the request term is higher, do not grant the Pre-Vote + // This could happen when a node, previously voter, is converted to non-voter + if len(r.configurations.latest.Servers) > 0 && !hasVote(r.configurations.latest, candidateID) { + r.logger.Warn("rejecting pre-vote request since node is not a voter", "from", candidate) + return + } + + // Reject if their term is older + lastIdx, lastTerm := r.getLastEntry() + if lastTerm > req.LastLogTerm { + r.logger.Warn("rejecting pre-vote request since our last term is greater", + "candidate", candidate, + "last-term", lastTerm, + "last-candidate-term", req.LastLogTerm) + return + } + + if lastTerm == req.LastLogTerm && lastIdx > req.LastLogIndex { + r.logger.Warn("rejecting pre-vote request since our last index is greater", + "candidate", candidate, + "last-index", lastIdx, + "last-candidate-index", req.LastLogIndex) + return + } + + resp.Granted = true + r.setLastContact() +} + // installSnapshot is invoked when we get a InstallSnapshot RPC call. // We must be in the follower state for this, since it means we are // too far behind a leader for log replay. This must only be called @@ -1837,6 +1965,11 @@ type voteResult struct { voterID ServerID } +type preVoteResult struct { + RequestPreVoteResponse + voterID ServerID +} + // electSelf is used to send a RequestVote RPC to all peers, and vote for // ourself. This has the side affecting of incrementing the current term. The // response channel returned is used to wait for all the responses (including a @@ -1846,13 +1979,14 @@ func (r *Raft) electSelf() <-chan *voteResult { respCh := make(chan *voteResult, len(r.configurations.latest.Servers)) // Increment the term - r.setCurrentTerm(r.getCurrentTerm() + 1) + newTerm := r.getCurrentTerm() + 1 + r.setCurrentTerm(newTerm) // Construct the request lastIdx, lastTerm := r.getLastEntry() req := &RequestVoteRequest{ RPCHeader: r.getRPCHeader(), - Term: r.getCurrentTerm(), + Term: newTerm, // this is needed for retro compatibility, before RPCHeader.Addr was added Candidate: r.trans.EncodePeer(r.localID, r.localAddr), LastLogIndex: lastIdx, @@ -1883,10 +2017,12 @@ func (r *Raft) electSelf() <-chan *voteResult { if server.Suffrage == Voter { if server.ID == r.localID { r.logger.Debug("voting for self", "term", req.Term, "id", r.localID) + // Persist a vote for ourselves if err := r.persistVote(req.Term, req.RPCHeader.Addr); err != nil { r.logger.Error("failed to persist vote", "error", err) return nil + } // Include our own vote respCh <- &voteResult{ @@ -1907,6 +2043,90 @@ func (r *Raft) electSelf() <-chan *voteResult { return respCh } +// preElectSelf is used to send a RequestPreVote RPC to all peers, and vote for +// ourself. This will not increment the current term. The +// response channel returned is used to wait for all the responses (including a +// vote for ourself). +// This must only be called from the main thread. +func (r *Raft) preElectSelf() <-chan *preVoteResult { + + // At this point transport should support pre-vote + // but check just in case + prevoteTrans, prevoteTransSupported := r.trans.(WithPreVote) + if !prevoteTransSupported { + panic("preElection is not possible if the transport don't support pre-vote") + } + + // Create a response channel + respCh := make(chan *preVoteResult, len(r.configurations.latest.Servers)) + + // Propose the next term without actually changing our state + newTerm := r.getCurrentTerm() + 1 + + // Construct the request + lastIdx, lastTerm := r.getLastEntry() + req := &RequestPreVoteRequest{ + RPCHeader: r.getRPCHeader(), + Term: newTerm, + LastLogIndex: lastIdx, + LastLogTerm: lastTerm, + } + + // Construct a function to ask for a vote + askPeer := func(peer Server) { + r.goFunc(func() { + defer metrics.MeasureSince([]string{"raft", "candidate", "preElectSelf"}, time.Now()) + resp := &preVoteResult{voterID: peer.ID} + + err := prevoteTrans.RequestPreVote(peer.ID, peer.Address, req, &resp.RequestPreVoteResponse) + + // If the target server do not support Pre-vote RPC we count this as a granted vote to allow + // the cluster to progress. + if err != nil && strings.Contains(err.Error(), rpcUnexpectedCommandError) { + r.logger.Error("target does not support pre-vote RPC, treating as granted", + "target", peer, + "error", err, + "term", req.Term) + resp.Term = req.Term + resp.Granted = true + } else if err != nil { + r.logger.Error("failed to make requestVote RPC", + "target", peer, + "error", err, + "term", req.Term) + resp.Term = req.Term + resp.Granted = false + } + respCh <- resp + + }) + } + + // For each peer, request a vote + for _, server := range r.configurations.latest.Servers { + if server.Suffrage == Voter { + if server.ID == r.localID { + r.logger.Debug("pre-voting for self", "term", req.Term, "id", r.localID) + + // cast a pre-vote for our self + respCh <- &preVoteResult{ + RequestPreVoteResponse: RequestPreVoteResponse{ + RPCHeader: r.getRPCHeader(), + Term: req.Term, + Granted: true, + }, + voterID: r.localID, + } + } else { + r.logger.Debug("asking for pre-vote", "term", req.Term, "from", server.ID, "address", server.Address) + askPeer(server) + } + } + } + + return respCh +} + // persistVote is used to persist our vote for safety. func (r *Raft) persistVote(term uint64, candidate []byte) error { if err := r.stable.SetUint64(keyLastVoteTerm, term); err != nil { diff --git a/raft_test.go b/raft_test.go index 3eaf1e3c..d6d2c030 100644 --- a/raft_test.go +++ b/raft_test.go @@ -2061,6 +2061,99 @@ func TestRaft_AppendEntry(t *testing.T) { require.True(t, resp2.Success) } +// TestRaft_PreVoteMixedCluster focus on testing a cluster with +// a mix of nodes that have pre-vote activated and deactivated. +// Once the cluster is created, we force an election by partioning the leader +// and verify that the cluster regain stability. +func TestRaft_PreVoteMixedCluster(t *testing.T) { + + tcs := []struct { + name string + prevoteNum int + noprevoteNum int + }{ + {"majority no pre-vote", 2, 3}, + {"majority pre-vote", 3, 2}, + {"majority no pre-vote", 1, 2}, + {"majority pre-vote", 2, 1}, + {"all pre-vote", 3, 0}, + {"all no pre-vote", 0, 3}, + } + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + + // Make majority cluster. + majority := tc.prevoteNum + minority := tc.noprevoteNum + if tc.prevoteNum < tc.noprevoteNum { + majority = tc.noprevoteNum + minority = tc.prevoteNum + } + + conf := inmemConfig(t) + conf.PreVoteDisabled = tc.prevoteNum <= tc.noprevoteNum + c := MakeCluster(majority, t, conf) + defer c.Close() + + // Set up another server speaking protocol version 2. + conf = inmemConfig(t) + conf.PreVoteDisabled = tc.prevoteNum >= tc.noprevoteNum + c1 := MakeClusterNoBootstrap(minority, t, conf) + + // Merge clusters. + c.Merge(c1) + c.FullyConnect() + + for _, r := range c1.rafts { + future := c.Leader().AddVoter(r.localID, r.localAddr, 0, 0) + if err := future.Error(); err != nil { + t.Fatalf("err: %v", err) + } + } + time.Sleep(c.propagateTimeout * 10) + + leaderOld := c.Leader() + c.Followers() + c.Partition([]ServerAddress{leaderOld.localAddr}) + time.Sleep(c.propagateTimeout * 3) + leader := c.Leader() + require.NotEqual(t, leader.leaderID, leaderOld.leaderID) + }) + } + +} + +func TestRaft_PreVoteAvoidElectionWithPartition(t *testing.T) { + // Make a prevote cluster. + conf := inmemConfig(t) + conf.PreVoteDisabled = false + c := MakeCluster(5, t, conf) + defer c.Close() + + oldLeaderTerm := c.Leader().getCurrentTerm() + followers := c.Followers() + require.Len(t, followers, 4) + + //Partition a node and wait enough for it to increase its term + c.Partition([]ServerAddress{followers[0].localAddr}) + time.Sleep(10 * c.propagateTimeout) + + // Check the leader is stable and the followers are as expected + leaderTerm := c.Leader().getCurrentTerm() + require.Equal(t, leaderTerm, oldLeaderTerm) + require.Len(t, c.WaitForFollowers(3), 3) + + // reconnect the partitioned node + c.FullyConnect() + time.Sleep(3 * c.propagateTimeout) + + // Check that the number of followers increase and the term is not increased + require.Len(t, c.Followers(), 4) + leaderTerm = c.Leader().getCurrentTerm() + require.Equal(t, leaderTerm, oldLeaderTerm) + +} + func TestRaft_VotingGrant_WhenLeaderAvailable(t *testing.T) { conf := inmemConfig(t) conf.ProtocolVersion = 3 @@ -3065,7 +3158,6 @@ func TestRaft_VoteWithNoIDNoAddr(t *testing.T) { var resp RequestVoteResponse followerT := c.trans[c.IndexOf(followers[1])] c.Partition([]ServerAddress{leader.localAddr}) - time.Sleep(c.propagateTimeout) // wait for the remaining follower to trigger an election waitForState(follower, Candidate) diff --git a/testing.go b/testing.go index e0885714..351a9aba 100644 --- a/testing.go +++ b/testing.go @@ -21,13 +21,13 @@ import ( var userSnapshotErrorsOnNoData = true // Return configurations optimized for in-memory -func inmemConfig(tb testing.TB) *Config { +func inmemConfig(t testing.TB) *Config { conf := DefaultConfig() conf.HeartbeatTimeout = 50 * time.Millisecond conf.ElectionTimeout = 50 * time.Millisecond conf.LeaderLeaseTimeout = 50 * time.Millisecond conf.CommitTimeout = 5 * time.Millisecond - conf.Logger = newTestLogger(tb) + conf.Logger = newTestLogger(t) return conf } @@ -211,7 +211,7 @@ func newTestLogger(tb testing.TB) hclog.Logger { // is logged after the test is complete. func newTestLoggerWithPrefix(tb testing.TB, prefix string) hclog.Logger { if testing.Verbose() { - return hclog.New(&hclog.LoggerOptions{Name: prefix}) + return hclog.New(&hclog.LoggerOptions{Name: prefix, Level: hclog.Trace}) } return hclog.New(&hclog.LoggerOptions{ @@ -501,6 +501,12 @@ func (c *cluster) Leader() *Raft { // state. func (c *cluster) Followers() []*Raft { expFollowers := len(c.rafts) - 1 + return c.WaitForFollowers(expFollowers) +} + +// WaitForFollowers waits for the cluster to have a given number of followers and stay in a stable +// state. +func (c *cluster) WaitForFollowers(expFollowers int) []*Raft { followers := c.GetInState(Follower) if len(followers) != expFollowers { c.t.Fatalf("timeout waiting for %d followers (followers are %v)", expFollowers, followers) diff --git a/transport.go b/transport.go index 054fa624..c64fff6e 100644 --- a/transport.go +++ b/transport.go @@ -66,6 +66,16 @@ type Transport interface { TimeoutNow(id ServerID, target ServerAddress, args *TimeoutNowRequest, resp *TimeoutNowResponse) error } +// WithPreVote is an interface that a transport may provide which +// allows a transport to support a PreVote request. +// +// It is defined separately from Transport as unfortunately it wasn't in the +// original interface specification. +type WithPreVote interface { + // RequestPreVote sends the appropriate RPC to the target node. + RequestPreVote(id ServerID, target ServerAddress, args *RequestPreVoteRequest, resp *RequestPreVoteResponse) error +} + // WithClose is an interface that a transport may provide which // allows a transport to be shut down cleanly when a Raft instance // shuts down. @@ -81,9 +91,10 @@ type WithClose interface { // LoopbackTransport is an interface that provides a loopback transport suitable for testing // e.g. InmemTransport. It's there so we don't have to rewrite tests. type LoopbackTransport interface { - Transport // Embedded transport reference - WithPeers // Embedded peer management - WithClose // with a close routine + Transport // Embedded transport reference + WithPeers // Embedded peer management + WithClose // with a close routine + WithPreVote // with a prevote } // WithPeers is an interface that a transport may provide which allows for connection and