Skip to content

Commit

Permalink
add custom k8s labels to dagster k8s jobs, not just dagster k8s pods (#…
Browse files Browse the repository at this point in the history
…8381)

Summary:
User reported that when they use container_context to override labels, it is only applied to the created pod, not the job. Fix that by passing it through to the job metadata as well.

Test Plan: New test case
  • Loading branch information
gibsondan committed Jun 14, 2022
1 parent a20b02b commit c97ddaa
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 11 deletions.
12 changes: 10 additions & 2 deletions python_modules/libraries/dagster-k8s/dagster_k8s/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,9 @@ def construct_dagster_k8s_job(
{"template": template},
)

user_defined_job_metadata = copy.deepcopy(user_defined_k8s_config.job_metadata)
user_defined_job_labels = user_defined_job_metadata.pop("labels", {})

job = k8s_model_from_dict(
kubernetes.client.V1Job,
merge_dicts(
Expand All @@ -700,8 +703,13 @@ def construct_dagster_k8s_job(
"api_version": "batch/v1",
"kind": "Job",
"metadata": merge_dicts(
user_defined_k8s_config.job_metadata,
{"name": job_name, "labels": dagster_labels},
user_defined_job_metadata,
{
"name": job_name,
"labels": merge_dicts(
dagster_labels, user_defined_job_labels, job_config.labels
),
},
),
"spec": job_spec_config,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
from dagster import __version__ as dagster_version
from dagster import graph
from dagster.core.test_utils import environ, remove_none_recursively
from dagster.utils import merge_dicts


def test_job_serialization():
Expand Down Expand Up @@ -629,6 +628,7 @@ def test_construct_dagster_k8s_job_with_labels():
).to_dict()
expected_labels1 = dict(
**common_labels,
**job_config_labels,
**{
"dagster/job": "some_job",
"dagster/op": "some_op",
Expand All @@ -637,10 +637,7 @@ def test_construct_dagster_k8s_job_with_labels():
)

assert job1["metadata"]["labels"] == expected_labels1
assert job1["spec"]["template"]["metadata"]["labels"] == merge_dicts(
expected_labels1,
job_config_labels,
)
assert job1["spec"]["template"]["metadata"]["labels"] == expected_labels1

job2 = construct_dagster_k8s_job(
cfg,
Expand All @@ -654,6 +651,7 @@ def test_construct_dagster_k8s_job_with_labels():
).to_dict()
expected_labels2 = dict(
**common_labels,
**job_config_labels,
**{
# The last character should be truncated.
"dagster/job": "long_job_name_64____0123456789012345678901234567890123456789012",
Expand All @@ -662,10 +660,7 @@ def test_construct_dagster_k8s_job_with_labels():
},
)
assert job2["metadata"]["labels"] == expected_labels2
assert job2["spec"]["template"]["metadata"]["labels"] == merge_dicts(
expected_labels2,
job_config_labels,
)
assert job2["spec"]["template"]["metadata"]["labels"] == expected_labels2


def test_sanitize_labels():
Expand Down

0 comments on commit c97ddaa

Please sign in to comment.