Skip to content

Commit

Permalink
Merge pull request #172 from aalda/remove-term-checks
Browse files Browse the repository at this point in the history
Remove term checks in fsm
  • Loading branch information
gdiazlo committed Oct 1, 2019
2 parents 4f50b5c + dce22d2 commit 60ace2e
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 13 deletions.
10 changes: 3 additions & 7 deletions consensus/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (m *VersionMetadata) decode(value []byte) error {
}

type fsmState struct {
Index, Term, BalloonVersion uint64
Index, BalloonVersion uint64
}

func (s *fsmState) encode() ([]byte, error) {
Expand All @@ -61,11 +61,7 @@ func (s *fsmState) decode(value []byte) error {

func (s *fsmState) shouldApply(f *fsmState) bool {

if s.Term > f.Term {
return false
}

if s.Term == f.Term && s.Index >= f.Index && s.Index != 0 {
if s.Index >= f.Index && s.Index != 0 {
return false
}

Expand Down Expand Up @@ -190,7 +186,7 @@ func (n *RaftNode) Apply(l *raft.Log) interface{} {
if err := cmd.decode(&eventDigests); err != nil {
panic(fmt.Sprintf("Unable to decode command: %v", err))
}
newState := &fsmState{l.Index, l.Term, n.balloon.Version() + uint64(len(eventDigests)) - 1}
newState := &fsmState{l.Index, n.balloon.Version() + uint64(len(eventDigests)) - 1}
if n.state.shouldApply(newState) {
return n.applyAdd(eventDigests, newState)
}
Expand Down
12 changes: 6 additions & 6 deletions consensus/snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestRestoreFromLeaderWAL(t *testing.T) {
//wait for WAL replication
require.Truef(t,
retryTrue(20, 200*time.Millisecond, func() bool {
return node1.state.Term == node3.state.Term && node1.state.Index == node3.state.Index
return node1.state.Index == node3.state.Index
}), "WAL not in sync")

// take a snapshot to inspect its values
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestRestoreFromLeaderSnapshot(t *testing.T) {
//wait for WAL replication
require.Truef(t,
retryTrue(20, 200*time.Millisecond, func() bool {
return node1.state.Term == node3.state.Term && node1.state.Index == node3.state.Index
return node1.state.Index == node3.state.Index
}), "WAL not in sync")

// take a snapshot to inspect its values
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestRestoreNewNodeFromLeader(t *testing.T) {
// wait for WAL replication
require.Truef(t,
retryTrue(20, 200*time.Millisecond, func() bool {
return node1.state.Term == node3.state.Term && node1.state.Index == node3.state.Index
return node1.state.Index == node3.state.Index
}), "WAL not in sync")

// take a snapshot to inspect its values
Expand Down Expand Up @@ -439,7 +439,7 @@ func TestRestoreNewNodeFromChangedLeader(t *testing.T) {
// wait for WAL replication
require.Truef(t,
retryTrue(20, 200*time.Millisecond, func() bool {
return node1.state.Term == node3.state.Term && node1.state.Index == node3.state.Index
return node1.state.Index == node3.state.Index
}), "WAL not in sync")

// take a snapshot to inspect its values
Expand Down Expand Up @@ -488,7 +488,7 @@ func TestRestoreOldNodeFromChangedLeader(t *testing.T) {
// wait for WAL replication
require.Truef(t,
retryTrue(20, 200*time.Millisecond, func() bool {
return node1.state.Term == node3.state.Term && node1.state.Index == node3.state.Index
return node1.state.Index == node3.state.Index
}), "WAL not in sync")

// stop the follower
Expand Down Expand Up @@ -530,7 +530,7 @@ func TestRestoreOldNodeFromChangedLeader(t *testing.T) {
// wait for WAL replication
require.Truef(t,
retryTrue(20, 200*time.Millisecond, func() bool {
return node1.state.Term == node3.state.Term && node1.state.Index == node3.state.Index
return node1.state.Index == node3.state.Index
}), "WAL not in sync")

// take a snapshot to inspect its values
Expand Down

0 comments on commit 60ace2e

Please sign in to comment.