Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

198 lines (162 sloc) 5.014 kB
package raft
import (
"fmt"
"io/ioutil"
"os"
"time"
)
const (
testHeartbeatInterval = 50 * time.Millisecond
testElectionTimeout = 200 * time.Millisecond
)
const (
testListenerLoggerEnabled = false
)
func init() {
RegisterCommand(&testCommand1{})
RegisterCommand(&testCommand2{})
}
//------------------------------------------------------------------------------
//
// Helpers
//
//------------------------------------------------------------------------------
//--------------------------------------
// Logs
//--------------------------------------
func getLogPath() string {
f, _ := ioutil.TempFile("", "raft-log-")
f.Close()
os.Remove(f.Name())
return f.Name()
}
func setupLog(entries []*LogEntry) (*Log, string) {
f, _ := ioutil.TempFile("", "raft-log-")
for _, entry := range entries {
entry.Encode(f)
}
err := f.Close()
if err != nil {
panic(err)
}
log := newLog()
log.ApplyFunc = func(e *LogEntry, c Command) (interface{}, error) {
return nil, nil
}
if err := log.open(f.Name()); err != nil {
panic(err)
}
return log, f.Name()
}
//--------------------------------------
// Servers
//--------------------------------------
func newTestServer(name string, transporter Transporter) Server {
p, _ := ioutil.TempDir("", "raft-server-")
if err := os.MkdirAll(p, 0644); err != nil {
panic(err.Error())
}
server, _ := NewServer(name, p, transporter, nil, nil, "")
if testListenerLoggerEnabled {
fn := func(e Event) {
server := e.Source().(Server)
warnf("[%s] %s %v -> %v\n", server.Name(), e.Type(), e.PrevValue(), e.Value())
}
server.AddEventListener(StateChangeEventType, fn)
server.AddEventListener(LeaderChangeEventType, fn)
server.AddEventListener(TermChangeEventType, fn)
}
return server
}
func newTestServerWithPath(name string, transporter Transporter, p string) Server {
server, _ := NewServer(name, p, transporter, nil, nil, "")
return server
}
func newTestServerWithLog(name string, transporter Transporter, entries []*LogEntry) Server {
server := newTestServer(name, transporter)
f, err := os.Create(server.LogPath())
if err != nil {
panic(err)
}
for _, entry := range entries {
entry.Encode(f)
}
f.Close()
return server
}
func newTestCluster(names []string, transporter Transporter, lookup map[string]Server) []Server {
servers := []Server{}
e0, _ := newLogEntry(newLog(), nil, 1, 1, &testCommand1{Val: "foo", I: 20})
for _, name := range names {
if lookup[name] != nil {
panic(fmt.Sprintf("raft: Duplicate server in test cluster! %v", name))
}
server := newTestServerWithLog("1", transporter, []*LogEntry{e0})
server.SetElectionTimeout(testElectionTimeout)
servers = append(servers, server)
lookup[name] = server
}
for _, server := range servers {
server.SetHeartbeatInterval(testHeartbeatInterval)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name(), "")
}
}
return servers
}
//--------------------------------------
// Transporter
//--------------------------------------
type testTransporter struct {
sendVoteRequestFunc func(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse
sendAppendEntriesRequestFunc func(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse
sendSnapshotRequestFunc func(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse
}
func (t *testTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
return t.sendVoteRequestFunc(server, peer, req)
}
func (t *testTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
return t.sendAppendEntriesRequestFunc(server, peer, req)
}
func (t *testTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
return t.sendSnapshotRequestFunc(server, peer, req)
}
func (t *testTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
return t.SendSnapshotRecoveryRequest(server, peer, req)
}
type testStateMachine struct {
saveFunc func() ([]byte, error)
recoveryFunc func([]byte) error
}
func (sm *testStateMachine) Save() ([]byte, error) {
return sm.saveFunc()
}
func (sm *testStateMachine) Recovery(state []byte) error {
return sm.recoveryFunc(state)
}
//--------------------------------------
// Command1
//--------------------------------------
type testCommand1 struct {
Val string `json:"val"`
I int `json:"i"`
}
func (c *testCommand1) CommandName() string {
return "cmd_1"
}
func (c *testCommand1) Apply(server Server) (interface{}, error) {
return nil, nil
}
//--------------------------------------
// Command2
//--------------------------------------
type testCommand2 struct {
X int `json:"x"`
}
func (c *testCommand2) CommandName() string {
return "cmd_2"
}
func (c *testCommand2) Apply(server Server) (interface{}, error) {
return nil, nil
}
Jump to Line
Something went wrong with that request. Please try again.