Skip to content

Commit

Permalink
prevent dependencies from being added after stop
Browse files Browse the repository at this point in the history
A Nomad user reported an issue where `View.poll` goroutines were being leaked
when using large templates. When the runner's `Run` method is in-flight and the
runner concurrently receives `Stop`, it's possible for the watcher to stop
dependencies via stop and then immediately recreate them. Because the `Stop`
method has returned, the caller will assume that all goroutines have been
stopped, and this causes the `View.poll` goroutines to be leaked. The race
condition for this case is very narrow, but is most likely to occur when the
user has both a very large number of dependencies, short secret leases, and the
`Wait` configuration is nil.

Add a `stopped` flag to the `Watcher` which is set on `Stop`, and check whether
this field has been set before adding a new dependency.
  • Loading branch information
tgross committed Mar 19, 2024
1 parent 062dd60 commit 7b0ce5b
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
97 changes: 97 additions & 0 deletions manager/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,16 @@ package manager

import (
"bytes"
"context"
"fmt"
"net/http"
"net/http/httptest"
"os"
"os/exec"
"path/filepath"
"reflect"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -1023,6 +1027,7 @@ func TestRunner_Start(t *testing.T) {
t.Fatal("expected parse only to stop runner")
}
})

}

func TestRunner_quiescence(t *testing.T) {
Expand Down Expand Up @@ -1278,3 +1283,95 @@ func TestRunner_commandPath(t *testing.T) {
t.Fatalf("unexpected shell: %#v\n", cmd)
}
}

// TestRunner_stoppedWatcher verifies that dependencies can't be added to a
// watcher that's been stopped after the first template rendering
func TestRunner_stoppedWatcher(t *testing.T) {

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
t.Cleanup(cancel)

stopCtx, stopNow := context.WithCancel(ctx)
t.Cleanup(stopNow)

// Test server to simulate Vault cluster responses.
reqNum := atomic.Uintptr{}
vaultServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
t.Logf("GET %q", req.URL.Path)
if req.URL.Path == "/v1/secret/data/test1" {
if reqNum.Add(1) > 1 {
stopNow()
}

w.WriteHeader(http.StatusOK)

// We can't configure the default lease in tests because the
// field is global, so ensure there's a rotation_period and ttl
// that causes the runner to make multiple requests
w.Write([]byte(`{
"data": {
"data": {
"secret": "foo"
},
"rotation_period": "10ms",
"ttl": 1
}
}`))
} else {
w.WriteHeader(http.StatusOK)
}

}))

t.Cleanup(vaultServer.Close)

out, err := os.CreateTemp(t.TempDir(), "")
if err != nil {
t.Fatal(err)
}
defer os.Remove(out.Name())

c := config.DefaultConfig().Merge(&config.Config{
Vault: &config.VaultConfig{
Address: &vaultServer.URL,
},
Templates: &config.TemplateConfigs{
&config.TemplateConfig{
Contents: config.String(
`{{with secret "secret/data/test1"}}{{ if .Data}}{{.Data.data.secret}}{{end}}{{end}}`),
Destination: config.String(out.Name()),
},
},
})
c.Finalize()

r, err := NewRunner(c, false)
if err != nil {
t.Fatal(err)
}
t.Cleanup(r.Stop)

go r.Start()

// the mock vault server controls the stopCtx so that we can be sure we've
// rendered the empty template, set up the watcher, and have started the
// next poll
select {
case <-stopCtx.Done():
r.Stop()
case <-ctx.Done():
t.Fatal("test timed out")
}

// there's no way for us to control whether the Start goroutine's select
// takes the watcher's DataCh or the runner's doneCh. So once the runner has
// been stopped, we fake that we've received on the DataCh so that we hit
// the code path that calls Run. This should never add a dependency to the
// watcher.
r.Run()

if r.watcher.Size() != 0 {
t.Fatal("watcher had dependencies added after stop")
}
}
17 changes: 13 additions & 4 deletions watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type Watcher struct {
// one time instead of polling infinitely.
once bool

// stopped signals if this watcher should stop adding any new dependencies
stopped bool

// retryFuncs specifies the different ways to retry based on the upstream.
retryFuncConsul RetryFunc
retryFuncDefault RetryFunc
Expand Down Expand Up @@ -125,13 +128,18 @@ func (w *Watcher) ErrCh() <-chan error {
// and start the associated view. If the dependency already exists, no action is
// taken.
//
// If the Dependency already existed, it this function will return false. If the
// view was successfully created, it will return true. If an error occurs while
// creating the view, it will be returned here (but future errors returned by
// the view will happen on the channel).
// If the Dependency already existed or the watcher was concurrently stopped, it
// this function will return false. If the view was successfully created, it
// will return true. If an error occurs while creating the view, it will be
// returned here (but future errors returned by the view will happen on the
// channel).
func (w *Watcher) Add(d dep.Dependency) (bool, error) {
w.Lock()
defer w.Unlock()
if w.stopped {
log.Printf("[TRACE] (watcher) did not add %s because watcher is stopped", d)
return false, nil
}

log.Printf("[DEBUG] (watcher) adding %s", d)

Expand Down Expand Up @@ -248,4 +256,5 @@ func (w *Watcher) Stop() {

// Close any idle TCP connections
w.clients.Stop()
w.stopped = true
}
23 changes: 23 additions & 0 deletions watch/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ func TestAdd_exists(t *testing.T) {
}
}

func TestAdd_stopped(t *testing.T) {
w := NewWatcher(&NewWatcherInput{
Clients: dep.NewClientSet(),
Once: true,
})

w.Stop()

d := &TestDep{}
added, err := w.Add(d)
if err != nil {
t.Fatal(err)
}

if added != false {
t.Errorf("expected Add to return false when watcher is stopped")
}

if w.Watching(d) {
t.Errorf("expected Add not to add dependency when watcher is stopped")
}
}

func TestAdd_startsViewPoll(t *testing.T) {
w := NewWatcher(&NewWatcherInput{
Clients: dep.NewClientSet(),
Expand Down

0 comments on commit 7b0ce5b

Please sign in to comment.