Skip to content

Commit

Permalink
Add an interface to allow overriding of the reconnect timeout on a pe…
Browse files Browse the repository at this point in the history
…r-node basis
  • Loading branch information
mkeeler committed Oct 7, 2020
1 parent a030342 commit 943cc43
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 1 deletion.
4 changes: 4 additions & 0 deletions serf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,10 @@ type Config struct {
// WARNING: this should ONLY be used in tests
messageDropper func(typ messageType) bool

// ReconnectTimeoutOverride is an optional interface which when present allows
// the application to cause reaping of a node to happen when it otherwise wouldn't
ReconnectTimeoutOverride ReconnectTimeoutOverrider

// ValidateNodeNames controls whether nodenames only
// contain alphanumeric, dashes and '.'characters
// and sets maximum length to 128 characters
Expand Down
13 changes: 12 additions & 1 deletion serf/serf.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// ReconnectTimeoutOverrider is an interface that can be implemented to allow overriding
// the reconnect timeout for individual members.
type ReconnectTimeoutOverrider interface {
ReconnectTimeout(member *Member, timeout time.Duration) time.Duration
}

// Serf is a single node that is part of a single cluster that gets
// events about joins/leaves/failures/etc. It is created with the Create
// method.
Expand Down Expand Up @@ -1577,8 +1583,13 @@ func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []
for i := 0; i < n; i++ {
m := old[i]

memberTimeout := timeout
if s.config.ReconnectTimeoutOverride != nil {
memberTimeout = s.config.ReconnectTimeoutOverride.ReconnectTimeout(&m.Member, memberTimeout)
}

// Skip if the timeout is not yet reached
if now.Sub(m.leaveTime) <= timeout {
if now.Sub(m.leaveTime) <= memberTimeout {
continue
}

Expand Down
65 changes: 65 additions & 0 deletions serf/serf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2999,3 +2999,68 @@ func TestSerf_ValidateNodeName(t *testing.T) {
}

}

type reconnectOverride struct {
timeout time.Duration
called bool
}

func (r *reconnectOverride) ReconnectTimeout(_ *Member, _ time.Duration) time.Duration {
r.called = true
return r.timeout
}

func TestSerf_perNodeReconnectTimeout(t *testing.T) {
ip1, returnFn1 := testutil.TakeIP()
defer returnFn1()

ip2, returnFn2 := testutil.TakeIP()
defer returnFn2()

override := reconnectOverride{timeout: 1 * time.Microsecond}

// Create the s1 config with an event channel so we can listen
eventCh := make(chan Event, 4)
s1Config := testConfig(t, ip1)
s1Config.ReconnectTimeout = 30 * time.Second
s1Config.ReconnectTimeoutOverride = &override
s1Config.EventCh = eventCh

s2Config := testConfig(t, ip2)

s1, err := Create(s1Config)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s1.Shutdown()

s2, err := Create(s2Config)
if err != nil {
t.Fatalf("err: %v", err)
}
defer s2.Shutdown()

waitUntilNumNodes(t, 1, s1, s2)

_, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false)
if err != nil {
t.Fatalf("err: %v", err)
}

waitUntilNumNodes(t, 2, s1, s2)

err = s2.Shutdown()
if err != nil {
t.Fatalf("err: %v", err)
}

waitUntilNumNodes(t, 1, s1)

// Since s2 shutdown, we check the events to make sure we got failures.
testEvents(t, eventCh, s2Config.NodeName,
[]EventType{EventMemberJoin, EventMemberFailed, EventMemberReap})

if !override.called {
t.Fatalf("The reconnect override was not used")
}
}

0 comments on commit 943cc43

Please sign in to comment.