Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change reconcile/container update order on init and waitForHostResources/emitCurrentStatus order #3747

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 6 additions & 3 deletions agent/engine/docker_task_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,15 +597,18 @@ func (engine *DockerTaskEngine) synchronizeState() {
}

tasks := engine.state.AllTasks()
// Pre-consume resources for tasks have progressed beyond resource check (waitingTaskQueue) stage.
// Call reconcileHostResources before
// - filterTasksToStartUnsafe which will reconcile container statuses
// for the duration the agent was stopped
// - starting managedTask goroutines
engine.reconcileHostResources()
tasksToStart := engine.filterTasksToStartUnsafe(tasks)
for _, task := range tasks {
task.InitializeResources(engine.resourceFields)
engine.saveTaskData(task)
}

// Before starting managedTask goroutines, pre-allocate resources for tasks which
// which have progressed beyond resource check (waitingTaskQueue) stage
engine.reconcileHostResources()
for _, task := range tasksToStart {
engine.startTask(task)
}
Expand Down
16 changes: 14 additions & 2 deletions agent/engine/host_resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,25 @@ func (e *ResourceIsNilForTask) Error() string {
return fmt.Sprintf("resource %s is nil in task resources", e.resource)
}

func toStringSlice(s []*string) []string {
var t []string
for _, ptr := range s {
if ptr == nil {
t = append(t, "nil")
} else {
t = append(t, *ptr)
}
}
return t
}

func (h *HostResourceManager) logResources(msg string, taskArn string) {
logger.Debug(msg, logger.Fields{
"taskArn": taskArn,
"CPU": *h.consumedResource[CPU].IntegerValue,
"MEMORY": *h.consumedResource[MEMORY].IntegerValue,
"PORTS_TCP": h.consumedResource[PORTSTCP].StringSetValue,
"PORTS_UDP": h.consumedResource[PORTSUDP].StringSetValue,
"PORTS_TCP": toStringSlice(h.consumedResource[PORTSTCP].StringSetValue),
singholt marked this conversation as resolved.
Show resolved Hide resolved
"PORTS_UDP": toStringSlice(h.consumedResource[PORTSUDP].StringSetValue),
"GPU": *h.consumedResource[GPU].IntegerValue,
})
}
Expand Down
10 changes: 7 additions & 3 deletions agent/engine/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,12 +199,16 @@ func (mtask *managedTask) overseeTask() {
// `desiredstatus`es which are a construct of the engine used only here,
// not present on the backend
mtask.UpdateStatus()
// If this was a 'state restore', send all unsent statuses
mtask.emitCurrentStatus()

// Wait for host resources required by this task to become available
// Wait here until enough resources are available on host for the task to progress
// - Waits until host resource manager succesfully 'consume's task resources and returns
// - For tasks which have crossed this stage before (on agent restarts), resources are pre-consumed - returns immediately
// (resources are later 'release'd on Stopped task emitTaskEvent call)
mtask.waitForHostResources()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can add a small optimization here to skip invoking waitForHostResources if task known status is not STOPPED? Since we know in that case the host resources will be released immediately anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes let me add that. Although for this particular 'restart case', resources would be detected as 'STOPPED', and waitForHostResources behavior is resources are pre-consumed - returns immediately i.e. without queueing.


// If this was a 'state restore', send all unsent statuses
mtask.emitCurrentStatus()

// Main infinite loop. This is where we receive messages and dispatch work.
for {
if mtask.shouldExit() {
Expand Down