Skip to content

Commit

Permalink
Reuse ECS task definitions (#7009)
Browse files Browse the repository at this point in the history
Unless you provide your own task definition in dagster.yaml, every run
that the EcsRunLauncher launches registers a new task definition.

For most runs, this is redundant - the new task definition it creates is
identical to the last one it created. Consequently:

- Users' AWS accounts are slowly filling up with redundant ECS Task
  Definitions
- There's a hard limit of one million revisions per task definition family
  so the first user to hit a million runs on the EcsRunLauncher has a
  nasty ticking time bomb waiting for them

This change aims to alter that behavior by treating the task definition
as a more generic canvas and leaning into Task Overides and Continer
Overrides to vary actual behavior. As long as we have a minimum
sufficient task definition, we should be able to reuse it and customize
it via overrides:

https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_TaskOverride.html
https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_ContainerOverride.html
  • Loading branch information
jmsanders committed Mar 15, 2022
1 parent 8d0319a commit 07e3403
Show file tree
Hide file tree
Showing 8 changed files with 163 additions and 35 deletions.
48 changes: 38 additions & 10 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import warnings
from collections import namedtuple
from contextlib import suppress

import boto3
from botocore.exceptions import ClientError
Expand All @@ -13,6 +14,7 @@

from ..secretsmanager import get_secrets_from_arns, get_tagged_secrets
from .tasks import default_ecs_task_definition, default_ecs_task_metadata
from .utils import sanitize_family

Tags = namedtuple("Tags", ["arn", "cluster", "cpu", "memory"])

Expand Down Expand Up @@ -148,10 +150,13 @@ def launch_run(self, context: LaunchRunContext) -> None:
docker-compose when you use the Dagster ECS reference deployment.
"""
run = context.pipeline_run
family = sanitize_family(
run.external_pipeline_origin.external_repository_origin.repository_location_origin.location_name
)
metadata = self._task_metadata()
pipeline_origin = context.pipeline_code_origin
image = pipeline_origin.repository_origin.container_image
task_definition = self._task_definition(metadata, image)["family"]
task_definition = self._task_definition(family, metadata, image)["family"]

args = ExecuteRunArgs(
pipeline_origin=pipeline_origin,
Expand Down Expand Up @@ -257,7 +262,7 @@ def terminate(self, run_id):
self.ecs.stop_task(task=tags.arn, cluster=tags.cluster)
return True

def _task_definition(self, metadata, image):
def _task_definition(self, family, metadata, image):
"""
Return the launcher's task definition if it's configured.
Expand All @@ -270,19 +275,42 @@ def _task_definition(self, metadata, image):
task_definition = self.ecs.describe_task_definition(taskDefinition=self.task_definition)
return task_definition["taskDefinition"]

secrets = merge_dicts(
(
get_tagged_secrets(self.secrets_manager, self.secrets_tag)
if self.secrets_tag
else {}
),
self.secrets,
)
secrets_dict = (
{"secrets": [{"name": key, "valueFrom": value} for key, value in secrets.items()]}
if secrets
else {}
)

task_definition = {}
with suppress(ClientError):
task_definition = self.ecs.describe_task_definition(taskDefinition=family)[
"taskDefinition"
]

container_definitions = task_definition.get("containerDefinitions", [{}])
for container_definition in container_definitions:
if (
container_definition.get("image") == image
and container_definition.get("name") == self.container_name
and container_definition.get("secrets") == secrets_dict.get("secrets")
):
return task_definition

return default_ecs_task_definition(
self.ecs,
family,
metadata,
image,
self.container_name,
secrets=merge_dicts(
(
get_tagged_secrets(self.secrets_manager, self.secrets_tag)
if self.secrets_tag
else {}
),
self.secrets,
),
secrets=secrets_dict,
)

def _task_metadata(self):
Expand Down
18 changes: 3 additions & 15 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ class EcsNoTasksFound(Exception):

def default_ecs_task_definition(
ecs,
family,
metadata,
image,
container_name,
command=None,
environment=None,
secrets=None,
):
# Start with the current process's task's definition but remove
Expand All @@ -54,17 +54,6 @@ def default_ecs_task_definition(
if key in metadata.task_definition.keys()
)

environment_dict = (
{"environment": [{"key": key, "value": value} for key, value in environment.items()]}
if environment
else {}
)
secrets_dict = (
{"secrets": [{"name": key, "valueFrom": value} for key, value in secrets.items()]}
if secrets
else {}
)

# The current process might not be running in a container that has the
# pipeline's code installed. Inherit most of the process's container
# definition (things like environment, dependencies, etc.) but replace
Expand All @@ -83,13 +72,12 @@ def default_ecs_task_definition(
"entryPoint": [],
"command": command if command else [],
},
environment_dict,
secrets_dict,
secrets or {},
)
]
task_definition = {
**task_definition,
"family": "dagster-run",
"family": family,
"containerDefinitions": container_definitions,
}

Expand Down
6 changes: 6 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import re


def sanitize_family(family):
# Trim the location name and remove special characters
return re.sub(r"[^\w^\-]", "", family)[:255]
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@ def ignore_experimental_warning():

@pytest.fixture
def image():
return "dagster:latest"
return "dagster:first"


@pytest.fixture
def other_image():
return "dagster:second"


@pytest.fixture
Expand Down Expand Up @@ -114,6 +119,28 @@ def instance(instance_cm):
yield dagster_instance


@pytest.fixture
def workspace(instance, image):
with in_process_test_workspace(
instance,
ReconstructableRepository.for_file(
repo.__file__, repo.repository.__name__, container_image=image
),
) as workspace:
yield workspace


@pytest.fixture
def other_workspace(instance, other_image):
with in_process_test_workspace(
instance,
ReconstructableRepository.for_file(
repo.__file__, repo.repository.__name__, container_image=other_image
),
) as workspace:
yield workspace


@pytest.fixture
def pipeline():
return repo.pipeline
Expand All @@ -128,14 +155,11 @@ def external_pipeline(workspace):


@pytest.fixture
def workspace(instance, image):
with in_process_test_workspace(
instance,
ReconstructableRepository.for_file(
repo.__file__, repo.repository.__name__, container_image=image
),
) as workspace:
yield workspace
def other_external_pipeline(other_workspace):
location = other_workspace.get_repository_location(other_workspace.repository_location_names[0])
return location.get_repository(repo.repository.__name__).get_full_external_pipeline(
repo.pipeline.__name__
)


@pytest.fixture
Expand All @@ -145,3 +169,12 @@ def run(instance, pipeline, external_pipeline):
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)


@pytest.fixture
def other_run(instance, pipeline, other_external_pipeline):
return instance.create_run_for_pipeline(
pipeline,
external_pipeline_origin=other_external_pipeline.get_external_origin(),
pipeline_code_origin=other_external_pipeline.get_python_origin(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ def test_default_launcher(
task_definition = task_definition["taskDefinition"]

# It has a new family, name, and image
assert task_definition["family"] == "dagster-run"
# We get the family name from the location name. With the InProcessExecutor that we use in tests,
# the location name is always <<in_process>>. And we sanitize it so it's compatible with the ECS API.
assert task_definition["family"] == "in_process"
assert len(task_definition["containerDefinitions"]) == 1
container_definition = task_definition["containerDefinitions"][0]
assert container_definition["name"] == "run"
Expand Down Expand Up @@ -84,6 +86,47 @@ def test_default_launcher(
assert MetadataEntry.text(run.run_id, "Run ID") in event_metadata


def test_task_definition_registration(
ecs, instance, workspace, run, other_workspace, other_run, secrets_manager
):
initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
initial_tasks = ecs.list_tasks()["taskArns"]

instance.launch_run(run.run_id, workspace)

# A new task definition is created
task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
assert len(task_definitions) == len(initial_task_definitions) + 1

# Launching another run reuses an existing task definition
instance.launch_run(run.run_id, workspace)
assert task_definitions == ecs.list_task_definitions()["taskDefinitionArns"]

# Register a new task definition if the image changes
instance.launch_run(other_run.run_id, other_workspace)
assert len(ecs.list_task_definitions()["taskDefinitionArns"]) == len(task_definitions) + 1

# Relaunching another run with the new image reuses an existing task definition
task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
instance.launch_run(other_run.run_id, other_workspace)
assert task_definitions == ecs.list_task_definitions()["taskDefinitionArns"]

# Register a new task definition if secrets change
secrets_manager.create_secret(
Name="hello",
SecretString="hello",
Tags=[{"Key": "dagster", "Value": "true"}],
)

instance.launch_run(other_run.run_id, other_workspace)
assert len(ecs.list_task_definitions()["taskDefinitionArns"]) == len(task_definitions) + 1

# Relaunching another run with the same secrets reuses an existing task definition
task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
instance.launch_run(other_run.run_id, other_workspace)
assert task_definitions == ecs.list_task_definitions()["taskDefinitionArns"]


def test_launching_custom_task_definition(
ecs, instance_cm, run, workspace, pipeline, external_pipeline
):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from dagster_aws.ecs.utils import sanitize_family


def test_sanitize_family():
assert sanitize_family("abc") == "abc"
assert sanitize_family("abc123") == "abc123"
assert sanitize_family("abc-123") == "abc-123"
assert sanitize_family("abc_123") == "abc_123"
assert sanitize_family("abc 123") == "abc123"
assert sanitize_family("abc~123") == "abc123"
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import itertools
import re
import uuid
from collections import defaultdict
from operator import itemgetter
Expand Down Expand Up @@ -258,6 +259,12 @@ def put_account_setting(self, **kwargs):
@stubbed
def register_task_definition(self, **kwargs):
family = kwargs.get("family")
# Family must be <= 255 characters. Alphanumeric, dash, and underscore only.
if len(family) > 255 or not re.match(r"^[\w\-]+$", family):
self.stubber.add_client_error(
method="register_task_definition", expected_params={**kwargs}
)

# Revisions are 1 indexed
revision = len(self.task_definitions[family]) + 1
arn = self._task_definition_arn(family, revision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,19 @@ def test_register_task_definition(ecs):
family="dagster", containerDefinitions=[], memory="512", cpu="1"
)

# With invalid names
with pytest.raises(ClientError):
# Special characters
ecs.register_task_definition(
family="boom!", containerDefinitions=[], memory="512", cpu="256"
)

with pytest.raises(ClientError):
# Too long
ecs.register_task_definition(
family=256 * "a", containerDefinitions=[], memory="512", cpu="256"
)

response = ecs.register_task_definition(
family="dagster", containerDefinitions=[], memory="512", cpu="256"
)
Expand Down

0 comments on commit 07e3403

Please sign in to comment.