diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 0e486a6f00e..0dfa0119419 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -1,6 +1,6 @@ { "ImportPath": "github.com/coreos/etcd", - "GoVersion": "go1.3.1", + "GoVersion": "go1.4.1", "Packages": [ "./..." ], diff --git a/raft/rafttest/network.go b/raft/rafttest/network.go new file mode 100644 index 00000000000..006711ab34f --- /dev/null +++ b/raft/rafttest/network.go @@ -0,0 +1,73 @@ +package rafttest + +import ( + "time" + + "github.com/coreos/etcd/raft/raftpb" +) + +type network interface { + send(m raftpb.Message) + recv() chan raftpb.Message + // drop message at given rate (1.0 drops all messages) + drop(from, to uint64, rate float64) + // delay message for (0, d] randomly at given rate (1.0 delay all messages) + // do we need rate here? + delay(from, to uint64, d time.Duration, rate float64) +} + +type raftNetwork struct { + recvQueues map[uint64]chan raftpb.Message +} + +func newRaftNetwork(nodes ...uint64) *raftNetwork { + pn := &raftNetwork{ + recvQueues: make(map[uint64]chan raftpb.Message, 0), + } + + for _, n := range nodes { + pn.recvQueues[n] = make(chan raftpb.Message, 1024) + } + return pn +} + +func (rn *raftNetwork) nodeNetwork(id uint64) *nodeNetwork { + return &nodeNetwork{id: id, raftNetwork: rn} +} + +func (rn *raftNetwork) send(m raftpb.Message) { + to := rn.recvQueues[m.To] + if to == nil { + panic("sent to nil") + } + to <- m +} + +func (rn *raftNetwork) recvFrom(from uint64) chan raftpb.Message { + fromc := rn.recvQueues[from] + if fromc == nil { + panic("recv from nil") + } + return fromc +} + +func (rn *raftNetwork) drop(from, to uint64, rate float64) { + panic("unimplemented") +} + +func (rn *raftNetwork) delay(from, to uint64, d time.Duration, rate float64) { + panic("unimplemented") +} + +type nodeNetwork struct { + id uint64 + *raftNetwork +} + +func (nt *nodeNetwork) send(m raftpb.Message) { + nt.raftNetwork.send(m) +} + +func (nt *nodeNetwork) recv() chan raftpb.Message { + return nt.recvFrom(nt.id) +} diff --git a/raft/rafttest/node.go b/raft/rafttest/node.go new file mode 100644 index 00000000000..56cd7411f1f --- /dev/null +++ b/raft/rafttest/node.go @@ -0,0 +1,84 @@ +package rafttest + +import ( + "log" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/raft" + "github.com/coreos/etcd/raft/raftpb" +) + +type node struct { + raft.Node + paused bool + nt network + stopc chan struct{} + + // stable + storage *raft.MemoryStorage + state raftpb.HardState +} + +func startNode(id uint64, peers []raft.Peer, nt network) *node { + st := raft.NewMemoryStorage() + rn := raft.StartNode(id, peers, 10, 1, st) + n := &node{ + Node: rn, + storage: st, + nt: nt, + stopc: make(chan struct{}), + } + + ticker := time.Tick(5 * time.Millisecond) + go func() { + for { + select { + case <-ticker: + n.Tick() + case rd := <-n.Ready(): + if !raft.IsEmptyHardState(rd.HardState) { + n.state = rd.HardState + } + n.storage.Append(rd.Entries) + go func() { + for _, m := range rd.Messages { + nt.send(m) + } + }() + n.Advance() + case m := <-n.nt.recv(): + n.Step(context.TODO(), m) + case <-n.stopc: + log.Printf("raft.%d: stop", id) + return + } + } + }() + return n +} + +func (n *node) stop() { close(n.stopc) } + +// restart restarts the node with the given delay. +// All in memory state of node is reset to initialized state. +// All stable MUST be unchanged. +func (n *node) restart(delay time.Duration) { + panic("unimplemented") +} + +// pause pauses the node. +// The paused node buffers the received messages and replies +// all of them when it resumes. +func (n *node) pause() { + panic("unimplemented") +} + +// resume resumes the paused node. +func (n *node) resume() { + panic("unimplemented") +} + +func (n *node) isPaused() bool { + return n.paused +} diff --git a/raft/rafttest/node_test.go b/raft/rafttest/node_test.go new file mode 100644 index 00000000000..a38865016a5 --- /dev/null +++ b/raft/rafttest/node_test.go @@ -0,0 +1,34 @@ +package rafttest + +import ( + "testing" + "time" + + "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context" + "github.com/coreos/etcd/raft" +) + +func TestBasicProgress(t *testing.T) { + peers := []raft.Peer{{1, nil}, {2, nil}, {3, nil}, {4, nil}, {5, nil}} + nt := newRaftNetwork(1, 2, 3, 4, 5) + + nodes := make([]*node, 0) + + for i := 1; i <= 5; i++ { + n := startNode(uint64(i), peers, nt.nodeNetwork(uint64(i))) + nodes = append(nodes, n) + } + + time.Sleep(50 * time.Millisecond) + for i := 0; i < 1000; i++ { + nodes[0].Propose(context.TODO(), []byte("somedata")) + } + + time.Sleep(100 * time.Millisecond) + for _, n := range nodes { + n.stop() + if n.state.Commit < 1000 { + t.Errorf("commit = %d, want > 1000", n.state.Commit) + } + } +}