diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index e9a082b8b..be7012eda 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -126,7 +126,11 @@ func (m *manager) Start() (<-chan struct{}, error) { log.Infoln("Manager stopped.") return - case leader := <-notify: + case leader, open := <-notify: + + if !open { + return + } // This channel has data only when there's been a leadership change. @@ -150,11 +154,17 @@ func (m *manager) Start() (<-chan struct{}, error) { case <-m.stop: log.Infoln("Stopping..") + m.stop = nil close(stopWorkQueue) close(notify) return - case evt := <-leaderChan: + case evt, open := <-leaderChan: + + if !open { + return + } + // This here handles possible duplicated events about leadership and fires only when there // is a change. @@ -198,8 +208,8 @@ func (m *manager) Stop() { if m.stop == nil { return } - m.leader.Stop() close(m.stop) + m.leader.Stop() } func (m *manager) getCurrentState() (GlobalSpec, error) { diff --git a/pkg/manager/manager_test.go b/pkg/manager/manager_test.go index 2b012075b..f61d576fd 100644 --- a/pkg/manager/manager_test.go +++ b/pkg/manager/manager_test.go @@ -121,6 +121,12 @@ func testToStruct(m *json.RawMessage) interface{} { return &o } +func testCloseAll(c []chan string) { + for _, cc := range c { + close(cc) + } +} + func TestNoCallsToGroupWhenNoLeader(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -145,11 +151,15 @@ func TestNoCallsToGroupWhenNoLeader(t *testing.T) { manager1.Start() manager2.Start() + testSetLeader(t, leaderChans, "nobody") + manager1.Stop() manager2.Stop() stoppable1.Stop() stoppable2.Stop() + + testCloseAll(leaderChans) } func TestStartOneLeader(t *testing.T) { @@ -209,6 +219,7 @@ func TestStartOneLeader(t *testing.T) { stoppable1.Stop() stoppable2.Stop() + testCloseAll(leaderChans) } func TestChangeLeadership(t *testing.T) { @@ -306,4 +317,6 @@ func TestChangeLeadership(t *testing.T) { stoppable1.Stop() stoppable2.Stop() + + testCloseAll(leaderChans) }