Skip to content

Commit

Permalink
feat: Add explicit constraints for update_ray_cluster
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 589973886
  • Loading branch information
yinghsienwu authored and Copybara-Service committed Dec 11, 2023
1 parent cd233ef commit 979a4f3
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 27 deletions.
59 changes: 54 additions & 5 deletions google/cloud/aiplatform/preview/vertex_ray/cluster_init.py
Expand Up @@ -124,6 +124,14 @@ def create_ray_cluster(
"[Ray on Vertex AI]: For head_node_type, "
+ "Resources.node_count must be 1."
)
if (
head_node_type.accelerator_type is None
and head_node_type.accelerator_count > 0
):
raise ValueError(
"[Ray on Vertex]: accelerator_type must be specified when"
+ " accelerator_count is set to a value other than 0."
)

resource_pool_images = {}

Expand All @@ -147,6 +155,14 @@ def create_ray_cluster(
i = 0
if worker_node_types:
for worker_node_type in worker_node_types:
if (
worker_node_type.accelerator_type is None
and worker_node_type.accelerator_count > 0
):
raise ValueError(
"[Ray on Vertex]: accelerator_type must be specified when"
+ " accelerator_count is set to a value other than 0."
)
# Worker and head share the same MachineSpec, merge them into the
# same ResourcePool
additional_replica_count = resources._check_machine_spec_identical(
Expand Down Expand Up @@ -327,31 +343,64 @@ def update_ray_cluster(
Returns:
The cluster_resource_name of the Ray cluster on Vertex.
"""
# worker_node_types should not be duplicated.
for i in range(len(worker_node_types)):
for j in range(len(worker_node_types)):
additional_replica_count = resources._check_machine_spec_identical(
worker_node_types[i], worker_node_types[j]
)
if additional_replica_count > 0 and i != j:
raise ValueError(
"[Ray on Vertex AI]: Worker_node_types have duplicate machine specs: ",
worker_node_types[i],
"and ",
worker_node_types[j],
)

persistent_resource = _gapic_utils.get_persistent_resource(
persistent_resource_name=cluster_resource_name
)

current_persistent_resource = copy.deepcopy(persistent_resource)
head_node_type = get_ray_cluster(cluster_resource_name).head_node_type
current_persistent_resource.resource_pools[0].replica_count = 1
# TODO(b/300146407): Raise ValueError for duplicate resource pools

previous_ray_cluster = get_ray_cluster(cluster_resource_name)
head_node_type = previous_ray_cluster.head_node_type
previous_worker_node_types = previous_ray_cluster.worker_node_types

# new worker_node_types and previous_worker_node_types should be the same length.
if len(worker_node_types) != len(previous_worker_node_types):
raise ValueError(
f"[Ray on Vertex AI]: Desired number of worker_node_types ({len(worker_node_types)}) does not match the number of the existing worker_node_type({len(previous_worker_node_types)}).",
)

# Merge worker_node_type and head_node_type if the share
# the same machine spec.
not_merged = 1
for i in range(len(worker_node_types)):
additional_replica_count = resources._check_machine_spec_identical(
head_node_type, worker_node_types[i]
)
if additional_replica_count != 0:
# merge the 1st duplicated worker with head
if additional_replica_count != 0 or (
additional_replica_count == 0 and worker_node_types[i].node_count == 0
):
# Merge the 1st duplicated worker with head, allow scale down to 0 worker
current_persistent_resource.resource_pools[0].replica_count = (
1 + additional_replica_count
)
# reset not_merged
# Reset not_merged
not_merged = 0
else:
# No duplication w/ head node, write the 2nd worker node to the 2nd resource pool.
current_persistent_resource.resource_pools[
i + not_merged
].replica_count = worker_node_types[i].node_count
# New worker_node_type.node_count should be >=1 unless the worker_node_type
# and head_node_type are merged due to the same machine specs.
if worker_node_types[i].node_count == 0:
raise ValueError(
f"[Ray on Vertex AI]: Worker_node_type ({worker_node_types[i]}) must update to >= 1 nodes",
)

request = persistent_resource_service.UpdatePersistentResourceRequest(
persistent_resource=current_persistent_resource,
Expand Down
29 changes: 7 additions & 22 deletions google/cloud/aiplatform/preview/vertex_ray/util/resources.py
Expand Up @@ -19,6 +19,7 @@
from google.cloud.aiplatform_v1beta1.types import PersistentResource


@dataclasses.dataclass
class Resources:
"""Resources for a ray cluster node.
Expand All @@ -38,28 +39,12 @@ class Resources:
be either unspecified or within the range of [100, 64000].
"""

def __init__(
self,
machine_type: Optional[str] = "n1-standard-4",
node_count: Optional[int] = 1,
accelerator_type: Optional[str] = None,
accelerator_count: Optional[int] = 0,
boot_disk_type: Optional[str] = "pd-ssd",
boot_disk_size_gb: Optional[int] = 100,
):

self.machine_type = machine_type
self.node_count = node_count
self.accelerator_type = accelerator_type
self.accelerator_count = accelerator_count
self.boot_disk_type = boot_disk_type
self.boot_disk_size_gb = boot_disk_size_gb

if accelerator_type is None and accelerator_count > 0:
raise ValueError(
"[Ray on Vertex]: accelerator_type must be specified when"
+ " accelerator_count is set to a value other than 0."
)
machine_type: Optional[str] = "n1-standard-4"
node_count: Optional[int] = 1
accelerator_type: Optional[str] = None
accelerator_count: Optional[int] = 0
boot_disk_type: Optional[str] = "pd-ssd"
boot_disk_size_gb: Optional[int] = 100


@dataclasses.dataclass
Expand Down
91 changes: 91 additions & 0 deletions tests/unit/vertex_ray/test_cluster_init.py
Expand Up @@ -46,6 +46,11 @@
)
_TEST_RESPONSE_RUNNING_2_POOLS_RESIZE.resource_pools[1].replica_count = 1

_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER = copy.deepcopy(
tc.ClusterConstants._TEST_RESPONSE_RUNNING_1_POOL
)
_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER.resource_pools[0].replica_count = 1


@pytest.fixture
def create_persistent_resource_1_pool_mock():
Expand Down Expand Up @@ -163,6 +168,22 @@ def update_persistent_resource_1_pool_mock():
yield update_persistent_resource_1_pool_mock


@pytest.fixture
def update_persistent_resource_1_pool_0_worker_mock():
with mock.patch.object(
PersistentResourceServiceClient,
"update_persistent_resource",
) as update_persistent_resource_1_pool_0_worker_mock:
update_persistent_resource_lro_mock = mock.Mock(ga_operation.Operation)
update_persistent_resource_lro_mock.result.return_value = (
_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER
)
update_persistent_resource_1_pool_0_worker_mock.return_value = (
update_persistent_resource_lro_mock
)
yield update_persistent_resource_1_pool_0_worker_mock


@pytest.fixture
def update_persistent_resource_2_pools_mock():
with mock.patch.object(
Expand Down Expand Up @@ -472,6 +493,30 @@ def test_update_ray_cluster_1_pool(self, update_persistent_resource_1_pool_mock)

assert returned_name == tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS

@pytest.mark.usefixtures("get_persistent_resource_1_pool_mock")
def test_update_ray_cluster_1_pool_to_0_worker(
self, update_persistent_resource_1_pool_mock
):

new_worker_node_types = []
for worker_node_type in tc.ClusterConstants._TEST_CLUSTER.worker_node_types:
# resize worker node to node_count = 0
worker_node_type.node_count = 0
new_worker_node_types.append(worker_node_type)

returned_name = vertex_ray.update_ray_cluster(
cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS,
worker_node_types=new_worker_node_types,
)

request = persistent_resource_service.UpdatePersistentResourceRequest(
persistent_resource=_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER,
update_mask=_EXPECTED_MASK,
)
update_persistent_resource_1_pool_mock.assert_called_once_with(request)

assert returned_name == tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS

@pytest.mark.usefixtures("get_persistent_resource_2_pools_mock")
def test_update_ray_cluster_2_pools(self, update_persistent_resource_2_pools_mock):

Expand All @@ -493,3 +538,49 @@ def test_update_ray_cluster_2_pools(self, update_persistent_resource_2_pools_moc
update_persistent_resource_2_pools_mock.assert_called_once_with(request)

assert returned_name == tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS

@pytest.mark.usefixtures("get_persistent_resource_2_pools_mock")
def test_update_ray_cluster_2_pools_0_worker_fail(self):

new_worker_node_types = []
for worker_node_type in tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types:
# resize worker node to node_count = 0
worker_node_type.node_count = 0
new_worker_node_types.append(worker_node_type)

with pytest.raises(ValueError) as e:
vertex_ray.update_ray_cluster(
cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS,
worker_node_types=new_worker_node_types,
)

e.match(regexp=r"must update to >= 1 nodes.")

@pytest.mark.usefixtures("get_persistent_resource_1_pool_mock")
def test_update_ray_cluster_duplicate_worker_node_types_error(self):
new_worker_node_types = (
tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types
+ tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types
)
with pytest.raises(ValueError) as e:
vertex_ray.update_ray_cluster(
cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS,
worker_node_types=new_worker_node_types,
)

e.match(regexp=r"Worker_node_types have duplicate machine specs")

@pytest.mark.usefixtures("get_persistent_resource_1_pool_mock")
def test_update_ray_cluster_mismatch_worker_node_types_count_error(self):
with pytest.raises(ValueError) as e:
new_worker_node_types = (
tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types
)
vertex_ray.update_ray_cluster(
cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS,
worker_node_types=new_worker_node_types,
)

e.match(
regexp=r"does not match the number of the existing worker_node_type"
)

0 comments on commit 979a4f3

Please sign in to comment.