Skip to content

Commit

Permalink
Also override cpu/memory in containerOverrides (#6836)
Browse files Browse the repository at this point in the history
Previously, we only set our cpu/memory overrides as a taskOverride.

This sets the limit for the entire task. If the containers don't
have their own limit set, they'll expand to consume all available
cpu/memory in the task. This is the case when we rely on Dagster to
register its own task definitions because the task definitions it
registers don't set container cpu/memory.

But if the containers have their own limits defined, they'll continue to
respect that limit even if we override the task itself. This can happen
when you provide a custom task definition that explicitly sets
cpu/memory for containers.

This fix changes the behavior of the EcsRunLauncher to set cpu/memory
overrides as both a taskOverride and a containerOverride. This way, both
the task will be upsized and the container will be allowed to consume
cpu/memory beyond its initial allocation.

One small gotcha in the PR that I've called out in a comment - AWS
expects that cpu/memory be an integer in containerOverrides but a string
in taskOverrides.
  • Loading branch information
jmsanders committed Feb 28, 2022
1 parent ab08410 commit a3ecc86
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 6 deletions.
18 changes: 13 additions & 5 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,21 +143,29 @@ def launch_run(self, context: LaunchRunContext) -> None:

# Set cpu or memory overrides
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/task-cpu-memory-error.html
overrides = {}
cpu_and_memory_overrides = {}
tags = self._get_run_tags(run.run_id)
if tags.cpu:
overrides["cpu"] = tags.cpu
cpu_and_memory_overrides["cpu"] = tags.cpu
if tags.memory:
overrides["memory"] = tags.memory
cpu_and_memory_overrides["memory"] = tags.memory

# Run a task using the same network configuration as this processes's
# task.
response = self.ecs.run_task(
taskDefinition=task_definition,
cluster=metadata.cluster,
overrides={
"containerOverrides": [{"name": self.container_name, "command": command}],
**overrides,
"containerOverrides": [
{
"name": self.container_name,
"command": command,
# containerOverrides expects cpu/memory as integers
**{k: int(v) for k, v in cpu_and_memory_overrides.items()},
}
],
# taskOverrides expects cpu/memory as strings
**cpu_and_memory_overrides,
},
networkConfiguration={
"awsvpcConfiguration": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,13 @@ def test_memory_and_cpu(ecs, instance, workspace, run, task_definition):
task = ecs.describe_tasks(tasks=[task_arn])["tasks"][0]

assert task.get("memory") == task_definition.get("memory")
# taskOverrides expects cpu/memory as strings
assert task.get("overrides").get("memory") == "1024"
# taskOverrides expects cpu/memory as integers
assert task.get("overrides").get("containerOverrides")[0].get("memory") == 1024
assert task.get("cpu") == task_definition.get("cpu")
assert not task.get("overrides").get("cpu")
assert not task.get("overrides").get("containerOverrides")[0].get("cpu")

# Also override cpu
existing_tasks = ecs.list_tasks()["taskArns"]
Expand All @@ -246,8 +250,10 @@ def test_memory_and_cpu(ecs, instance, workspace, run, task_definition):

assert task.get("memory") == task_definition.get("memory")
assert task.get("overrides").get("memory") == "1024"
assert task.get("overrides").get("containerOverrides")[0].get("memory") == 1024
assert task.get("cpu") == task_definition.get("cpu")
assert task.get("overrides").get("cpu") == "512"
assert task.get("overrides").get("containerOverrides")[0].get("cpu") == 512

# Override with invalid constraints
instance.add_run_tags(run.run_id, {"ecs/memory": "999"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def run_task(self, **kwargs):
"clusterArn": self._cluster_arn(cluster),
"containers": containers,
"lastStatus": "RUNNING",
"overrides": kwargs.get("overrides", {}),
"overrides": overrides,
"taskArn": arn,
"taskDefinitionArn": task_definition["taskDefinitionArn"],
"cpu": task_definition["cpu"],
Expand Down

0 comments on commit a3ecc86

Please sign in to comment.