From 65836607f75050df5d8925d568dd23bf3dce0bf3 Mon Sep 17 00:00:00 2001 From: Alex Bligh Date: Mon, 21 Mar 2016 18:12:45 +0000 Subject: [PATCH] Make raft_test.go far more resilient * Add observations - emitted when something happens * Makefile: Change test timeout in Makefile to 30s * raft_test.go: Change default commit timeout to 5ms * raft_test.go: Centralise all references to time in a single place. * raft_test.go: Make logger work consistently and output time in microseconds (very useful for debugging). Convert all logging to use the cluster logger. * raft_test.go: provide c.Failf function that consistently produces the output, in log format, with timestamps. Convert use of panic() and t.Fatalf() to c.Failf() * raft_test.go: rewrite GetInState() so it is now reliable, i.e. by waiting for the state to remain stable for a given period of time. * raft_test.go: provide WaitEventChan() and WaitEvent() which wait for 'something to happen' or a timeout. * raft_test.go: provide WaitForReplication() which waits until the FSM has a supplied number of logs on each node. * raft_test.go: rewrite Leaders() and Followers() to be much more simple now GetInState() is reliable. * raft_test.go: rewrite EnsureLeader() now Leaders() is reliable. Signed-off-by: Alex Bligh --- Makefile | 2 +- observer.go | 88 +++++++ raft.go | 18 ++ raft_test.go | 713 +++++++++++++++++++++++++++++++++------------------ 4 files changed, 566 insertions(+), 255 deletions(-) create mode 100644 observer.go diff --git a/Makefile b/Makefile index bf8b1e3ee..556aa2e20 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...) test: - go test -timeout=15s ./... + go test -timeout=30s ./... integ: test INTEG_TESTS=yes go test -timeout=3s -run=Integ ./... diff --git a/observer.go b/observer.go new file mode 100644 index 000000000..cf950dd6a --- /dev/null +++ b/observer.go @@ -0,0 +1,88 @@ +package raft + +import ( + "sync/atomic" +) + +type Observation struct { + Raft *Raft + Data interface{} +} + +type LeaderObservation struct { + leader string +} + +var nextObserverId uint64 + +// Observer describes what to do with a given observation +type Observer struct { + channel chan Observation // channel of observations + blocking bool // whether it should block in order to write an observation (generally no) + numObserved uint64 // number observed + numDropped uint64 // number dropped + id uint64 // ID of this observer in the raft map + filter func(o *Observation) bool // filter to apply to determine whether observation should be sent to channel +} + +// Register a new observer +func (r *Raft) RegisterObserver(or *Observer) { + r.observerLock.Lock() + defer r.observerLock.Unlock() + r.observers[or.id] = or +} + +// Deregister an observer +func (r *Raft) DeregisterObserver(or *Observer) { + r.observerLock.Lock() + defer r.observerLock.Unlock() + delete(r.observers, or.id) +} + +// Send an observation to every observer +func (r *Raft) observe(o interface{}) { + // we hold this mutex whilst observers (potentially) block. + // In general observers should not block. But in any case this isn't + // disastrous as we only hold a read lock, which merely prevents + // registration / deregistration of observers + ob := Observation{Raft: r, Data: o} + r.observerLock.RLock() + defer r.observerLock.RUnlock() + for _, or := range r.observers { + if or.filter != nil { + if !or.filter(&ob) { + continue + } + } + if or.channel == nil { + return + } + if or.blocking { + or.channel <- ob + atomic.AddUint64(&or.numObserved, 1) + } else { + select { + case or.channel <- ob: + atomic.AddUint64(&or.numObserved, 1) + default: + atomic.AddUint64(&or.numDropped, 1) + } + } + } +} + +// get performance counters for an observer +func (or *Observer) GetCounters() (uint64, uint64, error) { + return atomic.LoadUint64(&or.numObserved), atomic.LoadUint64(&or.numDropped), nil +} + +// Create a new observer with the specified channel, blocking status, and filter (filter can be nil) +func NewObserver(channel chan Observation, blocking bool, filter func(o *Observation) bool) *Observer { + ob := &Observer{ + channel: channel, + blocking: blocking, + filter: filter, + id: atomic.AddUint64(&nextObserverId, 1), + } + return ob +} diff --git a/raft.go b/raft.go index 034dcfc60..e4b8a5c58 100644 --- a/raft.go +++ b/raft.go @@ -75,6 +75,9 @@ type leaderState struct { type Raft struct { raftState + // the previously observed raft state + observedRaftState RaftState + // applyCh is used to async send logs to the main thread to // be committed and applied to the FSM. applyCh chan *logFuture @@ -147,6 +150,10 @@ type Raft struct { // verifyCh is used to async send verify futures to the main thread // to verify we are still the leader verifyCh chan *verifyFuture + + // list of observers and the mutex that protects them + observerLock sync.RWMutex + observers map[uint64]*Observer } // NewRaft is used to construct a new Raft node. It takes a configuration, as well @@ -221,6 +228,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna stable: stable, trans: trans, verifyCh: make(chan *verifyFuture, 64), + observers: make(map[uint64]*Observer), } // Initialize as a follower @@ -267,8 +275,12 @@ func (r *Raft) Leader() string { // setLeader is used to modify the current leader of the cluster func (r *Raft) setLeader(leader string) { r.leaderLock.Lock() + oldLeader := r.leader r.leader = leader r.leaderLock.Unlock() + if oldLeader != r.leader { + r.observe(LeaderObservation{leader: leader}) + } } // Apply is used to apply a command to the FSM in a highly consistent @@ -1418,6 +1430,8 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) { rpc.Respond(resp, rpcErr) }() + r.observe(*req) + // Check if we have an existing leader [who's not the candidate] candidate := r.trans.DecodePeer(req.Candidate) if leader := r.Leader(); leader != "" && leader != candidate { @@ -1695,7 +1709,11 @@ func (r *Raft) setCurrentTerm(t uint64) { // that leader should be set only after updating the state. func (r *Raft) setState(state RaftState) { r.setLeader("") + oldState := r.raftState.getState() r.raftState.setState(state) + if oldState != state { + r.observe(state) + } } // runSnapshots is a long running goroutine used to manage taking diff --git a/raft_test.go b/raft_test.go index 5b33b2cbf..2397addbc 100644 --- a/raft_test.go +++ b/raft_test.go @@ -72,7 +72,7 @@ func inmemConfig(t *testing.T) *Config { conf.HeartbeatTimeout = 50 * time.Millisecond conf.ElectionTimeout = 50 * time.Millisecond conf.LeaderLeaseTimeout = 50 * time.Millisecond - conf.CommitTimeout = time.Millisecond + conf.CommitTimeout = 5 * time.Millisecond conf.Logger = newTestLogger(t) return conf } @@ -81,29 +81,50 @@ func inmemConfig(t *testing.T) *Config { // map them into calls to testing.T.Log, so that you only see // the logging for failed tests. type testLoggerAdapter struct { - t *testing.T + t *testing.T + prefix string } func (a *testLoggerAdapter) Write(d []byte) (int, error) { if d[len(d)-1] == '\n' { d = d[:len(d)-1] } - a.t.Log(string(d)) - return len(d), nil + if a.prefix != "" { + l := a.prefix + ": " + string(d) + a.t.Log(l) + return len(l), nil + } else { + a.t.Log(string(d)) + return len(d), nil + } } func newTestLogger(t *testing.T) *log.Logger { - return log.New(&testLoggerAdapter{t}, "", 0) + return log.New(&testLoggerAdapter{t: t}, "", log.Lmicroseconds) +} + +func newTestLoggerWithPrefix(t *testing.T, prefix string) *log.Logger { + return log.New(&testLoggerAdapter{t: t, prefix: prefix}, "", log.Lmicroseconds) } type cluster struct { - dirs []string - stores []*InmemStore - fsms []*MockFSM - snaps []*FileSnapshotStore - trans []LoopbackTransport - rafts []*Raft - t *testing.T + dirs []string + stores []*InmemStore + fsms []*MockFSM + snaps []*FileSnapshotStore + trans []LoopbackTransport + rafts []*Raft + t *testing.T + observationCh chan Observation + conf *Config + propagateTimeout time.Duration + longstopTimeout time.Duration + logger *log.Logger + startTime time.Time + + failedMutex sync.Mutex + failedCh chan struct{} + failed bool } func (c *cluster) Merge(other *cluster) { @@ -115,6 +136,42 @@ func (c *cluster) Merge(other *cluster) { c.rafts = append(c.rafts, other.rafts...) } +func (c *cluster) markFailed() { + defer c.failedMutex.Unlock() + c.failedMutex.Lock() + if !c.failed { + c.failed = true + close(c.failedCh) + } +} + +// provide a logging function that fails the tests, prints the output +// with microseconds, and does not mysteriously eat the string +func (c *cluster) Failf(format string, args ...interface{}) { + c.logger.Printf(format, args...) + c.t.Fail() + c.markFailed() +} + +// provide a logging function that fails the tests, prints the output +// with microseconds, and does not mysteriously eat the string +func (c *cluster) FailNowf(format string, args ...interface{}) { + c.logger.Printf(format, args...) + c.t.FailNow() +} + +// Check whether something has failed (main thread only) and exit if so +func (c *cluster) CheckFailed() { + if c.t.Failed() { + c.t.FailNow() + } + select { + case <-c.failedCh: + c.t.FailNow() + default: + } +} + func (c *cluster) Close() { var futures []Future for _, r := range c.rafts { @@ -122,69 +179,196 @@ func (c *cluster) Close() { } // Wait for shutdown - timer := time.AfterFunc(200*time.Millisecond, func() { + timer := time.AfterFunc(c.longstopTimeout, func() { + // we can't Failfnow here, and c.Failf won't do anything if we hang, so panic + c.Failf("[ERROR] timed out waiting for shutdown") panic("timed out waiting for shutdown") }) + defer timer.Stop() for _, f := range futures { if err := f.Error(); err != nil { - panic(fmt.Errorf("shutdown future err: %v", err)) + c.FailNowf("[ERROR] shutdown future err: %v", err) } } - timer.Stop() for _, d := range c.dirs { os.RemoveAll(d) } + + c.markFailed() // this doesn't fail the test, merely closes the channel } -func (c *cluster) GetInState(s RaftState) []*Raft { +// returns a channel which will signal if either something happens or a small amount of time has passed. +// It is possible to set a filter on the small amount of time. Setting timeout to 0 means that it will wait until +// something happens. +func (c *cluster) WaitEventChan(filter func(ob *Observation) bool, timeout time.Duration) <-chan struct{} { + ch := make(chan struct{}) + go func() { + defer close(ch) + var timeoutCh <-chan time.Time + if timeout > 0 { + timeoutCh = time.After(c.conf.CommitTimeout) + } + for { + select { + case <-timeoutCh: + return + case o, ok := <-c.observationCh: + if !ok || filter == nil || filter(&o) { + return + } + } + } + }() + return ch +} + +// Waits until either either something happens or a small amount of time has passed. +// It is possible to set a filter on the small amount of time. Setting timeout to 0 means that it will wait until +// something happens. +func (c *cluster) WaitEvent(filter func(ob *Observation) bool, timeout time.Duration) { + select { + case <-c.WaitEventChan(filter, timeout): + case <-c.failedCh: + c.t.FailNow() + } +} + +// Wait until the entire cluster has fsm length specified +func (c *cluster) WaitForReplication(fsmLength int) { + longstopTimeout := time.AfterFunc(c.longstopTimeout, func() { c.FailNowf("[ERROR] Timeout waiting for replication") }) + defer longstopTimeout.Stop() + +checking: + for { + // Check that it is applied to the FSM + c.WaitEvent(nil, c.conf.CommitTimeout) + for _, fsm := range c.fsms { + fsm.Lock() + num := len(fsm.logs) + fsm.Unlock() + if num != 1 { + continue checking + } + return + } + } +} + +// internal routine to get a snapshot of state - may not be stable +func (c *cluster) GetInStateInternal(s RaftState) ([]*Raft, uint64) { + var highestTerm uint64 in := make([]*Raft, 0, 1) for _, r := range c.rafts { if r.State() == s { in = append(in, r) } + term := r.getCurrentTerm() + if term > highestTerm { + highestTerm = term + } + } + return in, highestTerm +} + +// get the a stable version of the which rafts are in what state +func (c *cluster) GetInState(s RaftState) []*Raft { + // this is the longstop timeout + longstopTimeout := time.AfterFunc(c.longstopTimeout, func() { + c.Failf("[ERROR] longstop timeout waiting for stable state %s", s) + }) + defer longstopTimeout.Stop() + + c.logger.Printf("[INFO] Starting stability test for raft state: %+v", s) + var pollStartTime = time.Now() + + inState, highestTerm := c.GetInStateInternal(s) + + inStateTime := pollStartTime + + // an election should complete after 2 * max(HeartbeatTimeout, ElectionTimeout) + // because of the randomised timer expiring in 1 x interval ... 2 x interval. + // we add a bit for propagation delay. If the election fails (e.g. because + // two elections start at once), we will have got something through our + // Observer channel indicating a different state (i.e. one of the nodes + // will have moved to candidate state) which will reset the timer. + // + // because of an implementation peculiarity, it can actually be 3 x timeout + // + electionTimeout := c.conf.HeartbeatTimeout + if electionTimeout < c.conf.ElectionTimeout { + electionTimeout = c.conf.ElectionTimeout + } + timeout := electionTimeout*2 + c.conf.CommitTimeout + timer := time.NewTimer(timeout) + defer timer.Stop() + // wait until we have a stable instate slice. Each time we see + // an observation a state has changed, recheck it and if it + // as changed, restart the timer. + for { + inState, highestTerm = c.GetInStateInternal(s) + inStateTime = time.Now() + // sometimes this routine is called very early on before the rafts have started + // up. We then timeout even though no one has even started an election. So if the + // highest term in use is zero, we know there are no raft processes that have yet + // issued a RequestVote, and we set a long time out. This is fixed when we hear + // the first RequestVote, at which point we reset the timer + if highestTerm == 0 { + timer.Reset(c.longstopTimeout) + } else { + timer.Reset(timeout) + } + select { + case <-c.WaitEventChan(func(ob *Observation) bool { + switch ob.Data.(type) { + case RaftState: + return true + case RequestVoteRequest: + return true + default: + return false + } + }, time.Duration(0)): + c.logger.Printf("[DEBUG] Resetting stability timeout") + case t, ok := <-timer.C: + if !ok { + c.FailNowf("[ERROR] Timer channel errored") + } + c.logger.Printf("[INFO] Stable state for %s reached at %s (%d nodes), %s from start of poll, %s from cluster start. Timeout at %s, %s after stability", s, inStateTime, len(inState), inStateTime.Sub(pollStartTime), inStateTime.Sub(c.startTime), t, t.Sub(inStateTime)) + return inState + case <-c.failedCh: + c.t.FailNow() + } } - return in } func (c *cluster) Leader() *Raft { - timeout := time.AfterFunc(400*time.Millisecond, func() { - panic("timeout waiting for leader") + // this is the longstop timeout + longstopTimeout := time.AfterFunc(c.longstopTimeout, func() { + c.Failf("[ERROR] longstop timeout waiting for leader") }) - defer timeout.Stop() + defer longstopTimeout.Stop() - for len(c.GetInState(Leader)) < 1 { - time.Sleep(time.Millisecond) - } leaders := c.GetInState(Leader) if len(leaders) != 1 { - panic(fmt.Errorf("expected one leader: %v", leaders)) + c.FailNowf("[ERROR] expected one leader: %v", leaders) } return leaders[0] } // Waits for there to be cluster size -1 followers, and then returns them -// If you just wait for a Leader sometimes you can get timing scenarios -// where a 2nd node starts an election just as the first leader was elected -// so even though you waited on the leader, it might become not leader soon -// by waiting on the followers you can be in a more stable state func (c *cluster) Followers() []*Raft { expFollowers := len(c.rafts) - 1 followers := c.GetInState(Follower) - limit := time.Now().Add(200 * time.Millisecond) - for time.Now().Before(limit) && len(followers) != expFollowers { - time.Sleep(time.Millisecond) - followers = c.GetInState(Follower) - } if len(followers) != expFollowers { - c.t.Fatalf("timeout waiting for %d followers (followers are %v)", expFollowers, followers) + c.FailNowf("[ERROR] timeout waiting for %d followers (followers are %v)", expFollowers, followers) } return followers } func (c *cluster) FullyConnect() { - c.t.Logf("[WARN] Fully Connecting") + c.logger.Printf("[WARN] Fully Connecting") for i, t1 := range c.trans { for j, t2 := range c.trans { if i != j { @@ -196,7 +380,7 @@ func (c *cluster) FullyConnect() { } func (c *cluster) Disconnect(a string) { - c.t.Logf("[WARN] Disconnecting %v", a) + c.logger.Printf("[WARN] Disconnecting %v", a) for _, t := range c.trans { if t.LocalAddr() == a { t.DisconnectAll() @@ -215,38 +399,33 @@ func (c *cluster) IndexOf(r *Raft) int { return -1 } +// This checks that ALL the nodes think the leader is 'expect' func (c *cluster) EnsureLeader(t *testing.T, expect string) { - limit := time.Now().Add(400 * time.Millisecond) -CHECK: + // We assume c.Leader() has been called already + // Now check all the rafts think the leader is correct + fail := false for _, r := range c.rafts { leader := r.Leader() - if expect == "" { - if leader != "" { - if time.Now().After(limit) { - t.Fatalf("leader %v expected nil", leader) - } else { - goto WAIT - } + if leader != expect { + if leader == "" { + leader = "[none]" } - } else { - if leader == "" || leader != expect { - if time.Now().After(limit) { - t.Fatalf("leader %v expected %v", leader, expect) - } else { - goto WAIT - } + if expect == "" { + c.logger.Printf("[ERROR] Peer %s sees leader %v expected [none]", r, leader) + } else { + c.logger.Printf("[ERROR] Peer %s sees leader %v expected %v", r, leader, expect) } + fail = true } } - + if fail { + c.FailNowf("[ERROR] At least one peer has the wrong notion of leader") + } return -WAIT: - time.Sleep(10 * time.Millisecond) - goto CHECK } func (c *cluster) EnsureSame(t *testing.T) { - limit := time.Now().Add(400 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) first := c.fsms[0] CHECK: @@ -260,7 +439,7 @@ CHECK: if len(first.logs) != len(fsm.logs) { fsm.Unlock() if time.Now().After(limit) { - t.Fatalf("FSM log length mismatch: %d %d", + c.FailNowf("[ERROR] FSM log length mismatch: %d %d", len(first.logs), len(fsm.logs)) } else { goto WAIT @@ -271,7 +450,7 @@ CHECK: if bytes.Compare(first.logs[idx], fsm.logs[idx]) != 0 { fsm.Unlock() if time.Now().After(limit) { - t.Fatalf("log mismatch at index %d", idx) + c.FailNowf("[ERROR] log mismatch at index %d", idx) } else { goto WAIT } @@ -285,7 +464,7 @@ CHECK: WAIT: first.Unlock() - time.Sleep(20 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) goto CHECK } @@ -301,7 +480,7 @@ func raftToPeerSet(r *Raft) map[string]struct{} { } func (c *cluster) EnsureSamePeers(t *testing.T) { - limit := time.Now().Add(400 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) peerSet := raftToPeerSet(c.rafts[0]) CHECK: @@ -313,7 +492,7 @@ CHECK: otherSet := raftToPeerSet(raft) if !reflect.DeepEqual(peerSet, otherSet) { if time.Now().After(limit) { - t.Fatalf("peer mismatch: %v %v", peerSet, otherSet) + c.FailNowf("[ERROR] peer mismatch: %v %v", peerSet, otherSet) } else { goto WAIT } @@ -322,12 +501,24 @@ CHECK: return WAIT: - time.Sleep(20 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) goto CHECK } func MakeCluster(n int, t *testing.T, conf *Config) *cluster { - c := &cluster{} + if conf == nil { + conf = inmemConfig(t) + } + c := &cluster{ + observationCh: make(chan Observation, 1024), + conf: conf, + // Propagation takes a maximum of 2 heartbeat timeouts (time to get a new heartbeat that would cause a commit) + // plus a bit + propagateTimeout: conf.HeartbeatTimeout*2 + conf.CommitTimeout, + longstopTimeout: 5 * time.Second, + logger: newTestLoggerWithPrefix(t, "cluster"), + failedCh: make(chan struct{}), + } c.t = t peers := make([]string, 0, n) @@ -335,7 +526,7 @@ func MakeCluster(n int, t *testing.T, conf *Config) *cluster { for i := 0; i < n; i++ { dir, err := ioutil.TempDir("", "raft") if err != nil { - t.Fatalf("err: %v ", err) + c.FailNowf("[ERROR] err: %v ", err) } store := NewInmemStore() c.dirs = append(c.dirs, dir) @@ -354,11 +545,10 @@ func MakeCluster(n int, t *testing.T, conf *Config) *cluster { // Wire the transports together c.FullyConnect() + c.startTime = time.Now() + // Create all the rafts for i := 0; i < n; i++ { - if conf == nil { - conf = inmemConfig(t) - } if n == 1 { conf.EnableSingleNode = true } @@ -369,9 +559,16 @@ func MakeCluster(n int, t *testing.T, conf *Config) *cluster { trans := c.trans[i] peerStore := &StaticPeers{StaticPeers: peers} - raft, err := NewRaft(conf, c.fsms[i], logs, store, snap, peerStore, trans) + peerConf := conf + peerConf.Logger = newTestLoggerWithPrefix(t, peers[i]) + + raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, peerStore, trans) + if err != nil { + c.FailNowf("[ERROR] NewRaft failed: %v", err) + } + raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) } @@ -380,13 +577,24 @@ func MakeCluster(n int, t *testing.T, conf *Config) *cluster { } func MakeClusterNoPeers(n int, t *testing.T, conf *Config) *cluster { - c := &cluster{} + if conf == nil { + conf = inmemConfig(t) + } + c := &cluster{ + observationCh: make(chan Observation, 1024), + conf: conf, + propagateTimeout: conf.CommitTimeout * 4, + longstopTimeout: 5 * time.Second, + logger: newTestLoggerWithPrefix(t, "cluster"), + failedCh: make(chan struct{}), + } c.t = t + peers := make([]string, 0, n) // Setup the stores and transports for i := 0; i < n; i++ { dir, err := ioutil.TempDir("", "raft") if err != nil { - t.Fatalf("err: %v ", err) + c.FailNowf("[ERROR] err: %v ", err) } store := NewInmemStore() c.dirs = append(c.dirs, dir) @@ -397,28 +605,34 @@ func MakeClusterNoPeers(n int, t *testing.T, conf *Config) *cluster { c.dirs = append(c.dirs, dir2) c.snaps = append(c.snaps, snap) - _, trans := NewInmemTransport("") + addr, trans := NewInmemTransport("") c.trans = append(c.trans, trans) + peers = append(peers, addr) } // Wire the transports together c.FullyConnect() + c.startTime = time.Now() + // Create all the rafts for i := 0; i < n; i++ { - if conf == nil { - conf = inmemConfig(t) - } - logs := c.stores[i] store := c.stores[i] snap := c.snaps[i] trans := c.trans[i] peerStore := &StaticPeers{} - raft, err := NewRaft(conf, c.fsms[i], logs, store, snap, peerStore, trans) + peerConf := conf + peerConf.Logger = newTestLoggerWithPrefix(t, peers[i]) + + raft, err := NewRaft(peerConf, c.fsms[i], logs, store, snap, peerStore, trans) + if err != nil { + c.FailNowf("[ERROR] NewRaft failed: %v", err) + } + raft.RegisterObserver(NewObserver(c.observationCh, false, nil)) if err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] RegisterObserver failed: %v", err) } c.rafts = append(c.rafts, raft) } @@ -438,21 +652,21 @@ func TestRaft_AfterShutdown(t *testing.T) { // Everything should fail now if f := raft.Apply(nil, 0); f.Error() != ErrRaftShutdown { - t.Fatalf("should be shutdown: %v", f.Error()) + c.FailNowf("[ERROR] should be shutdown: %v", f.Error()) } if f := raft.AddPeer(NewInmemAddr()); f.Error() != ErrRaftShutdown { - t.Fatalf("should be shutdown: %v", f.Error()) + c.FailNowf("[ERROR] should be shutdown: %v", f.Error()) } if f := raft.RemovePeer(NewInmemAddr()); f.Error() != ErrRaftShutdown { - t.Fatalf("should be shutdown: %v", f.Error()) + c.FailNowf("[ERROR] should be shutdown: %v", f.Error()) } if f := raft.Snapshot(); f.Error() != ErrRaftShutdown { - t.Fatalf("should be shutdown: %v", f.Error()) + c.FailNowf("[ERROR] should be shutdown: %v", f.Error()) } // Should be idempotent if f := raft.Shutdown(); f.Error() != nil { - t.Fatalf("shutdown should be idempotent") + c.FailNowf("[ERROR] shutdown should be idempotent") } } @@ -467,36 +681,36 @@ func TestRaft_SingleNode(t *testing.T) { select { case v := <-raft.LeaderCh(): if !v { - t.Fatalf("should become leader") + c.FailNowf("[ERROR] should become leader") } case <-time.After(conf.HeartbeatTimeout * 3): - t.Fatalf("timeout becoming leader") + c.FailNowf("[ERROR] timeout becoming leader") } // Should be leader if s := raft.State(); s != Leader { - t.Fatalf("expected leader: %v", s) + c.FailNowf("[ERROR] expected leader: %v", s) } // Should be able to apply - future := raft.Apply([]byte("test"), time.Millisecond) + future := raft.Apply([]byte("test"), c.conf.HeartbeatTimeout) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Check the response if future.Response().(int) != 1 { - t.Fatalf("bad response: %v", future.Response()) + c.FailNowf("[ERROR] bad response: %v", future.Response()) } // Check the index if idx := future.Index(); idx == 0 { - t.Fatalf("bad index: %d", idx) + c.FailNowf("[ERROR] bad index: %d", idx) } // Check that it is applied to the FSM if len(c.fsms[0].logs) != 1 { - t.Fatalf("did not apply to FSM!") + c.FailNowf("[ERROR] did not apply to FSM!") } } @@ -506,28 +720,17 @@ func TestRaft_TripleNode(t *testing.T) { defer c.Close() // Should be one leader - c.Followers() leader := c.Leader() + c.Followers() c.EnsureLeader(t, leader.localAddr) // Should be able to apply - future := leader.Apply([]byte("test"), time.Millisecond) + future := leader.Apply([]byte("test"), c.conf.CommitTimeout) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } - // Wait for replication - time.Sleep(30 * time.Millisecond) - - // Check that it is applied to the FSM - for _, fsm := range c.fsms { - fsm.Lock() - num := len(fsm.logs) - fsm.Unlock() - if num != 1 { - t.Fatalf("did not apply to FSM!") - } - } + c.WaitForReplication(1) } func TestRaft_LeaderFail(t *testing.T) { @@ -540,13 +743,13 @@ func TestRaft_LeaderFail(t *testing.T) { leader := c.Leader() // Should be able to apply - future := leader.Apply([]byte("test"), time.Millisecond) + future := leader.Apply([]byte("test"), c.conf.CommitTimeout) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait for replication - time.Sleep(30 * time.Millisecond) + c.WaitForReplication(1) // Disconnect the leader now t.Logf("[INFO] Disconnecting %v", leader) @@ -554,33 +757,34 @@ func TestRaft_LeaderFail(t *testing.T) { c.Disconnect(leader.localAddr) // Wait for new leader - limit := time.Now().Add(300 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var newLead *Raft for time.Now().Before(limit) && newLead == nil { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) leaders := c.GetInState(Leader) if len(leaders) == 1 && leaders[0] != leader { newLead = leaders[0] } } + if newLead == nil { - t.Fatalf("expected new leader") + c.FailNowf("[ERROR] expected new leader") } // Ensure the term is greater if newLead.getCurrentTerm() <= leaderTerm { - t.Fatalf("expected newer term! %d %d (%v, %v)", newLead.getCurrentTerm(), leaderTerm, newLead, leader) + c.FailNowf("[ERROR] expected newer term! %d %d (%v, %v)", newLead.getCurrentTerm(), leaderTerm, newLead, leader) } // Apply should work not work on old leader - future1 := leader.Apply([]byte("fail"), time.Millisecond) + future1 := leader.Apply([]byte("fail"), c.conf.CommitTimeout) // Apply should work on newer leader - future2 := newLead.Apply([]byte("apply"), time.Millisecond) + future2 := newLead.Apply([]byte("apply"), c.conf.CommitTimeout) // Future2 should work if err := future2.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Reconnect the networks @@ -589,7 +793,7 @@ func TestRaft_LeaderFail(t *testing.T) { // Future1 should fail if err := future1.Error(); err != ErrLeadershipLost && err != ErrNotLeader { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait for log replication @@ -599,13 +803,13 @@ func TestRaft_LeaderFail(t *testing.T) { for _, fsm := range c.fsms { fsm.Lock() if len(fsm.logs) != 2 { - t.Fatalf("did not apply both to FSM! %v", fsm.logs) + c.FailNowf("[ERROR] did not apply both to FSM! %v", fsm.logs) } if bytes.Compare(fsm.logs[0], []byte("test")) != 0 { - t.Fatalf("first entry should be 'test'") + c.FailNowf("[ERROR] first entry should be 'test'") } if bytes.Compare(fsm.logs[1], []byte("apply")) != 0 { - t.Fatalf("second entry should be 'apply'") + c.FailNowf("[ERROR] second entry should be 'apply'") } fsm.Unlock() } @@ -630,14 +834,14 @@ func TestRaft_BehindFollower(t *testing.T) { // Wait for the last future to apply if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } else { t.Logf("[INFO] Finished apply without behind follower") } // Check that we have a non zero last contact if behind.LastContact().IsZero() { - t.Fatalf("expected previous contact") + c.FailNowf("[ERROR] expected previous contact") } // Reconnect the behind node @@ -658,33 +862,31 @@ func TestRaft_ApplyNonLeader(t *testing.T) { // Wait for a leader c.Leader() - time.Sleep(10 * time.Millisecond) - // Try to apply to them followers := c.GetInState(Follower) if len(followers) != 2 { - t.Fatalf("Expected 2 followers") + c.FailNowf("[ERROR] Expected 2 followers") } follower := followers[0] // Try to apply - future := follower.Apply([]byte("test"), time.Millisecond) + future := follower.Apply([]byte("test"), c.conf.CommitTimeout) if future.Error() != ErrNotLeader { - t.Fatalf("should not apply on follower") + c.FailNowf("[ERROR] should not apply on follower") } // Should be cached if future.Error() != ErrNotLeader { - t.Fatalf("should not apply on follower") + c.FailNowf("[ERROR] should not apply on follower") } } func TestRaft_ApplyConcurrent(t *testing.T) { // Make the cluster conf := inmemConfig(t) - conf.HeartbeatTimeout = 80 * time.Millisecond - conf.ElectionTimeout = 80 * time.Millisecond + conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout + conf.ElectionTimeout = 2 * conf.ElectionTimeout c := MakeCluster(3, t, conf) defer c.Close() @@ -699,7 +901,7 @@ func TestRaft_ApplyConcurrent(t *testing.T) { defer group.Done() future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), 0) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.Failf("[ERROR] err: %v", err) } } @@ -716,10 +918,10 @@ func TestRaft_ApplyConcurrent(t *testing.T) { }() select { case <-doneCh: - case <-time.After(time.Second): - t.Fatalf("timeout") + case <-time.After(c.longstopTimeout): + c.Failf("[ERROR] timeout") } - + c.CheckFailed() // Check the FSMs c.EnsureSame(t) } @@ -727,8 +929,9 @@ func TestRaft_ApplyConcurrent(t *testing.T) { func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { // Make the cluster conf := inmemConfig(t) - conf.HeartbeatTimeout = 80 * time.Millisecond - conf.ElectionTimeout = 80 * time.Millisecond + conf.CommitTimeout = 1 * time.Millisecond + conf.HeartbeatTimeout = 2 * conf.HeartbeatTimeout + conf.ElectionTimeout = 2 * conf.ElectionTimeout c := MakeCluster(1, t, conf) defer c.Close() @@ -737,7 +940,7 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { // Enough enqueues should cause at least one timeout... var didTimeout int32 = 0 - for i := 0; (i < 500) && (atomic.LoadInt32(&didTimeout) == 0); i++ { + for i := 0; (i < 5000) && (atomic.LoadInt32(&didTimeout) == 0); i++ { go func(i int) { future := leader.Apply([]byte(fmt.Sprintf("test%d", i)), time.Microsecond) if future.Error() == ErrEnqueueTimeout { @@ -750,12 +953,14 @@ func TestRaft_ApplyConcurrent_Timeout(t *testing.T) { } } - // Wait - time.Sleep(20 * time.Millisecond) + longstopTimeout := time.AfterFunc(c.longstopTimeout, func() { c.Failf("[ERROR] Timeout waiting to detect apply timeouts") }) + defer longstopTimeout.Stop() - // Some should have failed - if atomic.LoadInt32(&didTimeout) == 0 { - t.Fatalf("expected a timeout") + for { + if atomic.LoadInt32(&didTimeout) != 0 { + return + } + c.WaitEvent(nil, c.propagateTimeout) } } @@ -769,7 +974,7 @@ func TestRaft_JoinNode(t *testing.T) { leader := c.Leader() future = leader.Apply([]byte("first"), 0) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } else { t.Logf("[INFO] Applied log") } @@ -782,31 +987,31 @@ func TestRaft_JoinNode(t *testing.T) { c.FullyConnect() // Wait until we have 2 leaders - limit := time.Now().Add(200 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var leaders []*Raft for time.Now().Before(limit) && len(leaders) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) leaders = c.GetInState(Leader) } if len(leaders) != 2 { - t.Fatalf("expected two leader: %v", leaders) + c.FailNowf("[ERROR] expected two leader: %v", leaders) } // Join the new node in future = leader.AddPeer(c1.rafts[0].localAddr) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait until we have 2 followers - limit = time.Now().Add(200 * time.Millisecond) + limit = time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) followers = c.GetInState(Follower) } if len(followers) != 2 { - t.Fatalf("expected two followers: %v", followers) + c.FailNowf("[ERROR] expected two followers: %v", followers) } // Check the FSMs @@ -829,32 +1034,32 @@ func TestRaft_RemoveFollower(t *testing.T) { leader := c.Leader() // Wait until we have 2 followers - limit := time.Now().Add(300 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) followers = c.GetInState(Follower) } if len(followers) != 2 { - t.Fatalf("expected two followers: %v", followers) + c.FailNowf("[ERROR] expected two followers: %v", followers) } // Remove a follower follower := followers[0] future := leader.RemovePeer(follower.localAddr) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait a while - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Other nodes should have fewer peers if peers, _ := leader.peerStore.Peers(); len(peers) != 2 { - t.Fatalf("too many peers") + c.FailNowf("[ERROR] too many peers") } if peers, _ := followers[1].peerStore.Peers(); len(peers) != 2 { - t.Fatalf("too many peers") + c.FailNowf("[ERROR] too many peers") } } @@ -867,41 +1072,41 @@ func TestRaft_RemoveLeader(t *testing.T) { leader := c.Leader() // Wait until we have 2 followers - limit := time.Now().Add(200 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) followers = c.GetInState(Follower) } if len(followers) != 2 { - t.Fatalf("expected two followers: %v", followers) + c.FailNowf("[ERROR] expected two followers: %v", followers) } // Remove the leader leader.RemovePeer(leader.localAddr) // Wait a while - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Should have a new leader newLeader := c.Leader() // Wait a bit for log application - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Other nodes should have fewer peers if peers, _ := newLeader.peerStore.Peers(); len(peers) != 2 { - t.Fatalf("too many peers") + c.FailNowf("[ERROR] too many peers") } // Old leader should be shutdown if leader.State() != Shutdown { - t.Fatalf("leader should be shutdown") + c.FailNowf("[ERROR] leader should be shutdown") } // Old leader should have no peers if peers, _ := leader.peerStore.Peers(); len(peers) != 1 { - t.Fatalf("leader should have no peers") + c.FailNowf("[ERROR] leader should have no peers") } } @@ -925,37 +1130,37 @@ func TestRaft_RemoveLeader_NoShutdown(t *testing.T) { } if i > 80 { if err := future.Error(); err == nil || err != ErrNotLeader { - t.Fatalf("err: %v, future entries should fail", err) + c.FailNowf("[ERROR] err: %v, future entries should fail", err) } } } if err := removeFuture.Error(); err != nil { - t.Fatalf("RemovePeer failed with error %v", err) + c.FailNowf("[ERROR] RemovePeer failed with error %v", err) } // Wait a while - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Should have a new leader newLeader := c.Leader() // Wait a bit for log application - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Other nodes should have fewer peers if peers, _ := newLeader.peerStore.Peers(); len(peers) != 2 { - t.Fatalf("too many peers") + c.FailNowf("[ERROR] too many peers") } // Old leader should be a follower if leader.State() != Follower { - t.Fatalf("leader should be shutdown") + c.FailNowf("[ERROR] leader should be shutdown") } // Old leader should have no peers if peers, _ := leader.peerStore.Peers(); len(peers) != 1 { - t.Fatalf("leader should have no peers") + c.FailNowf("[ERROR] leader should have no peers") } // Other nodes should have the same state @@ -981,19 +1186,19 @@ func TestRaft_RemoveLeader_SplitCluster(t *testing.T) { leader.RemovePeer(leader.localAddr) // Wait until we have 2 leaders - limit := time.Now().Add(300 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var leaders []*Raft for time.Now().Before(limit) && len(leaders) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) leaders = c.GetInState(Leader) } if len(leaders) != 2 { - t.Fatalf("expected two leader: %v", leaders) + c.FailNowf("[ERROR] expected two leader: %v", leaders) } // Old leader should have no peers if len(leader.peers) != 0 { - t.Fatalf("leader should have no peers") + c.FailNowf("[ERROR] leader should have no peers") } } @@ -1011,7 +1216,7 @@ func TestRaft_AddKnownPeer(t *testing.T) { // Should be already added if err := future.Error(); err != ErrKnownPeer { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } } @@ -1028,7 +1233,7 @@ func TestRaft_RemoveUnknownPeer(t *testing.T) { // Should be already added if err := future.Error(); err != ErrUnknownPeer { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } } @@ -1048,29 +1253,29 @@ func TestRaft_SnapshotRestore(t *testing.T) { // Wait for the last future to apply if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Take a snapshot snapFuture := leader.Snapshot() if err := snapFuture.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Check for snapshot if snaps, _ := leader.snapshots.List(); len(snaps) != 1 { - t.Fatalf("should have a snapshot") + c.FailNowf("[ERROR] should have a snapshot") } // Logs should be trimmed if idx, _ := leader.logs.FirstIndex(); idx != 92 { - t.Fatalf("should trim logs to 92: %d", idx) + c.FailNowf("[ERROR] should trim logs to 92: %d", idx) } // Shutdown shutdown := leader.Shutdown() if err := shutdown.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Restart the Raft @@ -1080,13 +1285,13 @@ func TestRaft_SnapshotRestore(t *testing.T) { r, err := NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, r.peerStore, trans2) if err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } c.rafts[0] = r // We should have restored from the snapshot! if last := r.getLastApplied(); last != 101 { - t.Fatalf("bad last: %v", last) + c.FailNowf("[ERROR] bad last: %v", last) } } @@ -1106,19 +1311,19 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { // Wait for the last future to apply if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Take a snapshot snapFuture := leader.Snapshot() if err := snapFuture.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Shutdown shutdown := leader.Shutdown() if err := shutdown.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Make a separate cluster @@ -1146,7 +1351,7 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { r, err := NewRaft(r.conf, r.fsm, r.logs, r.stable, r.snapshots, peerStore, trans2) if err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } c.rafts[0] = r c2.rafts = append(c2.rafts, r) @@ -1155,7 +1360,7 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { c2.FullyConnect() // Wait a while - time.Sleep(50 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Ensure we elect a leader, and that we replicate // to our new followers @@ -1163,14 +1368,14 @@ func TestRaft_SnapshotRestore_PeerChange(t *testing.T) { // We should have restored from the snapshot! if last := r.getLastApplied(); last != 102 { - t.Fatalf("bad last: %v", last) + c.FailNowf("[ERROR] bad last: %v", last) } } func TestRaft_AutoSnapshot(t *testing.T) { // Make the cluster conf := inmemConfig(t) - conf.SnapshotInterval = 5 * time.Millisecond + conf.SnapshotInterval = conf.CommitTimeout * 2 conf.SnapshotThreshold = 50 conf.TrailingLogs = 10 c := MakeCluster(1, t, conf) @@ -1185,15 +1390,15 @@ func TestRaft_AutoSnapshot(t *testing.T) { // Wait for the last future to apply if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait for a snapshot to happen - time.Sleep(50 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Check for snapshot if snaps, _ := leader.snapshots.List(); len(snaps) == 0 { - t.Fatalf("should have a snapshot") + c.FailNowf("[ERROR] should have a snapshot") } } @@ -1217,7 +1422,7 @@ func TestRaft_ManualSnapshot(t *testing.T) { future = leader.Apply([]byte(fmt.Sprintf("test %d", i)), 0) } if err := future.Error(); err != nil { - t.Fatalf("Error Apply new log entries: %v", err) + c.FailNowf("[ERROR] Error Apply new log entries: %v", err) } // now we should be able to ask for a snapshot without getting an error ssErr = leader.Snapshot().Error() @@ -1247,7 +1452,7 @@ func TestRaft_SendSnapshotFollower(t *testing.T) { // Wait for the last future to apply if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } else { t.Logf("[INFO] Finished apply without behind follower") } @@ -1257,7 +1462,7 @@ func TestRaft_SendSnapshotFollower(t *testing.T) { future = r.Snapshot() // the disconnected node will have nothing to snapshot, so that's expected if err := future.Error(); err != nil && err != ErrNothingNewToSnapshot { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } } @@ -1281,36 +1486,36 @@ func TestRaft_ReJoinFollower(t *testing.T) { leader := c.Leader() // Wait until we have 2 followers - limit := time.Now().Add(200 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) followers = c.GetInState(Follower) } if len(followers) != 2 { - t.Fatalf("expected two followers: %v", followers) + c.FailNowf("[ERROR] expected two followers: %v", followers) } // Remove a follower follower := followers[0] future := leader.RemovePeer(follower.localAddr) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait a while - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Other nodes should have fewer peers if peers, _ := leader.peerStore.Peers(); len(peers) != 2 { - t.Fatalf("too many peers: %v", peers) + c.FailNowf("[ERROR] too many peers: %v", peers) } if peers, _ := followers[1].peerStore.Peers(); len(peers) != 2 { - t.Fatalf("too many peers: %v", peers) + c.FailNowf("[ERROR] too many peers: %v", peers) } // Get the leader - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) leader = c.Leader() // Rejoin. The follower will have a higher term than the leader, @@ -1318,23 +1523,23 @@ func TestRaft_ReJoinFollower(t *testing.T) { // to take place. We should eventually re-stabilize. future = leader.AddPeer(follower.localAddr) if err := future.Error(); err != nil && err != ErrLeadershipLost { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Wait a while - time.Sleep(40 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Other nodes should have fewer peers if peers, _ := leader.peerStore.Peers(); len(peers) != 3 { - t.Fatalf("missing peers: %v", peers) + c.FailNowf("[ERROR] missing peers: %v", peers) } if peers, _ := followers[1].peerStore.Peers(); len(peers) != 3 { - t.Fatalf("missing peers: %v", peers) + c.FailNowf("[ERROR] missing peers: %v", peers) } // Should be a follower now if follower.State() != Follower { - t.Fatalf("bad state: %v", follower.State()) + c.FailNowf("[ERROR] bad state: %v", follower.State()) } } @@ -1348,14 +1553,14 @@ func TestRaft_LeaderLeaseExpire(t *testing.T) { leader := c.Leader() // Wait until we have a followers - limit := time.Now().Add(200 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 1 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) followers = c.GetInState(Follower) } if len(followers) != 1 { - t.Fatalf("expected a followers: %v", followers) + c.FailNowf("[ERROR] expected a followers: %v", followers) } // Disconnect the follower now @@ -1367,37 +1572,37 @@ func TestRaft_LeaderLeaseExpire(t *testing.T) { select { case v := <-leader.LeaderCh(): if v { - t.Fatalf("should step down as leader") + c.FailNowf("[ERROR] should step down as leader") } case <-time.After(conf.LeaderLeaseTimeout * 2): - t.Fatalf("timeout stepping down as leader") + c.FailNowf("[ERROR] timeout stepping down as leader") } // Ensure the last contact of the leader is non-zero if leader.LastContact().IsZero() { - t.Fatalf("expected non-zero contact time") + c.FailNowf("[ERROR] expected non-zero contact time") } // Should be no leaders if len(c.GetInState(Leader)) != 0 { - t.Fatalf("expected step down") + c.FailNowf("[ERROR] expected step down") } // Verify no further contact last := follower.LastContact() - time.Sleep(110 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Check that last contact has not changed if last != follower.LastContact() { - t.Fatalf("unexpected further contact") + c.FailNowf("[ERROR] unexpected further contact") } // Ensure both have cleared their leader if l := leader.Leader(); l != "" { - t.Fatalf("bad: %v", l) + c.FailNowf("[ERROR] bad: %v", l) } if l := follower.Leader(); l != "" { - t.Fatalf("bad: %v", l) + c.FailNowf("[ERROR] bad: %v", l) } } @@ -1419,13 +1624,13 @@ func TestRaft_Barrier(t *testing.T) { // Wait for the barrier future to apply if err := barrier.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Ensure all the logs are the same c.EnsureSame(t) if len(c.fsms[0].logs) != 100 { - t.Fatalf("Bad log length") + c.FailNowf("[ERROR] Bad log length") } } @@ -1442,7 +1647,7 @@ func TestRaft_VerifyLeader(t *testing.T) { // Wait for the verify to apply if err := verify.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } } @@ -1459,7 +1664,7 @@ func TestRaft_VerifyLeader_Single(t *testing.T) { // Wait for the verify to apply if err := verify.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } } @@ -1484,12 +1689,12 @@ func TestRaft_VerifyLeader_Fail(t *testing.T) { // Wait for the leader to step down if err := verify.Error(); err != ErrNotLeader && err != ErrLeadershipLost { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Ensure the known leader is cleared if l := leader.Leader(); l != "" { - t.Fatalf("bad: %v", l) + c.FailNowf("[ERROR] bad: %v", l) } } @@ -1503,14 +1708,14 @@ func TestRaft_VerifyLeader_ParitalConnect(t *testing.T) { leader := c.Leader() // Wait until we have a followers - limit := time.Now().Add(200 * time.Millisecond) + limit := time.Now().Add(c.longstopTimeout) var followers []*Raft for time.Now().Before(limit) && len(followers) != 2 { - time.Sleep(10 * time.Millisecond) + c.WaitEvent(nil, c.conf.CommitTimeout) followers = c.GetInState(Follower) } if len(followers) != 2 { - t.Fatalf("expected a followers: %v", followers) + c.FailNowf("[ERROR] expected two followers but got: %v", followers) } // Force partial disconnect @@ -1523,7 +1728,7 @@ func TestRaft_VerifyLeader_ParitalConnect(t *testing.T) { // Wait for the leader to step down if err := verify.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } } @@ -1540,16 +1745,16 @@ func TestRaft_SettingPeers(t *testing.T) { for _, v := range c.rafts { future := v.SetPeers(peers) if err := future.Error(); err != nil { - t.Fatalf("error setting peers: %v", err) + c.FailNowf("[ERROR] error setting peers: %v", err) } } // Wait a while - time.Sleep(20 * time.Millisecond) + time.Sleep(c.propagateTimeout) // Should have a new leader if leader := c.Leader(); leader == nil { - t.Fatalf("no leader?") + c.FailNowf("[ERROR] no leader?") } } @@ -1564,36 +1769,36 @@ func TestRaft_StartAsLeader(t *testing.T) { select { case v := <-raft.LeaderCh(): if !v { - t.Fatalf("should become leader") + c.FailNowf("[ERROR] should become leader") } - case <-time.After(5 * time.Millisecond): - t.Fatalf("timeout becoming leader") + case <-time.After(c.conf.HeartbeatTimeout * 4): // Longer than you think as possibility of multiple elections + c.FailNowf("[ERROR] timeout becoming leader") } // Should be leader if s := raft.State(); s != Leader { - t.Fatalf("expected leader: %v", s) + c.FailNowf("[ERROR] expected leader: %v", s) } // Should be able to apply - future := raft.Apply([]byte("test"), time.Millisecond) + future := raft.Apply([]byte("test"), c.conf.CommitTimeout) if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) + c.FailNowf("[ERROR] err: %v", err) } // Check the response if future.Response().(int) != 1 { - t.Fatalf("bad response: %v", future.Response()) + c.FailNowf("[ERROR] bad response: %v", future.Response()) } // Check the index if idx := future.Index(); idx == 0 { - t.Fatalf("bad index: %d", idx) + c.FailNowf("[ERROR] bad index: %d", idx) } // Check that it is applied to the FSM if len(c.fsms[0].logs) != 1 { - t.Fatalf("did not apply to FSM!") + c.FailNowf("[ERROR] did not apply to FSM!") } } @@ -1608,10 +1813,10 @@ func TestRaft_NotifyCh(t *testing.T) { select { case v := <-ch: if !v { - t.Fatalf("should become leader") + c.FailNowf("[ERROR] should become leader") } - case <-time.After(conf.HeartbeatTimeout * 6): - t.Fatalf("timeout becoming leader") + case <-time.After(conf.HeartbeatTimeout * 8): + c.FailNowf("[ERROR] timeout becoming leader") } // Close the cluster @@ -1621,10 +1826,10 @@ func TestRaft_NotifyCh(t *testing.T) { select { case v := <-ch: if v { - t.Fatalf("should step down as leader") + c.FailNowf("[ERROR] should step down as leader") } case <-time.After(conf.HeartbeatTimeout * 6): - t.Fatalf("timeout on step down as leader") + c.FailNowf("[ERROR] timeout on step down as leader") } } @@ -1644,17 +1849,17 @@ func TestRaft_Voting(t *testing.T) { // a follower that thinks there's a leader should vote for that leader. var resp RequestVoteResponse if err := ldrT.RequestVote(followers[0].localAddr, &reqVote, &resp); err != nil { - t.Fatalf("RequestVote RPC failed %v", err) + c.FailNowf("[ERROR] RequestVote RPC failed %v", err) } if !resp.Granted { - t.Fatalf("expected vote to be granted, but wasn't %+v", resp) + c.FailNowf("[ERROR] expected vote to be granted, but wasn't %+v", resp) } // a follow that thinks there's a leader shouldn't vote for a different candidate reqVote.Candidate = ldrT.EncodePeer(followers[0].localAddr) if err := ldrT.RequestVote(followers[1].localAddr, &reqVote, &resp); err != nil { - t.Fatalf("RequestVote RPC failed %v", err) + c.FailNowf("[ERROR] RequestVote RPC failed %v", err) } if resp.Granted { - t.Fatalf("expected vote not to be granted, but was %+v", resp) + c.FailNowf("[ERROR] expected vote not to be granted, but was %+v", resp) } }