diff --git a/google/cloud/aiplatform/preview/vertex_ray/cluster_init.py b/google/cloud/aiplatform/preview/vertex_ray/cluster_init.py index 6438812b82..1dd94f69e3 100644 --- a/google/cloud/aiplatform/preview/vertex_ray/cluster_init.py +++ b/google/cloud/aiplatform/preview/vertex_ray/cluster_init.py @@ -124,6 +124,14 @@ def create_ray_cluster( "[Ray on Vertex AI]: For head_node_type, " + "Resources.node_count must be 1." ) + if ( + head_node_type.accelerator_type is None + and head_node_type.accelerator_count > 0 + ): + raise ValueError( + "[Ray on Vertex]: accelerator_type must be specified when" + + " accelerator_count is set to a value other than 0." + ) resource_pool_images = {} @@ -147,6 +155,14 @@ def create_ray_cluster( i = 0 if worker_node_types: for worker_node_type in worker_node_types: + if ( + worker_node_type.accelerator_type is None + and worker_node_type.accelerator_count > 0 + ): + raise ValueError( + "[Ray on Vertex]: accelerator_type must be specified when" + + " accelerator_count is set to a value other than 0." + ) # Worker and head share the same MachineSpec, merge them into the # same ResourcePool additional_replica_count = resources._check_machine_spec_identical( @@ -327,31 +343,64 @@ def update_ray_cluster( Returns: The cluster_resource_name of the Ray cluster on Vertex. """ + # worker_node_types should not be duplicated. + for i in range(len(worker_node_types)): + for j in range(len(worker_node_types)): + additional_replica_count = resources._check_machine_spec_identical( + worker_node_types[i], worker_node_types[j] + ) + if additional_replica_count > 0 and i != j: + raise ValueError( + "[Ray on Vertex AI]: Worker_node_types have duplicate machine specs: ", + worker_node_types[i], + "and ", + worker_node_types[j], + ) + persistent_resource = _gapic_utils.get_persistent_resource( persistent_resource_name=cluster_resource_name ) current_persistent_resource = copy.deepcopy(persistent_resource) - head_node_type = get_ray_cluster(cluster_resource_name).head_node_type current_persistent_resource.resource_pools[0].replica_count = 1 - # TODO(b/300146407): Raise ValueError for duplicate resource pools + + previous_ray_cluster = get_ray_cluster(cluster_resource_name) + head_node_type = previous_ray_cluster.head_node_type + previous_worker_node_types = previous_ray_cluster.worker_node_types + + # new worker_node_types and previous_worker_node_types should be the same length. + if len(worker_node_types) != len(previous_worker_node_types): + raise ValueError( + f"[Ray on Vertex AI]: Desired number of worker_node_types ({len(worker_node_types)}) does not match the number of the existing worker_node_type({len(previous_worker_node_types)}).", + ) + + # Merge worker_node_type and head_node_type if the share + # the same machine spec. not_merged = 1 for i in range(len(worker_node_types)): additional_replica_count = resources._check_machine_spec_identical( head_node_type, worker_node_types[i] ) - if additional_replica_count != 0: - # merge the 1st duplicated worker with head + if additional_replica_count != 0 or ( + additional_replica_count == 0 and worker_node_types[i].node_count == 0 + ): + # Merge the 1st duplicated worker with head, allow scale down to 0 worker current_persistent_resource.resource_pools[0].replica_count = ( 1 + additional_replica_count ) - # reset not_merged + # Reset not_merged not_merged = 0 else: # No duplication w/ head node, write the 2nd worker node to the 2nd resource pool. current_persistent_resource.resource_pools[ i + not_merged ].replica_count = worker_node_types[i].node_count + # New worker_node_type.node_count should be >=1 unless the worker_node_type + # and head_node_type are merged due to the same machine specs. + if worker_node_types[i].node_count == 0: + raise ValueError( + f"[Ray on Vertex AI]: Worker_node_type ({worker_node_types[i]}) must update to >= 1 nodes", + ) request = persistent_resource_service.UpdatePersistentResourceRequest( persistent_resource=current_persistent_resource, diff --git a/google/cloud/aiplatform/preview/vertex_ray/util/resources.py b/google/cloud/aiplatform/preview/vertex_ray/util/resources.py index 7dbffe23d7..7779f5b6c7 100644 --- a/google/cloud/aiplatform/preview/vertex_ray/util/resources.py +++ b/google/cloud/aiplatform/preview/vertex_ray/util/resources.py @@ -19,6 +19,7 @@ from google.cloud.aiplatform_v1beta1.types import PersistentResource +@dataclasses.dataclass class Resources: """Resources for a ray cluster node. @@ -38,28 +39,12 @@ class Resources: be either unspecified or within the range of [100, 64000]. """ - def __init__( - self, - machine_type: Optional[str] = "n1-standard-4", - node_count: Optional[int] = 1, - accelerator_type: Optional[str] = None, - accelerator_count: Optional[int] = 0, - boot_disk_type: Optional[str] = "pd-ssd", - boot_disk_size_gb: Optional[int] = 100, - ): - - self.machine_type = machine_type - self.node_count = node_count - self.accelerator_type = accelerator_type - self.accelerator_count = accelerator_count - self.boot_disk_type = boot_disk_type - self.boot_disk_size_gb = boot_disk_size_gb - - if accelerator_type is None and accelerator_count > 0: - raise ValueError( - "[Ray on Vertex]: accelerator_type must be specified when" - + " accelerator_count is set to a value other than 0." - ) + machine_type: Optional[str] = "n1-standard-4" + node_count: Optional[int] = 1 + accelerator_type: Optional[str] = None + accelerator_count: Optional[int] = 0 + boot_disk_type: Optional[str] = "pd-ssd" + boot_disk_size_gb: Optional[int] = 100 @dataclasses.dataclass diff --git a/tests/unit/vertex_ray/test_cluster_init.py b/tests/unit/vertex_ray/test_cluster_init.py index f4cee9380f..8ea01a1668 100644 --- a/tests/unit/vertex_ray/test_cluster_init.py +++ b/tests/unit/vertex_ray/test_cluster_init.py @@ -46,6 +46,11 @@ ) _TEST_RESPONSE_RUNNING_2_POOLS_RESIZE.resource_pools[1].replica_count = 1 +_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER = copy.deepcopy( + tc.ClusterConstants._TEST_RESPONSE_RUNNING_1_POOL +) +_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER.resource_pools[0].replica_count = 1 + @pytest.fixture def create_persistent_resource_1_pool_mock(): @@ -163,6 +168,22 @@ def update_persistent_resource_1_pool_mock(): yield update_persistent_resource_1_pool_mock +@pytest.fixture +def update_persistent_resource_1_pool_0_worker_mock(): + with mock.patch.object( + PersistentResourceServiceClient, + "update_persistent_resource", + ) as update_persistent_resource_1_pool_0_worker_mock: + update_persistent_resource_lro_mock = mock.Mock(ga_operation.Operation) + update_persistent_resource_lro_mock.result.return_value = ( + _TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER + ) + update_persistent_resource_1_pool_0_worker_mock.return_value = ( + update_persistent_resource_lro_mock + ) + yield update_persistent_resource_1_pool_0_worker_mock + + @pytest.fixture def update_persistent_resource_2_pools_mock(): with mock.patch.object( @@ -472,6 +493,30 @@ def test_update_ray_cluster_1_pool(self, update_persistent_resource_1_pool_mock) assert returned_name == tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS + @pytest.mark.usefixtures("get_persistent_resource_1_pool_mock") + def test_update_ray_cluster_1_pool_to_0_worker( + self, update_persistent_resource_1_pool_mock + ): + + new_worker_node_types = [] + for worker_node_type in tc.ClusterConstants._TEST_CLUSTER.worker_node_types: + # resize worker node to node_count = 0 + worker_node_type.node_count = 0 + new_worker_node_types.append(worker_node_type) + + returned_name = vertex_ray.update_ray_cluster( + cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS, + worker_node_types=new_worker_node_types, + ) + + request = persistent_resource_service.UpdatePersistentResourceRequest( + persistent_resource=_TEST_RESPONSE_RUNNING_1_POOL_RESIZE_0_WORKER, + update_mask=_EXPECTED_MASK, + ) + update_persistent_resource_1_pool_mock.assert_called_once_with(request) + + assert returned_name == tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS + @pytest.mark.usefixtures("get_persistent_resource_2_pools_mock") def test_update_ray_cluster_2_pools(self, update_persistent_resource_2_pools_mock): @@ -493,3 +538,49 @@ def test_update_ray_cluster_2_pools(self, update_persistent_resource_2_pools_moc update_persistent_resource_2_pools_mock.assert_called_once_with(request) assert returned_name == tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS + + @pytest.mark.usefixtures("get_persistent_resource_2_pools_mock") + def test_update_ray_cluster_2_pools_0_worker_fail(self): + + new_worker_node_types = [] + for worker_node_type in tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types: + # resize worker node to node_count = 0 + worker_node_type.node_count = 0 + new_worker_node_types.append(worker_node_type) + + with pytest.raises(ValueError) as e: + vertex_ray.update_ray_cluster( + cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS, + worker_node_types=new_worker_node_types, + ) + + e.match(regexp=r"must update to >= 1 nodes.") + + @pytest.mark.usefixtures("get_persistent_resource_1_pool_mock") + def test_update_ray_cluster_duplicate_worker_node_types_error(self): + new_worker_node_types = ( + tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types + + tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types + ) + with pytest.raises(ValueError) as e: + vertex_ray.update_ray_cluster( + cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS, + worker_node_types=new_worker_node_types, + ) + + e.match(regexp=r"Worker_node_types have duplicate machine specs") + + @pytest.mark.usefixtures("get_persistent_resource_1_pool_mock") + def test_update_ray_cluster_mismatch_worker_node_types_count_error(self): + with pytest.raises(ValueError) as e: + new_worker_node_types = ( + tc.ClusterConstants._TEST_CLUSTER_2.worker_node_types + ) + vertex_ray.update_ray_cluster( + cluster_resource_name=tc.ClusterConstants._TEST_VERTEX_RAY_PR_ADDRESS, + worker_node_types=new_worker_node_types, + ) + + e.match( + regexp=r"does not match the number of the existing worker_node_type" + )