Skip to content

Commit

Permalink
Fix leaking tasks.db
Browse files Browse the repository at this point in the history
For a long time, it's been a known fault that tasks.db grows out of
control. The reason is that this database is only cleaned up, and old
tasks removed, during initialization.

When an assignment to a worker is removed, previously, the assignment
was just marked as removed in the tasks database. However, an assignment
is only removed from a worker when the task is removed from the manager.
The worker does not need to continue to keep track of the task.

Instead of marking a task as no longer assigned, when a task is removed
as an assignment, we'll simply delete it from the database.

I'm not 100% sure of what all the task database is responsible for, or
why it needs to be persisted, so this change is targeted to have the
minimal impact on the system.

Signed-off-by: Drew Erny <derny@mirantis.com>
  • Loading branch information
dperny committed Mar 23, 2020
1 parent 49e3561 commit af58e4d
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 13 deletions.
10 changes: 8 additions & 2 deletions agent/worker.go
Expand Up @@ -278,8 +278,14 @@ func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.Assig

removeTaskAssignment := func(taskID string) error {
ctx := log.WithLogger(ctx, log.G(ctx).WithField("task.id", taskID))
if err := SetTaskAssignment(tx, taskID, false); err != nil {
log.G(ctx).WithError(err).Error("error setting task assignment in database")
// if a task is no longer assigned, then we do not have to keep track
// of it. a task will only be unassigned when it is deleted on the
// manager. insteaad of SetTaskAssginment to true, we'll just remove
// the task now.
if err := DeleteTask(tx, taskID); err != nil {
log.G(ctx).WithError(err).Errorf(
"error removing de-assigned task %v", taskID,
)
}
return err
}
Expand Down
14 changes: 4 additions & 10 deletions agent/worker_test.go
Expand Up @@ -143,7 +143,6 @@ func TestWorkerAssign(t *testing.T) {
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
expectedSecrets: []*api.Secret{
Expand All @@ -153,15 +152,14 @@ func TestWorkerAssign(t *testing.T) {
{ID: "config-2"},
},
expectedAssigned: []*api.Task{
// task-1 should be cleaned up and deleted.
{ID: "task-2"},
},
},
{
// remove assigned tasks, secret and config no longer present
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
// there should be no tasks in the tasks db after this.
expectedTasks: nil,
},

// TODO(stevvooe): There are a few more states here we need to get
Expand All @@ -173,6 +171,7 @@ func TestWorkerAssign(t *testing.T) {
tasks []*api.Task
assigned []*api.Task
)

assert.NoError(t, worker.db.View(func(tx *bolt.Tx) error {
return WalkTasks(tx, func(task *api.Task) error {
tasks = append(tasks, task)
Expand Down Expand Up @@ -491,7 +490,6 @@ func TestWorkerUpdate(t *testing.T) {
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
expectedSecrets: []*api.Secret{
Expand Down Expand Up @@ -556,10 +554,6 @@ func TestWorkerUpdate(t *testing.T) {
Action: api.AssignmentChange_AssignmentActionRemove,
},
},
expectedTasks: []*api.Task{
{ID: "task-1"},
{ID: "task-2"},
},
},
} {
assert.NoError(t, worker.Update(ctx, testcase.changeSet))
Expand Down
2 changes: 1 addition & 1 deletion direct.mk
Expand Up @@ -123,7 +123,7 @@ uninstall:
coverage: ## generate coverprofiles from the unit tests
@echo "🐳 $@"
@( for pkg in $(filter-out ${INTEGRATION_PACKAGE},${PACKAGES}); do \
go test ${RACE} -tags "${DOCKER_BUILDTAGS}" -test.short -coverprofile="../../../$$pkg/coverage.txt" -covermode=atomic $$pkg || exit; \
go test -v ${RACE} -tags "${DOCKER_BUILDTAGS}" -test.short -coverprofile="../../../$$pkg/coverage.txt" -covermode=atomic $$pkg || exit; \
go test ${RACE} -tags "${DOCKER_BUILDTAGS}" -test.short -coverprofile="../../../$$pkg/coverage.txt" -covermode=atomic $$pkg || exit; \
done )

Expand Down

0 comments on commit af58e4d

Please sign in to comment.