Skip to content

Commit

Permalink
add an "environment" key to EcsRunLauncher / EcsContainerContext that…
Browse files Browse the repository at this point in the history
… sets env vars (#8243)

Summary:
Lets you pass in a set of key-value pairs to be injected into the container.
  • Loading branch information
gibsondan committed Jun 8, 2022
1 parent 9567e1e commit ff3392a
Show file tree
Hide file tree
Showing 18 changed files with 259 additions and 43 deletions.
3 changes: 2 additions & 1 deletion helm/dagster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ runLauncher:
# Example:
#
# envVars:
# - "ENV_VAR"
# - "FOO_ENV_VAR" (Will pull the value of FOO_ENV_VAR from the calling process)
# - "BAR_ENV_VAR=baz_value" (Will set the value of BAR_ENV_VAR to baz_value)
envVars: []

# Additional volumes that should be included in the Job's Pod. See:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ def helm_chart_for_k8s_run_launcher(
"envConfigMaps": [{"name": TEST_CONFIGMAP_NAME}]
+ ([{"name": TEST_AWS_CONFIGMAP_NAME}] if not IS_BUILDKITE else []),
"envSecrets": [{"name": TEST_SECRET_NAME}],
"envVars": ["BUILDKITE"],
"envVars": (["BUILDKITE=1"] if os.getenv("BUILDKITE") else []),
"imagePullPolicy": image_pull_policy(),
"volumeMounts": [
{
Expand Down
14 changes: 13 additions & 1 deletion python_modules/dagster/dagster/core/utils.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import os
import random
import string
import uuid
import warnings
from collections import OrderedDict
from typing import Union
from typing import Tuple, Union, cast

import toposort as toposort_

Expand Down Expand Up @@ -80,3 +81,14 @@ def check_dagster_package_version(library_name, library_version):
__version__, library_name, library_version
)
warnings.warn(message)


def parse_env_var(env_var_str: str) -> Tuple[str, str]:
if "=" in env_var_str:
split = env_var_str.split("=", maxsplit=1)
return (split[0], split[1])
else:
env_var_value = os.getenv(env_var_str)
if env_var_value == None:
raise Exception(f"Tried to load environment variable {env_var_str}, but it was not set")
return (env_var_str, cast(str, env_var_value))
28 changes: 28 additions & 0 deletions python_modules/dagster/dagster_tests/core_tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import pytest

from dagster.core.test_utils import environ
from dagster.core.utils import parse_env_var


def test_parse_env_var_no_equals():

env_var = "FOO_ENV_VAR"

with pytest.raises(
Exception, match="Tried to load environment variable FOO_ENV_VAR, but it was not set"
):
parse_env_var(env_var)

with environ({"FOO_ENV_VAR": "BAR_VALUE"}):
assert parse_env_var(env_var) == ("FOO_ENV_VAR", "BAR_VALUE")


def test_parse_env_var_equals():
env_var = "FOO_ENV_VAR=BAR_VALUE"
assert parse_env_var(env_var) == ("FOO_ENV_VAR", "BAR_VALUE")


def test_parse_env_var_containing_equals():
env_var = "FOO_ENV_VAR=HERE_COMES_THE_EQUALS=THERE_IT_WENT"

assert parse_env_var(env_var) == ("FOO_ENV_VAR", "HERE_COMES_THE_EQUALS=THERE_IT_WENT")
Original file line number Diff line number Diff line change
@@ -1,25 +1,36 @@
from typing import TYPE_CHECKING, Any, List, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, NamedTuple, Optional

from dagster import Array, Field, Noneable, Shape, StringSource
from dagster import _check as check
from dagster.config.validate import process_config
from dagster.core.errors import DagsterInvalidConfigError
from dagster.core.storage.pipeline_run import PipelineRun
from dagster.core.utils import parse_env_var
from dagster.utils import merge_dicts

from ..secretsmanager import get_tagged_secrets

if TYPE_CHECKING:
from . import EcsRunLauncher

# Config shared between EcsRunLauncher and EcsContainerContext
SHARED_ECS_SCHEMA = {
"env_vars": Field(
[StringSource],
is_required=False,
description="List of environment variable names to include in the ECS task. "
"Each can be of the form KEY=VALUE or just KEY (in which case the value will be pulled "
"from the current process)",
),
}

ECS_CONTAINER_CONTEXT_SCHEMA = {
"secrets": Field(
Noneable(Array(Shape({"name": StringSource, "valueFrom": StringSource}))),
is_required=False,
description=(
"An array of AWS Secrets Manager secrets. These secrets will "
"be mounted as environment variabls in the container. See "
"be mounted as environment variables in the container. See "
"https://docs.aws.amazon.com/AmazonECS/latest/APIReference/API_Secret.html."
),
),
Expand All @@ -31,13 +42,18 @@
"environment variables in the container."
),
),
**SHARED_ECS_SCHEMA,
}


class EcsContainerContext(
NamedTuple(
"_EcsContainerContext",
[("secrets", List[Any]), ("secrets_tags", List[str])],
[
("secrets", List[Any]),
("secrets_tags", List[str]),
("env_vars", List[str]),
],
)
):
"""Encapsulates configuration that can be applied to an ECS task running Dagster code."""
Expand All @@ -46,17 +62,20 @@ def __new__(
cls,
secrets: Optional[List[Any]] = None,
secrets_tags: Optional[List[str]] = None,
env_vars: Optional[List[str]] = None,
):
return super(EcsContainerContext, cls).__new__(
cls,
secrets=check.opt_list_param(secrets, "secrets"),
secrets_tags=check.opt_list_param(secrets_tags, "secrets_tags"),
env_vars=check.opt_list_param(env_vars, "env_vars"),
)

def merge(self, other: "EcsContainerContext") -> "EcsContainerContext":
return EcsContainerContext(
secrets=other.secrets + self.secrets,
secrets_tags=other.secrets_tags + self.secrets_tags,
env_vars=other.env_vars + self.env_vars,
)

def get_secrets_dict(self, secrets_manager) -> Mapping[str, str]:
Expand All @@ -65,6 +84,10 @@ def get_secrets_dict(self, secrets_manager) -> Mapping[str, str]:
{secret["name"]: secret["valueFrom"] for secret in self.secrets},
)

def get_environment_dict(self) -> Dict[str, str]:
parsed_env_var_tuples = [parse_env_var(env_var) for env_var in self.env_vars]
return {env_var_tuple[0]: env_var_tuple[1] for env_var_tuple in parsed_env_var_tuples}

@staticmethod
def create_for_run(pipeline_run: PipelineRun, run_launcher: Optional["EcsRunLauncher"]):
context = EcsContainerContext()
Expand All @@ -73,6 +96,7 @@ def create_for_run(pipeline_run: PipelineRun, run_launcher: Optional["EcsRunLaun
EcsContainerContext(
secrets=run_launcher.secrets,
secrets_tags=run_launcher.secrets_tags,
env_vars=run_launcher.env_vars,
)
)

Expand Down Expand Up @@ -112,4 +136,5 @@ def create_from_config(run_container_context):
return EcsContainerContext(
secrets=processed_context_value.get("secrets"),
secrets_tags=processed_context_value.get("secrets_tags"),
env_vars=processed_context_value.get("env_vars"),
)
17 changes: 14 additions & 3 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from dagster.serdes import ConfigurableClass

from ..secretsmanager import get_secrets_from_arns
from .container_context import EcsContainerContext
from .container_context import SHARED_ECS_SCHEMA, EcsContainerContext
from .tasks import default_ecs_task_definition, default_ecs_task_metadata
from .utils import sanitize_family

Expand Down Expand Up @@ -47,6 +47,7 @@ def __init__(
container_name="run",
secrets=None,
secrets_tag="dagster",
env_vars=None,
include_sidecars=False,
):
self._inst_data = inst_data
Expand All @@ -60,6 +61,8 @@ def __init__(

self.secrets = check.opt_list_param(secrets, "secrets")

self.env_vars = check.opt_list_param(env_vars, "env_vars")

if self.secrets and all(isinstance(secret, str) for secret in self.secrets):
warnings.warn(
"Setting secrets as a list of ARNs is deprecated. "
Expand Down Expand Up @@ -146,6 +149,7 @@ def config_type(cls):
"Defaults to False."
),
),
**SHARED_ECS_SCHEMA,
}

@staticmethod
Expand Down Expand Up @@ -312,6 +316,11 @@ def _task_definition(self, family, metadata, image, container_context):
task_definition = self.ecs.describe_task_definition(taskDefinition=self.task_definition)
return task_definition["taskDefinition"]

environment = [
{"name": key, "value": value}
for key, value in container_context.get_environment_dict().items()
]

secrets = container_context.get_secrets_dict(self.secrets_manager)
secrets_definition = (
{"secrets": [{"name": key, "valueFrom": value} for key, value in secrets.items()]}
Expand All @@ -325,7 +334,7 @@ def _task_definition(self, family, metadata, image, container_context):
"taskDefinition"
]
secrets = secrets_definition.get("secrets", [])
if self._reuse_task_definition(task_definition, metadata, image, secrets):
if self._reuse_task_definition(task_definition, metadata, image, secrets, environment):
return task_definition

return default_ecs_task_definition(
Expand All @@ -334,11 +343,12 @@ def _task_definition(self, family, metadata, image, container_context):
metadata,
image,
self.container_name,
environment=environment,
secrets=secrets_definition,
include_sidecars=self.include_sidecars,
)

def _reuse_task_definition(self, task_definition, metadata, image, secrets):
def _reuse_task_definition(self, task_definition, metadata, image, secrets, environment):
container_definitions_match = False
task_definitions_match = False

Expand All @@ -349,6 +359,7 @@ def _reuse_task_definition(self, task_definition, metadata, image, secrets):
container_definition.get("image") == image
and container_definition.get("name") == self.container_name
and container_definition.get("secrets") == secrets
and ((not environment) or container_definition.get("environment") == environment)
):
container_definitions_match = True

Expand Down
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-aws/dagster_aws/ecs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def default_ecs_task_definition(
metadata,
image,
container_name,
environment,
command=None,
secrets=None,
include_sidecars=False,
Expand Down Expand Up @@ -71,6 +72,7 @@ def default_ecs_task_definition(
"entryPoint": [],
"command": command if command else [],
},
({"environment": environment} if environment else {}),
secrets or {},
{} if include_sidecars else {"dependsOn": []},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ def container_context_config(configured_secret):
}
],
"secrets_tags": ["dagster"],
"env_vars": ["FOO_ENV_VAR=BAR_VALUE"],
}
}

Expand All @@ -289,6 +290,7 @@ def other_container_context_config(other_configured_secret):
}
],
"secrets_tags": ["other_secret_tag"],
"env_vars": ["OTHER_FOO_ENV_VAR"],
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dagster_aws.ecs.container_context import EcsContainerContext

from dagster.core.errors import DagsterInvalidConfigError
from dagster.core.test_utils import environ


@pytest.fixture
Expand All @@ -24,6 +25,7 @@ def other_secrets_container_context(other_container_context_config):
def test_empty_container_context(empty_container_context):
assert empty_container_context.secrets == []
assert empty_container_context.secrets_tags == []
assert empty_container_context.env_vars == []


def test_invalid_config():
Expand All @@ -46,12 +48,22 @@ def test_merge(
{"name": "HELLO", "valueFrom": configured_secret.arn + "/hello"},
]
assert secrets_container_context.secrets_tags == ["dagster"]
assert secrets_container_context.get_environment_dict() == {"FOO_ENV_VAR": "BAR_VALUE"}

assert other_secrets_container_context.secrets == [
{"name": "GOODBYE", "valueFrom": other_configured_secret.arn + "/goodbye"},
]

assert other_secrets_container_context.secrets_tags == ["other_secret_tag"]
with pytest.raises(
Exception, match="Tried to load environment variable OTHER_FOO_ENV_VAR, but it was not set"
):
other_secrets_container_context.get_environment_dict()

with environ({"OTHER_FOO_ENV_VAR": "OTHER_BAR_VALUE"}):
assert other_secrets_container_context.get_environment_dict() == {
"OTHER_FOO_ENV_VAR": "OTHER_BAR_VALUE"
}

merged = other_secrets_container_context.merge(secrets_container_context)

Expand All @@ -62,6 +74,17 @@ def test_merge(

assert merged.secrets_tags == ["dagster", "other_secret_tag"]

with pytest.raises(
Exception, match="Tried to load environment variable OTHER_FOO_ENV_VAR, but it was not set"
):
merged.get_environment_dict()

with environ({"OTHER_FOO_ENV_VAR": "OTHER_BAR_VALUE"}):
assert merged.get_environment_dict() == {
"FOO_ENV_VAR": "BAR_VALUE",
"OTHER_FOO_ENV_VAR": "OTHER_BAR_VALUE",
}

assert (
empty_container_context.merge(secrets_container_context).secrets
== secrets_container_context.secrets
Expand Down

0 comments on commit ff3392a

Please sign in to comment.