Skip to content
This repository has been archived by the owner on Feb 11, 2021. It is now read-only.

Commit

Permalink
Merge branch 'master' of https://github.com/goraft/raft
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Feb 11, 2014
2 parents ef3280c + bfee414 commit cff0a00
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 17 deletions.
1 change: 1 addition & 0 deletions event.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const (
StateChangeEventType = "stateChange" StateChangeEventType = "stateChange"
LeaderChangeEventType = "leaderChange" LeaderChangeEventType = "leaderChange"
TermChangeEventType = "termChange" TermChangeEventType = "termChange"
CommitEventType = "commit"
AddPeerEventType = "addPeer" AddPeerEventType = "addPeer"
RemovePeerEventType = "removePeer" RemovePeerEventType = "removePeer"


Expand Down
18 changes: 18 additions & 0 deletions event_dispatcher.go
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft package raft


import ( import (
"reflect"
"sync" "sync"
) )


Expand Down Expand Up @@ -33,6 +34,23 @@ func (d *eventDispatcher) AddEventListener(typ string, listener EventListener) {
d.listeners[typ] = append(d.listeners[typ], listener) d.listeners[typ] = append(d.listeners[typ], listener)
} }


// RemoveEventListener removes a listener function for a given event type.
func (d *eventDispatcher) RemoveEventListener(typ string, listener EventListener) {
d.Lock()
defer d.Unlock()

// Grab a reference to the function pointer once.
ptr := reflect.ValueOf(listener).Pointer()

// Find listener by pointer and remove it.
listeners := d.listeners[typ]
for i, l := range listeners {
if reflect.ValueOf(l).Pointer() == ptr {
d.listeners[typ] = append(listeners[:i], listeners[i+1:]...)
}
}
}

// DispatchEvent dispatches an event. // DispatchEvent dispatches an event.
func (d *eventDispatcher) DispatchEvent(e Event) { func (d *eventDispatcher) DispatchEvent(e Event) {
d.RLock() d.RLock()
Expand Down
19 changes: 19 additions & 0 deletions event_dispatcher_test.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ func TestDispatchEvent(t *testing.T) {
assert.Equal(t, 11, count) assert.Equal(t, 11, count)
} }


// Ensure that we can add and remove a listener.
func TestRemoveEventListener(t *testing.T) {
var count int
f0 := func(e Event) {
count += 1
}
f1 := func(e Event) {
count += 10
}

dispatcher := newEventDispatcher(nil)
dispatcher.AddEventListener("foo", f0)
dispatcher.AddEventListener("foo", f1)
dispatcher.DispatchEvent(&event{typ: "foo"})
dispatcher.RemoveEventListener("foo", f0)
dispatcher.DispatchEvent(&event{typ: "foo"})
assert.Equal(t, 21, count)
}

// Ensure that event is properly passed to listener. // Ensure that event is properly passed to listener.
func TestEventListener(t *testing.T) { func TestEventListener(t *testing.T) {
dispatcher := newEventDispatcher("X") dispatcher := newEventDispatcher("X")
Expand Down
14 changes: 7 additions & 7 deletions log.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (


// A log is a collection of log entries that are persisted to durable storage. // A log is a collection of log entries that are persisted to durable storage.
type Log struct { type Log struct {
ApplyFunc func(Command) (interface{}, error) ApplyFunc func(*LogEntry, Command) (interface{}, error)
file *os.File file *os.File
path string path string
entries []*LogEntry entries []*LogEntry
Expand Down Expand Up @@ -160,7 +160,7 @@ func (l *Log) open(path string) error {
entry, _ := newLogEntry(l, nil, 0, 0, nil) entry, _ := newLogEntry(l, nil, 0, 0, nil)
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR) entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)


n, err := entry.decode(l.file) n, err := entry.Decode(l.file)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
debugln("open.log.append: finish ") debugln("open.log.append: finish ")
Expand All @@ -179,7 +179,7 @@ func (l *Log) open(path string) error {
if err != nil { if err != nil {
continue continue
} }
l.ApplyFunc(command) l.ApplyFunc(entry, command)
} }
debugln("open.log.append log index ", entry.Index()) debugln("open.log.append log index ", entry.Index())
} }
Expand Down Expand Up @@ -368,7 +368,7 @@ func (l *Log) setCommitIndex(index uint64) error {
} }


// Apply the changes to the state machine and store the error code. // Apply the changes to the state machine and store the error code.
returnValue, err := l.ApplyFunc(command) returnValue, err := l.ApplyFunc(entry, command)


debugf("setCommitIndex.set.result index: %v, entries index: %v", i, entryIndex) debugf("setCommitIndex.set.result index: %v, entries index: %v", i, entryIndex)
if entry.event != nil { if entry.event != nil {
Expand Down Expand Up @@ -517,7 +517,7 @@ func (l *Log) appendEntry(entry *LogEntry) error {
entry.Position = position entry.Position = position


// Write to storage. // Write to storage.
if _, err := entry.encode(l.file); err != nil { if _, err := entry.Encode(l.file); err != nil {
return err return err
} }


Expand All @@ -544,7 +544,7 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
} }


// Write to storage. // Write to storage.
size, err := entry.encode(w) size, err := entry.Encode(w)
if err != nil { if err != nil {
return -1, err return -1, err
} }
Expand Down Expand Up @@ -589,7 +589,7 @@ func (l *Log) compact(index uint64, term uint64) error {
position, _ := l.file.Seek(0, os.SEEK_CUR) position, _ := l.file.Seek(0, os.SEEK_CUR)
entry.Position = position entry.Position = position


if _, err = entry.encode(file); err != nil { if _, err = entry.Encode(file); err != nil {
file.Close() file.Close()
os.Remove(new_file_path) os.Remove(new_file_path)
return err return err
Expand Down
4 changes: 2 additions & 2 deletions log_entry.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (e *LogEntry) Command() []byte {


// Encodes the log entry to a buffer. Returns the number of bytes // Encodes the log entry to a buffer. Returns the number of bytes
// written and any error that may have occurred. // written and any error that may have occurred.
func (e *LogEntry) encode(w io.Writer) (int, error) { func (e *LogEntry) Encode(w io.Writer) (int, error) {
b, err := proto.Marshal(e.pb) b, err := proto.Marshal(e.pb)
if err != nil { if err != nil {
return -1, err return -1, err
Expand All @@ -82,7 +82,7 @@ func (e *LogEntry) encode(w io.Writer) (int, error) {


// Decodes the log entry from a buffer. Returns the number of bytes read and // Decodes the log entry from a buffer. Returns the number of bytes read and
// any error that occurs. // any error that occurs.
func (e *LogEntry) decode(r io.Reader) (int, error) { func (e *LogEntry) Decode(r io.Reader) (int, error) {


var length int var length int
_, err := fmt.Fscanf(r, "%8x\n", &length) _, err := fmt.Fscanf(r, "%8x\n", &length)
Expand Down
8 changes: 4 additions & 4 deletions log_test.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func TestLogNewLog(t *testing.T) { func TestLogNewLog(t *testing.T) {
path := getLogPath() path := getLogPath()
log := newLog() log := newLog()
log.ApplyFunc = func(c Command) (interface{}, error) { log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
return nil, nil return nil, nil
} }
if err := log.open(path); err != nil { if err := log.open(path); err != nil {
Expand Down Expand Up @@ -119,13 +119,13 @@ func TestLogRecovery(t *testing.T) {
e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100}) e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-") f, _ := ioutil.TempFile("", "raft-log-")


e0.encode(f) e0.Encode(f)
e1.encode(f) e1.Encode(f)
f.WriteString("CORRUPT!") f.WriteString("CORRUPT!")
f.Close() f.Close()


log := newLog() log := newLog()
log.ApplyFunc = func(c Command) (interface{}, error) { log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
return nil, nil return nil, nil
} }
if err := log.open(f.Name()); err != nil { if err := log.open(f.Name()); err != nil {
Expand Down
6 changes: 5 additions & 1 deletion server.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -180,7 +180,11 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
s.eventDispatcher = newEventDispatcher(s) s.eventDispatcher = newEventDispatcher(s)


// Setup apply function. // Setup apply function.
s.log.ApplyFunc = func(c Command) (interface{}, error) { s.log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
// Dispatch commit event.
s.DispatchEvent(newEvent(CommitEventType, e, nil))

// Apply command to the state machine.
switch c := c.(type) { switch c := c.(type) {
case CommandApply: case CommandApply:
return c.Apply(&context{ return c.Apply(&context{
Expand Down
6 changes: 3 additions & 3 deletions test.go
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func setupLog(entries []*LogEntry) (*Log, string) {
f, _ := ioutil.TempFile("", "raft-log-") f, _ := ioutil.TempFile("", "raft-log-")


for _, entry := range entries { for _, entry := range entries {
entry.encode(f) entry.Encode(f)
} }
err := f.Close() err := f.Close()


Expand All @@ -51,7 +51,7 @@ func setupLog(entries []*LogEntry) (*Log, string) {
} }


log := newLog() log := newLog()
log.ApplyFunc = func(c Command) (interface{}, error) { log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
return nil, nil return nil, nil
} }
if err := log.open(f.Name()); err != nil { if err := log.open(f.Name()); err != nil {
Expand Down Expand Up @@ -95,7 +95,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
} }


for _, entry := range entries { for _, entry := range entries {
entry.encode(f) entry.Encode(f)
} }
f.Close() f.Close()
return server return server
Expand Down

0 comments on commit cff0a00

Please sign in to comment.