Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions gigl/env/constants.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
"""Environment-variable keys used across GiGL.
Most of these keys are set on subprocess env (never on the parent
``os.environ``) by ``gigl.src.common.custom_launcher.launch_custom`` so that
receiving CLIs can ``os.environ.get(...)`` their runtime context.
Most of these keys are set on launched process envs by
``gigl.src.common.custom_launcher.launch_custom`` and
``gigl.src.common.vertex_ai_launcher`` so that receiving CLIs can
``os.environ.get(...)`` their runtime context.
``GIGL_RESOURCE_CONFIG_URI`` is also written to the parent ``os.environ`` by
``gigl.env.pipelines_config.get_resource_config`` so that downstream readers
Expand Down
123 changes: 110 additions & 13 deletions gigl/src/common/vertex_ai_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@
)
from gigl.common.logger import Logger
from gigl.common.services.vertex_ai import VertexAiJobConfig, VertexAIService
from gigl.env.constants import (
GIGL_APPLIED_TASK_IDENTIFIER_ENV_KEY,
GIGL_COMPONENT_ENV_KEY,
GIGL_CPU_DOCKER_URI_ENV_KEY,
GIGL_CUDA_DOCKER_URI_ENV_KEY,
GIGL_RESOURCE_CONFIG_URI_ENV_KEY,
GIGL_TASK_CONFIG_URI_ENV_KEY,
)
from gigl.env.distributed import COMPUTE_CLUSTER_LOCAL_WORLD_SIZE_ENV_KEY
from gigl.src.common.constants.components import GiGLComponents
from gigl.src.common.types.pb_wrappers.gigl_resource_config import (
Expand Down Expand Up @@ -57,7 +65,7 @@ def launch_single_pool_job(

Args:
vertex_ai_resource_config: The Vertex AI resource configuration
job_name: Full name for the Vertex AI job
job_name: Raw GiGL applied task identifier
task_config_uri: URI to the task configuration
resource_config_uri: URI to the resource configuration
process_command: Command to run in the container
Expand All @@ -72,6 +80,10 @@ def launch_single_pool_job(
raise ValueError(
f"Invalid component: {component}. Expected one of: {_LAUNCHABLE_COMPONENTS}"
)
vertex_ai_job_name = _build_vertex_ai_job_name(
job_name=job_name,
component=component,
)
is_cpu_execution = _determine_if_cpu_execution(
vertex_ai_resource_config=vertex_ai_resource_config
)
Expand All @@ -80,15 +92,26 @@ def launch_single_pool_job(
container_uri = cpu_docker_uri if is_cpu_execution else cuda_docker_uri

job_config = _build_job_config(
job_name=job_name,
vertex_ai_job_name=vertex_ai_job_name,
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
command_str=process_command,
args=process_runtime_args,
use_cuda=not is_cpu_execution,
container_uri=container_uri,
vertex_ai_resource_config=vertex_ai_resource_config,
env_vars=[env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3")],
env_vars=[
env_var.EnvVar(name="TF_CPP_MIN_LOG_LEVEL", value="3"),
*_build_common_gigl_env_vars(
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
component=component,
),
],
labels=resource_config_wrapper.get_resource_labels(component=component),
)
logger.info(f"Launching {component.value} job with config: {job_config}")
Expand Down Expand Up @@ -120,7 +143,7 @@ def launch_graph_store_enabled_job(

Args:
vertex_ai_graph_store_config: The Vertex AI graph store configuration
job_name: Full name for the Vertex AI job
job_name: Raw GiGL applied task identifier
task_config_uri: URI to the task configuration
resource_config_uri: URI to the resource configuration
compute_commmand: Command to run in the compute container
Expand All @@ -136,6 +159,10 @@ def launch_graph_store_enabled_job(
raise ValueError(
f"Invalid component: {component}. Expected one of: {_LAUNCHABLE_COMPONENTS}"
)
vertex_ai_job_name = _build_vertex_ai_job_name(
job_name=job_name,
component=component,
)
storage_pool_config = vertex_ai_graph_store_config.graph_store_pool
compute_pool_config = vertex_ai_graph_store_config.compute_pool

Expand Down Expand Up @@ -167,13 +194,22 @@ def launch_graph_store_enabled_job(
name=COMPUTE_CLUSTER_LOCAL_WORLD_SIZE_ENV_KEY,
value=str(num_compute_processes),
),
*_build_common_gigl_env_vars(
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
cpu_docker_uri=cpu_docker_uri,
cuda_docker_uri=cuda_docker_uri,
component=component,
),
]

labels = resource_config_wrapper.get_resource_labels(component=component)

# Create compute pool job config
compute_job_config = _build_job_config(
job_name=job_name,
vertex_ai_job_name=vertex_ai_job_name,
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
command_str=compute_commmand,
Expand All @@ -187,7 +223,8 @@ def launch_graph_store_enabled_job(

# Create storage pool job config
storage_job_config = _build_job_config(
job_name=job_name,
vertex_ai_job_name=vertex_ai_job_name,
applied_task_identifier=job_name,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
command_str=storage_command,
Expand Down Expand Up @@ -219,7 +256,8 @@ def launch_graph_store_enabled_job(


def _build_job_config(
job_name: str,
vertex_ai_job_name: str,
applied_task_identifier: str,
task_config_uri: Uri,
resource_config_uri: Uri,
command_str: str,
Expand All @@ -233,12 +271,12 @@ def _build_job_config(
"""Build a VertexAiJobConfig for training or inference jobs.

This function constructs a configuration object for running GiGL training or inference
jobs on Vertex AI. It assembles job arguments, sets appropriate job naming conventions,
and configures resource specifications based on the provided parameters.
jobs on Vertex AI. It assembles job arguments and configures resource specifications
based on the provided parameters.

Args:
job_name (str): The base name for the job. Will be prefixed with "gigl_train_" or "gigl_infer_".
is_inference (bool): Whether this is an inference job (True) or training job (False).
vertex_ai_job_name (str): The Vertex AI CustomJob display name.
applied_task_identifier (str): Raw GiGL applied task identifier passed to the process.
task_config_uri (Uri): URI to the task configuration file.
resource_config_uri (Uri): URI to the resource configuration file.
command_str (str): The command to run in the container (will be split on spaces).
Expand All @@ -255,7 +293,7 @@ def _build_job_config(
"""
job_args = (
[
f"--job_name={job_name}",
f"--job_name={applied_task_identifier}",
f"--task_config_uri={task_config_uri}",
f"--resource_config_uri={resource_config_uri}",
]
Expand All @@ -266,7 +304,7 @@ def _build_job_config(
command = command_str.strip().split(" ")

job_config = VertexAiJobConfig(
job_name=job_name,
job_name=vertex_ai_job_name,
container_uri=container_uri,
command=command,
args=job_args,
Expand Down Expand Up @@ -297,6 +335,65 @@ def _build_job_config(
return job_config


def _build_vertex_ai_job_name(job_name: str, component: GiGLComponents) -> str:
"""Build the Vertex AI CustomJob display name from a raw GiGL job name.

Args:
job_name: Raw GiGL applied task identifier.
component: The GiGL component being launched.

Returns:
The component-prefixed Vertex AI CustomJob display name.

Raises:
ValueError: If ``component`` is not a launchable Vertex AI component.
"""
if component == GiGLComponents.Trainer:
return f"gigl_train_{job_name}"
if component == GiGLComponents.Inferencer:
return f"gigl_infer_{job_name}"
raise ValueError(
f"Invalid component: {component}. Expected one of: {_LAUNCHABLE_COMPONENTS}"
)


def _build_common_gigl_env_vars(
applied_task_identifier: str,
task_config_uri: Uri,
resource_config_uri: Uri,
cpu_docker_uri: str,
cuda_docker_uri: str,
component: GiGLComponents,
) -> list[env_var.EnvVar]:
"""Build common GiGL runtime context env vars for Vertex AI containers.

Args:
applied_task_identifier: The raw GiGL task identifier.
task_config_uri: URI to the task configuration.
resource_config_uri: URI to the resource configuration.
cpu_docker_uri: Resolved CPU Docker image URI.
cuda_docker_uri: Resolved CUDA Docker image URI.
component: The GiGL component being launched.

Returns:
Environment variables carrying shared GiGL launcher context.
"""
return [
env_var.EnvVar(
name=GIGL_APPLIED_TASK_IDENTIFIER_ENV_KEY,
value=str(applied_task_identifier),
),
env_var.EnvVar(name=GIGL_TASK_CONFIG_URI_ENV_KEY, value=str(task_config_uri)),
env_var.EnvVar(
name=GIGL_RESOURCE_CONFIG_URI_ENV_KEY,
value=str(resource_config_uri),
),
env_var.EnvVar(name=GIGL_COMPONENT_ENV_KEY, value=component.name),
env_var.EnvVar(name=GIGL_CPU_DOCKER_URI_ENV_KEY, value=cpu_docker_uri),
env_var.EnvVar(name=GIGL_CUDA_DOCKER_URI_ENV_KEY, value=cuda_docker_uri),
]


def _build_reservation_affinity(
affinity: VertexAiReservationAffinity,
) -> Optional[ReservationAffinity]:
Expand Down
6 changes: 2 additions & 4 deletions gigl/src/inference/v2/glt_inferencer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,12 @@ def __execute_VAI_inference(
gbml_config_pb_wrapper.inferencer_config.inferencer_args
)

job_name = f"gigl_infer_{applied_task_identifier}"

if isinstance(
resource_config_wrapper.inferencer_config, VertexAiResourceConfig
):
launch_single_pool_job(
vertex_ai_resource_config=resource_config_wrapper.inferencer_config,
job_name=job_name,
job_name=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
process_command=inference_process_command,
Expand All @@ -78,7 +76,7 @@ def __execute_VAI_inference(
):
launch_graph_store_enabled_job(
vertex_ai_graph_store_config=resource_config_wrapper.inferencer_config,
job_name=job_name,
job_name=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
compute_commmand=inference_process_command,
Expand Down
6 changes: 2 additions & 4 deletions gigl/src/training/v2/glt_trainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,10 @@ def __execute_VAI_training(
gbml_config_pb_wrapper.trainer_config.trainer_args
)

job_name = f"gigl_train_{applied_task_identifier}"

if isinstance(resource_config.trainer_config, VertexAiResourceConfig):
launch_single_pool_job(
vertex_ai_resource_config=resource_config.trainer_config,
job_name=job_name,
job_name=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
process_command=training_process_command,
Expand All @@ -74,7 +72,7 @@ def __execute_VAI_training(
elif isinstance(resource_config.trainer_config, VertexAiGraphStoreConfig):
launch_graph_store_enabled_job(
vertex_ai_graph_store_config=resource_config.trainer_config,
job_name=job_name,
job_name=applied_task_identifier,
task_config_uri=task_config_uri,
resource_config_uri=resource_config_uri,
compute_commmand=training_process_command,
Expand Down
Loading