Skip to content

Commit

Permalink
feat: Add support for TPU v5 lite pod(v5e) for custom training jobs. …
Browse files Browse the repository at this point in the history
…Custom training jobs now accept the v5e machine types as listed in https://cloud.google.com/tpu/docs/tpus-in-gke#v5e.

PiperOrigin-RevId: 627165889
  • Loading branch information
vertex-sdk-bot authored and Copybara-Service committed Apr 22, 2024
1 parent 0599ca1 commit 415912e
Show file tree
Hide file tree
Showing 6 changed files with 859 additions and 2 deletions.
8 changes: 8 additions & 0 deletions google/cloud/aiplatform/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1924,6 +1924,7 @@ def from_local_script(
encryption_spec_key_name: Optional[str] = None,
staging_bucket: Optional[str] = None,
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> "CustomJob":
"""Configures a custom job from a local script.
Expand Down Expand Up @@ -2034,6 +2035,12 @@ def from_local_script(
on-demand short-live machines. The network, CMEK, and node pool
configs on the job should be consistent with those on the
PersistentResource, otherwise, the job will be rejected.
tpu_topology (str):
Optional. Specifies the tpu topology to be used for
TPU training job. This field is required for TPU v5 versions. For
details on the TPU topology, refer to
https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config. The topology
must be a supported value for the TPU machine type.
Raises:
RuntimeError: If staging bucket was not set using aiplatform.init
Expand Down Expand Up @@ -2063,6 +2070,7 @@ def from_local_script(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
).pool_specs
)

Expand Down
46 changes: 46 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,7 @@ def _prepare_and_validate_run(
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> Tuple[worker_spec_utils._DistributedTrainingSpec, Optional[gca_model.Model]]:
"""Create worker pool specs and managed model as well validating the
run.
Expand Down Expand Up @@ -1417,6 +1418,10 @@ def _prepare_and_validate_run(
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
Optional. The type of machine to use for reduction server.
tpu_topology (str):
Optional. Only required if the machine type is a TPU
v5 version.
Returns:
Worker pools specs and managed model for run.
Expand Down Expand Up @@ -1454,6 +1459,7 @@ def _prepare_and_validate_run(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
).pool_specs
)

Expand Down Expand Up @@ -2974,6 +2980,7 @@ def run(
create_request_timeout: Optional[float] = None,
disable_retries: bool = False,
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -3268,6 +3275,12 @@ def run(
on-demand short-live machines. The network, CMEK, and node pool
configs on the job should be consistent with those on the
PersistentResource, otherwise, the job will be rejected.
tpu_topology (str):
Optional. Specifies the tpu topology to be used for
TPU training job. This field is required for TPU v5 versions. For
details on the TPU topology, refer to
https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config. The topology must
be a supported value for the TPU machine type.
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -3287,6 +3300,7 @@ def run(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
)

# make and copy package
Expand Down Expand Up @@ -3383,6 +3397,7 @@ def submit(
create_request_timeout: Optional[float] = None,
disable_retries: bool = False,
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> Optional[models.Model]:
"""Submits the custom training job without blocking until completion.
Expand Down Expand Up @@ -3677,6 +3692,12 @@ def submit(
on-demand short-live machines. The network, CMEK, and node pool
configs on the job should be consistent with those on the
PersistentResource, otherwise, the job will be rejected.
tpu_topology (str):
Optional. Specifies the tpu topology to be used for
TPU training job. This field is required for TPU v5 versions. For
details on the TPU topology, refer to
https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config. The topology must
be a supported value for the TPU machine type.
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -3695,6 +3716,7 @@ def submit(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
)

# make and copy package
Expand Down Expand Up @@ -4360,6 +4382,7 @@ def run(
create_request_timeout: Optional[float] = None,
disable_retries: bool = False,
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -4647,6 +4670,12 @@ def run(
on-demand short-live machines. The network, CMEK, and node pool
configs on the job should be consistent with those on the
PersistentResource, otherwise, the job will be rejected.
tpu_topology (str):
Optional. Specifies the tpu topology to be used for
TPU training job. This field is required for TPU v5 versions. For
details on the TPU topology, refer to
https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config. The topology
must be a supported value for the TPU machine type.
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -4671,6 +4700,7 @@ def run(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
)

return self._run(
Expand Down Expand Up @@ -4761,6 +4791,7 @@ def submit(
create_request_timeout: Optional[float] = None,
disable_retries: bool = False,
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> Optional[models.Model]:
"""Submits the custom training job without blocking until completion.
Expand Down Expand Up @@ -5048,6 +5079,12 @@ def submit(
on-demand short-live machines. The network, CMEK, and node pool
configs on the job should be consistent with those on the
PersistentResource, otherwise, the job will be rejected.
tpu_topology (str):
Optional. Specifies the tpu topology to be used for
TPU training job. This field is required for TPU v5 versions. For
details on the TPU topology, refer to
https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config. The topology
must be a supported value for the TPU machine type.
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -5071,6 +5108,7 @@ def submit(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
)

return self._run(
Expand Down Expand Up @@ -7315,6 +7353,7 @@ def run(
create_request_timeout: Optional[float] = None,
disable_retries: bool = False,
persistent_resource_id: Optional[str] = None,
tpu_topology: Optional[str] = None,
) -> Optional[models.Model]:
"""Runs the custom training job.
Expand Down Expand Up @@ -7603,6 +7642,12 @@ def run(
on-demand short-live machines. The network, CMEK, and node pool
configs on the job should be consistent with those on the
PersistentResource, otherwise, the job will be rejected.
tpu_topology (str):
Optional. Specifies the tpu topology to be used for
TPU training job. This field is required for TPU v5 versions. For
details on the TPU topology, refer to
https://cloud.google.com/tpu/docs/v5e#tpu-v5e-config. The topology
must be a supported value for the TPU machine type.
Returns:
model: The trained Vertex AI Model resource or None if training did not
Expand All @@ -7622,6 +7667,7 @@ def run(
boot_disk_size_gb=boot_disk_size_gb,
reduction_server_replica_count=reduction_server_replica_count,
reduction_server_machine_type=reduction_server_machine_type,
tpu_topology=tpu_topology,
)

return self._run(
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/aiplatform/utils/worker_spec_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class _WorkerPoolSpec(NamedTuple):
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED"
boot_disk_type: str = "pd-ssd"
boot_disk_size_gb: int = 100
tpu_topology: Optional[str] = None

def _get_accelerator_type(self) -> Optional[str]:
"""Validates accelerator_type and returns the name of the accelerator.
Expand Down Expand Up @@ -97,6 +98,9 @@ def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]:
spec["machine_spec"]["accelerator_type"] = accelerator_type
spec["machine_spec"]["accelerator_count"] = self.accelerator_count

if self.tpu_topology:
spec["machine_spec"]["tpu_topology"] = self.tpu_topology

return spec

@property
Expand Down Expand Up @@ -185,6 +189,7 @@ def chief_worker_pool(
boot_disk_size_gb: int = 100,
reduction_server_replica_count: int = 0,
reduction_server_machine_type: str = None,
tpu_topology: str = None,
) -> "_DistributedTrainingSpec":
"""Parametrizes Config to support only chief with worker replicas.
Expand Down Expand Up @@ -214,6 +219,10 @@ def chief_worker_pool(
The number of reduction server replicas, default is 0.
reduction_server_machine_type (str):
The type of machine to use for reduction server, default is `n1-highcpu-16`.
tpu_topology (str):
TPU topology for the TPU type. This field is
required for the TPU v5 versions. This field is only passed to the
chief replica as TPU jobs only allow 1 replica.
Returns:
_DistributedTrainingSpec representing one chief and n workers all of
Expand All @@ -230,6 +239,7 @@ def chief_worker_pool(
accelerator_type=accelerator_type,
boot_disk_type=boot_disk_type,
boot_disk_size_gb=boot_disk_size_gb,
tpu_topology=tpu_topology,
)

worker_spec = _WorkerPoolSpec(
Expand Down
55 changes: 54 additions & 1 deletion tests/unit/aiplatform/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ class TrainingJobConstants:
}
_TEST_REPLICA_COUNT = 1
_TEST_MACHINE_TYPE = "n1-standard-4"
_TEST_MACHINE_TYPE_TPU = "cloud-tpu"
_TEST_MACHINE_TYPE_TPU_V5E = "ct5lp-hightpu-4t"
_TEST_ACCELERATOR_TPU_TYPE = "TPU_V3"
_TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_K80"
_TEST_ACCELERATOR_COUNT = 1
_TEST_BOOT_DISK_TYPE = "pd-standard"
Expand Down Expand Up @@ -120,12 +123,40 @@ class TrainingJobConstants:
},
}
]
_TEST_TPU_V5E_WORKER_POOL_SPEC = [
{
"machine_spec": {
"machine_type": _TEST_MACHINE_TYPE_TPU_V5E,
"tpu_topology": "2x2",
},
"replica_count": 1,
"disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100},
"container_spec": {
"image_uri": _TEST_TRAINING_CONTAINER_IMAGE,
},
}
]
_TEST_TPU_V3_WORKER_POOL_SPEC = [
{
"machine_spec": {
"machine_type": _TEST_MACHINE_TYPE_TPU,
"accelerator_type": _TEST_ACCELERATOR_TPU_TYPE,
"accelerator_count": 32,
},
"replica_count": 1,
"disk_spec": {"boot_disk_type": "pd-ssd", "boot_disk_size_gb": 100},
"container_spec": {
"image_uri": _TEST_TRAINING_CONTAINER_IMAGE,
},
}
]
_TEST_ID = "1028944691210842416"
_TEST_NETWORK = (
f"projects/{ProjectConstants._TEST_PROJECT}/global/networks/{_TEST_ID}"
)
_TEST_RESERVED_IP_RANGES = ["example_ip_range"]
_TEST_TIMEOUT = 8000
_TEST_TIMEOUT_SECONDS = duration_pb2.Duration(seconds=_TEST_TIMEOUT)
_TEST_RESTART_JOB_ON_WORKER_RESTART = True
_TEST_DISABLE_RETRIES = True

Expand All @@ -137,7 +168,7 @@ class TrainingJobConstants:
output_uri_prefix=_TEST_BASE_OUTPUT_DIR
),
scheduling=custom_job.Scheduling(
timeout=duration_pb2.Duration(seconds=_TEST_TIMEOUT),
timeout=_TEST_TIMEOUT_SECONDS,
restart_job_on_worker_restart=_TEST_RESTART_JOB_ON_WORKER_RESTART,
disable_retries=_TEST_DISABLE_RETRIES,
),
Expand Down Expand Up @@ -167,6 +198,28 @@ class TrainingJobConstants:
)
_TEST_DEFAULT_ENCRYPTION_KEY_NAME = "key_default"

def create_tpu_job_proto(tpu_version):
worker_pool_spec = (
TrainingJobConstants._TEST_TPU_V5E_WORKER_POOL_SPEC
if tpu_version == "v5e"
else TrainingJobConstants._TEST_TPU_V3_WORKER_POOL_SPEC
)
return custom_job.CustomJob(
display_name=TrainingJobConstants._TEST_DISPLAY_NAME,
job_spec=custom_job.CustomJobSpec(
worker_pool_specs=worker_pool_spec,
base_output_directory=io.GcsDestination(
output_uri_prefix=TrainingJobConstants._TEST_BASE_OUTPUT_DIR
),
scheduling=custom_job.Scheduling(
timeout=TrainingJobConstants._TEST_TIMEOUT_SECONDS,
restart_job_on_worker_restart=TrainingJobConstants._TEST_RESTART_JOB_ON_WORKER_RESTART,
),
service_account=ProjectConstants._TEST_SERVICE_ACCOUNT,
network=TrainingJobConstants._TEST_NETWORK,
),
)


@dataclasses.dataclass(frozen=True)
class ModelConstants:
Expand Down
Loading

0 comments on commit 415912e

Please sign in to comment.