Skip to content

Commit

Permalink
Optionally use sidecars (#7087)
Browse files Browse the repository at this point in the history
First, we changed the EcsRunLauncher to only carry forward a single
container definition:

#6850

But this change didn't account for container definitions that defined
sidecars with `dependsOn`. When using the docker compose CLI to deploy
ECS, this introduced a breaking change because docker compose runs each
of its ECS tasks with a dependent sidecar container.

We fixed this with:

#6929

But it was only a partial fix; it removed the ability to intentionally
provide sidecars while still relying on Dagster to generate an ECS task
definition for you.

This change gives users the option to continue to create sidecars by
setting `use_sidecars` to `True` in their dagster.yaml. Perhaps in
the future, we'll change this config to optionally allow a list of
specific sidecars to enable (for example, if you have a monitoring
sidecar that you don't want the run task to inherit but a secrets
sidecar that you do want it to inherit). But for now, this should allow
users to configure their deployments to continue to behave the way
things behaved before 6850.
  • Loading branch information
jmsanders committed Mar 16, 2022
1 parent d07a108 commit 5765c38
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 26 deletions.
2 changes: 2 additions & 0 deletions examples/deploy_ecs/dagster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ run_coordinator:
run_launcher:
module: dagster_aws.ecs
class: EcsRunLauncher
config:
include_sidecars: true

run_storage:
module: dagster_postgres.run_storage
Expand Down
12 changes: 12 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
container_name="run",
secrets=None,
secrets_tag="dagster",
include_sidecars=False,
):
self._inst_data = inst_data
self.ecs = boto3.client("ecs")
Expand All @@ -51,6 +52,7 @@ def __init__(
self.secrets = {secret["name"]: secret["valueFrom"] for secret in self.secrets}

self.secrets_tag = secrets_tag
self.include_sidecars = include_sidecars

if self.task_definition:
task_definition = self.ecs.describe_task_definition(taskDefinition=task_definition)
Expand Down Expand Up @@ -112,6 +114,15 @@ def config_type(cls):
"environment variables in the container. Defaults to 'dagster'."
),
),
"include_sidecars": Field(
bool,
is_required=False,
default_value=False,
description=(
"Whether each run should use the same sidecars as the task that launches it. "
"Defaults to False."
),
),
}

@staticmethod
Expand Down Expand Up @@ -311,6 +322,7 @@ def _task_definition(self, family, metadata, image):
image,
self.container_name,
secrets=secrets_dict,
include_sidecars=self.include_sidecars,
)

def _task_metadata(self):
Expand Down
33 changes: 20 additions & 13 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def default_ecs_task_definition(
container_name,
command=None,
secrets=None,
include_sidecars=False,
):
# Start with the current process's task's definition but remove
# extra keys that aren't useful for creating a new task definition
Expand All @@ -62,19 +63,25 @@ def default_ecs_task_definition(
# entryPoint and containerOverrides are specified, they're concatenated
# and the command will fail
# https://aws.amazon.com/blogs/opensource/demystifying-entrypoint-cmd-docker/
container_definitions = [
merge_dicts(
{
**metadata.container_definition,
"name": container_name,
"image": image,
"entryPoint": [],
"command": command if command else [],
"dependsOn": [], # Remove any other container dependencies as well
},
secrets or {},
)
]
new_container_definition = merge_dicts(
{
**metadata.container_definition,
"name": container_name,
"image": image,
"entryPoint": [],
"command": command if command else [],
},
secrets or {},
{} if include_sidecars else {"dependsOn": []},
)

if include_sidecars:
container_definitions = metadata.task_definition.get("containerDefinitions")
container_definitions.remove(metadata.container_definition)
container_definitions.append(new_container_definition)
else:
container_definitions = [new_container_definition]

task_definition = {
**task_definition,
"family": family,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,16 @@ def other_run(instance, pipeline, other_external_pipeline):
external_pipeline_origin=other_external_pipeline.get_external_origin(),
pipeline_code_origin=other_external_pipeline.get_python_origin(),
)


@pytest.fixture
def launch_run(pipeline, external_pipeline, workspace):
def _launch_run(instance):
run = instance.create_run_for_pipeline(
pipeline,
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)
instance.launch_run(run.run_id, workspace)

return _launch_run
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,6 @@ def configured_secret(secrets_manager):
yield Secret(name, arn)


@pytest.fixture
def launch_run(pipeline, external_pipeline, workspace):
def _launch_run(instance):
run = instance.create_run_for_pipeline(
pipeline,
external_pipeline_origin=external_pipeline.get_external_origin(),
pipeline_code_origin=external_pipeline.get_python_origin(),
)
instance.launch_run(run.run_id, workspace)

return _launch_run


def test_secrets(
ecs,
secrets_manager,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
def test_default(ecs, instance, launch_run):
initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]

launch_run(instance)

# A new task definition is created
task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
assert len(task_definitions) == len(initial_task_definitions) + 1
task_definition_arn = list(set(task_definitions).difference(initial_task_definitions))[0]
task_definition = ecs.describe_task_definition(taskDefinition=task_definition_arn)
container_definitions = task_definition["taskDefinition"]["containerDefinitions"]

assert len(container_definitions) == 1
assert not container_definitions[0].get("dependsOn")


def test_include_sidecars_with_depends_on(ecs, instance_cm, launch_run, task_definition):
with instance_cm({"include_sidecars": True}) as instance:
initial_task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]

launch_run(instance)

# A new task definition is created
task_definitions = ecs.list_task_definitions()["taskDefinitionArns"]
assert len(task_definitions) == len(initial_task_definitions) + 1
task_definition_arn = list(set(task_definitions).difference(initial_task_definitions))[0]
task_definition = ecs.describe_task_definition(taskDefinition=task_definition_arn)
container_definitions = task_definition["taskDefinition"]["containerDefinitions"]

assert len(container_definitions) == 2
for container_definition in container_definitions:
if container_definition.get("name") == "run":
assert container_definition.get("dependsOn")

0 comments on commit 5765c38

Please sign in to comment.