Skip to content

Commit

Permalink
Fix task reaper not cleaning up COMPLETE tasks
Browse files Browse the repository at this point in the history
Tasks in the complete state were not cleaned up because the task reaper
was only looking at task.Status.State >= api.TaskStateShutdown, but the
COMPLETE state (api.TaskStateCompleted) is the actual "first" terminal
state.

Added a test for the initialization stage of the task repear. This runs
the logic of which tasks to clean up through its paces, but doesn't test
the run-time logic of the task reaper (it's ability to respond to
updates to tasks).

Signed-off-by: Drew Erny <drew.erny@docker.com>
  • Loading branch information
dperny committed Dec 18, 2017
1 parent a6519e2 commit e5b3107
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 4 deletions.
10 changes: 6 additions & 4 deletions manager/orchestrator/taskreaper/task_reaper.go
Expand Up @@ -96,10 +96,10 @@ func (tr *TaskReaper) Run(ctx context.Context) {
// Serviceless tasks can be cleaned up right away since they are not attached to a service.
tr.cleanup = append(tr.cleanup, t.ID)
}
// tasks with desired state REMOVE that have progressed beyond SHUTDOWN can be cleaned up
// tasks with desired state REMOVE that have progressed beyond COMPLETE can be cleaned up
// right away
for _, t := range removeTasks {
if t.Status.State >= api.TaskStateShutdown {
if t.Status.State >= api.TaskStateCompleted {
tr.cleanup = append(tr.cleanup, t.ID)
}
}
Expand Down Expand Up @@ -138,10 +138,10 @@ func (tr *TaskReaper) Run(ctx context.Context) {
if t.Status.State >= api.TaskStateOrphaned && t.ServiceID == "" {
tr.cleanup = append(tr.cleanup, t.ID)
}
// add tasks that have progressed beyond SHUTDOWN and have desired state REMOVE. These
// add tasks that have progressed beyond COMPLETE and have desired state REMOVE. These
// tasks are associated with slots that were removed as part of a service scale down
// or service removal.
if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateShutdown {
if t.DesiredState == api.TaskStateRemove && t.Status.State >= api.TaskStateCompleted {
tr.cleanup = append(tr.cleanup, t.ID)
}
case api.EventUpdateCluster:
Expand Down Expand Up @@ -282,6 +282,8 @@ func (tr *TaskReaper) tick() {

// Stop stops the TaskReaper and waits for the main loop to exit.
func (tr *TaskReaper) Stop() {
// TODO(dperny) calling stop on the task reaper twice will cause a panic
// because we try to close a channel that will already have been closed.
close(tr.stopChan)
<-tr.doneChan
}
165 changes: 165 additions & 0 deletions manager/orchestrator/taskreaper/task_reaper_test.go
@@ -0,0 +1,165 @@
package taskreaper

import (
"context"

"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/state/store"
)

// TestTaskReaperInit tests that the task reaper correctly cleans up tasks when
// it is initialized. This will happen every time cluster leadership changes.
func TestTaskReaperInit(t *testing.T) {
// start up the memory store
ctx := context.Background()
s := store.NewMemoryStore(nil)
require.NotNil(t, s)
defer s.Close()

// Create the basic cluster with precooked tasks we need for the taskreaper
cluster := &api.Cluster{
Spec: api.ClusterSpec{
Annotations: api.Annotations{
Name: store.DefaultClusterName,
},
Orchestration: api.OrchestrationConfig{
TaskHistoryRetentionLimit: 2,
},
},
}

// this service is alive and active, has no tasks to clean up
service := &api.Service{
ID: "cleanservice",
Spec: api.ServiceSpec{
Annotations: api.Annotations{
Name: "cleanservice",
},
Task: api.TaskSpec{
// the runtime spec isn't looked at and doesn't really need to
// be filled in
Runtime: &api.TaskSpec_Container{
Container: &api.ContainerSpec{},
},
},
Mode: &api.ServiceSpec_Replicated{
Replicated: &api.ReplicatedService{
Replicas: 2,
},
},
},
}

// Two clean tasks, these should not be removed
cleantask1 := &api.Task{
ID: "cleantask1",
Slot: 1,
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "cleanservice",
}

cleantask2 := &api.Task{
ID: "cleantask2",
Slot: 2,
DesiredState: api.TaskStateRunning,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "cleanservice",
}

// this is an old task from when an earlier task failed. It should not be
// removed because it's retained history
retainedtask := &api.Task{
ID: "retainedtask",
Slot: 1,
DesiredState: api.TaskStateShutdown,
Status: api.TaskStatus{
State: api.TaskStateFailed,
},
ServiceID: "cleanservice",
}

// This is a removed task after cleanservice was scaled down
removedtask := &api.Task{
ID: "removedtask",
Slot: 3,
DesiredState: api.TaskStateRemove,
Status: api.TaskStatus{
State: api.TaskStateShutdown,
},
ServiceID: "cleanservice",
}

// two tasks belonging to a service that does not exist.
// this first one is sitll running and should not be cleaned up
terminaltask1 := &api.Task{
ID: "terminaltask1",
Slot: 1,
DesiredState: api.TaskStateRemove,
Status: api.TaskStatus{
State: api.TaskStateRunning,
},
ServiceID: "goneservice",
}

// this second task is shutdown, and can be cleaned up
terminaltask2 := &api.Task{
ID: "terminaltask2",
Slot: 2,
DesiredState: api.TaskStateRemove,
Status: api.TaskStatus{
// use COMPLETE because it's the earliest terminal state
State: api.TaskStateCompleted,
},
ServiceID: "goneservice",
}

err := s.Update(func(tx store.Tx) error {
require.NoError(t, store.CreateCluster(tx, cluster))
require.NoError(t, store.CreateService(tx, service))
require.NoError(t, store.CreateTask(tx, cleantask1))
require.NoError(t, store.CreateTask(tx, cleantask2))
require.NoError(t, store.CreateTask(tx, retainedtask))
require.NoError(t, store.CreateTask(tx, removedtask))
require.NoError(t, store.CreateTask(tx, terminaltask1))
require.NoError(t, store.CreateTask(tx, terminaltask2))
return nil
})
require.NoError(t, err, "Error setting up test fixtures")

// set up the task reaper we'll use for this test
reaper := New(s)

// Now, start the reaper
go reaper.Run(ctx)

// And then stop the reaper. This will cause the reaper to run through its
// whole init phase and then immediately enter the loop body, get the stop
// signal, and exit. plus, it will block until that loop body has been
// reached and the reaper is stopped.
reaper.Stop()

// Now check that all of the tasks are in the state we expect
s.View(func(tx store.ReadTx) {
// the first two clean tasks should exist
assert.NotNil(t, store.GetTask(tx, "cleantask1"))
assert.NotNil(t, store.GetTask(tx, "cleantask1"))
// the retained task should still exist
assert.NotNil(t, store.GetTask(tx, "retainedtask"))
// the removed task should be gone
assert.Nil(t, store.GetTask(tx, "removedtask"))
// the first terminal task, which has not yet shut down, should exist
assert.NotNil(t, store.GetTask(tx, "terminaltask1"))
// and the second terminal task should have been removed
assert.Nil(t, store.GetTask(tx, "terminaltask2"))
})
}

0 comments on commit e5b3107

Please sign in to comment.