Skip to content

Commit

Permalink
feat(dagster-k8s): add run id label to run/step workers (#7167)
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed Mar 23, 2022
1 parent b986894 commit 0c7f490
Show file tree
Hide file tree
Showing 5 changed files with 8 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ def _execute_step_k8s_job(
labels={
"dagster/job": execute_step_args.pipeline_origin.pipeline_name,
"dagster/op": step_key,
"dagster/run-id": execute_step_args.pipeline_run_id,
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def launch_run(self, context: LaunchRunContext) -> None:
env_vars=env_vars,
labels={
"dagster/job": pipeline_origin.pipeline_name,
"dagster/run-id": run.run_id,
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def launch_step(self, step_handler_context: StepHandlerContext):
labels={
"dagster/job": step_handler_context.execute_step_args.pipeline_origin.pipeline_name,
"dagster/op": step_key,
"dagster/run-id": step_handler_context.execute_step_args.pipeline_run_id,
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ def _launch_k8s_job_with_args(self, job_name, args, run, pipeline_origin):
user_defined_k8s_config=user_defined_k8s_config,
labels={
"dagster/job": pipeline_origin.pipeline_name,
"dagster/run-id": run.run_id,
},
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,13 +619,15 @@ def test_construct_dagster_k8s_job_with_labels():
labels={
"dagster/job": "some_job",
"dagster/op": "some_op",
"dagster/run-id": "some_run_id",
},
).to_dict()
expected_labels1 = dict(
**common_labels,
**{
"dagster/job": "some_job",
"dagster/op": "some_op",
"dagster/run-id": "some_run_id",
},
)

Expand All @@ -642,6 +644,7 @@ def test_construct_dagster_k8s_job_with_labels():
labels={
"dagster/job": "long_job_name_64____01234567890123456789012345678901234567890123",
"dagster/op": "long_op_name_64_____01234567890123456789012345678901234567890123",
"dagster/run_id": "long_run_id_64______01234567890123456789012345678901234567890123",
},
).to_dict()
expected_labels2 = dict(
Expand All @@ -650,6 +653,7 @@ def test_construct_dagster_k8s_job_with_labels():
# The last character should be truncated.
"dagster/job": "long_job_name_64____0123456789012345678901234567890123456789012",
"dagster/op": "long_op_name_64_____0123456789012345678901234567890123456789012",
"dagster/run_id": "long_run_id_64______0123456789012345678901234567890123456789012",
},
)
assert job2["metadata"]["labels"] == expected_labels2
Expand Down

0 comments on commit 0c7f490

Please sign in to comment.