Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge Feature/task-resource-accounting to dev #3819

Merged
merged 2 commits into from
Jul 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion agent/acs/handler/acs_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,7 +1418,6 @@ func validateAddedTask(expectedTask apitask.Task, addedTask apitask.Task) error
Family: addedTask.Family,
Version: addedTask.Version,
DesiredStatusUnsafe: addedTask.GetDesiredStatus(),
StartSequenceNumber: addedTask.StartSequenceNumber,
}

if !reflect.DeepEqual(expectedTask, taskToCompareFromAdded) {
Expand Down
12 changes: 6 additions & 6 deletions agent/acs/update_handler/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestPerformUpdateWithUpdatesDisabled(t *testing.T) {
Reason: ptr("Updates are disabled").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -182,7 +182,7 @@ func TestFullUpdateFlow(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUndownloadedUpdate(t *testing.T) {
MessageId: ptr("mid").(*string),
}})

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -308,7 +308,7 @@ func TestDuplicateUpdateMessagesWithSuccess(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestDuplicateUpdateMessagesWithFailure(t *testing.T) {

require.Equal(t, "update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down Expand Up @@ -448,7 +448,7 @@ func TestNewerUpdateMessages(t *testing.T) {

require.Equal(t, "newer-update-tar-data", writtenFile.String(), "incorrect data written")

taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil)
taskEngine := engine.NewTaskEngine(cfg, nil, nil, nil, nil, nil, nil, nil, nil, nil, nil)
msg := &ecsacs.PerformUpdateMessage{
ClusterArn: ptr("cluster").(*string),
ContainerInstanceArn: ptr("containerInstance").(*string),
Expand Down
18 changes: 18 additions & 0 deletions agent/api/ecsclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,24 @@ func (client *APIECSClient) getResources() ([]*ecs.Resource, error) {
return []*ecs.Resource{&cpuResource, &memResource, &portResource, &udpPortResource}, nil
}

// GetHostResources calling getHostResources to get a list of CPU, MEMORY, PORTS and PORTS_UPD resources
// and return a resourceMap that map the resource name to each resource
func (client *APIECSClient) GetHostResources() (map[string]*ecs.Resource, error) {
resources, err := client.getResources()
if err != nil {
return nil, err
}
resourceMap := make(map[string]*ecs.Resource)
for _, resource := range resources {
if *resource.Name == "PORTS" {
// Except for RCI, TCP Ports are named as PORTS_TCP in agent for Host Resources purpose
resource.Name = utils.Strptr("PORTS_TCP")
}
resourceMap[*resource.Name] = resource
}
return resourceMap, nil
}

func getCpuAndMemory() (int64, int64) {
memInfo, err := system.ReadMemInfo()
mem := int64(0)
Expand Down
2 changes: 2 additions & 0 deletions agent/api/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type ECSClient interface {
// UpdateContainerInstancesState updates the given container Instance ID with
// the given status. Only valid statuses are ACTIVE and DRAINING.
UpdateContainerInstancesState(instanceARN, status string) error
// GetHostResources retrieves a map that map the resource name to the corresponding resource
GetHostResources() (map[string]*ecs.Resource, error)
}

// ECSSDK is an interface that specifies the subset of the AWS Go SDK's ECS
Expand Down
15 changes: 15 additions & 0 deletions agent/api/mocks/api_mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

168 changes: 144 additions & 24 deletions agent/api/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

apiappmesh "github.com/aws/amazon-ecs-agent/agent/api/appmesh"
"github.com/aws/amazon-ecs-agent/agent/api/container"
apicontainer "github.com/aws/amazon-ecs-agent/agent/api/container"
apicontainerstatus "github.com/aws/amazon-ecs-agent/agent/api/container/status"
"github.com/aws/amazon-ecs-agent/agent/api/serviceconnect"
Expand All @@ -47,6 +48,7 @@ import (
apieni "github.com/aws/amazon-ecs-agent/ecs-agent/api/eni"
apierrors "github.com/aws/amazon-ecs-agent/ecs-agent/api/errors"
"github.com/aws/amazon-ecs-agent/ecs-agent/credentials"
"github.com/aws/amazon-ecs-agent/ecs-agent/ecs_client/model/ecs"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger"
"github.com/aws/amazon-ecs-agent/ecs-agent/logger/field"
"github.com/aws/amazon-ecs-agent/ecs-agent/utils/arn"
Expand Down Expand Up @@ -233,9 +235,6 @@ type Task struct {
// is handled properly so that the state storage continues to work.
SentStatusUnsafe apitaskstatus.TaskStatus `json:"SentStatus"`

StartSequenceNumber int64
StopSequenceNumber int64

// ExecutionCredentialsID is the ID of credentials that are used by agent to
// perform some action at the task level, such as pulling image from ECR
ExecutionCredentialsID string `json:"executionCredentialsID"`
Expand Down Expand Up @@ -311,11 +310,6 @@ func TaskFromACS(acsTask *ecsacs.Task, envelope *ecsacs.PayloadMessage) (*Task,
if err := json.Unmarshal(data, task); err != nil {
return nil, err
}
if task.GetDesiredStatus() == apitaskstatus.TaskRunning && envelope.SeqNum != nil {
task.StartSequenceNumber = *envelope.SeqNum
} else if task.GetDesiredStatus() == apitaskstatus.TaskStopped && envelope.SeqNum != nil {
task.StopSequenceNumber = *envelope.SeqNum
}

// Overrides the container command if it's set
for _, container := range task.Containers {
Expand Down Expand Up @@ -2824,22 +2818,6 @@ func (task *Task) GetAppMesh() *apiappmesh.AppMesh {
return task.AppMesh
}

// GetStopSequenceNumber returns the stop sequence number of a task
func (task *Task) GetStopSequenceNumber() int64 {
task.lock.RLock()
defer task.lock.RUnlock()

return task.StopSequenceNumber
}

// SetStopSequenceNumber sets the stop seqence number of a task
func (task *Task) SetStopSequenceNumber(seqnum int64) {
task.lock.Lock()
defer task.lock.Unlock()

task.StopSequenceNumber = seqnum
}

// SetPullStartedAt sets the task pullstartedat timestamp and returns whether
// this field was updated or not
func (task *Task) SetPullStartedAt(timestamp time.Time) bool {
Expand Down Expand Up @@ -3525,3 +3503,145 @@ func (task *Task) IsServiceConnectConnectionDraining() bool {
defer task.lock.RUnlock()
return task.ServiceConnectConnectionDrainingUnsafe
}

// ToHostResources will convert a task to a map of resources which ECS takes into account when scheduling tasks on instances
// * CPU
// - If task level CPU is set, use that
// - Else add up container CPUs
//
// * Memory
// - If task level memory is set, use that
// - Else add up container level
// - If memoryReservation field is set, use that
// - Else use memory field
//
// * Ports (TCP/UDP)
// - Only account for hostPort
// - Don't need to account for awsvpc mode, each task gets its own namespace
//
// * GPU
// - Return num of gpus requested (len of GPUIDs field)
func (task *Task) ToHostResources() map[string]*ecs.Resource {
resources := make(map[string]*ecs.Resource)
// CPU
if task.CPU > 0 {
// cpu unit is vcpu at task level
// convert to cpushares
taskCPUint64 := int64(task.CPU * 1024)
resources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &taskCPUint64,
}
} else {
// cpu unit is cpushares at container level
containerCPUint64 := int64(0)
for _, container := range task.Containers {
containerCPUint64 += int64(container.CPU)
}
resources["CPU"] = &ecs.Resource{
Name: utils.Strptr("CPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &containerCPUint64,
}
}

// Memory
if task.Memory > 0 {
// memory unit is MiB at task level
taskMEMint64 := task.Memory
resources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &taskMEMint64,
}
} else {
containerMEMint64 := int64(0)

for _, c := range task.Containers {
// To parse memory reservation / soft limit
hostConfig := &dockercontainer.HostConfig{}

if c.DockerConfig.HostConfig != nil {
err := json.Unmarshal([]byte(*c.DockerConfig.HostConfig), hostConfig)
if err != nil || hostConfig.MemoryReservation <= 0 {
// container memory unit is MiB, keeping as is
containerMEMint64 += int64(c.Memory)
} else {
// Soft limit is specified in MiB units but translated to bytes while being transferred to Agent
// Converting back to MiB
containerMEMint64 += hostConfig.MemoryReservation / (1024 * 1024)
}
} else {
// container memory unit is MiB, keeping as is
containerMEMint64 += int64(c.Memory)
}
}
resources["MEMORY"] = &ecs.Resource{
Name: utils.Strptr("MEMORY"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &containerMEMint64,
}
}

// PORTS_TCP and PORTS_UDP
var tcpPortSet []uint16
var udpPortSet []uint16

// AWSVPC tasks have 'host' ports mapped to task ENI, not to host
// So don't need to keep an 'account' of awsvpc tasks with host ports fields assigned
if !task.IsNetworkModeAWSVPC() {
for _, c := range task.Containers {
for _, port := range c.Ports {
hostPort := port.HostPort
protocol := port.Protocol
if hostPort > 0 && protocol == container.TransportProtocolTCP {
tcpPortSet = append(tcpPortSet, hostPort)
} else if hostPort > 0 && protocol == container.TransportProtocolUDP {
udpPortSet = append(udpPortSet, hostPort)
}
}
}
}
resources["PORTS_TCP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_TCP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: utils.Uint16SliceToStringSlice(tcpPortSet),
}
resources["PORTS_UDP"] = &ecs.Resource{
Name: utils.Strptr("PORTS_UDP"),
Type: utils.Strptr("STRINGSET"),
StringSetValue: utils.Uint16SliceToStringSlice(udpPortSet),
}

// GPU
var num_gpus int64
num_gpus = 0
for _, c := range task.Containers {
num_gpus += int64(len(c.GPUIDs))
}
resources["GPU"] = &ecs.Resource{
Name: utils.Strptr("GPU"),
Type: utils.Strptr("INTEGER"),
IntegerValue: &num_gpus,
}
logger.Debug("Task host resources to account for", logger.Fields{
"taskArn": task.Arn,
"CPU": *resources["CPU"].IntegerValue,
"MEMORY": *resources["MEMORY"].IntegerValue,
"PORTS_TCP": aws.StringValueSlice(resources["PORTS_TCP"].StringSetValue),
"PORTS_UDP": aws.StringValueSlice(resources["PORTS_UDP"].StringSetValue),
"GPU": *resources["GPU"].IntegerValue,
})
return resources
}

func (task *Task) HasActiveContainers() bool {
for _, container := range task.Containers {
containerStatus := container.GetKnownStatus()
if containerStatus >= apicontainerstatus.ContainerPulled && containerStatus <= apicontainerstatus.ContainerResourcesProvisioned {
return true
}
}
return false
}