Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

- Add earlier check if invalid Node IDs are provided to the partitioner

- Add support for labeling VAI pipeline runs

### Changed

### Deprecated
Expand Down
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,8 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images
$(if $(filter ${should_wait_for_job_to_finish},true),--wait,) \
--job_name='$${job_name}' \
--start_at='config_populator' \
--run_labels="gigl_commit=$(GIT_HASH)" \
--run_labels="gigl_version=$(GIGL_VERSION)" \
--task_config_uri='$${task_config_uris[$$i]}' \
--resource_config_uri='$${resource_config_uris[$$i]}' \
--compiled_pipeline_path='$${compiled_pipeline_path}'"; \
Expand All @@ -314,6 +316,8 @@ _run_e2e_kfp_test: compile_jars push_new_docker_images
--job_name='$${job_name}' \
--start_at='config_populator' \
--pipeline_tag=$(GIT_HASH) \
--run_labels="gigl_commit=$(GIT_HASH)" \
--run_labels="gigl_version=$(GIGL_VERSION)" \
--task_config_uri='$${task_config_uris[$$i]}' \
--resource_config_uri='$${resource_config_uris[$$i]}'"; \
echo "Running: $$CMD"; \
Expand Down Expand Up @@ -454,6 +458,8 @@ run_dev_gnn_kubeflow_pipeline: $(if $(compiled_pipeline_path), _skip_build_deps,
--task_config_uri=$(task_config_uri) \
--resource_config_uri=$(resource_config_uri) \
--pipeline_tag=$(GIT_HASH) \
--run_labels="gigl_commit=$(GIT_HASH)" \
--run_labels="gigl_version=$(GIGL_VERSION)" \
Comment thread
kmontemayor2-sc marked this conversation as resolved.
$(if $(compiled_pipeline_path),--compiled_pipeline_path=$(compiled_pipeline_path)) \


Expand Down
9 changes: 9 additions & 0 deletions python/gigl/common/services/vertex_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ def run_pipeline(
template_path: Uri,
run_keyword_args: dict[str, str],
job_id: Optional[str] = None,
labels: Optional[dict[str, str]] = None,
experiment: Optional[str] = None,
) -> aiplatform.PipelineJob:
"""
Expand All @@ -267,6 +268,7 @@ def run_pipeline(
Note: The pipeline_name and display_name are *not* the same.
Note: pipeline_name comes is defined in the `template_path` and ultimately comes from Python pipeline definition.
If provided, must be unique.
labels (Optional[dict[str, str]]): Labels to associate with the run.
experiment (Optional[str]): The name of the experiment to associate the run with.
Returns:
The PipelineJob created.
Expand All @@ -278,9 +280,16 @@ def run_pipeline(
job_id=job_id,
project=self._project,
location=self._location,
labels=labels,
)
job.submit(service_account=self._service_account, experiment=experiment)
logger.info(f"Created run: {job.resource_name}")
if experiment:
logger.info(
f"Associated run {job.resource_name} with experiment: {experiment}"
)
if labels:
logger.info(f"Associated run {job.resource_name} with labels: {labels}")

return job

Expand Down
4 changes: 3 additions & 1 deletion python/gigl/orchestration/kubeflow/kfp_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ def run(
start_at: str = DEFAULT_START_AT_COMPONENT,
stop_after: Optional[str] = None,
compiled_pipeline_path: Uri = DEFAULT_KFP_COMPILED_PIPELINE_DEST_PATH,
labels: Optional[dict[str, str]] = None,
) -> aiplatform.PipelineJob:
"""
Runs the GiGL Kubeflow pipeline.
Expand All @@ -127,7 +128,7 @@ def run(
start_at (str): Component to start the pipeline at. Defaults to 'config_populator'.
stop_after (Optional[str]): Component to stop the pipeline after. Defaults to None i.e. run entire pipeline.
compiled_pipeline_path (Uri): Path to the compiled pipeline YAML file.

labels (Optional[dict[str, str]]): Labels to associate with the run.
Returns:
aiplatform.PipelineJob: The created pipeline job.
"""
Expand Down Expand Up @@ -161,6 +162,7 @@ def run(
template_path=compiled_pipeline_path,
run_keyword_args=run_keyword_args,
job_id=str(applied_task_identifier).replace("_", "-"),
labels=labels,
)
return run

Expand Down
53 changes: 50 additions & 3 deletions python/gigl/orchestration/kubeflow/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
--additional_job_args=split_generator.some_other_arg='value'
This passes additional_spark35_jar_file_uris="gs://path/to/jar" to subgraph_sampler at compile time and
some_other_arg="value" to split_generator at compile time.
--run_labels: Labels to associate with the pipeline run.
The value has to be of form: "<label_name>=<label_value>".
NOTE: unlike SharedResourceConfig.resource_labels, these are *only* applied to the vertex ai pipeline run.
Example: --run_labels=gigl-integration-test=true --run_labels=user=me

You can alternatively run_no_compile if you have a precompiled pipeline somewhere.
python gigl.orchestration.kubeflow.runner --action=run_no_compile ...args
Expand Down Expand Up @@ -152,6 +156,12 @@ def _assert_required_flags(args: argparse.Namespace) -> None:
f"Missing values for the following flags for a {args.action} command: {missing_values}. "
+ f"All required flags are: {list(required_flags)}."
)
if args.action == Action.COMPILE and args.run_labels:
raise ValueError(
"Labels are not supported for the compile action. "
"Please use the run action to run a pipeline with labels."
f"Labels provided: {args.run_labels}"
)


logger = Logger()
Expand Down Expand Up @@ -191,7 +201,26 @@ def _parse_additional_job_args(
return dict(result) # Ensure the default dict is converted to a regular dict


if __name__ == "__main__":
def _parse_labels(labels: list[str]) -> dict[str, str]:
"""
Parse the labels for the pipeline run.
Args:
labels list[str]: Each element is of form: "<label_name>=<label_value>"
Example: ["gigl-integration-test=true", "user=me"].
Returns dict[str, str]: The parsed labels.
"""
result: dict[str, str] = {}
for label in labels:
label_name, label_value = label.split("=", 1)
result[label_name] = label_value
logger.info(f"Parsed labels: {result}")
return result


def _get_parser() -> argparse.ArgumentParser:
"""
Get the parser for the runner.py script.
"""
parser = argparse.ArgumentParser(
description="Create the KF pipeline for GNN preprocessing/training/inference"
)
Expand Down Expand Up @@ -278,15 +307,32 @@ def _parse_additional_job_args(
some_other_arg="value" to split_generator at compile time.
""",
)
parser.add_argument(
"--run_labels",
action="append",
default=[],
help="""Labels to associate with the pipeline run, of the form: --run_labels=label_name=label_value.
Only applicable for run and run_no_compile actions.
NOTE: unlike SharedResourceConfig.resource_labels, these are *only* applied to the vertex ai pipeline run.
Example: --run_labels=gigl-integration-test=true --run_labels=user=me
Which will taget the pipeline run with gigl-integration-test=true and user=me.
""",
)

return parser


if __name__ == "__main__":
parser = _get_parser()
args = parser.parse_args()
logger.info(f"Beginning runner.py with args: {args}")

parsed_additional_job_args = _parse_additional_job_args(args.additional_job_args)

# Assert correctness of args
_assert_required_flags(args)

parsed_additional_job_args = _parse_additional_job_args(args.additional_job_args)
parsed_labels = _parse_labels(args.run_labels)

# Set the default value for compiled_pipeline_path as we cannot set it in argparse as
# for compile action this is a required flag so we cannot provide it a default value.
# See _assert_required_flags for more details.
Expand Down Expand Up @@ -342,6 +388,7 @@ def _parse_additional_job_args(
start_at=args.start_at,
stop_after=args.stop_after,
compiled_pipeline_path=compiled_pipeline_path,
labels=parsed_labels if parsed_labels else None,
)

if args.wait:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def test_launch_job(self):
location = resource_config.region
service_account = resource_config.service_account_email
staging_bucket = resource_config.temp_assets_regional_bucket_path.uri
job_name = f"GiGL-Intergration-Test-{uuid.uuid4()}"
job_name = f"GiGL-Integration-Test-{uuid.uuid4()}"
container_uri = "condaforge/miniforge3:25.3.0-1"
command = ["python", "-c", "import logging; logging.info('Hello, World!')"]

Expand Down Expand Up @@ -73,6 +73,7 @@ def test_run_pipeline(self):
template_path=UriFactory.create_uri(pipeline_def),
run_keyword_args={},
experiment="gigl-integration-tests",
labels={"gigl-integration-test": "true"},
)
# Wait for the run to complete, 30 minutes is probably too long but
# we don't want this test to be flaky.
Expand All @@ -83,6 +84,7 @@ def test_run_pipeline(self):
# Also verify that we can fetch a pipeline.
run = ps.get_pipeline_job_from_job_name(job.name)
self.assertEqual(run.resource_name, job.resource_name)
self.assertEqual(run.labels["gigl-integration-test"], "true")


if __name__ == "__main__":
Expand Down
39 changes: 38 additions & 1 deletion python/tests/unit/orchestration/kubeflow/kfp_runner_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import unittest

from gigl.common.logger import Logger
from gigl.orchestration.kubeflow.runner import _parse_additional_job_args
from gigl.orchestration.kubeflow.runner import (
_get_parser,
_parse_additional_job_args,
_parse_labels,
)
from gigl.src.common.constants.components import GiGLComponents

logger = Logger()
Expand Down Expand Up @@ -29,6 +33,39 @@ def test_parse_additional_job_args(
parsed_args = _parse_additional_job_args(args)
self.assertEqual(parsed_args, expected_parsed_args)

def test_parse_labels(self):
args = ["gigl-integration-test=true", "user=me"]
expected_parsed_args = {"gigl-integration-test": "true", "user": "me"}
parsed_args = _parse_labels(args)
self.assertEqual(parsed_args, expected_parsed_args)

def test_parse_args_from_cli(self):
parser = _get_parser()
args = parser.parse_args(
[
"--action=run", # required arg - not tested here
"--additional_job_args=subgraph_sampler.additional_spark35_jar_file_uris=gs://path/to/jar",
"--additional_job_args=subgraph_sampler.arg_2=value=10.243,123",
"--additional_job_args=split_generator.some_other_arg=value",
"--run_labels=gigl-integration-test=true",
"--run_labels=user=me",
]
)
parsed_args = _parse_additional_job_args(args.additional_job_args)
parsed_labels = _parse_labels(args.run_labels)
expected_parsed_args = {
GiGLComponents.SubgraphSampler: {
"additional_spark35_jar_file_uris": "gs://path/to/jar",
"arg_2": "value=10.243,123",
},
GiGLComponents.SplitGenerator: {
"some_other_arg": "value",
},
}
expected_parsed_labels = {"gigl-integration-test": "true", "user": "me"}
self.assertEqual(parsed_args, expected_parsed_args)
self.assertEqual(parsed_labels, expected_parsed_labels)


if __name__ == "__main__":
unittest.main()