Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order #3747

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3617,8 +3617,8 @@ func (task *Task) ToHostResources() map[string]*ecs.Resource {
"taskArn": task.Arn,
"CPU": *resources["CPU"].IntegerValue,
"MEMORY": *resources["MEMORY"].IntegerValue,
"PORTS_TCP": resources["PORTS_TCP"].StringSetValue,
"PORTS_UDP": resources["PORTS_UDP"].StringSetValue,
"PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue),
"PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue),
"GPU": *resources["GPU"].IntegerValue,
})
return resources
Expand Down
11 changes: 8 additions & 3 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,20 @@ func (engine *DockerTaskEngine) synchronizeState() {
}

tasks := engine.state.AllTasks()
// For normal task progress, overseeTask 'consume's resources through waitForHostResources in host_resource_manager before progressing
// For agent restarts (state restore), we pre-consume resources for tasks that had progressed beyond waitForHostResources stage -
// so these tasks do not wait during 'waitForHostResources' call again - do not go through queuing again
//
// Call reconcileHostResources before
// - filterTasksToStartUnsafe which will reconcile container statuses for the duration the agent was stopped
// - starting managedTask's overseeTask goroutines
engine.reconcileHostResources()
tasksToStart := engine.filterTasksToStartUnsafe(tasks)
for _, task := range tasks {
task.InitializeResources(engine.resourceFields)
engine.saveTaskData(task)
}

// Before starting managedTask goroutines, pre-allocate resources for tasks which
// which have progressed beyond resource check (waitingTaskQueue) stage
engine.reconcileHostResources()
for _, task := range tasksToStart {
engine.startTask(task)
}
Expand Down
5 changes: 3 additions & 2 deletions agent/engine/host_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/aws/amazon-ecs-agent/agent/utils"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/aws-sdk-go/aws"
)

const (
Expand Down Expand Up @@ -72,8 +73,8 @@ func (h *HostResourceManager) logResources(msg string, taskArn string) {
"taskArn": taskArn,
"CPU": *h.consumedResource[CPU].IntegerValue,
"MEMORY": *h.consumedResource[MEMORY].IntegerValue,
"PORTS_TCP": h.consumedResource[PORTSTCP].StringSetValue,
"PORTS_UDP": h.consumedResource[PORTSUDP].StringSetValue,
"PORTS_TCP": aws.StringValueSlice(h.consumedResource[PORTSTCP].StringSetValue),
"PORTS_UDP": aws.StringValueSlice(h.consumedResource[PORTSUDP].StringSetValue),
"GPU": *h.consumedResource[GPU].IntegerValue,
})
}
Expand Down
18 changes: 15 additions & 3 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,17 @@ func (mtask *managedTask) overseeTask() {
// `desiredstatus`es which are a construct of the engine used only here,
// not present on the backend
mtask.UpdateStatus()
// If this was a 'state restore', send all unsent statuses
mtask.emitCurrentStatus()

// Wait for host resources required by this task to become available
// Wait here until enough resources are available on host for the task to progress
// - Waits until host resource manager succesfully 'consume's task resources and returns
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
// - If the task is already stopped (knownStatus is STOPPED), does not attempt to consume resources - returns immediately
// (resources are later 'release'd on Stopped task emitTaskEvent call)
mtask.waitForHostResources()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a small optimization here to skip invoking waitForHostResources if task known status is not STOPPED? Since we know in that case the host resources will be released immediately anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let me add that. Although for this particular 'restart case', resources would be detected as 'STOPPED', and waitForHostResources behavior is resources are pre-consumed - returns immediately i.e. without queueing.


// If this was a 'state restore', send all unsent statuses
mtask.emitCurrentStatus()

// Main infinite loop. This is where we receive messages and dispatch work.
for {
if mtask.shouldExit() {
Expand Down Expand Up @@ -272,6 +277,13 @@ func (mtask *managedTask) emitCurrentStatus() {
// the task. It will wait for event on this task's consumedHostResourceEvent
// channel from monitorQueuedTasks routine to wake up
func (mtask *managedTask) waitForHostResources() {
if mtask.GetKnownStatus().Terminal() {
// Task's known status is STOPPED. No need to wait in this case and proceed to cleanup
// This is relevant when agent restarts and a task has stopped - do not attempt
// to consume resources in host resource manager
return
}

if !mtask.IsInternal && !mtask.engine.hostResourceManager.checkTaskConsumed(mtask.Arn) {
// Internal tasks are started right away as their resources are not accounted for
mtask.engine.enqueueTask(mtask)
Expand Down