Skip to content

Commit

Permalink
Allow the use of environment variables in k8s_job_executor (#7031)
Browse files Browse the repository at this point in the history
Duplicate of #6992, I think I couldn't push to that since it was a fork
  • Loading branch information
johannkm committed Mar 14, 2022
1 parent b60c070 commit 8e6bccd
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 14 deletions.
2 changes: 2 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def k8s_job_executor(init_context: InitExecutorContext) -> Executor:
service_account_name: ...
env_config_maps: ...
env_secrets: ...
env_vars: ...
job_image: ... # leave out if using userDeployments
Configuration set on the Kubernetes Jobs and Pods created by the `K8sRunLauncher` will also be
Expand Down Expand Up @@ -89,6 +90,7 @@ def k8s_job_executor(init_context: InitExecutorContext) -> Executor:
),
env_config_maps=run_launcher.env_config_maps + (exc_cfg.get("env_config_maps") or []),
env_secrets=run_launcher.env_secrets + (exc_cfg.get("env_secrets") or []),
env_vars=run_launcher.env_vars + (exc_cfg.get("env_vars") or []),
volume_mounts=run_launcher.volume_mounts + (exc_cfg.get("volume_mounts") or []),
volumes=run_launcher.volumes + (exc_cfg.get("volumes") or []),
labels=merge_dicts(run_launcher.labels, exc_cfg.get("labels", {})),
Expand Down
4 changes: 4 additions & 0 deletions python_modules/libraries/dagster-k8s/dagster_k8s/launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def volume_mounts(self):
def volumes(self):
return self._volumes

@property
def env_vars(self):
return self._env_vars

@property
def labels(self):
return self._labels
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import pytest

from dagster.core.test_utils import environ, instance_for_test

MINIMAL_KUBECONFIG_CONTENT = """
apiVersion: v1
kind: Config
Expand All @@ -25,3 +27,27 @@ def kubeconfig_file(tmp_path):
config_path = dir_path / "config"
config_path.write_text(MINIMAL_KUBECONFIG_CONTENT)
return str(config_path)


@pytest.fixture
def k8s_run_launcher_instance(kubeconfig_file): # pylint: disable=redefined-outer-name
with environ({"BAR_TEST": "bar"}):
with instance_for_test(
overrides={
"run_launcher": {
"class": "K8sRunLauncher",
"module": "dagster_k8s",
"config": {
"service_account_name": "dagit-admin",
"instance_config_map": "dagster-instance",
"postgres_password_secret": "dagster-postgresql-secret",
"dagster_home": "/opt/dagster/dagster_home",
"job_image": "fake_job_image",
"load_incluster_config": False,
"kubeconfig_file": kubeconfig_file,
"env_vars": ["BAR_TEST"],
},
},
}
) as instance:
yield instance
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# pylint: disable=redefined-outer-name
import json
from unittest import mock

Expand All @@ -7,11 +8,13 @@

from dagster import execute_pipeline, pipeline, solid
from dagster.core.definitions.mode import ModeDefinition
from dagster.core.definitions.pipeline_base import InMemoryPipeline
from dagster.core.definitions.reconstructable import reconstructable
from dagster.core.errors import DagsterUnmetExecutorRequirementsError
from dagster.core.executor.init import InitExecutorContext
from dagster.core.executor.step_delegating.step_handler.base import StepHandlerContext
from dagster.core.storage.fs_io_manager import fs_io_manager
from dagster.core.test_utils import create_run_for_test, instance_for_test
from dagster.core.test_utils import create_run_for_test, environ, instance_for_test
from dagster.grpc.types import ExecuteStepArgs


Expand Down Expand Up @@ -40,6 +43,21 @@ def test_requires_k8s_launcher_fail():
execute_pipeline(reconstructable(bar), instance=instance)


def test_executor_init(k8s_run_launcher_instance):
executor = k8s_job_executor.executor_creation_fn(
InitExecutorContext(
job=InMemoryPipeline(bar),
executor_def=k8s_job_executor,
executor_config={"env_vars": ["FOO_TEST"], "retries": {}},
instance=k8s_run_launcher_instance,
)
)

# env vars from both launcher and the executor
# pylint: disable=protected-access
assert executor._step_handler._job_config.env_vars == ["BAR_TEST", "FOO_TEST"]


def test_step_handler(kubeconfig_file):

mock_k8s_client_batch_api = mock.MagicMock()
Expand Down Expand Up @@ -78,7 +96,9 @@ def test_step_handler_user_defined_config(kubeconfig_file):

mock_k8s_client_batch_api = mock.MagicMock()
handler = K8sStepHandler(
job_config=DagsterK8sJobConfig(instance_config_map="foobar", job_image="bizbuz"),
job_config=DagsterK8sJobConfig(
instance_config_map="foobar", job_image="bizbuz", env_vars=["FOO_TEST"]
),
job_namespace="foo",
load_incluster_config=False,
kubeconfig_file=kubeconfig_file,
Expand All @@ -96,20 +116,21 @@ def test_step_handler_user_defined_config(kubeconfig_file):
user_defined_k8s_config_json = json.dumps(user_defined_k8s_config.to_dict())
tags = {"dagster-k8s/config": user_defined_k8s_config_json}

with instance_for_test() as instance:
run = create_run_for_test(
instance,
pipeline_name="bar",
)
handler.launch_step(
StepHandlerContext(
with environ({"FOO_TEST": "bar"}):
with instance_for_test() as instance:
run = create_run_for_test(
instance,
ExecuteStepArgs(
reconstructable(bar).get_python_origin(), run.run_id, ["foo_solid"]
),
{"foo_solid": tags},
pipeline_name="bar",
)
handler.launch_step(
StepHandlerContext(
instance,
ExecuteStepArgs(
reconstructable(bar).get_python_origin(), run.run_id, ["foo_solid"]
),
{"foo_solid": tags},
)
)
)

# Check that user defined k8s config was passed down to the k8s job.
mock_method_calls = mock_k8s_client_batch_api.method_calls
Expand All @@ -119,6 +140,8 @@ def test_step_handler_user_defined_config(kubeconfig_file):
assert kwargs["body"].spec.template.spec.containers[0].image == "bizbuz"
job_resources = kwargs["body"].spec.template.spec.containers[0].resources
assert job_resources.to_dict() == expected_resources
assert kwargs["body"].spec.template.spec.containers[0].env[1].name == "FOO_TEST"
assert kwargs["body"].spec.template.spec.containers[0].env[1].value == "bar"


def test_step_handler_image_override(kubeconfig_file):
Expand Down

0 comments on commit 8e6bccd

Please sign in to comment.