Skip to content

Commit

Permalink
deployment watcher: fix goroutine leak when job is purged (#20348)
Browse files Browse the repository at this point in the history
The deployment watcher on the leader makes blocking queries to detect when the
set of active deployments changes. It takes the resulting list of deployments
and adds or removes watchers based on whether the deployment is active. But when
a job is purged, the deployment will be deleted. This unblocks the query but
the query result only shows the remaining deployments.

When the query unblocks, ensure that all active watchers have a corresponding
deployment in state. If not, remove the watcher so that the goroutine stops.

Fixes: #19988
  • Loading branch information
tgross committed Apr 11, 2024
1 parent ca22f34 commit a13e455
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 4 deletions.
3 changes: 3 additions & 0 deletions .changelog/20348.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
deployments: Fixed a goroutine leak when jobs are purged
```
35 changes: 31 additions & 4 deletions nomad/deploymentwatcher/deployments_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,7 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
// Update the latest index
dindex = idx

// Ensure we are tracking the things we should and not tracking what we
// shouldn't be
// Ensure we are tracking only active deployments
for _, d := range deployments {
if d.Active() {
if err := w.add(d); err != nil {
Expand All @@ -191,6 +190,9 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
w.remove(d)
}
}

// Ensure we've removed deployments for purged jobs
w.removeDeletedDeployments(deployments)
}
}

Expand Down Expand Up @@ -236,6 +238,28 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (in
return deploys, index, nil
}

// removeDeletedDeployments removes any watchers that aren't in the list of
// deployments we got from state
func (w *Watcher) removeDeletedDeployments(deployments []*structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()

// note we can't optimize this by checking the lengths first because some
// deployments might not be active
for _, watcher := range w.watchers {
var found bool
for _, d := range deployments {
if watcher.deploymentID == d.ID {
found = true
break
}
}
if !found {
w.removeByIDLocked(watcher.deploymentID)
}
}
}

// add adds a deployment to the watch list
func (w *Watcher) add(d *structs.Deployment) error {
w.l.Lock()
Expand Down Expand Up @@ -287,15 +311,18 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) {
func (w *Watcher) remove(d *structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()
w.removeByIDLocked(d.ID)
}

func (w *Watcher) removeByIDLocked(id string) {
// Not enabled so no-op
if !w.enabled {
return
}

if watcher, ok := w.watchers[d.ID]; ok {
if watcher, ok := w.watchers[id]; ok {
watcher.StopWatch()
delete(w.watchers, d.ID)
delete(w.watchers, id)
}
}

Expand Down
51 changes: 51 additions & 0 deletions nomad/deploymentwatcher/deployments_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
mocker "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -2081,3 +2082,53 @@ func watchersCount(w *Watcher) int {

return len(w.watchers)
}

// TestWatcher_PurgeDeployment tests that we don't leak watchers if a job is purged
func TestWatcher_PurgeDeployment(t *testing.T) {
ci.Parallel(t)
w, m := defaultTestDeploymentWatcher(t)

// clear UpdateDeploymentStatus default expectation
m.Mock.ExpectedCalls = nil

// Create a job and a deployment
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j))
must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d))

// require that we get a call to UpsertDeploymentStatusUpdate
matchConfig := &matchDeploymentStatusUpdateConfig{
DeploymentID: d.ID,
Status: structs.DeploymentStatusPaused,
StatusDescription: structs.DeploymentStatusDescriptionPaused,
}
matcher := matchDeploymentStatusUpdateRequest(matchConfig)
m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil)

w.SetEnabled(true, m.state)
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 1 {
return fmt.Errorf("expected 1 deployment")
}
return nil
}),
wait.Attempts(100),
wait.Gap(10*time.Millisecond),
))

must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID))

must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 0 {
return fmt.Errorf("expected deployment watcher to be stopped")
}
return nil
}),
wait.Attempts(500),
wait.Gap(10*time.Millisecond),
))
}

0 comments on commit a13e455

Please sign in to comment.