From 5543f973c22853ea7bfd05dbc925b682050e6d9d Mon Sep 17 00:00:00 2001 From: Xiang Li Date: Sat, 31 Jan 2015 20:51:50 -0800 Subject: [PATCH] raft: add raft test suite --- raft/rafttest/network.go | 73 +++++++++++++++++++++++++++++++++ raft/rafttest/node.go | 83 ++++++++++++++++++++++++++++++++++++++ raft/rafttest/node_test.go | 34 ++++++++++++++++ 3 files changed, 190 insertions(+) create mode 100644 raft/rafttest/network.go create mode 100644 raft/rafttest/node.go create mode 100644 raft/rafttest/node_test.go diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go new file mode 100644 index 000000000000..4c556d8c9fb1 --- /dev/null +++ b/raft/rafttest/network.go @@ -0,0 +1,73 @@ +package rafttest + +import ( + "time" + + "github.com/coreos/etcd/raft/raftpb" +) + +type network interface { + send(m raftpb.Message) + recv() chan raftpb.Message + // drop message at given rate (1.0 drops all messages) + drop(from, to uint64, rate float64) + // delay message for (0, d] randomly at given rate (1.0 delay all messages) + // do we need rate here? + delay(from, to uint64, d time.Duration, rate float64) +} + +type raftNetwork struct { + recvQueues map[uint64]chan raftpb.Message +} + +func newRaftNetwork(nodes ...uint64) *raftNetwork { + pn := &raftNetwork{ + recvQueues: make(map[uint64]chan raftpb.Message, 0), + } + + for _, a := range nodes { + pn.recvQueues[a] = make(chan raftpb.Message, 1024) + } + return pn +} + +func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork { + return &nodeNetwork{id: id, raftNetwork: rn} +} + +func (rn *raftNetwork) send(m raftpb.Message) { + to := rn.recvQueues[m.To] + if to == nil { + panic("sent to nil") + } + to <- m +} + +func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message { + fromc := rn.recvQueues[from] + if fromc == nil { + panic("recv from nil") + } + return fromc +} + +func (rn *raftNetwork) drop(from, to uint64, rate float64) { + panic("unimplemented") +} + +func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) { + panic("unimplemented") +} + +type nodeNetwork struct { + id uint64 + *raftNetwork +} + +func (nt *nodeNetwork) send(m raftpb.Message) { + nt.raftNetwork.send(m) +} + +func (nt *nodeNetwork) recv() chan raftpb.Message { + return nt.recvFrom(nt.id) +} diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go new file mode 100644 index 000000000000..28b0327c2f2d --- /dev/null +++ b/raft/rafttest/node.go @@ -0,0 +1,83 @@ +package rafttest + +import ( + "log" + "time" + + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" + "golang.org/x/net/context" +) + +type node struct { + raft.Node + paused bool + nt network + stopc chan struct{} + + // stable + storage *raft.MemoryStorage + state raftpb.HardState +} + +func startNode(id uint64, peers []raft.Peer, nt network) *node { + st := raft.NewMemoryStorage() + rn := raft.StartNode(id, peers, 10, 1, st) + n := &node{ + Node: rn, + storage: st, + nt: nt, + stopc: make(chan struct{}), + } + + ticker := time.Tick(5 * time.Millisecond) + go func() { + for { + select { + case <-ticker: + n.Tick() + case rd := <-n.Ready(): + if !raft.IsEmptyHardState(rd.HardState) { + n.state = rd.HardState + } + n.storage.Append(rd.Entries) + go func() { + for _, m := range rd.Messages { + nt.send(m) + } + }() + n.Advance() + case m := <-n.nt.recv(): + n.Step(context.TODO(), m) + case <-n.stopc: + log.Printf("raft.%d: stop", id) + return + } + } + }() + return n +} + +func (n *node) stop() { close(n.stopc) } + +// restart restarts the node. +// All in memory state of node is rested to initialized state. +func (n *node) restart() { + panic("unimplemented") +} + +// pause pauses the node. +// The paused node buffers the received messages and replies +// all of them when it resumes. +func (n *node) pause() { + panic("unimplemented") +} + +// resume resumes the paused node. +func (n *node) resume() { + panic("unimplemented") +} + +func (n *node) isPaused() bool { + return n.paused +} diff --git a/raft/rafttest/node_test.go b/raft/rafttest/node_test.go new file mode 100644 index 000000000000..07c944f8fa24 --- /dev/null +++ b/raft/rafttest/node_test.go @@ -0,0 +1,34 @@ +package rafttest + +import ( + "testing" + "time" + + "github.com/coreos/etcd/raft" + "golang.org/x/net/context" +) + +func TestBasicProgress(t *testing.T) { + peers := []raft.Peer{{1, nil}, {2, nil}, {3, nil}, {4, nil}, {5, nil}} + nt := newRaftNetwork(1, 2, 3, 4, 5) + + nodes := make([]*node, 0) + + for i := 1; i <= 5; i++ { + n := startNode(uint64(i), peers, nt.nodeNetwork(uint64(i))) + nodes = append(nodes, n) + } + + time.Sleep(50 * time.Millisecond) + for i := 0; i < 1000; i++ { + nodes[0].Propose(context.TODO(), []byte("somedata")) + } + + time.Sleep(100 * time.Millisecond) + for _, n := range nodes { + n.stop() + if n.state.Commit < 1000 { + t.Errorf("commit = %d, want > 1000", n.state.Commit) + } + } +}