From 5b5399560151e444d4b49cb3d005e69afdb9d7d4 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 27 Aug 2025 17:13:42 +0000 Subject: [PATCH 1/8] Add ability to add vai pipeline labels --- python/gigl/common/services/vertex_ai.py | 9 ++++ .../kubeflow/kfp_orchestrator.py | 4 +- python/gigl/orchestration/kubeflow/runner.py | 47 ++++++++++++++++++- .../common/services/vertex_ai_test.py | 6 ++- .../orchestration/kubeflow/kfp_runner_test.py | 39 ++++++++++++++- 5 files changed, 99 insertions(+), 6 deletions(-) diff --git a/python/gigl/common/services/vertex_ai.py b/python/gigl/common/services/vertex_ai.py index 4eb8dd75d..04e285c4a 100644 --- a/python/gigl/common/services/vertex_ai.py +++ b/python/gigl/common/services/vertex_ai.py @@ -252,6 +252,7 @@ def run_pipeline( template_path: Uri, run_keyword_args: dict[str, str], job_id: Optional[str] = None, + labels: Optional[dict[str, str]] = None, experiment: Optional[str] = None, ) -> aiplatform.PipelineJob: """ @@ -267,6 +268,7 @@ def run_pipeline( Note: The pipeline_name and display_name are *not* the same. Note: pipeline_name comes is defined in the `template_path` and ultimately comes from Python pipeline definition. If provided, must be unique. + labels (Optional[dict[str, str]]): Labels to associate with the run. experiment (Optional[str]): The name of the experiment to associate the run with. Returns: The PipelineJob created. @@ -278,9 +280,16 @@ def run_pipeline( job_id=job_id, project=self._project, location=self._location, + labels=labels, ) job.submit(service_account=self._service_account, experiment=experiment) logger.info(f"Created run: {job.resource_name}") + if experiment: + logger.info( + f"Associated run {job.resource_name} with experiment: {experiment}" + ) + if labels: + logger.info(f"Associated run {job.resource_name} with labels: {labels}") return job diff --git a/python/gigl/orchestration/kubeflow/kfp_orchestrator.py b/python/gigl/orchestration/kubeflow/kfp_orchestrator.py index 8e4f56977..3d8e7bb3d 100644 --- a/python/gigl/orchestration/kubeflow/kfp_orchestrator.py +++ b/python/gigl/orchestration/kubeflow/kfp_orchestrator.py @@ -116,6 +116,7 @@ def run( start_at: str = DEFAULT_START_AT_COMPONENT, stop_after: Optional[str] = None, compiled_pipeline_path: Uri = DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH, + labels: Optional[dict[str, str]] = None, ) -> aiplatform.PipelineJob: """ Runs the GiGL Kubeflow pipeline. @@ -127,7 +128,7 @@ def run( start_at (str): Component to start the pipeline at. Defaults to 'config_populator'. stop_after (Optional[str]): Component to stop the pipeline after. Defaults to None i.e. run entire pipeline. compiled_pipeline_path (Uri): Path to the compiled pipeline YAML file. - + labels (Optional[dict[str, str]]): Labels to associate with the run. Returns: aiplatform.PipelineJob: The created pipeline job. """ @@ -161,6 +162,7 @@ def run( template_path=compiled_pipeline_path, run_keyword_args=run_keyword_args, job_id=str(applied_task_identifier).replace("_", "-"), + labels=labels, ) return run diff --git a/python/gigl/orchestration/kubeflow/runner.py b/python/gigl/orchestration/kubeflow/runner.py index 460790f89..d9729bd62 100644 --- a/python/gigl/orchestration/kubeflow/runner.py +++ b/python/gigl/orchestration/kubeflow/runner.py @@ -27,6 +27,9 @@ --additional_job_args=split_generator.some_other_arg='value' This passes additional_spark35_jar_file_uris="gs://path/to/jar" to subgraph_sampler at compile time and some_other_arg="value" to split_generator at compile time. + --labels: Labels to associate with the pipeline run. + The value has to be of form: "=". + Example: --labels=gigl-integration-test=true --labels=user=me You can alternatively run_no_compile if you have a precompiled pipeline somewhere. python gigl.orchestration.kubeflow.runner --action=run_no_compile ...args @@ -191,7 +194,25 @@ def _parse_additional_job_args( return dict(result) # Ensure the default dict is converted to a regular dict -if __name__ == "__main__": +def _parse_labels(labels: list[str]) -> dict[str, str]: + """ + Parse the labels for the pipeline run. + Args: + labels list[str]: Each element is of form: "=" + Example: ["gigl-integration-test=true", "user=me"]. + Returns dict[str, str]: The parsed labels. + """ + result: dict[str, str] = {} + for label in labels: + label_name, label_value = label.split("=", 1) + result[label_name] = label_value + return result + + +def _get_parser() -> argparse.ArgumentParser: + """ + Get the parser for the runner.py script. + """ parser = argparse.ArgumentParser( description="Create the KF pipeline for GNN preprocessing/training/inference" ) @@ -278,12 +299,27 @@ def _parse_additional_job_args( some_other_arg="value" to split_generator at compile time. """, ) + parser.add_argument( + "--labels", + action="append", + default=[], + help="""Labels to associate with the pipeline run, of the form: --labels=label_name=label_value. + Only applicable for run and run_no_compile actions. + Example: --labels=gigl-integration-test=true --labels=user=me + Which will taget the pipeline run with gigl-integration-test=true and user=me. + """, + ) + + return parser + +if __name__ == "__main__": + parser = _get_parser() args = parser.parse_args() logger.info(f"Beginning runner.py with args: {args}") parsed_additional_job_args = _parse_additional_job_args(args.additional_job_args) - + parsed_labels = _parse_labels(args.labels) # Assert correctness of args _assert_required_flags(args) @@ -342,12 +378,19 @@ def _parse_additional_job_args( start_at=args.start_at, stop_after=args.stop_after, compiled_pipeline_path=compiled_pipeline_path, + labels=parsed_labels if parsed_labels else None, ) if args.wait: orchestrator.wait_for_completion(run=run) elif args.action == Action.COMPILE: + if parsed_labels: + raise ValueError( + "Labels are not supported for the compile action. " + "Please use the run action to run a pipeline with labels." + f"Labels provided: {parsed_labels}" + ) pipeline_bundle_path = KfpOrchestrator.compile( cuda_container_image=cuda_container_image, cpu_container_image=cpu_container_image, diff --git a/python/tests/integration/common/services/vertex_ai_test.py b/python/tests/integration/common/services/vertex_ai_test.py index a35a957b4..199032e12 100644 --- a/python/tests/integration/common/services/vertex_ai_test.py +++ b/python/tests/integration/common/services/vertex_ai_test.py @@ -34,13 +34,13 @@ def get_pipeline() -> int: class VertexAIPipelineIntegrationTest(unittest.TestCase): - def test_launch_job(self): + def _test_launch_job(self): resource_config = get_resource_config() project = resource_config.project location = resource_config.region service_account = resource_config.service_account_email staging_bucket = resource_config.temp_assets_regional_bucket_path.uri - job_name = f"GiGL-Intergration-Test-{uuid.uuid4()}" + job_name = f"GiGL-Integration-Test-{uuid.uuid4()}" container_uri = "condaforge/miniforge3:25.3.0-1" command = ["python", "-c", "import logging; logging.info('Hello, World!')"] @@ -73,6 +73,7 @@ def test_run_pipeline(self): template_path=UriFactory.create_uri(pipeline_def), run_keyword_args={}, experiment="gigl-integration-tests", + labels={"gigl-integration-test": "true"}, ) # Wait for the run to complete, 30 minutes is probably too long but # we don't want this test to be flaky. @@ -83,6 +84,7 @@ def test_run_pipeline(self): # Also verify that we can fetch a pipeline. run = ps.get_pipeline_job_from_job_name(job.name) self.assertEqual(run.resource_name, job.resource_name) + self.assertEqual(run.labels["gigl-integration-test"], "true") if __name__ == "__main__": diff --git a/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py b/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py index 8589b17d3..9cd246160 100644 --- a/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py +++ b/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py @@ -1,7 +1,11 @@ import unittest from gigl.common.logger import Logger -from gigl.orchestration.kubeflow.runner import _parse_additional_job_args +from gigl.orchestration.kubeflow.runner import ( + _get_parser, + _parse_additional_job_args, + _parse_labels, +) from gigl.src.common.constants.components import GiGLComponents logger = Logger() @@ -29,6 +33,39 @@ def test_parse_additional_job_args( parsed_args = _parse_additional_job_args(args) self.assertEqual(parsed_args, expected_parsed_args) + def test_parse_labels(self): + args = ["gigl-integration-test=true", "user=me"] + expected_parsed_args = {"gigl-integration-test": "true", "user": "me"} + parsed_args = _parse_labels(args) + self.assertEqual(parsed_args, expected_parsed_args) + + def test_parse_args_from_cli(self): + parser = _get_parser() + args = parser.parse_args( + [ + "--action=run", # required arg - not tested here + "--additional_job_args=subgraph_sampler.additional_spark35_jar_file_uris=gs://path/to/jar", + "--additional_job_args=subgraph_sampler.arg_2=value=10.243,123", + "--additional_job_args=split_generator.some_other_arg=value", + "--labels=gigl-integration-test=true", + "--labels=user=me", + ] + ) + parsed_args = _parse_additional_job_args(args.additional_job_args) + parsed_labels = _parse_labels(args.labels) + expected_parsed_args = { + GiGLComponents.SubgraphSampler: { + "additional_spark35_jar_file_uris": "gs://path/to/jar", + "arg_2": "value=10.243,123", + }, + GiGLComponents.SplitGenerator: { + "some_other_arg": "value", + }, + } + expected_parsed_labels = {"gigl-integration-test": "true", "user": "me"} + self.assertEqual(parsed_args, expected_parsed_args) + self.assertEqual(parsed_labels, expected_parsed_labels) + if __name__ == "__main__": unittest.main() From 0a7b8c679701917d3d27e908bbc66880f3849246 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 27 Aug 2025 17:14:03 +0000 Subject: [PATCH 2/8] fix --- python/tests/integration/common/services/vertex_ai_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/tests/integration/common/services/vertex_ai_test.py b/python/tests/integration/common/services/vertex_ai_test.py index 199032e12..30477a466 100644 --- a/python/tests/integration/common/services/vertex_ai_test.py +++ b/python/tests/integration/common/services/vertex_ai_test.py @@ -34,7 +34,7 @@ def get_pipeline() -> int: class VertexAIPipelineIntegrationTest(unittest.TestCase): - def _test_launch_job(self): + def test_launch_job(self): resource_config = get_resource_config() project = resource_config.project location = resource_config.region From d49d86400da266ad6cd55c698aa7b1503e210fbb Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 27 Aug 2025 20:45:02 +0000 Subject: [PATCH 3/8] update Makefile --- Makefile | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/Makefile b/Makefile index 81e31c87e..02ea5bdb4 100644 --- a/Makefile +++ b/Makefile @@ -314,6 +314,9 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images --job_name='$${job_name}' \ --start_at='config_populator' \ --pipeline_tag=$(GIT_HASH) \ + --labels="gigl_commit=$(GIT_HASH)" \ + --labels="gigl_branch=$(GIT_BRANCH)" \ + --labels="gigl_version=$(GIGL_VERSION)" \ --task_config_uri='$${task_config_uris[$$i]}' \ --resource_config_uri='$${resource_config_uris[$$i]}'"; \ echo "Running: $$CMD"; \ @@ -454,6 +457,9 @@ run_dev_gnn_kubeflow_pipeline: $(if $(compiled_pipeline_path), _skip_build_deps, --task_config_uri=$(task_config_uri) \ --resource_config_uri=$(resource_config_uri) \ --pipeline_tag=$(GIT_HASH) \ + --labels="gigl_commit=$(GIT_HASH)" \ + --labels="gigl_branch=$(GIT_BRANCH)" \ + --labels="gigl_version=$(GIGL_VERSION)" \ $(if $(compiled_pipeline_path),--compiled_pipeline_path=$(compiled_pipeline_path)) \ From 2226af386d01875471cecbd793d01064160d7d86 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 27 Aug 2025 22:03:17 +0000 Subject: [PATCH 4/8] minor refactor --- python/gigl/orchestration/kubeflow/runner.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/python/gigl/orchestration/kubeflow/runner.py b/python/gigl/orchestration/kubeflow/runner.py index d9729bd62..af9e1ca4e 100644 --- a/python/gigl/orchestration/kubeflow/runner.py +++ b/python/gigl/orchestration/kubeflow/runner.py @@ -155,6 +155,12 @@ def _assert_required_flags(args: argparse.Namespace) -> None: f"Missing values for the following flags for a {args.action} command: {missing_values}. " + f"All required flags are: {list(required_flags)}." ) + if args.action == Action.COMPILE and args.labels: + raise ValueError( + "Labels are not supported for the compile action. " + "Please use the run action to run a pipeline with labels." + f"Labels provided: {args.labels}" + ) logger = Logger() @@ -206,6 +212,7 @@ def _parse_labels(labels: list[str]) -> dict[str, str]: for label in labels: label_name, label_value = label.split("=", 1) result[label_name] = label_value + logger.info(f"Parsed labels: {result}") return result @@ -318,11 +325,12 @@ def _get_parser() -> argparse.ArgumentParser: args = parser.parse_args() logger.info(f"Beginning runner.py with args: {args}") - parsed_additional_job_args = _parse_additional_job_args(args.additional_job_args) - parsed_labels = _parse_labels(args.labels) # Assert correctness of args _assert_required_flags(args) + parsed_additional_job_args = _parse_additional_job_args(args.additional_job_args) + parsed_labels = _parse_labels(args.labels) + # Set the default value for compiled_pipeline_path as we cannot set it in argparse as # for compile action this is a required flag so we cannot provide it a default value. # See _assert_required_flags for more details. @@ -385,12 +393,6 @@ def _get_parser() -> argparse.ArgumentParser: orchestrator.wait_for_completion(run=run) elif args.action == Action.COMPILE: - if parsed_labels: - raise ValueError( - "Labels are not supported for the compile action. " - "Please use the run action to run a pipeline with labels." - f"Labels provided: {parsed_labels}" - ) pipeline_bundle_path = KfpOrchestrator.compile( cuda_container_image=cuda_container_image, cpu_container_image=cpu_container_image, From 0c5ae3bcc0d2e968c8be93c32eff49c1e1dc7f5b Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 27 Aug 2025 22:04:58 +0000 Subject: [PATCH 5/8] stop adding branch --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 02ea5bdb4..1d611f45b 100644 --- a/Makefile +++ b/Makefile @@ -300,6 +300,8 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images $(if $(filter ${should_wait_for_job_to_finish},true),--wait,) \ --job_name='$${job_name}' \ --start_at='config_populator' \ + --labels="gigl_commit=$(GIT_HASH)" \ + --labels="gigl_version=$(GIGL_VERSION)" \ --task_config_uri='$${task_config_uris[$$i]}' \ --resource_config_uri='$${resource_config_uris[$$i]}' \ --compiled_pipeline_path='$${compiled_pipeline_path}'"; \ @@ -315,7 +317,6 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images --start_at='config_populator' \ --pipeline_tag=$(GIT_HASH) \ --labels="gigl_commit=$(GIT_HASH)" \ - --labels="gigl_branch=$(GIT_BRANCH)" \ --labels="gigl_version=$(GIGL_VERSION)" \ --task_config_uri='$${task_config_uris[$$i]}' \ --resource_config_uri='$${resource_config_uris[$$i]}'"; \ @@ -458,7 +459,6 @@ run_dev_gnn_kubeflow_pipeline: $(if $(compiled_pipeline_path), _skip_build_deps, --resource_config_uri=$(resource_config_uri) \ --pipeline_tag=$(GIT_HASH) \ --labels="gigl_commit=$(GIT_HASH)" \ - --labels="gigl_branch=$(GIT_BRANCH)" \ --labels="gigl_version=$(GIGL_VERSION)" \ $(if $(compiled_pipeline_path),--compiled_pipeline_path=$(compiled_pipeline_path)) \ From 6100f5138e019414ba219ded64744bc198371d2b Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 27 Aug 2025 22:10:10 +0000 Subject: [PATCH 6/8] update CHANGELOG --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9eaa002fc..c911e7a69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Add earlier check if invalid Node IDs are provided to the partitioner +- Add support for labeling VAI pipeline runs + ### Changed ### Deprecated From 70262fa918c247e0890b39e8ee19d2110cf52cf1 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Thu, 28 Aug 2025 19:57:47 +0000 Subject: [PATCH 7/8] rename to run_labels --- Makefile | 12 ++++++------ python/gigl/orchestration/kubeflow/runner.py | 14 ++++++++------ .../unit/orchestration/kubeflow/kfp_runner_test.py | 6 +++--- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/Makefile b/Makefile index 1d611f45b..35aaf4e1e 100644 --- a/Makefile +++ b/Makefile @@ -300,8 +300,8 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images $(if $(filter ${should_wait_for_job_to_finish},true),--wait,) \ --job_name='$${job_name}' \ --start_at='config_populator' \ - --labels="gigl_commit=$(GIT_HASH)" \ - --labels="gigl_version=$(GIGL_VERSION)" \ + --run_labels="gigl_commit=$(GIT_HASH)" \ + --run_labels="gigl_version=$(GIGL_VERSION)" \ --task_config_uri='$${task_config_uris[$$i]}' \ --resource_config_uri='$${resource_config_uris[$$i]}' \ --compiled_pipeline_path='$${compiled_pipeline_path}'"; \ @@ -316,8 +316,8 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images --job_name='$${job_name}' \ --start_at='config_populator' \ --pipeline_tag=$(GIT_HASH) \ - --labels="gigl_commit=$(GIT_HASH)" \ - --labels="gigl_version=$(GIGL_VERSION)" \ + --run_labels="gigl_commit=$(GIT_HASH)" \ + --run_labels="gigl_version=$(GIGL_VERSION)" \ --task_config_uri='$${task_config_uris[$$i]}' \ --resource_config_uri='$${resource_config_uris[$$i]}'"; \ echo "Running: $$CMD"; \ @@ -458,8 +458,8 @@ run_dev_gnn_kubeflow_pipeline: $(if $(compiled_pipeline_path), _skip_build_deps, --task_config_uri=$(task_config_uri) \ --resource_config_uri=$(resource_config_uri) \ --pipeline_tag=$(GIT_HASH) \ - --labels="gigl_commit=$(GIT_HASH)" \ - --labels="gigl_version=$(GIGL_VERSION)" \ + --run_labels="gigl_commit=$(GIT_HASH)" \ + --run_labels="gigl_version=$(GIGL_VERSION)" \ $(if $(compiled_pipeline_path),--compiled_pipeline_path=$(compiled_pipeline_path)) \ diff --git a/python/gigl/orchestration/kubeflow/runner.py b/python/gigl/orchestration/kubeflow/runner.py index af9e1ca4e..fcb08409a 100644 --- a/python/gigl/orchestration/kubeflow/runner.py +++ b/python/gigl/orchestration/kubeflow/runner.py @@ -27,9 +27,10 @@ --additional_job_args=split_generator.some_other_arg='value' This passes additional_spark35_jar_file_uris="gs://path/to/jar" to subgraph_sampler at compile time and some_other_arg="value" to split_generator at compile time. - --labels: Labels to associate with the pipeline run. + --run_labels: Labels to associate with the pipeline run. The value has to be of form: "=". - Example: --labels=gigl-integration-test=true --labels=user=me + NOTE: unlike SharedResourceConfig.resource_labels, these are *only* applied to the vertex ai pipeline run. + Example: --run_labels=gigl-integration-test=true --run_labels=user=me You can alternatively run_no_compile if you have a precompiled pipeline somewhere. python gigl.orchestration.kubeflow.runner --action=run_no_compile ...args @@ -307,12 +308,13 @@ def _get_parser() -> argparse.ArgumentParser: """, ) parser.add_argument( - "--labels", + "--run_labels", action="append", default=[], - help="""Labels to associate with the pipeline run, of the form: --labels=label_name=label_value. + help="""Labels to associate with the pipeline run, of the form: --run_labels=label_name=label_value. Only applicable for run and run_no_compile actions. - Example: --labels=gigl-integration-test=true --labels=user=me + NOTE: unlike SharedResourceConfig.resource_labels, these are *only* applied to the vertex ai pipeline run. + Example: --run_labels=gigl-integration-test=true --run_labels=user=me Which will taget the pipeline run with gigl-integration-test=true and user=me. """, ) @@ -329,7 +331,7 @@ def _get_parser() -> argparse.ArgumentParser: _assert_required_flags(args) parsed_additional_job_args = _parse_additional_job_args(args.additional_job_args) - parsed_labels = _parse_labels(args.labels) + parsed_labels = _parse_labels(args.run_labels) # Set the default value for compiled_pipeline_path as we cannot set it in argparse as # for compile action this is a required flag so we cannot provide it a default value. diff --git a/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py b/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py index 9cd246160..75513abd8 100644 --- a/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py +++ b/python/tests/unit/orchestration/kubeflow/kfp_runner_test.py @@ -47,12 +47,12 @@ def test_parse_args_from_cli(self): "--additional_job_args=subgraph_sampler.additional_spark35_jar_file_uris=gs://path/to/jar", "--additional_job_args=subgraph_sampler.arg_2=value=10.243,123", "--additional_job_args=split_generator.some_other_arg=value", - "--labels=gigl-integration-test=true", - "--labels=user=me", + "--run_labels=gigl-integration-test=true", + "--run_labels=user=me", ] ) parsed_args = _parse_additional_job_args(args.additional_job_args) - parsed_labels = _parse_labels(args.labels) + parsed_labels = _parse_labels(args.run_labels) expected_parsed_args = { GiGLComponents.SubgraphSampler: { "additional_spark35_jar_file_uris": "gs://path/to/jar", From 766fbd5ed9d3012aef504af255bd30839725965b Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Fri, 29 Aug 2025 20:57:22 +0000 Subject: [PATCH 8/8] fix --- python/gigl/orchestration/kubeflow/runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/gigl/orchestration/kubeflow/runner.py b/python/gigl/orchestration/kubeflow/runner.py index fcb08409a..910a172e7 100644 --- a/python/gigl/orchestration/kubeflow/runner.py +++ b/python/gigl/orchestration/kubeflow/runner.py @@ -156,11 +156,11 @@ def _assert_required_flags(args: argparse.Namespace) -> None: f"Missing values for the following flags for a {args.action} command: {missing_values}. " + f"All required flags are: {list(required_flags)}." ) - if args.action == Action.COMPILE and args.labels: + if args.action == Action.COMPILE and args.run_labels: raise ValueError( "Labels are not supported for the compile action. " "Please use the run action to run a pipeline with labels." - f"Labels provided: {args.labels}" + f"Labels provided: {args.run_labels}" )