Skip to content

Commit

Permalink
feat: Enable vertexai preview persistent cluster executor
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 569371823
  • Loading branch information
yinghsienwu authored and copybara-github committed Sep 29, 2023
1 parent e1cedba commit 0ae969d
Show file tree
Hide file tree
Showing 11 changed files with 444 additions and 102 deletions.
20 changes: 20 additions & 0 deletions google/cloud/aiplatform/preview/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.compat.types import (
custom_job_v1beta1 as gca_custom_job_compat,
job_state as gca_job_state,
job_state_v1beta1 as gca_job_state_v1beta1,
)
from google.cloud.aiplatform.compat.types import (
execution_v1beta1 as gcs_execution_compat,
Expand All @@ -42,6 +44,24 @@

_LOGGER = base.Logger(__name__)
_DEFAULT_RETRY = retry.Retry()
# TODO(b/242108750): remove temporary logic once model monitoring for batch prediction is GA
_JOB_COMPLETE_STATES = (
gca_job_state.JobState.JOB_STATE_SUCCEEDED,
gca_job_state.JobState.JOB_STATE_FAILED,
gca_job_state.JobState.JOB_STATE_CANCELLED,
gca_job_state.JobState.JOB_STATE_PAUSED,
gca_job_state_v1beta1.JobState.JOB_STATE_SUCCEEDED,
gca_job_state_v1beta1.JobState.JOB_STATE_FAILED,
gca_job_state_v1beta1.JobState.JOB_STATE_CANCELLED,
gca_job_state_v1beta1.JobState.JOB_STATE_PAUSED,
)

_JOB_ERROR_STATES = (
gca_job_state.JobState.JOB_STATE_FAILED,
gca_job_state.JobState.JOB_STATE_CANCELLED,
gca_job_state_v1beta1.JobState.JOB_STATE_FAILED,
gca_job_state_v1beta1.JobState.JOB_STATE_CANCELLED,
)


class CustomJob(jobs.CustomJob):
Expand Down
94 changes: 94 additions & 0 deletions google/cloud/aiplatform/preview/resource_pool_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import NamedTuple, Optional, Dict, Union

from google.cloud.aiplatform import utils
from google.cloud.aiplatform.compat.types import (
accelerator_type_v1beta1 as gca_accelerator_type_compat,
)


class _ResourcePool(NamedTuple):
"""Specification container for Worker Pool specs used for distributed training.
Usage:
resource_pool = _ResourcePool(
replica_count=1,
machine_type='n1-standard-4',
accelerator_count=1,
accelerator_type='NVIDIA_TESLA_K80',
boot_disk_type='pd-ssd',
boot_disk_size_gb=100,
)
Note that container and python package specs are not stored with this spec.
"""

replica_count: int = 1
machine_type: str = "n1-standard-4"
accelerator_count: int = 0
accelerator_type: str = "ACCELERATOR_TYPE_UNSPECIFIED"
boot_disk_type: str = "pd-ssd"
boot_disk_size_gb: int = 100

def _get_accelerator_type(self) -> Optional[str]:
"""Validates accelerator_type and returns the name of the accelerator.
Returns:
None if no accelerator or valid accelerator name.
Raise:
ValueError if accelerator type is invalid.
"""

# Raises ValueError if invalid accelerator_type
utils.validate_accelerator_type(self.accelerator_type)

accelerator_enum = getattr(
gca_accelerator_type_compat.AcceleratorType, self.accelerator_type
)

if (
accelerator_enum
!= gca_accelerator_type_compat.AcceleratorType.ACCELERATOR_TYPE_UNSPECIFIED
):
return self.accelerator_type

@property
def spec_dict(self) -> Dict[str, Union[int, str, Dict[str, Union[int, str]]]]:
"""Return specification as a Dict."""
spec = {
"machine_spec": {"machine_type": self.machine_type},
"replica_count": self.replica_count,
"disk_spec": {
"boot_disk_type": self.boot_disk_type,
"boot_disk_size_gb": self.boot_disk_size_gb,
},
}

accelerator_type = self._get_accelerator_type()
if accelerator_type and self.accelerator_count:
spec["machine_spec"]["accelerator_type"] = accelerator_type
spec["machine_spec"]["accelerator_count"] = self.accelerator_count

return spec

@property
def is_empty(self) -> bool:
"""Returns True is replica_count > 0 False otherwise."""
return self.replica_count <= 0
21 changes: 18 additions & 3 deletions tests/unit/vertexai/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,13 @@
from google.cloud.aiplatform_v1beta1.services.persistent_resource_service import (
PersistentResourceServiceClient,
)
import constants as test_constants
from pyfakefs import fake_filesystem_unittest
import pytest
import tensorflow.saved_model as tf_saved_model
from google.cloud.aiplatform_v1beta1.types.persistent_resource import (
PersistentResource,
ResourcePool,
)


_TEST_PROJECT = "test-project"
Expand Down Expand Up @@ -83,6 +86,18 @@
labels={"trained_by_vertex_ai": "true"},
)

_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource()
resource_pool = ResourcePool()
resource_pool.machine_spec.machine_type = "n1-standard-4"
resource_pool.replica_count = 1
resource_pool.disk_spec.boot_disk_type = "pd-ssd"
resource_pool.disk_spec.boot_disk_size_gb = 100
_TEST_REQUEST_RUNNING_DEFAULT.resource_pools = [resource_pool]


_TEST_PERSISTENT_RESOURCE_RUNNING = PersistentResource()
_TEST_PERSISTENT_RESOURCE_RUNNING.state = "RUNNING"


@pytest.fixture(scope="module")
def google_auth_mock():
Expand Down Expand Up @@ -264,7 +279,7 @@ def persistent_resource_running_mock():
"get_persistent_resource",
) as persistent_resource_running_mock:
persistent_resource_running_mock.return_value = (
test_constants._TEST_PERSISTENT_RESOURCE_RUNNING
_TEST_PERSISTENT_RESOURCE_RUNNING
)
yield persistent_resource_running_mock

Expand All @@ -287,7 +302,7 @@ def create_persistent_resource_default_mock():
) as create_persistent_resource_default_mock:
create_persistent_resource_lro_mock = mock.Mock(ga_operation.Operation)
create_persistent_resource_lro_mock.result.return_value = (
test_constants._TEST_REQUEST_RUNNING_DEFAULT
_TEST_REQUEST_RUNNING_DEFAULT
)
create_persistent_resource_default_mock.return_value = (
create_persistent_resource_lro_mock
Expand Down
45 changes: 0 additions & 45 deletions tests/unit/vertexai/constants.py

This file was deleted.

101 changes: 67 additions & 34 deletions tests/unit/vertexai/test_persistent_resource_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@
from google.api_core import operation as ga_operation
from google.cloud import aiplatform
import vertexai
from vertexai.preview.developer import remote_specs
from google.cloud.aiplatform_v1beta1.services.persistent_resource_service import (
PersistentResourceServiceClient,
)
from google.cloud.aiplatform_v1beta1.types import persistent_resource_service
from google.cloud.aiplatform_v1beta1.types.machine_resources import DiskSpec
from google.cloud.aiplatform_v1beta1.types.machine_resources import (
MachineSpec,
)
from google.cloud.aiplatform_v1beta1.types.persistent_resource import (
PersistentResource,
)
Expand All @@ -48,55 +53,64 @@
_TEST_PERSISTENT_RESOURCE_ERROR = PersistentResource()
_TEST_PERSISTENT_RESOURCE_ERROR.state = "ERROR"

_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource()
resource_pool = ResourcePool()
resource_pool.machine_spec.machine_type = "n1-standard-4"
resource_pool.replica_count = 1
resource_pool.disk_spec.boot_disk_type = "pd-ssd"
resource_pool.disk_spec.boot_disk_size_gb = 100
_TEST_REQUEST_RUNNING_DEFAULT.resource_pools = [resource_pool]

resource_pool_0 = ResourcePool(
machine_spec=MachineSpec(machine_type="n1-standard-4"),
disk_spec=DiskSpec(
boot_disk_type="pd-ssd",
boot_disk_size_gb=100,
),
replica_count=1,
)
resource_pool_1 = ResourcePool(
machine_spec=MachineSpec(
machine_type="n1-standard-8",
accelerator_type="NVIDIA_TESLA_T4",
accelerator_count=1,
),
disk_spec=DiskSpec(
boot_disk_type="pd-ssd",
boot_disk_size_gb=100,
),
replica_count=2,
)
_TEST_REQUEST_RUNNING_DEFAULT = PersistentResource(
resource_pools=[resource_pool_0],
)
_TEST_REQUEST_RUNNING_CUSTOM = PersistentResource(
resource_pools=[resource_pool_0, resource_pool_1],
)

_TEST_PERSISTENT_RESOURCE_RUNNING = PersistentResource()
_TEST_PERSISTENT_RESOURCE_RUNNING.state = "RUNNING"


@pytest.fixture
def persistent_resource_running_mock():
with mock.patch.object(
PersistentResourceServiceClient,
"get_persistent_resource",
) as persistent_resource_running_mock:
persistent_resource_running_mock.return_value = (
_TEST_PERSISTENT_RESOURCE_RUNNING
)
yield persistent_resource_running_mock


@pytest.fixture
def persistent_resource_exception_mock():
with mock.patch.object(
PersistentResourceServiceClient,
"get_persistent_resource",
) as persistent_resource_exception_mock:
persistent_resource_exception_mock.side_effect = Exception
yield persistent_resource_exception_mock
# user-configured remote_specs.ResourcePool
remote_specs_resource_pool_0 = remote_specs.ResourcePool(replica_count=1)
remote_specs_resource_pool_1 = remote_specs.ResourcePool(
machine_type="n1-standard-8",
replica_count=2,
accelerator_type="NVIDIA_TESLA_T4",
accelerator_count=1,
)
_TEST_CUSTOM_RESOURCE_POOLS = [
remote_specs_resource_pool_0,
remote_specs_resource_pool_1,
]


@pytest.fixture
def create_persistent_resource_default_mock():
def create_persistent_resource_custom_mock():
with mock.patch.object(
PersistentResourceServiceClient,
"create_persistent_resource",
) as create_persistent_resource_default_mock:
) as create_persistent_resource_custom_mock:
create_persistent_resource_lro_mock = mock.Mock(ga_operation.Operation)
create_persistent_resource_lro_mock.result.return_value = (
_TEST_REQUEST_RUNNING_DEFAULT
_TEST_REQUEST_RUNNING_CUSTOM
)
create_persistent_resource_default_mock.return_value = (
create_persistent_resource_custom_mock.return_value = (
create_persistent_resource_lro_mock
)
yield create_persistent_resource_default_mock
yield create_persistent_resource_custom_mock


@pytest.fixture
Expand Down Expand Up @@ -180,6 +194,25 @@ def test_create_persistent_resource_default_success(
request,
)

@pytest.mark.usefixtures("persistent_resource_running_mock")
def test_create_persistent_resource_custom_success(
self, create_persistent_resource_custom_mock
):
persistent_resource_util.create_persistent_resource(
cluster_resource_name=_TEST_CLUSTER_RESOURCE_NAME,
resource_pools=_TEST_CUSTOM_RESOURCE_POOLS,
)

request = persistent_resource_service.CreatePersistentResourceRequest(
parent=_TEST_PARENT,
persistent_resource=_TEST_REQUEST_RUNNING_CUSTOM,
persistent_resource_id=_TEST_CLUSTER_NAME,
)

create_persistent_resource_custom_mock.assert_called_with(
request,
)

@pytest.mark.usefixtures("create_persistent_resource_exception_mock")
def test_create_ray_cluster_state_error(self):
with pytest.raises(ValueError) as e:
Expand Down
Loading

0 comments on commit 0ae969d

Please sign in to comment.