Skip to content
Browse files

remove command timeout

  • Loading branch information...
1 parent 7b9d282 commit 42e4b8d3f5acadcf5c44654c07cce748a2ea41cc @xiang90 xiang90 committed
Showing with 59 additions and 110 deletions.
  1. +1 −1 append_entries_request_test.go
  2. +24 −43 log.go
  3. +3 −3 log_entry.go
  4. +15 −15 log_test.go
  5. +1 −33 server.go
  6. +14 −14 server_test.go
  7. +1 −1 test.go
View
2 append_entries_request_test.go
@@ -28,7 +28,7 @@ func createTestAppendEntriesRequest(entryCount int) (*AppendEntriesRequest, []by
entries := make([]*LogEntry, 0)
for i := 0; i < entryCount; i++ {
command := &DefaultJoinCommand{Name: "localhost:1000"}
- entry, _ := newLogEntry(nil, 1, 2, command)
+ entry, _ := newLogEntry(nil, nil, 1, 2, command)
entries = append(entries, entry)
}
req := newAppendEntriesRequest(1, 1, 1, 1, "leader", entries)
View
67 log.go
@@ -23,7 +23,6 @@ type Log struct {
file *os.File
path string
entries []*LogEntry
- results []*logResult
commitIndex uint64
mutex sync.RWMutex
startIndex uint64 // the index before the first entry in the Log entries
@@ -162,7 +161,7 @@ func (l *Log) open(path string) error {
// Read the file and decode entries.
for {
// Instantiate log entry and decode into it.
- entry, _ := newLogEntry(l, 0, 0, nil)
+ entry, _ := newLogEntry(l, nil, 0, 0, nil)
entry.Position, _ = l.file.Seek(0, os.SEEK_CUR)
n, err := entry.decode(l.file)
@@ -191,8 +190,6 @@ func (l *Log) open(path string) error {
readBytes += int64(n)
}
- l.results = make([]*logResult, len(l.entries))
-
debugln("open.log.recovery number of log ", len(l.entries))
return nil
}
@@ -207,7 +204,6 @@ func (l *Log) close() {
l.file = nil
}
l.entries = make([]*LogEntry, 0)
- l.results = make([]*logResult, 0)
}
//--------------------------------------
@@ -215,8 +211,8 @@ func (l *Log) close() {
//--------------------------------------
// Creates a log entry associated with this log.
-func (l *Log) createEntry(term uint64, command Command) (*LogEntry, error) {
- return newLogEntry(l, l.nextIndex(), term, command)
+func (l *Log) createEntry(term uint64, command Command, e *ev) (*LogEntry, error) {
+ return newLogEntry(l, e, l.nextIndex(), term, command)
}
// Retrieves an entry from the log. If the entry has been eliminated because
@@ -276,35 +272,6 @@ func (l *Log) getEntriesAfter(index uint64, maxLogEntriesPerRequest uint64) ([]*
}
}
-// Retrieves the return value and error for an entry. The result can only exist
-// after the entry has been committed.
-func (l *Log) getEntryResult(entry *LogEntry, clear bool) (interface{}, error) {
- l.mutex.RLock()
- defer l.mutex.RUnlock()
-
- if entry == nil {
- panic("raft: Log entry required for error retrieval")
- }
- debugln("getEntryResult.result index: ", entry.Index-l.startIndex-1)
- // If a result exists for the entry then return it with its error.
- if entry.Index > l.startIndex && entry.Index <= l.startIndex+uint64(len(l.results)) {
- if result := l.results[entry.Index-l.startIndex-1]; result != nil {
-
- // keep the records before remove it
- returnValue, err := result.returnValue, result.err
-
- // Remove reference to result if it's being cleared after retrieval.
- if clear {
- result.returnValue = nil
- }
-
- return returnValue, err
- }
- }
-
- return nil, nil
-}
-
//--------------------------------------
// Commit
//--------------------------------------
@@ -402,7 +369,10 @@ func (l *Log) setCommitIndex(index uint64) error {
// Apply the changes to the state machine and store the error code.
returnValue, err := l.ApplyFunc(command)
debugln("setCommitIndex.set.result index: ", entryIndex)
- l.results[entryIndex] = &logResult{returnValue: returnValue, err: err}
+ if entry.event != nil {
+ entry.event.returnValue = returnValue
+ entry.event.c <- err
+ }
}
return nil
}
@@ -443,6 +413,14 @@ func (l *Log) truncate(index uint64, term uint64) error {
debugln("log.truncate.clear")
l.file.Truncate(0)
l.file.Seek(0, os.SEEK_SET)
+
+ // notify clients if this node is the previous leader
+ for _, entry := range l.entries {
+ if entry.event != nil {
+ entry.event.c <- errors.New("command failed to be committed due to node failure")
+ }
+ }
+
l.entries = []*LogEntry{}
} else {
// Do not truncate if the entry at index does not have the matching term.
@@ -458,6 +436,15 @@ func (l *Log) truncate(index uint64, term uint64) error {
position := l.entries[index-l.startIndex].Position
l.file.Truncate(position)
l.file.Seek(position, os.SEEK_SET)
+
+ // notify clients if this node is the previous leader
+ for i := index - l.startIndex; i < uint64(len(l.entries)); i++ {
+ entry := l.entries[i]
+ if entry.event != nil {
+ entry.event.c <- errors.New("command failed to be committed due to node failure")
+ }
+ }
+
l.entries = l.entries[0 : index-l.startIndex]
}
}
@@ -529,7 +516,6 @@ func (l *Log) appendEntry(entry *LogEntry) error {
// Append to entries list if stored on disk.
l.entries = append(l.entries, entry)
- l.results = append(l.results, nil)
return nil
}
@@ -558,7 +544,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
// Append to entries list if stored on disk.
l.entries = append(l.entries, entry)
- l.results = append(l.results, nil)
return int64(size), nil
}
@@ -570,7 +555,6 @@ func (l *Log) writeEntry(entry *LogEntry, w io.Writer) (int64, error) {
// compact the log before index (including index)
func (l *Log) compact(index uint64, term uint64) error {
var entries []*LogEntry
- var results []*logResult
l.mutex.Lock()
defer l.mutex.Unlock()
@@ -583,11 +567,9 @@ func (l *Log) compact(index uint64, term uint64) error {
// we just recovery from on snapshot
if index >= l.internalCurrentIndex() {
entries = make([]*LogEntry, 0)
- results = make([]*logResult, 0)
} else {
// get all log entries after index
entries = l.entries[index-l.startIndex:]
- results = l.results[index-l.startIndex:]
}
// create a new log file and add all the entries
@@ -621,7 +603,6 @@ func (l *Log) compact(index uint64, term uint64) error {
// compaction the in memory log
l.entries = entries
- l.results = results
l.startIndex = index
l.startTerm = term
return nil
View
6 log_entry.go
@@ -17,11 +17,11 @@ type LogEntry struct {
CommandName string
Command []byte
Position int64 // position in the log file
- commit chan bool
+ event *ev
}
// Creates a new log entry associated with a log.
-func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntry, error) {
+func newLogEntry(log *Log, event *ev, index uint64, term uint64, command Command) (*LogEntry, error) {
var buf bytes.Buffer
var commandName string
if command != nil {
@@ -41,7 +41,7 @@ func newLogEntry(log *Log, index uint64, term uint64, command Command) (*LogEntr
Term: term,
CommandName: commandName,
Command: buf.Bytes(),
- commit: make(chan bool, 5),
+ event: event,
}
return e, nil
View
30 log_test.go
@@ -30,15 +30,15 @@ func TestLogNewLog(t *testing.T) {
defer log.close()
defer os.Remove(path)
- e, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
+ e, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
if err := log.appendEntry(e); err != nil {
t.Fatalf("Unable to append: %v", err)
}
- e, _ = newLogEntry(log, 2, 1, &testCommand2{X: 100})
+ e, _ = newLogEntry(log, nil, 2, 1, &testCommand2{X: 100})
if err := log.appendEntry(e); err != nil {
t.Fatalf("Unable to append: %v", err)
}
- e, _ = newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0})
+ e, _ = newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
if err := log.appendEntry(e); err != nil {
t.Fatalf("Unable to append: %v", err)
}
@@ -63,9 +63,9 @@ func TestLogNewLog(t *testing.T) {
// Ensure that we can decode and encode to an existing log.
func TestLogExistingLog(t *testing.T) {
tmpLog := newLog()
- e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
- e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
- e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
+ e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
+ e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
+ e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close()
defer os.Remove(path)
@@ -88,9 +88,9 @@ func TestLogExistingLog(t *testing.T) {
// Ensure that we can check the contents of the log by index/term.
func TestLogContainsEntries(t *testing.T) {
tmpLog := newLog()
- e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
- e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
- e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
+ e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
+ e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
+ e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
log, path := setupLog([]*LogEntry{e0, e1, e2})
defer log.close()
defer os.Remove(path)
@@ -115,8 +115,8 @@ func TestLogContainsEntries(t *testing.T) {
// Ensure that we can recover from an incomplete/corrupt log and continue logging.
func TestLogRecovery(t *testing.T) {
tmpLog := newLog()
- e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
- e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
+ e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
+ e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
f, _ := ioutil.TempFile("", "raft-log-")
e0.encode(f)
@@ -134,7 +134,7 @@ func TestLogRecovery(t *testing.T) {
defer log.close()
defer os.Remove(f.Name())
- e, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bat", I: -5})
+ e, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bat", I: -5})
if err := log.appendEntry(e); err != nil {
t.Fatalf("Unable to append: %v", err)
}
@@ -167,15 +167,15 @@ func TestLogTruncate(t *testing.T) {
defer os.Remove(path)
- entry1, _ := newLogEntry(log, 1, 1, &testCommand1{Val: "foo", I: 20})
+ entry1, _ := newLogEntry(log, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
if err := log.appendEntry(entry1); err != nil {
t.Fatalf("Unable to append: %v", err)
}
- entry2, _ := newLogEntry(log, 2, 1, &testCommand2{X: 100})
+ entry2, _ := newLogEntry(log, nil, 2, 1, &testCommand2{X: 100})
if err := log.appendEntry(entry2); err != nil {
t.Fatalf("Unable to append: %v", err)
}
- entry3, _ := newLogEntry(log, 3, 2, &testCommand1{Val: "bar", I: 0})
+ entry3, _ := newLogEntry(log, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
if err := log.appendEntry(entry3); err != nil {
t.Fatalf("Unable to append: %v", err)
}
View
34 server.go
@@ -837,7 +837,7 @@ func (s *server) processCommand(command Command, e *ev) {
s.debugln("server.command.process")
// Create an entry for the command in the log.
- entry, err := s.log.createEntry(s.currentTerm, command)
+ entry, err := s.log.createEntry(s.currentTerm, command, e)
if err != nil {
s.debugln("server.command.log.entry.error:", err)
@@ -851,23 +851,6 @@ func (s *server) processCommand(command Command, e *ev) {
return
}
- // Issue a callback for the entry once it's committed.
- go func() {
- // Wait for the entry to be committed.
- select {
- case <-entry.commit:
- var err error
- s.debugln("server.command.commit")
- e.returnValue, err = s.log.getEntryResult(entry, true)
- e.c <- err
- case <-time.After(time.Second):
- s.debugln("server.command.timeout")
- e.c <- CommandTimeoutError
- }
-
- entry.commit = nil
- }()
-
// Issue an append entries response for the server.
resp := newAppendEntriesResponse(s.currentTerm, true, s.log.currentIndex(), s.log.CommitIndex())
resp.append = true
@@ -972,21 +955,6 @@ func (s *server) processAppendEntriesResponse(resp *AppendEntriesResponse) {
if commitIndex > committedIndex {
s.log.setCommitIndex(commitIndex)
s.debugln("commit index ", commitIndex)
- for i := committedIndex; i < commitIndex; i++ {
- if entry := s.log.getEntry(i + 1); entry != nil {
- // if the leader is a new one and the entry came from the
- // old leader, the commit channel will be nil and no go routine
- // is waiting from this channel
- // if we try to send to it, the new leader will get stuck
- if entry.commit != nil {
- select {
- case entry.commit <- true:
- default:
- panic("server unable to send signal to commit channel")
- }
- }
- }
- }
}
}
View
28 server_test.go
@@ -111,9 +111,9 @@ func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
// Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
tmpLog := newLog()
- e0, _ := newLogEntry(tmpLog, 1, 1, &testCommand1{Val: "foo", I: 20})
- e1, _ := newLogEntry(tmpLog, 2, 1, &testCommand2{X: 100})
- e2, _ := newLogEntry(tmpLog, 3, 2, &testCommand1{Val: "bar", I: 0})
+ e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
+ e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
+ e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
// start as a follower with term 2 and index 3
@@ -151,7 +151,7 @@ func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
- e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
+ e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
// start as a follower
@@ -204,7 +204,7 @@ func TestServerAppendEntries(t *testing.T) {
defer s.Stop()
// Append single entry.
- e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
+ e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entries := []*LogEntry{e}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
if resp.Term != 1 || !resp.Success {
@@ -215,8 +215,8 @@ func TestServerAppendEntries(t *testing.T) {
}
// Append multiple entries + commit the last one.
- e1, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
- e2, _ := newLogEntry(nil, 3, 1, &testCommand1{Val: "baz", I: 30})
+ e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
+ e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30})
entries = []*LogEntry{e1, e2}
resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
if resp.Term != 1 || !resp.Success {
@@ -248,7 +248,7 @@ func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
s.(*server).mutex.Unlock()
// Append single entry.
- e, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
+ e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entries := []*LogEntry{e}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
if resp.Term != 2 || resp.Success {
@@ -266,8 +266,8 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
defer s.Stop()
// Append single entry + commit.
- e1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
- e2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
+ e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
+ e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
entries := []*LogEntry{e1, e2}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
if resp.Term != 1 || !resp.Success {
@@ -275,7 +275,7 @@ func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
}
// Append entry again (post-commit).
- e, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "bar", I: 20})
+ e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
entries = []*LogEntry{e}
resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
if resp.Term != 1 || resp.Success {
@@ -289,9 +289,9 @@ func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
s.Start()
defer s.Stop()
- entry1, _ := newLogEntry(nil, 1, 1, &testCommand1{Val: "foo", I: 10})
- entry2, _ := newLogEntry(nil, 2, 1, &testCommand1{Val: "foo", I: 15})
- entry3, _ := newLogEntry(nil, 2, 2, &testCommand1{Val: "bar", I: 20})
+ entry1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
+ entry2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
+ entry3, _ := newLogEntry(nil, nil, 2, 2, &testCommand1{Val: "bar", I: 20})
// Append single entry + commit.
entries := []*LogEntry{entry1, entry2}
View
2 test.go
@@ -103,7 +103,7 @@ func newTestServerWithLog(name string, transporter Transporter, entries []*LogEn
func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
servers := []Server{}
- e0, _ := newLogEntry(newLog(), 1, 1, &testCommand1{Val: "foo", I: 20})
+ e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
for _, name := range names {
if lookup[name] != nil {

0 comments on commit 42e4b8d

Please sign in to comment.
Something went wrong with that request. Please try again.