Skip to content

Commit

Permalink
Make raft_test.go far more resilient
Browse files Browse the repository at this point in the history
* Add observations - emitted when something happens

* Makefile: Change test timeout in Makefile to 30s

* raft_test.go: Change default commit timeout to 5ms

* raft_test.go: Centralise all references to time in a single
  place.

* raft_test.go: Make logger work consistently and output time in
  microseconds (very useful for debugging). Convert all logging
  to use the cluster logger.

* raft_test.go: provide c.Failf function that consistently
  produces the output, in log format, with timestamps. Convert
  use of panic() and t.Fatalf() to c.Failf()

* raft_test.go: rewrite GetInState() so it is now reliable, i.e.
  by waiting for the state to remain stable for a given period
  of time.

* raft_test.go: provide WaitEventChan() and WaitEvent() which
  wait for 'something to happen' or a timeout.

* raft_test.go: provide WaitForReplication() which waits until
  the FSM has a supplied number of logs on each node.

* raft_test.go: rewrite Leaders() and Followers() to be
  much more simple now GetInState() is reliable.

* raft_test.go: rewrite EnsureLeader() now Leaders() is
  reliable.

Signed-off-by: Alex Bligh <alex@alex.org.uk>
  • Loading branch information
abligh committed Mar 31, 2016
1 parent e3bce71 commit 6583660
Show file tree
Hide file tree
Showing 4 changed files with 566 additions and 255 deletions.
2 changes: 1 addition & 1 deletion Makefile
@@ -1,7 +1,7 @@
DEPS = $(go list -f '{{range .TestImports}}{{.}} {{end}}' ./...)

test:
go test -timeout=15s ./...
go test -timeout=30s ./...

integ: test
INTEG_TESTS=yes go test -timeout=3s -run=Integ ./...
Expand Down
88 changes: 88 additions & 0 deletions observer.go
@@ -0,0 +1,88 @@
package raft

import (
"sync/atomic"
)

type Observation struct {
Raft *Raft
Data interface{}
}

type LeaderObservation struct {
leader string
}

var nextObserverId uint64

// Observer describes what to do with a given observation
type Observer struct {
channel chan Observation // channel of observations
blocking bool // whether it should block in order to write an observation (generally no)
numObserved uint64 // number observed
numDropped uint64 // number dropped
id uint64 // ID of this observer in the raft map
filter func(o *Observation) bool // filter to apply to determine whether observation should be sent to channel
}

// Register a new observer
func (r *Raft) RegisterObserver(or *Observer) {
r.observerLock.Lock()
defer r.observerLock.Unlock()
r.observers[or.id] = or
}

// Deregister an observer
func (r *Raft) DeregisterObserver(or *Observer) {
r.observerLock.Lock()
defer r.observerLock.Unlock()
delete(r.observers, or.id)
}

// Send an observation to every observer
func (r *Raft) observe(o interface{}) {
// we hold this mutex whilst observers (potentially) block.
// In general observers should not block. But in any case this isn't
// disastrous as we only hold a read lock, which merely prevents
// registration / deregistration of observers
ob := Observation{Raft: r, Data: o}
r.observerLock.RLock()
defer r.observerLock.RUnlock()
for _, or := range r.observers {
if or.filter != nil {
if !or.filter(&ob) {
continue
}
}
if or.channel == nil {
return
}
if or.blocking {
or.channel <- ob
atomic.AddUint64(&or.numObserved, 1)
} else {
select {
case or.channel <- ob:
atomic.AddUint64(&or.numObserved, 1)
default:
atomic.AddUint64(&or.numDropped, 1)
}
}
}
}

// get performance counters for an observer
func (or *Observer) GetCounters() (uint64, uint64, error) {
return atomic.LoadUint64(&or.numObserved), atomic.LoadUint64(&or.numDropped), nil
}

// Create a new observer with the specified channel, blocking status, and filter (filter can be nil)
func NewObserver(channel chan Observation, blocking bool, filter func(o *Observation) bool) *Observer {
ob := &Observer{
channel: channel,
blocking: blocking,
filter: filter,
id: atomic.AddUint64(&nextObserverId, 1),
}
return ob
}
18 changes: 18 additions & 0 deletions raft.go
Expand Up @@ -75,6 +75,9 @@ type leaderState struct {
type Raft struct {
raftState

// the previously observed raft state
observedRaftState RaftState

// applyCh is used to async send logs to the main thread to
// be committed and applied to the FSM.
applyCh chan *logFuture
Expand Down Expand Up @@ -147,6 +150,10 @@ type Raft struct {
// verifyCh is used to async send verify futures to the main thread
// to verify we are still the leader
verifyCh chan *verifyFuture

// list of observers and the mutex that protects them
observerLock sync.RWMutex
observers map[uint64]*Observer
}

// NewRaft is used to construct a new Raft node. It takes a configuration, as well
Expand Down Expand Up @@ -221,6 +228,7 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
stable: stable,
trans: trans,
verifyCh: make(chan *verifyFuture, 64),
observers: make(map[uint64]*Observer),
}

// Initialize as a follower
Expand Down Expand Up @@ -267,8 +275,12 @@ func (r *Raft) Leader() string {
// setLeader is used to modify the current leader of the cluster
func (r *Raft) setLeader(leader string) {
r.leaderLock.Lock()
oldLeader := r.leader
r.leader = leader
r.leaderLock.Unlock()
if oldLeader != r.leader {
r.observe(LeaderObservation{leader: leader})
}
}

// Apply is used to apply a command to the FSM in a highly consistent
Expand Down Expand Up @@ -1418,6 +1430,8 @@ func (r *Raft) requestVote(rpc RPC, req *RequestVoteRequest) {
rpc.Respond(resp, rpcErr)
}()

r.observe(*req)

// Check if we have an existing leader [who's not the candidate]
candidate := r.trans.DecodePeer(req.Candidate)
if leader := r.Leader(); leader != "" && leader != candidate {
Expand Down Expand Up @@ -1695,7 +1709,11 @@ func (r *Raft) setCurrentTerm(t uint64) {
// that leader should be set only after updating the state.
func (r *Raft) setState(state RaftState) {
r.setLeader("")
oldState := r.raftState.getState()
r.raftState.setState(state)
if oldState != state {
r.observe(state)
}
}

// runSnapshots is a long running goroutine used to manage taking
Expand Down

0 comments on commit 6583660

Please sign in to comment.