Skip to content

Commit

Permalink
raft: add raft test suite
Browse files Browse the repository at this point in the history
  • Loading branch information
xiang90 committed Feb 1, 2015
1 parent bdcae31 commit 5543f97
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 0 deletions.
73 changes: 73 additions & 0 deletions raft/rafttest/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package rafttest

import (
"time"

"github.com/coreos/etcd/raft/raftpb"
)

type network interface {
send(m raftpb.Message)
recv() chan raftpb.Message
// drop message at given rate (1.0 drops all messages)
drop(from, to uint64, rate float64)
// delay message for (0, d] randomly at given rate (1.0 delay all messages)
// do we need rate here?
delay(from, to uint64, d time.Duration, rate float64)
}

type raftNetwork struct {
recvQueues map[uint64]chan raftpb.Message
}

func newRaftNetwork(nodes ...uint64) *raftNetwork {
pn := &raftNetwork{
recvQueues: make(map[uint64]chan raftpb.Message, 0),
}

for _, a := range nodes {
pn.recvQueues[a] = make(chan raftpb.Message, 1024)
}
return pn
}

func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork {
return &nodeNetwork{id: id, raftNetwork: rn}
}

func (rn *raftNetwork) send(m raftpb.Message) {
to := rn.recvQueues[m.To]
if to == nil {
panic("sent to nil")
}
to <- m
}

func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message {
fromc := rn.recvQueues[from]
if fromc == nil {
panic("recv from nil")
}
return fromc
}

func (rn *raftNetwork) drop(from, to uint64, rate float64) {
panic("unimplemented")
}

func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) {
panic("unimplemented")
}

type nodeNetwork struct {
id uint64
*raftNetwork
}

func (nt *nodeNetwork) send(m raftpb.Message) {
nt.raftNetwork.send(m)
}

func (nt *nodeNetwork) recv() chan raftpb.Message {
return nt.recvFrom(nt.id)
}
83 changes: 83 additions & 0 deletions raft/rafttest/node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package rafttest

import (
"log"
"time"

"github.com/coreos/etcd/raft"
"github.com/coreos/etcd/raft/raftpb"
"golang.org/x/net/context"
)

type node struct {
raft.Node
paused bool
nt network
stopc chan struct{}

// stable
storage *raft.MemoryStorage
state raftpb.HardState
}

func startNode(id uint64, peers []raft.Peer, nt network) *node {
st := raft.NewMemoryStorage()
rn := raft.StartNode(id, peers, 10, 1, st)
n := &node{
Node: rn,
storage: st,
nt: nt,
stopc: make(chan struct{}),
}

ticker := time.Tick(5 * time.Millisecond)
go func() {
for {
select {
case <-ticker:
n.Tick()
case rd := <-n.Ready():
if !raft.IsEmptyHardState(rd.HardState) {
n.state = rd.HardState
}
n.storage.Append(rd.Entries)
go func() {
for _, m := range rd.Messages {
nt.send(m)
}
}()
n.Advance()
case m := <-n.nt.recv():
n.Step(context.TODO(), m)
case <-n.stopc:
log.Printf("raft.%d: stop", id)
return
}
}
}()
return n
}

func (n *node) stop() { close(n.stopc) }

// restart restarts the node.
// All in memory state of node is rested to initialized state.
func (n *node) restart() {
panic("unimplemented")
}

// pause pauses the node.
// The paused node buffers the received messages and replies
// all of them when it resumes.
func (n *node) pause() {
panic("unimplemented")
}

// resume resumes the paused node.
func (n *node) resume() {
panic("unimplemented")
}

func (n *node) isPaused() bool {
return n.paused
}
34 changes: 34 additions & 0 deletions raft/rafttest/node_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package rafttest

import (
"testing"
"time"

"github.com/coreos/etcd/raft"
"golang.org/x/net/context"
)

func TestBasicProgress(t *testing.T) {
peers := []raft.Peer{{1, nil}, {2, nil}, {3, nil}, {4, nil}, {5, nil}}
nt := newRaftNetwork(1, 2, 3, 4, 5)

nodes := make([]*node, 0)

for i := 1; i <= 5; i++ {
n := startNode(uint64(i), peers, nt.nodeNetwork(uint64(i)))
nodes = append(nodes, n)
}

time.Sleep(50 * time.Millisecond)
for i := 0; i < 1000; i++ {
nodes[0].Propose(context.TODO(), []byte("somedata"))
}

time.Sleep(100 * time.Millisecond)
for _, n := range nodes {
n.stop()
if n.state.Commit < 1000 {
t.Errorf("commit = %d, want > 1000", n.state.Commit)
}
}
}

0 comments on commit 5543f97

Please sign in to comment.