Permalink
Switch branches/tags
Nothing to show
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
723 lines (603 sloc) 20.9 KB
package raft
import (
"bytes"
"encoding/json"
"fmt"
"strconv"
"sync"
"testing"
"time"
)
//--------------------------------------
// Request Vote
//--------------------------------------
// Ensure that we can request a vote from a server that has not voted.
func TestServerRequestVote(t *testing.T) {
server := newTestServer("1", &testTransporter{})
server.Start()
if _, err := server.Do(&DefaultJoinCommand{Name: server.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", server.Name(), err)
}
defer server.Stop()
resp := server.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
if resp.Term != 1 || !resp.VoteGranted {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
}
}
// // Ensure that a vote request is denied if it comes from an old term.
func TestServerRequestVoteDeniedForStaleTerm(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", s.Name(), err)
}
s.(*server).mutex.Lock()
s.(*server).currentTerm = 2
s.(*server).mutex.Unlock()
defer s.Stop()
resp := s.RequestVote(newRequestVoteRequest(1, "foo", 1, 0))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Invalid request vote response: %v/%v", resp.Term, resp.VoteGranted)
}
if s.Term() != 2 && s.State() != Follower {
t.Fatalf("Server did not update term and demote: %v / %v", s.Term(), s.State())
}
}
// Ensure that a vote request is denied if we've already voted for a different candidate.
func TestServerRequestVoteDeniedIfAlreadyVoted(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", s.Name(), err)
}
s.(*server).mutex.Lock()
s.(*server).currentTerm = 2
s.(*server).mutex.Unlock()
defer s.Stop()
resp := s.RequestVote(newRequestVoteRequest(2, "foo", 1, 0))
if resp.Term != 2 || !resp.VoteGranted {
t.Fatalf("First vote should not have been denied")
}
resp = s.RequestVote(newRequestVoteRequest(2, "bar", 1, 0))
if resp.Term != 2 || resp.VoteGranted {
t.Fatalf("Second vote should have been denied")
}
}
// Ensure that a vote request is approved if vote occurs in a new term.
func TestServerRequestVoteApprovedIfAlreadyVotedInOlderTerm(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
if _, err := s.Do(&DefaultJoinCommand{Name: s.Name()}); err != nil {
t.Fatalf("Server %s unable to join: %v", s.Name(), err)
}
time.Sleep(time.Millisecond * 100)
s.(*server).mutex.Lock()
s.(*server).currentTerm = 2
s.(*server).mutex.Unlock()
defer s.Stop()
resp := s.RequestVote(newRequestVoteRequest(2, "foo", 2, 1))
if resp.Term != 2 || !resp.VoteGranted || s.VotedFor() != "foo" {
t.Fatalf("First vote should not have been denied")
}
resp = s.RequestVote(newRequestVoteRequest(3, "bar", 2, 1))
if resp.Term != 3 || !resp.VoteGranted || s.VotedFor() != "bar" {
t.Fatalf("Second vote should have been approved")
}
}
// Ensure that a vote request is denied if the log is out of date.
func TestServerRequestVoteDenyIfCandidateLogIsBehind(t *testing.T) {
tmpLog := newLog()
e0, _ := newLogEntry(tmpLog, nil, 1, 1, &testCommand1{Val: "foo", I: 20})
e1, _ := newLogEntry(tmpLog, nil, 2, 1, &testCommand2{X: 100})
e2, _ := newLogEntry(tmpLog, nil, 3, 2, &testCommand1{Val: "bar", I: 0})
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0, e1, e2})
// start as a follower with term 2 and index 3
s.Start()
defer s.Stop()
// request vote from term 3 with last log entry 2, 2
resp := s.RequestVote(newRequestVoteRequest(3, "foo", 2, 2))
if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale index vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
// request vote from term 2 with last log entry 2, 3
resp = s.RequestVote(newRequestVoteRequest(2, "foo", 3, 2))
if resp.Term != 3 || resp.VoteGranted {
t.Fatalf("Stale term vote should have been denied [%v/%v]", resp.Term, resp.VoteGranted)
}
// request vote from term 3 with last log entry 2, 3
resp = s.RequestVote(newRequestVoteRequest(3, "foo", 3, 2))
if resp.Term != 3 || !resp.VoteGranted {
t.Fatalf("Matching log vote should have been granted")
}
// request vote from term 3 with last log entry 2, 4
resp = s.RequestVote(newRequestVoteRequest(3, "foo", 4, 2))
if resp.Term != 3 || !resp.VoteGranted {
t.Fatalf("Ahead-of-log vote should have been granted")
}
}
func TestProcessVoteResponse(t *testing.T) {
// server Term: 0, status: Leader
// response Term : 1, granted
// Expectation: not success
// Server Term 1 status:Leader
server := &server{}
server.eventDispatcher = newEventDispatcher(server)
server.currentTerm = 0
server.state = Leader
response := &RequestVoteResponse{
VoteGranted: true,
Term: 1,
}
if success := server.processVoteResponse(response); success {
t.Fatal("Process should fail if the resp's term is larger than server's")
}
if server.state != Follower {
t.Fatal("Server should stepdown")
}
// server Term: 1, status: Follower
// response Term: 2, granted
// Expectation: not success
response.Term = 2
if success := server.processVoteResponse(response); success {
t.Fatal("Process should fail if the resp's term is larger than server's")
}
if server.state != Follower {
t.Fatal("Server should still be Follower")
}
server.currentTerm = 2
// server Term: 2, status: Follower
// response Term: 2
// Expectation: success
if success := server.processVoteResponse(response); !success {
t.Fatal("Process should success if the server's term is larger than resp's")
}
}
// //--------------------------------------
// // Promotion
// //--------------------------------------
// // Ensure that we can self-promote a server to candidate, obtain votes and become a fearless leader.
func TestServerPromoteSelf(t *testing.T) {
e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
s := newTestServerWithLog("1", &testTransporter{}, []*LogEntry{e0})
// start as a follower
s.Start()
defer s.Stop()
time.Sleep(2 * testElectionTimeout)
if s.State() != Leader {
t.Fatalf("Server self-promotion failed: %v", s.State())
}
}
//Ensure that we can promote a server within a cluster to a leader.
func TestServerPromote(t *testing.T) {
lookup := map[string]Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return lookup[peer.Name].RequestVote(req)
}
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return lookup[peer.Name].AppendEntries(req)
}
servers := newTestCluster([]string{"1", "2", "3"}, transporter, lookup)
servers[0].Start()
servers[1].Start()
servers[2].Start()
time.Sleep(2 * testElectionTimeout)
if servers[0].State() != Leader && servers[1].State() != Leader && servers[2].State() != Leader {
t.Fatalf("No leader elected: (%s, %s, %s)", servers[0].State(), servers[1].State(), servers[2].State())
}
for _, s := range servers {
s.Stop()
}
}
//--------------------------------------
// Append Entries
//--------------------------------------
// Ensure we can append entries to a server.
func TestServerAppendEntries(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.SetHeartbeatInterval(time.Second * 10)
s.Start()
defer s.Stop()
// Append single entry.
e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entries := []*LogEntry{e}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
if resp.Term() != 1 || !resp.Success() {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
// Append multiple entries + commit the last one.
e1, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
e2, _ := newLogEntry(nil, nil, 3, 1, &testCommand1{Val: "baz", I: 30})
entries = []*LogEntry{e1, e2}
resp = s.AppendEntries(newAppendEntriesRequest(1, 1, 1, 1, "ldr", entries))
if resp.Term() != 1 || !resp.Success() {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := s.(*server).log.commitInfo(); index != 1 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
// Send zero entries and commit everything.
resp = s.AppendEntries(newAppendEntriesRequest(2, 3, 1, 3, "ldr", []*LogEntry{}))
if resp.Term() != 2 || !resp.Success() {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
if index, term := s.(*server).log.commitInfo(); index != 3 || term != 1 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}
//Ensure that entries with stale terms are rejected.
func TestServerAppendEntriesWithStaleTermsAreRejected(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
s.(*server).mutex.Lock()
s.(*server).currentTerm = 2
s.(*server).mutex.Unlock()
// Append single entry.
e, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entries := []*LogEntry{e}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 0, "ldr", entries))
if resp.Term() != 2 || resp.Success() {
t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
}
if index, term := s.(*server).log.commitInfo(); index != 0 || term != 0 {
t.Fatalf("Invalid commit info [IDX=%v, TERM=%v]", index, term)
}
}
// Ensure that we reject entries if the commit log is different.
func TestServerAppendEntriesRejectedIfAlreadyCommitted(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
// Append single entry + commit.
e1, _ := newLogEntry(nil, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
e2, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
entries := []*LogEntry{e1, e2}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 2, "ldr", entries))
if resp.Term() != 1 || !resp.Success() {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
// Append entry again (post-commit).
e, _ := newLogEntry(nil, nil, 2, 1, &testCommand1{Val: "bar", I: 20})
entries = []*LogEntry{e}
resp = s.AppendEntries(newAppendEntriesRequest(1, 2, 1, 1, "ldr", entries))
if resp.Term() != 1 || resp.Success() {
t.Fatalf("AppendEntries should have failed: %v/%v", resp.Term, resp.Success)
}
}
// Ensure that we uncommitted entries are rolled back if new entries overwrite them.
func TestServerAppendEntriesOverwritesUncommittedEntries(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
entry1, _ := newLogEntry(s.(*server).log, nil, 1, 1, &testCommand1{Val: "foo", I: 10})
entry2, _ := newLogEntry(s.(*server).log, nil, 2, 1, &testCommand1{Val: "foo", I: 15})
entry3, _ := newLogEntry(s.(*server).log, nil, 2, 2, &testCommand1{Val: "bar", I: 20})
// Append single entry + commit.
entries := []*LogEntry{entry1, entry2}
resp := s.AppendEntries(newAppendEntriesRequest(1, 0, 0, 1, "ldr", entries))
if resp.Term() != 1 || !resp.Success() || s.(*server).log.commitIndex != 1 {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
for i, entry := range s.(*server).log.entries {
if entry.Term() != entries[i].Term() || entry.Index() != entries[i].Index() || !bytes.Equal(entry.Command(), entries[i].Command()) {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
}
// Append entry that overwrites the second (uncommitted) entry.
entries = []*LogEntry{entry3}
resp = s.AppendEntries(newAppendEntriesRequest(2, 1, 1, 2, "ldr", entries))
if resp.Term() != 2 || !resp.Success() || s.(*server).log.commitIndex != 2 {
t.Fatalf("AppendEntries should have succeeded: %v/%v", resp.Term, resp.Success)
}
entries = []*LogEntry{entry1, entry3}
for i, entry := range s.(*server).log.entries {
if entry.Term() != entries[i].Term() || entry.Index() != entries[i].Index() || !bytes.Equal(entry.Command(), entries[i].Command()) {
t.Fatalf("AppendEntries failed: %v/%v", resp.Term, resp.Success)
}
}
}
//--------------------------------------
// Command Execution
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerDenyCommandExecutionWhenFollower(t *testing.T) {
s := newTestServer("1", &testTransporter{})
s.Start()
defer s.Stop()
var err error
if _, err = s.Do(&testCommand1{Val: "foo", I: 10}); err != NotLeaderError {
t.Fatalf("Expected error: %v, got: %v", NotLeaderError, err)
}
}
//--------------------------------------
// Recovery
//--------------------------------------
// Ensure that a follower cannot execute a command.
func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
target := servers[peer.Name]
mutex.RUnlock()
b, _ := json.Marshal(req)
clonedReq := &RequestVoteRequest{}
json.Unmarshal(b, clonedReq)
return target.RequestVote(clonedReq)
}
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
target := servers[peer.Name]
mutex.RUnlock()
b, _ := json.Marshal(req)
clonedReq := &AppendEntriesRequest{}
json.Unmarshal(b, clonedReq)
return target.AppendEntries(clonedReq)
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
var paths = make(map[string]string)
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader Server
for _, name := range names {
s := newTestServer(name, transporter)
mutex.Lock()
servers[name] = s
mutex.Unlock()
paths[name] = s.Path()
if name == "1" {
leader = s
s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
time.Sleep(testHeartbeatInterval)
} else {
s.SetElectionTimeout(testElectionTimeout)
s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
time.Sleep(testHeartbeatInterval)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
// commit some commands
for i := 0; i < 10; i++ {
if _, err := leader.Do(&testCommand2{X: 1}); err != nil {
t.Fatalf("cannot commit command: %s", err.Error())
}
}
time.Sleep(2 * testHeartbeatInterval)
for _, name := range names {
s := servers[name]
if s.CommitIndex() != 16 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 16)
}
s.Stop()
}
for _, name := range names {
// with old path and disable transportation
s := newTestServerWithPath(name, disTransporter, paths[name])
servers[name] = s
s.Start()
// should only commit to the last join command
if s.CommitIndex() != 6 {
t.Fatalf("%s recover phase 1 commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 6)
}
// peer conf should be recovered
if len(s.Peers()) != 4 {
t.Fatalf("%s recover phase 1 peer failed! [%d/%d]", name, len(s.Peers()), 4)
}
}
// let nodes talk to each other
for _, name := range names {
servers[name].SetTransporter(transporter)
}
time.Sleep(2 * testElectionTimeout)
// should commit to the previous index + 1(nop command when new leader elected)
for _, name := range names {
s := servers[name]
if s.CommitIndex() != 17 {
t.Fatalf("%s commitIndex is invalid [%d/%d]", name, s.CommitIndex(), 17)
}
s.Stop()
}
}
//--------------------------------------
// Membership
//--------------------------------------
// Ensure that we can start a single server and append to its log.
func TestServerSingleNode(t *testing.T) {
s := newTestServer("1", &testTransporter{})
if s.State() != Stopped {
t.Fatalf("Unexpected server state: %v", s.State())
}
s.Start()
time.Sleep(testHeartbeatInterval)
// Join the server to itself.
if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
t.Fatalf("Unable to join: %v", err)
}
debugln("finish command")
if s.State() != Leader {
t.Fatalf("Unexpected server state: %v", s.State())
}
s.Stop()
if s.State() != Stopped {
t.Fatalf("Unexpected server state: %v", s.State())
}
}
// Ensure that we can start multiple servers and determine a leader.
func TestServerMultiNode(t *testing.T) {
// Initialize the servers.
var mutex sync.RWMutex
servers := map[string]Server{}
transporter := &testTransporter{}
transporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
mutex.RLock()
target := servers[peer.Name]
mutex.RUnlock()
b, _ := json.Marshal(req)
clonedReq := &RequestVoteRequest{}
json.Unmarshal(b, clonedReq)
c := make(chan *RequestVoteResponse)
go func() {
c <- target.RequestVote(clonedReq)
}()
select {
case resp := <-c:
return resp
case <-time.After(time.Millisecond * 200):
return nil
}
}
transporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
mutex.RLock()
target := servers[peer.Name]
mutex.RUnlock()
b, _ := json.Marshal(req)
clonedReq := &AppendEntriesRequest{}
json.Unmarshal(b, clonedReq)
c := make(chan *AppendEntriesResponse)
go func() {
c <- target.AppendEntries(clonedReq)
}()
select {
case resp := <-c:
return resp
case <-time.After(time.Millisecond * 200):
return nil
}
}
disTransporter := &testTransporter{}
disTransporter.sendVoteRequestFunc = func(s Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return nil
}
disTransporter.sendAppendEntriesRequestFunc = func(s Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return nil
}
var names []string
n := 5
// add n servers
for i := 1; i <= n; i++ {
names = append(names, strconv.Itoa(i))
}
var leader Server
for _, name := range names {
s := newTestServer(name, transporter)
defer s.Stop()
mutex.Lock()
servers[name] = s
mutex.Unlock()
if name == "1" {
leader = s
s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
time.Sleep(testHeartbeatInterval)
} else {
s.SetElectionTimeout(testElectionTimeout)
s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
time.Sleep(testHeartbeatInterval)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
}
}
time.Sleep(2 * testElectionTimeout)
// Check that two peers exist on leader.
mutex.RLock()
if leader.MemberCount() != n {
t.Fatalf("Expected member count to be %v, got %v", n, leader.MemberCount())
}
if servers["2"].State() == Leader || servers["3"].State() == Leader {
t.Fatalf("Expected leader should be 1: 2=%v, 3=%v\n", servers["2"].State(), servers["3"].State())
}
mutex.RUnlock()
for i := 0; i < 20; i++ {
retry := 0
fmt.Println("Round ", i)
num := strconv.Itoa(i%(len(servers)) + 1)
num_1 := strconv.Itoa((i+3)%(len(servers)) + 1)
toStop := servers[num]
toStop_1 := servers[num_1]
// Stop the first server and wait for a re-election.
time.Sleep(2 * testElectionTimeout)
debugln("Disconnect ", toStop.Name())
debugln("disconnect ", num, " ", num_1)
toStop.SetTransporter(disTransporter)
toStop_1.SetTransporter(disTransporter)
time.Sleep(2 * testElectionTimeout)
// Check that either server 2 or 3 is the leader now.
//mutex.Lock()
leader := 0
for key, value := range servers {
debugln("Play begin")
if key != num && key != num_1 {
if value.State() == Leader {
debugln("Found leader")
for i := 0; i < 10; i++ {
debugln("[Test] do ", value.Name())
if _, err := value.Do(&testCommand2{X: 1}); err != nil {
break
}
debugln("[Test] Done")
}
debugln("Leader is ", value.Name(), " Index ", value.(*server).log.commitIndex)
}
debugln("Not Found leader")
}
}
for {
for key, value := range servers {
if key != num && key != num_1 {
if value.State() == Leader {
leader++
}
debugln(value.Name(), " ", value.(*server).Term(), " ", value.State())
}
}
if leader > 1 {
if retry < 300 {
debugln("retry")
retry++
leader = 0
time.Sleep(2 * testElectionTimeout)
continue
}
t.Fatalf("wrong leader number %v", leader)
}
if leader == 0 {
if retry < 300 {
retry++
fmt.Println("retry 0")
leader = 0
time.Sleep(2 * testElectionTimeout)
continue
}
t.Fatalf("wrong leader number %v", leader)
}
if leader == 1 {
break
}
}
//mutex.Unlock()
toStop.SetTransporter(transporter)
toStop_1.SetTransporter(transporter)
}
}