Skip to content

Commit

Permalink
Register a new task definition for IAM changes (#7564)
Browse files Browse the repository at this point in the history
We no longer register a new task definition for every run:

#7009

This introduced a regression where changing the IAM role used by a task
definition doesn't re-register the task definition. So it tries to run
the task with an IAM role that doesn't exist. The easiest way to
reproduce this is to use our example `docker compose` functionality:

#7358

This change extends our logic for deciding when to reuse a task
definition. Now, we reuse a task definition if:

- The container definition's image, name, or secrets change
- The task definition's task role or execution role changes

For now, I've pulled the logic out into its own private method with a
separate test so we can more easily extend the logic. I think
eventually, this would benefit from a broader refactor - perhaps with
the introduction of a `TaskDefinition` class that implements `__eq__`.
  • Loading branch information
jmsanders committed Apr 26, 2022
1 parent b87ef89 commit b49aadc
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 14 deletions.
33 changes: 24 additions & 9 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,9 @@ def _task_definition(self, family, metadata, image, container_context):
task_definition = self.ecs.describe_task_definition(taskDefinition=family)[
"taskDefinition"
]

container_definitions = task_definition.get("containerDefinitions", [{}])
for container_definition in container_definitions:
if (
container_definition.get("image") == image
and container_definition.get("name") == self.container_name
and container_definition.get("secrets") == secrets_definition.get("secrets", [])
):
return task_definition
secrets = secrets_definition.get("secrets", [])
if self._reuse_task_definition(task_definition, metadata, image, secrets):
return task_definition

return default_ecs_task_definition(
self.ecs,
Expand All @@ -327,5 +321,26 @@ def _task_definition(self, family, metadata, image, container_context):
include_sidecars=self.include_sidecars,
)

def _reuse_task_definition(self, task_definition, metadata, image, secrets):
container_definitions_match = False
task_definitions_match = False

container_definitions = task_definition.get("containerDefinitions", [{}])
# Only check for diffs to the primary container. This ignores changes to sidecars.
for container_definition in container_definitions:
if (
container_definition.get("image") == image
and container_definition.get("name") == self.container_name
and container_definition.get("secrets") == secrets
):
container_definitions_match = True

if task_definition.get("executionRoleArn") == metadata.task_definition.get(
"executionRoleArn"
) and task_definition.get("taskRoleArn") == metadata.task_definition.get("taskRoleArn"):
task_definitions_match = True

return container_definitions_match & task_definitions_match

def _task_metadata(self):
return default_ecs_task_metadata(self.ec2, self.ecs)
4 changes: 0 additions & 4 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ def default_ecs_task_definition(

# Register the task overridden task definition as a revision to the
# "dagster-run" family.
# TODO: Only register the task definition if a matching one doesn't
# already exist. Otherwise, we risk exhausting the revisions limit
# (1,000,000 per family) with unnecessary revisions:
# https://docs.aws.amazon.com/AmazonECS/latest/developerguide/service-quotas.html
ecs.register_task_definition(**task_definition)

return task_definition
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
# pylint: disable=protected-access
# pylint: disable=unused-variable
import copy

import dagster_aws
import pytest
from botocore.exceptions import ClientError
from dagster_aws.ecs import EcsEventualConsistencyTimeout
from dagster_aws.ecs.tasks import TaskMetadata

from dagster.check import CheckError
from dagster.core.events import MetadataEntry
Expand Down Expand Up @@ -88,7 +91,7 @@ def test_default_launcher(


def test_task_definition_registration(
ecs, instance, workspace, run, other_workspace, other_run, secrets_manager
ecs, instance, workspace, run, other_workspace, other_run, secrets_manager, monkeypatch
):
initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
initial_tasks = ecs.list_tasks()["taskArns"]
Expand Down Expand Up @@ -127,6 +130,79 @@ def test_task_definition_registration(
instance.launch_run(other_run.run_id, other_workspace)
assert task_definitions == ecs.list_task_definitions()["taskDefinitionArns"]

# Register a new task definition if _reuse_task_definition returns False
# for any other reason
monkeypatch.setattr(instance.run_launcher, "_reuse_task_definition", lambda *_: False)

instance.launch_run(other_run.run_id, other_workspace)
assert len(ecs.list_task_definitions()["taskDefinitionArns"]) == len(task_definitions) + 1


def test_reuse_task_definition(instance):
image = "image"
secrets = []
original_task_definition = {
"containerDefinitions": [
{
"image": image,
"name": instance.run_launcher.container_name,
"secrets": secrets,
},
],
}
metadata = TaskMetadata(
cluster="cluster",
subnets=[],
security_groups=[],
task_definition=original_task_definition,
container_definition={},
assign_public_ip=True,
)

# The same task definition passes
task_definition = copy.deepcopy(original_task_definition)
assert instance.run_launcher._reuse_task_definition(task_definition, metadata, image, secrets)

# Changed image fails
task_definition = copy.deepcopy(original_task_definition)
task_definition["containerDefinitions"][0]["image"] = "new-image"
assert not instance.run_launcher._reuse_task_definition(
task_definition, metadata, image, secrets
)

# Changed container name fails
task_definition = copy.deepcopy(original_task_definition)
task_definition["containerDefinitions"][0]["name"] = "new-container"
assert not instance.run_launcher._reuse_task_definition(
task_definition, metadata, image, secrets
)

# Changed secrets fails
task_definition = copy.deepcopy(original_task_definition)
task_definition["containerDefinitions"][0]["secrets"].append("new-secrets")
assert not instance.run_launcher._reuse_task_definition(
task_definition, metadata, image, secrets
)

# Changed execution role fails
task_definition = copy.deepcopy(original_task_definition)
task_definition["executionRoleArn"] = "new-role"
assert not instance.run_launcher._reuse_task_definition(
task_definition, metadata, image, secrets
)

# Changed task role fails
task_definition = copy.deepcopy(original_task_definition)
task_definition["taskRoleArn"] = "new-role"
assert not instance.run_launcher._reuse_task_definition(
task_definition, metadata, image, secrets
)

# Any other diff passes
task_definition = copy.deepcopy(original_task_definition)
task_definition["somethingElse"] = "boom"
assert instance.run_launcher._reuse_task_definition(task_definition, metadata, image, secrets)


def test_launching_custom_task_definition(
ecs, instance_cm, run, workspace, pipeline, external_pipeline
Expand Down

0 comments on commit b49aadc

Please sign in to comment.