Permalink
Browse files

fix(*.go): rename HeartbeatTimeout to HeartbeatInterval

While re-reading the etcd tuning guide I realized that perhaps it is a
bit of a misnomer to call this a "Timeout" since unlike the "Election
Timeout" there is no immediate consequence of it being missed.

Perhaps naming it interval would be better?
  • Loading branch information...
1 parent 3a5d909 commit 6ce38579a4e8d42dc6d6074665f6907010636a54 @philips philips committed Jan 23, 2014
Showing with 59 additions and 56 deletions.
  1. +1 −1 event.go
  2. +4 −4 http_transporter_test.go
  3. +17 −17 peer.go
  4. +18 −15 server.go
  5. +11 −11 server_test.go
  6. +1 −1 snapshot_recovery_response.go
  7. +3 −3 snapshot_test.go
  8. +1 −1 statemachine_test.go
  9. +3 −3 test.go
View
@@ -7,7 +7,7 @@ const (
AddPeerEventType = "addPeer"
RemovePeerEventType = "removePeer"
- HeartbeatTimeoutEventType = "heartbeatTimeout"
+ HeartbeatIntervalEventType = "heartbeatInterval"
ElectionTimeoutThresholdEventType = "electionTimeoutThreshold"
HeartbeatEventType = "heartbeat"
View
@@ -43,7 +43,7 @@ func runTestHttpServers(t *testing.T, servers *[]Server, transporter *HTTPTransp
// Create raft server.
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
- server.SetHeartbeatTimeout(testHeartbeatTimeout)
+ server.SetHeartbeatInterval(testHeartbeatInterval)
server.SetElectionTimeout(testElectionTimeout)
server.Start()
@@ -74,7 +74,7 @@ func runTestHttpServers(t *testing.T, servers *[]Server, transporter *HTTPTransp
}
// Wait for configuration to propagate.
- time.Sleep(testHeartbeatTimeout * 2)
+ time.Sleep(testHeartbeatInterval * 2)
// Execute all the callbacks at the same time.
for _i, _f := range callbacks {
@@ -101,7 +101,7 @@ func BenchmarkSpeed(b *testing.B) {
// Create raft server.
server := newTestServer(fmt.Sprintf("localhost:%d", port), transporter)
- server.SetHeartbeatTimeout(testHeartbeatTimeout)
+ server.SetHeartbeatInterval(testHeartbeatInterval)
server.SetElectionTimeout(testElectionTimeout)
server.Start()
@@ -131,7 +131,7 @@ func BenchmarkSpeed(b *testing.B) {
c := make(chan bool)
// Wait for configuration to propagate.
- time.Sleep(testHeartbeatTimeout * 2)
+ time.Sleep(testHeartbeatInterval * 2)
b.ResetTimer()
for n := 0; n < b.N; n++ {
View
34 peer.go
@@ -13,13 +13,13 @@ import (
// A peer is a reference to another server involved in the consensus protocol.
type Peer struct {
- server *server
- Name string `json:"name"`
- ConnectionString string `json:"connectionString"`
- prevLogIndex uint64
- mutex sync.RWMutex
- stopChan chan bool
- heartbeatTimeout time.Duration
+ server *server
+ Name string `json:"name"`
+ ConnectionString string `json:"connectionString"`
+ prevLogIndex uint64
+ mutex sync.RWMutex
+ stopChan chan bool
+ heartbeatInterval time.Duration
}
//------------------------------------------------------------------------------
@@ -29,12 +29,12 @@ type Peer struct {
//------------------------------------------------------------------------------
// Creates a new peer.
-func newPeer(server *server, name string, connectionString string, heartbeatTimeout time.Duration) *Peer {
+func newPeer(server *server, name string, connectionString string, heartbeatInterval time.Duration) *Peer {
return &Peer{
- server: server,
- Name: name,
- ConnectionString: connectionString,
- heartbeatTimeout: heartbeatTimeout,
+ server: server,
+ Name: name,
+ ConnectionString: connectionString,
+ heartbeatInterval: heartbeatInterval,
}
}
@@ -45,8 +45,8 @@ func newPeer(server *server, name string, connectionString string, heartbeatTime
//------------------------------------------------------------------------------
// Sets the heartbeat timeout.
-func (p *Peer) setHeartbeatTimeout(duration time.Duration) {
- p.heartbeatTimeout = duration
+func (p *Peer) setHeartbeatInterval(duration time.Duration) {
+ p.heartbeatInterval = duration
}
//--------------------------------------
@@ -116,9 +116,9 @@ func (p *Peer) heartbeat(c chan bool) {
c <- true
- ticker := time.Tick(p.heartbeatTimeout)
+ ticker := time.Tick(p.heartbeatInterval)
- debugln("peer.heartbeat: ", p.Name, p.heartbeatTimeout)
+ debugln("peer.heartbeat: ", p.Name, p.heartbeatInterval)
for {
select {
@@ -170,7 +170,7 @@ func (p *Peer) sendAppendEntriesRequest(req *AppendEntriesRequest) {
resp := p.server.Transporter().SendAppendEntriesRequest(p.server, p, req)
if resp == nil {
- p.server.DispatchEvent(newEvent(HeartbeatTimeoutEventType, p, nil))
+ p.server.DispatchEvent(newEvent(HeartbeatIntervalEventType, p, nil))
debugln("peer.append.timeout: ", p.server.Name(), "->", p.Name)
return
}
View
@@ -33,8 +33,11 @@ const (
)
const (
- DefaultHeartbeatTimeout = 50 * time.Millisecond
- DefaultElectionTimeout = 150 * time.Millisecond
+ // DefaultHeartbeatInterval is the interval that the leader will send
+ // AppendEntriesRequests to followers to maintain leadership.
+ DefaultHeartbeatInterval = 50 * time.Millisecond
+
+ DefaultElectionTimeout = 150 * time.Millisecond
)
// ElectionTimeoutThresholdPercent specifies the threshold at which the server
@@ -82,8 +85,8 @@ type Server interface {
GetState() string
ElectionTimeout() time.Duration
SetElectionTimeout(duration time.Duration)
- HeartbeatTimeout() time.Duration
- SetHeartbeatTimeout(duration time.Duration)
+ HeartbeatInterval() time.Duration
+ SetHeartbeatInterval(duration time.Duration)
Transporter() Transporter
SetTransporter(t Transporter)
AppendEntries(req *AppendEntriesRequest) *AppendEntriesResponse
@@ -119,10 +122,10 @@ type server struct {
mutex sync.RWMutex
syncedPeer map[string]bool
- stopped chan bool
- c chan *ev
- electionTimeout time.Duration
- heartbeatTimeout time.Duration
+ stopped chan bool
+ c chan *ev
+ electionTimeout time.Duration
+ heartbeatInterval time.Duration
currentSnapshot *Snapshot
lastSnapshot *Snapshot
@@ -170,7 +173,7 @@ func NewServer(name string, path string, transporter Transporter, stateMachine S
stopped: make(chan bool),
c: make(chan *ev, 256),
electionTimeout: DefaultElectionTimeout,
- heartbeatTimeout: DefaultHeartbeatTimeout,
+ heartbeatInterval: DefaultHeartbeatInterval,
maxLogEntriesPerRequest: MaxLogEntriesPerRequest,
connectionString: connectionString,
}
@@ -378,20 +381,20 @@ func (s *server) SetElectionTimeout(duration time.Duration) {
//--------------------------------------
// Retrieves the heartbeat timeout.
-func (s *server) HeartbeatTimeout() time.Duration {
+func (s *server) HeartbeatInterval() time.Duration {
s.mutex.RLock()
defer s.mutex.RUnlock()
- return s.heartbeatTimeout
+ return s.heartbeatInterval
}
// Sets the heartbeat timeout.
-func (s *server) SetHeartbeatTimeout(duration time.Duration) {
+func (s *server) SetHeartbeatInterval(duration time.Duration) {
s.mutex.Lock()
defer s.mutex.Unlock()
- s.heartbeatTimeout = duration
+ s.heartbeatInterval = duration
for _, peer := range s.peers {
- peer.setHeartbeatTimeout(duration)
+ peer.setHeartbeatInterval(duration)
}
}
@@ -1028,7 +1031,7 @@ func (s *server) AddPeer(name string, connectiongString string) error {
// Skip the Peer if it has the same name as the Server
if s.name != name {
- peer := newPeer(s, name, connectiongString, s.heartbeatTimeout)
+ peer := newPeer(s, name, connectiongString, s.heartbeatInterval)
if s.State() == Leader {
peer.startHeartbeat()
View
@@ -193,7 +193,7 @@ func TestServerPromote(t *testing.T) {
func TestServerAppendEntries(t *testing.T) {
s := newTestServer("1", &testTransporter{})
- s.SetHeartbeatTimeout(time.Second * 10)
+ s.SetHeartbeatInterval(time.Second * 10)
s.Start()
defer s.Stop()
@@ -393,14 +393,14 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
if name == "1" {
leader = s
- s.SetHeartbeatTimeout(testHeartbeatTimeout)
+ s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
- time.Sleep(testHeartbeatTimeout)
+ time.Sleep(testHeartbeatInterval)
} else {
s.SetElectionTimeout(testElectionTimeout)
- s.SetHeartbeatTimeout(testHeartbeatTimeout)
+ s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
- time.Sleep(testHeartbeatTimeout)
+ time.Sleep(testHeartbeatInterval)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
@@ -415,7 +415,7 @@ func TestServerRecoverFromPreviousLogAndConf(t *testing.T) {
}
}
- time.Sleep(2 * testHeartbeatTimeout)
+ time.Sleep(2 * testHeartbeatInterval)
for _, name := range names {
s := servers[name]
@@ -473,7 +473,7 @@ func TestServerSingleNode(t *testing.T) {
s.Start()
- time.Sleep(testHeartbeatTimeout)
+ time.Sleep(testHeartbeatInterval)
// Join the server to itself.
if _, err := s.Do(&DefaultJoinCommand{Name: "1"}); err != nil {
@@ -573,14 +573,14 @@ func TestServerMultiNode(t *testing.T) {
if name == "1" {
leader = s
- s.SetHeartbeatTimeout(testHeartbeatTimeout)
+ s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
- time.Sleep(testHeartbeatTimeout)
+ time.Sleep(testHeartbeatInterval)
} else {
s.SetElectionTimeout(testElectionTimeout)
- s.SetHeartbeatTimeout(testHeartbeatTimeout)
+ s.SetHeartbeatInterval(testHeartbeatInterval)
s.Start()
- time.Sleep(testHeartbeatTimeout)
+ time.Sleep(testHeartbeatInterval)
}
if _, err := leader.Do(&DefaultJoinCommand{Name: name}); err != nil {
t.Fatalf("Unable to join server[%s]: %v", name, err)
@@ -40,7 +40,7 @@ func (req *SnapshotRecoveryResponse) Encode(w io.Writer) (int, error) {
return w.Write(p)
}
-// Decodes the SnapshotRecoveryResponse from a buffer.
+// Decodes the SnapshotRecoveryResponse from a buffer.
func (req *SnapshotRecoveryResponse) Decode(r io.Reader) (int, error) {
data, err := ioutil.ReadAll(r)
View
@@ -9,7 +9,7 @@ import (
// Ensure that a snapshot occurs when there are existing logs.
func TestSnapshot(t *testing.T) {
- runServerWithMockStateMachine(Leader, func (s Server, m *mock.Mock) {
+ runServerWithMockStateMachine(Leader, func(s Server, m *mock.Mock) {
m.On("Save").Return([]byte("foo"), nil)
m.On("Recovery", []byte("foo")).Return(nil)
@@ -36,7 +36,7 @@ func TestSnapshot(t *testing.T) {
// Ensure that a snapshot request can be sent and received.
func TestSnapshotRequest(t *testing.T) {
- runServerWithMockStateMachine(Follower, func (s Server, m *mock.Mock) {
+ runServerWithMockStateMachine(Follower, func(s Server, m *mock.Mock) {
m.On("Recovery", []byte("bar")).Return(nil)
// Send snapshot request.
@@ -56,7 +56,7 @@ func TestSnapshotRequest(t *testing.T) {
})
}
-func runServerWithMockStateMachine(state string, fn func (s Server, m *mock.Mock)) {
+func runServerWithMockStateMachine(state string, fn func(s Server, m *mock.Mock)) {
var m mockStateMachine
s := newTestServer("1", &testTransporter{})
s.(*server).stateMachine = &m
View
@@ -13,7 +13,7 @@ func (m *mockStateMachine) Save() ([]byte, error) {
return args.Get(0).([]byte), args.Error(1)
}
-func (m *mockStateMachine) Recovery(b []byte) (error) {
+func (m *mockStateMachine) Recovery(b []byte) error {
args := m.Called(b)
return args.Error(0)
}
View
@@ -8,8 +8,8 @@ import (
)
const (
- testHeartbeatTimeout = 50 * time.Millisecond
- testElectionTimeout = 200 * time.Millisecond
+ testHeartbeatInterval = 50 * time.Millisecond
+ testElectionTimeout = 200 * time.Millisecond
)
const (
@@ -115,7 +115,7 @@ func newTestCluster(names []string, transporter Transporter, lookup map[string]S
lookup[name] = server
}
for _, server := range servers {
- server.SetHeartbeatTimeout(testHeartbeatTimeout)
+ server.SetHeartbeatInterval(testHeartbeatInterval)
server.Start()
for _, peer := range servers {
server.AddPeer(peer.Name(), "")

0 comments on commit 6ce3857

Please sign in to comment.