From 24bc17eaa19ab5dbac87901f29e616f59ef83c50 Mon Sep 17 00:00:00 2001 From: Wojciech Bederski Date: Fri, 11 Sep 2015 17:52:57 +0200 Subject: [PATCH 1/3] failing test showing that nested Pause()/Resume() release too early see: #1173 / https://github.com/hashicorp/consul/issues/1173 --- command/agent/local_test.go | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/command/agent/local_test.go b/command/agent/local_test.go index 997860279345..bc525d2784de 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -664,6 +664,29 @@ func TestAgent_checkTokens(t *testing.T) { } } +func TestAgent_nestedPauseResume(t *testing.T) { + l := new(localState) + if l.isPaused() != false { + t.Fatal("localState should be unPaused after init") + } + l.Pause() + if l.isPaused() != true { + t.Fatal("localState should be Paused after first call to Pause()") + } + l.Pause() + if l.isPaused() != true { + t.Fatal("localState should STILL be Paused after second call to Pause()") + } + l.Resume() + if l.isPaused() != true { + t.Fatal("localState should STILL be Paused after FIRST call to Resume()") + } + l.Resume() + if l.isPaused() != false { + t.Fatal("localState should NOT be Paused after SECOND call to Resume()") + } +} + var testRegisterRules = ` service "api" { policy = "write" From b014c0f91bb65c6f1039dab980093d652fee5253 Mon Sep 17 00:00:00 2001 From: Wojciech Bederski Date: Fri, 11 Sep 2015 18:28:06 +0200 Subject: [PATCH 2/3] make Pause()/Resume()/isPaused() behave more like a semaphore see: https://github.com/hashicorp/consul/issues/1173 #1173 Reasoning: somewhere during consul development Pause()/Resume() and PauseSync()/ResumeSync() were added to protect larger changes to agent's localState. A few of the places that it tries to protect are: - (a *Agent) AddService(...) # part of the method - (c *Command) handleReload(...) # almost the whole method - (l *localState) antiEntropy(...)# isPaused() prevents syncChanges() The main problem is, that in the middle of handleReload(...)'s critical section it indirectly (loadServices()) calls AddService(...). AddService() in turn calls Pause() to protect itself against syncChanges(). At the end of AddService() a defered call to Resume() is made. With the current implementation, this releases isPaused() "lock" in the middle of handleReload() allowing antiEntropy to kick in while configuration reload is still in progress. Specifically almost all services and probably all check are unloaded when syncChanges() is allowed to run. This in turn can causes massive service/check de-/re-registration, and since checks are by default registered in the critical state, a majority of services on a node can be marked as failing. It's made worse with automation, often calling `consul reload` in close proximity on many nodes in the cluster. This change basically turns Pause()/Resume() into P()/V() of a garden-variety semaphore. Allowing Pause() to be called multiple times, and releasing isPaused() only after all matching/defered Resumes() are called as well. TODO/NOTE: as with many semaphore implementations, it might be reasonable to panic() if l.paused ever becomes negative. --- command/agent/local.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/command/agent/local.go b/command/agent/local.go index dbe38e79d025..310b055ab43f 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -109,18 +109,18 @@ func (l *localState) ConsulServerUp() { // Pause is used to pause state synchronization, this can be // used to make batch changes func (l *localState) Pause() { - atomic.StoreInt32(&l.paused, 1) + atomic.AddInt32(&l.paused, 1) } // Resume is used to resume state synchronization func (l *localState) Resume() { - atomic.StoreInt32(&l.paused, 0) + atomic.AddInt32(&l.paused, -1) l.changeMade() } // isPaused is used to check if we are paused func (l *localState) isPaused() bool { - return atomic.LoadInt32(&l.paused) == 1 + return atomic.LoadInt32(&l.paused) > 0 } // ServiceToken returns the configured ACL token for the given From c4537ed26f832479ed55965945b0a6c5a10fd707 Mon Sep 17 00:00:00 2001 From: Wojciech Bederski Date: Thu, 17 Sep 2015 11:32:08 +0200 Subject: [PATCH 3/3] panic when unbalanced localState.Resume() is detected --- command/agent/local.go | 5 ++++- command/agent/local_test.go | 9 +++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/command/agent/local.go b/command/agent/local.go index 310b055ab43f..3dd8f64717d9 100644 --- a/command/agent/local.go +++ b/command/agent/local.go @@ -114,7 +114,10 @@ func (l *localState) Pause() { // Resume is used to resume state synchronization func (l *localState) Resume() { - atomic.AddInt32(&l.paused, -1) + paused := atomic.AddInt32(&l.paused, -1) + if paused < 0 { + panic("unbalanced localState.Resume() detected") + } l.changeMade() } diff --git a/command/agent/local_test.go b/command/agent/local_test.go index bc525d2784de..47d8f8f3eed6 100644 --- a/command/agent/local_test.go +++ b/command/agent/local_test.go @@ -685,6 +685,15 @@ func TestAgent_nestedPauseResume(t *testing.T) { if l.isPaused() != false { t.Fatal("localState should NOT be Paused after SECOND call to Resume()") } + + defer func() { + err := recover() + if err == nil { + t.Fatal("unbalanced Resume() should cause a panic()") + } + }() + l.Resume() + } var testRegisterRules = `