Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add Event Dispatch.

  • Loading branch information...
commit ce8ada2f9afb702918d38881e0de5fecc8edaede 1 parent faee071
Ben Johnson benbjohnson authored
1  .travis.yml
View
@@ -5,5 +5,6 @@ go:
- 1.2
install:
+ - go get github.com/stretchr/testify/assert
- make dependencies
55 event.go
View
@@ -0,0 +1,55 @@
+package raft
+
+const (
+ StateChangeEventType = "stateChange"
+ LeaderChangeEventType = "leaderChange"
+ TermChangeEventType = "termChange"
+ AddPeerEventType = "addPeer"
+ RemovePeerEventType = "removePeer"
+)
+
+// Event represents an action that occurred within the Raft library.
+// Listeners can subscribe to event types by using the Server.AddEventListener() function.
+type Event interface {
+ Type() string
+ Source() interface{}
+ Value() interface{}
+ PrevValue() interface{}
+}
+
+// event is the concrete implementation of the Event interface.
+type event struct {
+ typ string
+ source interface{}
+ value interface{}
+ prevValue interface{}
+}
+
+// newEvent creates a new event.
+func newEvent(typ string, value interface{}, prevValue interface{}) *event {
+ return &event{
+ typ: typ,
+ value: value,
+ prevValue: prevValue,
+ }
+}
+
+// Type returns the type of event that occurred.
+func (e *event) Type() string {
+ return e.typ
+}
+
+// Source returns the object that dispatched the event.
+func (e *event) Source() interface{} {
+ return e.source
+}
+
+// Value returns the current value associated with the event, if applicable.
+func (e *event) Value() interface{} {
+ return e.value
+}
+
+// PrevValue returns the previous value associated with the event, if applicable.
+func (e *event) PrevValue() interface{} {
+ return e.prevValue
+}
50 event_dispatcher.go
View
@@ -0,0 +1,50 @@
+package raft
+
+import (
+ "sync"
+)
+
+// eventDispatcher is responsible for managing listeners for named events
+// and dispatching event notifications to those listeners.
+type eventDispatcher struct {
+ sync.RWMutex
+ source interface{}
+ listeners map[string]eventListeners
+}
+
+// EventListener is a function that can receive event notifications.
+type EventListener func(Event)
+
+// EventListeners represents a collection of individual listeners.
+type eventListeners []EventListener
+
+// newEventDispatcher creates a new eventDispatcher instance.
+func newEventDispatcher(source interface{}) *eventDispatcher {
+ return &eventDispatcher{
+ source: source,
+ listeners: make(map[string]eventListeners),
+ }
+}
+
+// AddEventListener adds a listener function for a given event type.
+func (d *eventDispatcher) AddEventListener(typ string, listener EventListener) {
+ d.Lock()
+ defer d.Unlock()
+ d.listeners[typ] = append(d.listeners[typ], listener)
+}
+
+// DispatchEvent dispatches an event.
+func (d *eventDispatcher) DispatchEvent(e Event) {
+ d.RLock()
+ defer d.RUnlock()
+
+ // Automatically set the event source.
+ if e, ok := e.(*event); ok {
+ e.source = d.source
+ }
+
+ // Dispatch the event to all listeners.
+ for _, l := range d.listeners[e.Type()] {
+ l(e)
+ }
+}
45 event_dispatcher_test.go
View
@@ -0,0 +1,45 @@
+package raft
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+// Ensure that we can listen and dispatch events.
+func TestDispatchEvent(t *testing.T) {
+ var count int
+ dispatcher := newEventDispatcher(nil)
+ dispatcher.AddEventListener("foo", func(e Event) {
+ count += 1
+ })
+ dispatcher.AddEventListener("foo", func(e Event) {
+ count += 10
+ })
+ dispatcher.AddEventListener("bar", func(e Event) {
+ count += 100
+ })
+ dispatcher.DispatchEvent(&event{typ: "foo", value: nil, prevValue: nil})
+ assert.Equal(t, 11, count)
+}
+
+// Ensure that event is properly passed to listener.
+func TestEventListener(t *testing.T) {
+ dispatcher := newEventDispatcher("X")
+ dispatcher.AddEventListener("foo", func(e Event) {
+ assert.Equal(t, "foo", e.Type())
+ assert.Equal(t, "X", e.Source())
+ assert.Equal(t, 10, e.Value())
+ assert.Equal(t, 20, e.PrevValue())
+ })
+ dispatcher.DispatchEvent(&event{typ: "foo", value: 10, prevValue: 20})
+}
+
+// Benchmark the performance of event dispatch.
+func BenchmarkEventDispatch(b *testing.B) {
+ dispatcher := newEventDispatcher(nil)
+ dispatcher.AddEventListener("xxx", func(e Event) {})
+ for i := 0; i < b.N; i++ {
+ dispatcher.DispatchEvent(&event{typ: "foo", value: 10, prevValue: 20})
+ }
+}
68 server.go
View
@@ -94,9 +94,12 @@ type Server interface {
Do(command Command) (interface{}, error)
TakeSnapshot() error
LoadSnapshot() error
+ AddEventListener(string, EventListener)
}
type server struct {
+ *eventDispatcher
+
name string
path string
state string
@@ -111,7 +114,7 @@ type server struct {
mutex sync.RWMutex
syncedPeer map[string]bool
- c chan *event
+ c chan *ev
electionTimeout time.Duration
heartbeatTimeout time.Duration
@@ -123,8 +126,8 @@ type server struct {
connectionString string
}
-// An event to be processed by the server's event loop.
-type event struct {
+// An internal event to be processed by the server's event loop.
+type ev struct {
target interface{}
returnValue interface{}
c chan error
@@ -158,12 +161,13 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
state: Stopped,
peers: make(map[string]*Peer),
log: newLog(),
- c: make(chan *event, 256),
+ c: make(chan *ev, 256),
electionTimeout: DefaultElectionTimeout,
heartbeatTimeout: DefaultHeartbeatTimeout,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
connectionString: connectionString,
}
+ s.eventDispatcher = newEventDispatcher(s)
// Setup apply function.
s.log.ApplyFunc = func(c Command) (interface{}, error) {
@@ -250,10 +254,24 @@ func (s *server) State() string {
func (s *server) setState(state string) {
s.mutex.Lock()
defer s.mutex.Unlock()
+
+ // Temporarily store previous values.
+ prevState := s.state
+ prevLeader := s.leader
+
+ // Update state and leader.
s.state = state
if state == Leader {
s.leader = s.Name()
}
+
+ // Dispatch state and leader change events.
+ if prevState != state {
+ s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
+ }
+ if prevLeader != s.leader {
+ s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
+ }
}
// Retrieves the current term of the server.
@@ -451,22 +469,34 @@ func (s *server) setCurrentTerm(term uint64, leaderName string, append bool) {
s.mutex.Lock()
defer s.mutex.Unlock()
- // update the term and clear vote for
+ // Store previous values temporarily.
+ prevState := s.state
+ prevTerm := s.currentTerm
+ prevLeader := s.leader
+
if term > s.currentTerm {
+ // update the term and clear vote for
s.state = Follower
s.currentTerm = term
s.leader = leaderName
s.votedFor = ""
- return
- }
-
- // discover new leader when candidate
- // save leader name when follower
- if term == s.currentTerm && s.state != Leader && append {
+ } else if term == s.currentTerm && s.state != Leader && append {
+ // discover new leader when candidate
+ // save leader name when follower
s.state = Follower
s.leader = leaderName
}
+ // Dispatch change events.
+ if prevState != s.state {
+ s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
+ }
+ if prevLeader != s.leader {
+ s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
+ }
+ if prevTerm != s.currentTerm {
+ s.DispatchEvent(newEvent(TermChangeEventType, s.currentTerm, prevTerm))
+ }
}
//--------------------------------------
@@ -520,8 +550,8 @@ func (s *server) send(value interface{}) (interface{}, error) {
return event.returnValue, err
}
-func (s *server) sendAsync(value interface{}) *event {
- event := &event{target: value, c: make(chan error, 1)}
+func (s *server) sendAsync(value interface{}) *ev {
+ event := &ev{target: value, c: make(chan error, 1)}
s.c <- event
return event
}
@@ -596,7 +626,13 @@ func (s *server) followerLoop() {
// The event loop that is run when the server is in a Candidate state.
func (s *server) candidateLoop() {
lastLogIndex, lastLogTerm := s.log.lastInfo()
+
+ // Clear leader value.
+ prevLeader := s.leader
s.leader = ""
+ if prevLeader != s.leader {
+ s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
+ }
for {
// Increment current term, vote for self.
@@ -773,7 +809,7 @@ func (s *server) Do(command Command) (interface{}, error) {
}
// Processes a command.
-func (s *server) processCommand(command Command, e *event) {
+func (s *server) processCommand(command Command, e *ev) {
s.debugln("server.command.process")
// Create an entry for the command in the log.
@@ -998,6 +1034,8 @@ func (s *server) AddPeer(name string, connectiongString string) error {
}
s.peers[peer.Name] = peer
+
+ s.DispatchEvent(newEvent(AddPeerEventType, name, nil))
}
// Write the configuration to file.
@@ -1024,6 +1062,8 @@ func (s *server) RemovePeer(name string) error {
}
delete(s.peers, name)
+
+ s.DispatchEvent(newEvent(RemovePeerEventType, name, nil))
}
// Write the configuration to file.
13 test.go
View
@@ -12,6 +12,10 @@ const (
testElectionTimeout = 200 * time.Millisecond
)
+const (
+ testListenerLoggerEnabled = false
+)
+
func init() {
RegisterCommand(&testCommand1{})
RegisterCommand(&testCommand2{})
@@ -66,6 +70,15 @@ func newTestServer(name string, transporter Transporter) Server {
panic(err.Error())
}
server, _ := NewServer(name, p, transporter, nil, nil, "")
+ if testListenerLoggerEnabled {
+ fn := func(e Event) {
+ server := e.Source().(Server)
+ warnf("[%s] %s %v -> %v\n", server.Name(), e.Type(), e.PrevValue(), e.Value())
+ }
+ server.AddEventListener(StateChangeEventType, fn)
+ server.AddEventListener(LeaderChangeEventType, fn)
+ server.AddEventListener(TermChangeEventType, fn)
+ }
return server
}
Please sign in to comment.
Something went wrong with that request. Please try again.