From ac763406b614a41eb0428eb26f3906d43107f922 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Tue, 4 Nov 2025 19:30:04 +0000 Subject: [PATCH 1/5] Properly set VAI env vars --- python/gigl/common/services/vertex_ai.py | 27 ++++++++++++++++--- .../gigl/src/inference/v2/glt_inferencer.py | 7 ++--- python/gigl/src/training/v1/trainer.py | 8 +++--- python/gigl/src/training/v2/glt_trainer.py | 7 ++--- 4 files changed, 35 insertions(+), 14 deletions(-) diff --git a/python/gigl/common/services/vertex_ai.py b/python/gigl/common/services/vertex_ai.py index 20a6d5540..62c81f881 100644 --- a/python/gigl/common/services/vertex_ai.py +++ b/python/gigl/common/services/vertex_ai.py @@ -93,7 +93,7 @@ class VertexAiJobConfig: container_uri: str command: list[str] args: Optional[list[str]] = None - environment_variables: Optional[list[dict[str, str]]] = None + environment_variables: Optional[list[tuple[str, str]]] = None machine_type: str = "n1-standard-4" accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" accelerator_count: int = 0 @@ -167,7 +167,7 @@ def launch_job(self, job_config: VertexAiJobConfig) -> aiplatform.CustomJob: datetime.datetime.now().strftime("%Y%m%d-%H%M%S"), "leader_worker_internal_ip.txt", ) - env_vars = [ + env_vars = _get_vai_env_vars(job_config.environment_variables) + [ env_var.EnvVar( name=LEADER_WORKER_INTERNAL_IP_FILE_PATH_ENV_KEY, value=leader_worker_internal_ip_file_path.uri, @@ -265,8 +265,16 @@ def launch_graph_store_job( storage_disk_spec = _create_disk_spec(storage_pool_job_config) compute_disk_spec = _create_disk_spec(compute_pool_job_config) - storage_container_spec = _create_container_spec(storage_pool_job_config) - compute_container_spec = _create_container_spec(compute_pool_job_config) + env_vars: list[env_var.EnvVar] = _get_vai_env_vars( + compute_pool_job_config.environment_variables + ) + + storage_container_spec = _create_container_spec( + storage_pool_job_config, env_vars + ) + compute_container_spec = _create_container_spec( + compute_pool_job_config, env_vars + ) worker_pool_specs: list[Union[WorkerPoolSpec, dict]] = [] @@ -461,3 +469,14 @@ def _create_disk_spec(job_config: VertexAiJobConfig) -> DiskSpec: boot_disk_size_gb=job_config.boot_disk_size_gb, ) return disk_spec + + +def _get_vai_env_vars( + environment_variables: Optional[list[tuple[str, str]]] +) -> list[env_var.EnvVar]: + """Get the environment variables for a job config.""" + env_vars: list[env_var.EnvVar] = [] + if environment_variables: + for env_var, value in environment_variables.items(): + env_vars.append(env_var.EnvVar(name=env_var, value=value)) + return env_vars diff --git a/python/gigl/src/inference/v2/glt_inferencer.py b/python/gigl/src/inference/v2/glt_inferencer.py index fdda96d49..e033b7c87 100644 --- a/python/gigl/src/inference/v2/glt_inferencer.py +++ b/python/gigl/src/inference/v2/glt_inferencer.py @@ -101,14 +101,15 @@ def __execute_VAI_inference( command = inference_process_command.strip().split(" ") logger.info(f"Running inference with command: {command}") vai_job_name = f"gigl_infer_{applied_task_identifier}" + environment_variables: list[tuple[str, str]] = [ + ("TF_CPP_MIN_LOG_LEVEL", "3"), + ] job_config = VertexAiJobConfig( job_name=vai_job_name, container_uri=container_uri, command=command, args=job_args, - environment_variables=[ - {"name": "TF_CPP_MIN_LOG_LEVEL", "value": "3"}, - ], + environment_variables=environment_variables, machine_type=inferencer_resource_config.machine_type, accelerator_type=inferencer_resource_config.gpu_type.upper().replace( "-", "_" diff --git a/python/gigl/src/training/v1/trainer.py b/python/gigl/src/training/v1/trainer.py index af3a5bacb..db828fe2b 100644 --- a/python/gigl/src/training/v1/trainer.py +++ b/python/gigl/src/training/v1/trainer.py @@ -46,7 +46,9 @@ def run( cpu_docker_uri = cpu_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU cuda_docker_uri = cuda_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CUDA container_uri = cpu_docker_uri if is_cpu_training else cuda_docker_uri - + environment_variables: list[tuple[str, str]] = [ + ("TF_CPP_MIN_LOG_LEVEL", "3"), + ] job_args = [ f"--job_name={applied_task_identifier}", f"--task_config_uri={task_config_uri}", @@ -58,9 +60,7 @@ def run( container_uri=container_uri, command=["python", "-m", "gigl.src.training.v1.lib.training_process"], args=job_args, - environment_variables=[ - {"name": "TF_CPP_MIN_LOG_LEVEL", "value": "3"}, - ], + environment_variables=environment_variables, machine_type=trainer_config.machine_type, accelerator_type=trainer_config.gpu_type.upper().replace("-", "_"), accelerator_count=trainer_config.gpu_limit, diff --git a/python/gigl/src/training/v2/glt_trainer.py b/python/gigl/src/training/v2/glt_trainer.py index 9d2aea4c9..86abe6954 100644 --- a/python/gigl/src/training/v2/glt_trainer.py +++ b/python/gigl/src/training/v2/glt_trainer.py @@ -101,14 +101,15 @@ def __execute_VAI_training( command = training_process_command.strip().split(" ") logger.info(f"Running trainer with command: {command}") vai_job_name = f"gigl_train_{applied_task_identifier}" + environment_variables: list[tuple[str, str]] = [ + ("TF_CPP_MIN_LOG_LEVEL", "3"), + ] job_config = VertexAiJobConfig( job_name=vai_job_name, container_uri=container_uri, command=command, args=job_args, - environment_variables=[ - {"name": "TF_CPP_MIN_LOG_LEVEL", "value": "3"}, - ], + environment_variables=environment_variables, machine_type=trainer_resource_config.machine_type, accelerator_type=trainer_resource_config.gpu_type.upper().replace("-", "_"), accelerator_count=trainer_resource_config.gpu_limit, From 40c755c94ce8be5c1087d294f6504347e5c42a18 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Tue, 4 Nov 2025 19:51:39 +0000 Subject: [PATCH 2/5] add test --- python/gigl/common/services/vertex_ai.py | 4 ++-- .../common/services/vertex_ai_test.py | 18 +++++++++++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/python/gigl/common/services/vertex_ai.py b/python/gigl/common/services/vertex_ai.py index 62c81f881..7ad474a1f 100644 --- a/python/gigl/common/services/vertex_ai.py +++ b/python/gigl/common/services/vertex_ai.py @@ -477,6 +477,6 @@ def _get_vai_env_vars( """Get the environment variables for a job config.""" env_vars: list[env_var.EnvVar] = [] if environment_variables: - for env_var, value in environment_variables.items(): - env_vars.append(env_var.EnvVar(name=env_var, value=value)) + for var, value in environment_variables: + env_vars.append(env_var.EnvVar(name=var, value=value)) return env_vars diff --git a/python/tests/integration/common/services/vertex_ai_test.py b/python/tests/integration/common/services/vertex_ai_test.py index 1628e1913..5b411014c 100644 --- a/python/tests/integration/common/services/vertex_ai_test.py +++ b/python/tests/integration/common/services/vertex_ai_test.py @@ -4,6 +4,7 @@ import uuid import kfp +from google.cloud.aiplatform_v1.types import env_var from parameterized import param, parameterized from gigl.common import UriFactory @@ -73,10 +74,17 @@ def test_launch_job(self): command = ["python", "-c", "import logging; logging.info('Hello, World!')"] job_config = VertexAiJobConfig( - job_name=job_name, container_uri=container_uri, command=command + job_name=job_name, + container_uri=container_uri, + command=command, + environment_variables=[("FOO", "BAR")], ) - self._vertex_ai_service.launch_job(job_config) + job = self._vertex_ai_service.launch_job(job_config) + self.assertEqual( + job.job_spec.worker_pool_specs[0].container_spec.env[0], + env_var.EnvVar(name="FOO", value="BAR"), + ) @parameterized.expand( [ @@ -122,7 +130,7 @@ def test_launch_job(self): ), ] ) - def test_launch_graph_store_job( + def _test_launch_graph_store_job( self, _, num_compute, @@ -175,7 +183,7 @@ def test_launch_graph_store_job( expected_worker_pool_spec["image_uri"], ) - def test_run_pipeline(self): + def _test_run_pipeline(self): with tempfile.TemporaryDirectory() as tmpdir: pipeline_def = os.path.join(tmpdir, "pipeline.yaml") kfp.compiler.Compiler().compile(get_pipeline, pipeline_def) @@ -197,7 +205,7 @@ def test_run_pipeline(self): self.assertEqual(run.resource_name, job.resource_name) self.assertEqual(run.labels["gigl-integration-test"], "true") - def test_run_pipeline_fails(self): + def _test_run_pipeline_fails(self): with tempfile.TemporaryDirectory() as tmpdir: pipeline_def = os.path.join(tmpdir, "pipeline_that_fails.yaml") kfp.compiler.Compiler().compile(get_pipeline_that_fails, pipeline_def) From 8e4a43b2c3b57cfd42c19429fcc5c1450c2cab99 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 5 Nov 2025 01:35:28 +0000 Subject: [PATCH 3/5] fix --- python/gigl/common/services/vertex_ai.py | 21 ++++++------------- .../gigl/src/inference/v2/glt_inferencer.py | 6 +++--- python/gigl/src/training/v1/trainer.py | 6 +++--- python/gigl/src/training/v2/glt_trainer.py | 6 +++--- .../common/services/vertex_ai_test.py | 2 +- 5 files changed, 16 insertions(+), 25 deletions(-) diff --git a/python/gigl/common/services/vertex_ai.py b/python/gigl/common/services/vertex_ai.py index 7ad474a1f..65d1eaeaa 100644 --- a/python/gigl/common/services/vertex_ai.py +++ b/python/gigl/common/services/vertex_ai.py @@ -93,7 +93,7 @@ class VertexAiJobConfig: container_uri: str command: list[str] args: Optional[list[str]] = None - environment_variables: Optional[list[tuple[str, str]]] = None + environment_variables: Optional[list[env_var.EnvVar]] = None machine_type: str = "n1-standard-4" accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED" accelerator_count: int = 0 @@ -167,12 +167,14 @@ def launch_job(self, job_config: VertexAiJobConfig) -> aiplatform.CustomJob: datetime.datetime.now().strftime("%Y%m%d-%H%M%S"), "leader_worker_internal_ip.txt", ) - env_vars = _get_vai_env_vars(job_config.environment_variables) + [ + env_vars: list[env_var.EnvVar] = [ env_var.EnvVar( name=LEADER_WORKER_INTERNAL_IP_FILE_PATH_ENV_KEY, value=leader_worker_internal_ip_file_path.uri, ) ] + if job_config.environment_variables: + env_vars.extend(job_config.environment_variables) container_spec = _create_container_spec(job_config, env_vars) @@ -265,8 +267,8 @@ def launch_graph_store_job( storage_disk_spec = _create_disk_spec(storage_pool_job_config) compute_disk_spec = _create_disk_spec(compute_pool_job_config) - env_vars: list[env_var.EnvVar] = _get_vai_env_vars( - compute_pool_job_config.environment_variables + env_vars: list[env_var.EnvVar] = ( + compute_pool_job_config.environment_variables or [] ) storage_container_spec = _create_container_spec( @@ -469,14 +471,3 @@ def _create_disk_spec(job_config: VertexAiJobConfig) -> DiskSpec: boot_disk_size_gb=job_config.boot_disk_size_gb, ) return disk_spec - - -def _get_vai_env_vars( - environment_variables: Optional[list[tuple[str, str]]] -) -> list[env_var.EnvVar]: - """Get the environment variables for a job config.""" - env_vars: list[env_var.EnvVar] = [] - if environment_variables: - for var, value in environment_variables: - env_vars.append(env_var.EnvVar(name=var, value=value)) - return env_vars diff --git a/python/gigl/src/inference/v2/glt_inferencer.py b/python/gigl/src/inference/v2/glt_inferencer.py index e033b7c87..b823968bf 100644 --- a/python/gigl/src/inference/v2/glt_inferencer.py +++ b/python/gigl/src/inference/v2/glt_inferencer.py @@ -1,7 +1,7 @@ import argparse from typing import Optional -from google.cloud.aiplatform_v1.types import accelerator_type +from google.cloud.aiplatform_v1.types import accelerator_type, env_var from gigl.common import Uri, UriFactory from gigl.common.constants import ( @@ -101,8 +101,8 @@ def __execute_VAI_inference( command = inference_process_command.strip().split(" ") logger.info(f"Running inference with command: {command}") vai_job_name = f"gigl_infer_{applied_task_identifier}" - environment_variables: list[tuple[str, str]] = [ - ("TF_CPP_MIN_LOG_LEVEL", "3"), + environment_variables: list[env_var.EnvVar] = [ + env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3"), ] job_config = VertexAiJobConfig( job_name=vai_job_name, diff --git a/python/gigl/src/training/v1/trainer.py b/python/gigl/src/training/v1/trainer.py index db828fe2b..c1509ea54 100644 --- a/python/gigl/src/training/v1/trainer.py +++ b/python/gigl/src/training/v1/trainer.py @@ -2,7 +2,7 @@ from typing import Optional import torch -from google.cloud.aiplatform_v1.types import accelerator_type +from google.cloud.aiplatform_v1.types import accelerator_type, env_var from gigl.common import Uri, UriFactory from gigl.common.constants import ( @@ -46,8 +46,8 @@ def run( cpu_docker_uri = cpu_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CPU cuda_docker_uri = cuda_docker_uri or DEFAULT_GIGL_RELEASE_SRC_IMAGE_CUDA container_uri = cpu_docker_uri if is_cpu_training else cuda_docker_uri - environment_variables: list[tuple[str, str]] = [ - ("TF_CPP_MIN_LOG_LEVEL", "3"), + environment_variables: list[env_var.EnvVar] = [ + env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3"), ] job_args = [ f"--job_name={applied_task_identifier}", diff --git a/python/gigl/src/training/v2/glt_trainer.py b/python/gigl/src/training/v2/glt_trainer.py index 86abe6954..0d5663fd5 100644 --- a/python/gigl/src/training/v2/glt_trainer.py +++ b/python/gigl/src/training/v2/glt_trainer.py @@ -1,7 +1,7 @@ import argparse from typing import Optional -from google.cloud.aiplatform_v1.types import accelerator_type +from google.cloud.aiplatform_v1.types import accelerator_type, env_var from gigl.common import Uri, UriFactory from gigl.common.constants import ( @@ -101,8 +101,8 @@ def __execute_VAI_training( command = training_process_command.strip().split(" ") logger.info(f"Running trainer with command: {command}") vai_job_name = f"gigl_train_{applied_task_identifier}" - environment_variables: list[tuple[str, str]] = [ - ("TF_CPP_MIN_LOG_LEVEL", "3"), + environment_variables: list[env_var.EnvVar] = [ + env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3"), ] job_config = VertexAiJobConfig( job_name=vai_job_name, diff --git a/python/tests/integration/common/services/vertex_ai_test.py b/python/tests/integration/common/services/vertex_ai_test.py index 5b411014c..d2a9112b0 100644 --- a/python/tests/integration/common/services/vertex_ai_test.py +++ b/python/tests/integration/common/services/vertex_ai_test.py @@ -77,7 +77,7 @@ def test_launch_job(self): job_name=job_name, container_uri=container_uri, command=command, - environment_variables=[("FOO", "BAR")], + environment_variables=[env_var.EnvVar(name="FOO", value="BAR")], ) job = self._vertex_ai_service.launch_job(job_config) From 7ec3f05ea39a5154081b2d3836f8c5e8ac974191 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 5 Nov 2025 01:35:44 +0000 Subject: [PATCH 4/5] fix --- python/tests/integration/common/services/vertex_ai_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/tests/integration/common/services/vertex_ai_test.py b/python/tests/integration/common/services/vertex_ai_test.py index d2a9112b0..3fceadcdf 100644 --- a/python/tests/integration/common/services/vertex_ai_test.py +++ b/python/tests/integration/common/services/vertex_ai_test.py @@ -130,7 +130,7 @@ def test_launch_job(self): ), ] ) - def _test_launch_graph_store_job( + def test_launch_graph_store_job( self, _, num_compute, @@ -183,7 +183,7 @@ def _test_launch_graph_store_job( expected_worker_pool_spec["image_uri"], ) - def _test_run_pipeline(self): + def test_run_pipeline(self): with tempfile.TemporaryDirectory() as tmpdir: pipeline_def = os.path.join(tmpdir, "pipeline.yaml") kfp.compiler.Compiler().compile(get_pipeline, pipeline_def) @@ -205,7 +205,7 @@ def _test_run_pipeline(self): self.assertEqual(run.resource_name, job.resource_name) self.assertEqual(run.labels["gigl-integration-test"], "true") - def _test_run_pipeline_fails(self): + def test_run_pipeline_fails(self): with tempfile.TemporaryDirectory() as tmpdir: pipeline_def = os.path.join(tmpdir, "pipeline_that_fails.yaml") kfp.compiler.Compiler().compile(get_pipeline_that_fails, pipeline_def) From 6800cdee5fcd4cc9099e8d0a75c3b369944b1672 Mon Sep 17 00:00:00 2001 From: kmontemayor Date: Wed, 5 Nov 2025 22:12:42 +0000 Subject: [PATCH 5/5] fix test --- python/tests/integration/common/services/vertex_ai_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/tests/integration/common/services/vertex_ai_test.py b/python/tests/integration/common/services/vertex_ai_test.py index 3fceadcdf..43a4e2f94 100644 --- a/python/tests/integration/common/services/vertex_ai_test.py +++ b/python/tests/integration/common/services/vertex_ai_test.py @@ -81,9 +81,9 @@ def test_launch_job(self): ) job = self._vertex_ai_service.launch_job(job_config) - self.assertEqual( - job.job_spec.worker_pool_specs[0].container_spec.env[0], + self.assertIn( env_var.EnvVar(name="FOO", value="BAR"), + job.job_spec.worker_pool_specs[0].container_spec.env, ) @parameterized.expand(