Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 65 additions & 3 deletions google/cloud/aiplatform/preview/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from google.cloud.aiplatform import models
from google.cloud.aiplatform import utils
from google.cloud.aiplatform.utils import _explanation_utils
from google.cloud.aiplatform.utils import prediction_utils
from google.cloud.aiplatform.compat.services import (
deployment_resource_pool_service_client_v1beta1,
endpoint_service_client,
Expand Down Expand Up @@ -140,6 +141,7 @@ def create(
create_request_timeout: Optional[float] = None,
required_replica_count: Optional[int] = 0,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> "DeploymentResourcePool":
"""Creates a new DeploymentResourcePool.

Expand Down Expand Up @@ -209,6 +211,13 @@ def create(
multihost_gpu_node_count (int):
Optional. The number of nodes per replica for multihost GPU
deployments. Required for multihost GPU deployments.
max_runtime_duration (int):
Optional. Immutable. If set, use FlexStart VM powered by
Dynamic Workload Scheduler to schedule the deployment workload.
The FlexStart VM will be available for a maximum of 7 days or up
to the max_runtime_duration specified, whichever is shorter.
After this period, the model will be automatically undeployed.
The value is in seconds.

Returns:
DeploymentResourcePool
Expand Down Expand Up @@ -237,6 +246,7 @@ def create(
create_request_timeout=create_request_timeout,
required_replica_count=required_replica_count,
multihost_gpu_node_count=multihost_gpu_node_count,
max_runtime_duration=max_runtime_duration,
)

@classmethod
Expand All @@ -260,6 +270,7 @@ def _create(
create_request_timeout: Optional[float] = None,
required_replica_count: Optional[int] = 0,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> "DeploymentResourcePool":
"""Creates a new DeploymentResourcePool.

Expand Down Expand Up @@ -332,7 +343,13 @@ def _create(
multihost_gpu_node_count (int):
Optional. The number of nodes per replica for multihost GPU
deployments. Required for multihost GPU deployments.

max_runtime_duration (int):
Optional. Immutable. If set, use FlexStart VM powered by Dynamic
Workload Scheduler to schedule the deployment workload. The DWS
resource will be available for a maximum of 7 days or up to the
max_runtime_duration specified, whichever is shorter. After this
period, the model will be automatically undeployed. The value is
in seconds.
Returns:
DeploymentResourcePool
"""
Expand All @@ -347,6 +364,10 @@ def _create(
required_replica_count=required_replica_count,
)

prediction_utils.add_flex_start_to_dedicated_resources(
dedicated_resources, max_runtime_duration
)

machine_spec = gca_machine_resources_compat.MachineSpec(
machine_type=machine_type,
multihost_gpu_node_count=multihost_gpu_node_count,
Expand Down Expand Up @@ -706,6 +727,7 @@ def deploy(
required_replica_count: Optional[int] = 0,
rollout_options: Optional[RolloutOptions] = None,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> None:
"""Deploys a Model to the Endpoint.

Expand Down Expand Up @@ -809,6 +831,12 @@ def deploy(
multihost_gpu_node_count (int): Optional. The number of nodes per
replica for multihost GPU deployments. Required for multihost GPU
deployments.
max_runtime_duration (int):
Optional. Immutable. If set, use DWS resource
to schedule the deployment workload. The DWS resource will be available
for a maximum of 7 days or up to the max_runtime_duration specified,
whichever is shorter. After this period, the model will be
automatically undeployed. The value is in seconds.
"""
self._sync_gca_resource_if_skipped()

Expand Down Expand Up @@ -853,6 +881,7 @@ def deploy(
required_replica_count=required_replica_count,
rollout_options=rollout_options,
multihost_gpu_node_count=multihost_gpu_node_count,
max_runtime_duration=max_runtime_duration,
)

@base.optional_sync()
Expand Down Expand Up @@ -882,6 +911,7 @@ def _deploy(
required_replica_count: Optional[int] = 0,
rollout_options: Optional[RolloutOptions] = None,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> None:
"""Deploys a Model to the Endpoint.

Expand Down Expand Up @@ -979,7 +1009,12 @@ def _deploy(
multihost_gpu_node_count (int): Optional. The number of nodes per
replica for multihost GPU deployments. Required for multihost
GPU deployments.

max_runtime_duration (int):
Optional. Immutable. If set, use DWS resource
to schedule the deployment workload. The DWS resource will be available
for a maximum of 7 days or up to the max_runtime_duration specified,
whichever is shorter. After this period, the model will be
automatically undeployed. The value is in seconds.
"""
_LOGGER.log_action_start_against_resource(
f"Deploying Model {model.resource_name} to", "", self
Expand Down Expand Up @@ -1013,6 +1048,7 @@ def _deploy(
required_replica_count=required_replica_count,
rollout_options=rollout_options,
multihost_gpu_node_count=multihost_gpu_node_count,
max_runtime_duration=max_runtime_duration,
)

_LOGGER.log_action_completed_against_resource("model", "deployed", self)
Expand Down Expand Up @@ -1049,6 +1085,7 @@ def _deploy_call(
required_replica_count: Optional[int] = 0,
rollout_options: Optional[RolloutOptions] = None,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> None:
"""Helper method to deploy model to endpoint.

Expand Down Expand Up @@ -1153,6 +1190,12 @@ def _deploy_call(
multihost_gpu_node_count (int):
Optional. The number of nodes per replica for multihost GPU
deployments. Required for multihost GPU deployments.
max_runtime_duration (int):
Optional. Immutable. If set, use DWS resource
to schedule the deployment workload. The DWS resource will be available
for a maximum of 7 days or up to the max_runtime_duration specified,
whichever is shorter. After this period, the model will be
automatically undeployed. The value is in seconds.

Raises:
ValueError: If only `accelerator_type` or `accelerator_count` is
Expand Down Expand Up @@ -1235,6 +1278,10 @@ def _deploy_call(
required_replica_count=required_replica_count,
)

prediction_utils.add_flex_start_to_dedicated_resources(
dedicated_resources, max_runtime_duration
)

machine_spec = gca_machine_resources_compat.MachineSpec(
machine_type=machine_type,
multihost_gpu_node_count=multihost_gpu_node_count,
Expand Down Expand Up @@ -1599,6 +1646,7 @@ def deploy(
required_replica_count: Optional[int] = 0,
rollout_options: Optional[RolloutOptions] = None,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> Union[Endpoint, models.PrivateEndpoint]:
"""Deploys model to endpoint.

Expand Down Expand Up @@ -1723,7 +1771,12 @@ def deploy(
multihost_gpu_node_count (int):
Optional. The number of nodes per replica for multihost GPU
deployments. Required for multihost GPU deployments.

max_runtime_duration (int):
Optional. Immutable. If set, use DWS resource
to schedule the deployment workload. The DWS resource will be available
for a maximum of 7 days or up to the max_runtime_duration specified,
whichever is shorter. After this period, the model will be
automatically undeployed. The value is in seconds.
Returns:
endpoint (Union[Endpoint, models.PrivateEndpoint]):
Endpoint with the deployed model.
Expand Down Expand Up @@ -1785,6 +1838,7 @@ def deploy(
required_replica_count=required_replica_count,
rollout_options=rollout_options,
multihost_gpu_node_count=multihost_gpu_node_count,
max_runtime_duration=max_runtime_duration,
)

def _should_enable_dedicated_endpoint(self, fast_tryout_enabled: bool) -> bool:
Expand Down Expand Up @@ -1823,6 +1877,7 @@ def _deploy(
required_replica_count: Optional[int] = 0,
rollout_options: Optional[RolloutOptions] = None,
multihost_gpu_node_count: Optional[int] = None,
max_runtime_duration: Optional[int] = None,
) -> Union[Endpoint, models.PrivateEndpoint]:
"""Deploys model to endpoint.

Expand Down Expand Up @@ -1938,6 +1993,12 @@ def _deploy(
multihost_gpu_node_count (int):
Optional. The number of nodes per replica for multihost GPU
deployments. Required for multihost GPU deployments.
max_runtime_duration (int):
Optional. Immutable. If set, use DWS resource
to schedule the deployment workload. The DWS resource will be available
for a maximum of 7 days or up to the max_runtime_duration specified,
whichever is shorter. After this period, the model will be
automatically undeployed. The value is in seconds.
Returns:
endpoint (Union[Endpoint, models.PrivateEndpoint]):
Endpoint with the deployed model.
Expand Down Expand Up @@ -2004,6 +2065,7 @@ def _deploy(
system_labels=system_labels,
required_replica_count=required_replica_count,
multihost_gpu_node_count=multihost_gpu_node_count,
max_runtime_duration=max_runtime_duration,
**preview_kwargs,
)

Expand Down
23 changes: 23 additions & 0 deletions google/cloud/aiplatform/utils/prediction_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@
from google.cloud import storage
from google.cloud.aiplatform.constants import prediction
from google.cloud.aiplatform.utils import path_utils
from google.cloud.aiplatform.compat.types import (
machine_resources_v1beta1 as gca_machine_resources_compat,
)
from google.protobuf import duration_pb2

_logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,3 +155,22 @@ def download_model_artifacts(artifact_uri: str) -> None:
else:
# Copy files to the current working directory.
shutil.copytree(artifact_uri, ".", dirs_exist_ok=True)


def add_flex_start_to_dedicated_resources(
dedicated_resources: gca_machine_resources_compat.DedicatedResources,
max_runtime_duration: Optional[int] = None,
) -> None:
"""Adds FlexStart configuration to DedicatedResources if max_runtime_duration is provided.

Args:
dedicated_resources (gca_machine_resources_compat.DedicatedResources):
Required. The DedicatedResources object to modify.
max_runtime_duration (int):
Optional. The maximum runtime duration in seconds. If provided and
greater than 0, a FlexStart configuration will be added.
"""
if max_runtime_duration is not None and max_runtime_duration > 0:
dedicated_resources.flex_start = gca_machine_resources_compat.FlexStart(
max_runtime_duration=duration_pb2.Duration(seconds=max_runtime_duration)
)
59 changes: 59 additions & 0 deletions tests/unit/aiplatform/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,8 @@
inference_timeout=duration_pb2.Duration(seconds=_TEST_INFERENCE_TIMEOUT)
)

_TEST_MAX_RUNTIME_DURATION = 600

"""
----------------------------------------------------------------------------
Endpoint Fixtures
Expand Down Expand Up @@ -4645,3 +4647,60 @@ def test_construct_sdk_resource_from_gapic_uses_resource_project(self):
)
assert endpoint2.project != _TEST_PROJECT
assert endpoint2.location != _TEST_LOCATION

@pytest.mark.usefixtures(
"get_endpoint_mock", "get_model_mock", "create_endpoint_mock"
)
@pytest.mark.parametrize("sync", [True, False])
def test_deploy_no_endpoint_with_max_runtime_duration(
self, preview_deploy_model_mock, sync
):
test_model = preview_models.Model(_TEST_ID)
test_model._gca_resource.supported_deployment_resources_types.append(
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
)
test_endpoint = test_model.deploy(
machine_type=_TEST_MACHINE_TYPE,
accelerator_type=_TEST_ACCELERATOR_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
sync=sync,
deploy_request_timeout=None,
max_runtime_duration=_TEST_MAX_RUNTIME_DURATION,
)

if not sync:
test_endpoint.wait()

expected_machine_spec = gca_machine_resources_v1beta1.MachineSpec(
machine_type=_TEST_MACHINE_TYPE,
accelerator_type=_TEST_ACCELERATOR_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
)
expected_dedicated_resources = gca_machine_resources_v1beta1.DedicatedResources(
machine_spec=expected_machine_spec,
min_replica_count=1,
max_replica_count=1,
spot=False,
required_replica_count=0,
flex_start=gca_machine_resources_v1beta1.FlexStart(
max_runtime_duration=duration_pb2.Duration(
seconds=_TEST_MAX_RUNTIME_DURATION
),
),
)
expected_deployed_model = gca_endpoint_v1beta1.DeployedModel(
dedicated_resources=expected_dedicated_resources,
model=test_model.resource_name,
display_name=None,
service_account=_TEST_SERVICE_ACCOUNT,
enable_container_logging=True,
faster_deployment_config=gca_endpoint_v1beta1.FasterDeploymentConfig(),
)
preview_deploy_model_mock.assert_called_once_with(
endpoint=test_endpoint.resource_name,
deployed_model=expected_deployed_model,
traffic_split={"0": 100},
metadata=(),
timeout=None,
)
57 changes: 57 additions & 0 deletions tests/unit/aiplatform/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2556,6 +2556,63 @@ def test_preview_deploy_no_endpoint_dedicated_resources_autoscaling_request_coun
timeout=None,
)

@pytest.mark.usefixtures(
"get_endpoint_mock",
"get_model_mock",
"create_endpoint_mock",
"preview_deploy_model_mock",
)
@pytest.mark.parametrize("sync", [True, False])
def test_preview_deploy_no_endpoint_with_flex_start(
self, preview_deploy_model_mock, sync
):
test_model = preview_models.Model(_TEST_ID)
test_model._gca_resource.supported_deployment_resources_types.append(
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
)

test_endpoint = test_model.deploy(
machine_type=_TEST_MACHINE_TYPE,
accelerator_type=_TEST_ACCELERATOR_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
sync=sync,
deploy_request_timeout=None,
max_runtime_duration=600,
)

if not sync:
test_endpoint.wait()

expected_machine_spec = gca_machine_resources_v1beta1.MachineSpec(
machine_type=_TEST_MACHINE_TYPE,
accelerator_type=_TEST_ACCELERATOR_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
)
expected_dedicated_resources = gca_machine_resources_v1beta1.DedicatedResources(
machine_spec=expected_machine_spec,
min_replica_count=1,
max_replica_count=1,
spot=False,
flex_start=gca_machine_resources_v1beta1.FlexStart(
max_runtime_duration=duration_pb2.Duration(seconds=600),
),
)
expected_deployed_model = gca_endpoint_v1beta1.DeployedModel(
dedicated_resources=expected_dedicated_resources,
model=test_model.resource_name,
display_name=None,
enable_container_logging=True,
faster_deployment_config=gca_endpoint_v1beta1.FasterDeploymentConfig(),
)

preview_deploy_model_mock.assert_called_once_with(
endpoint=test_endpoint.resource_name,
deployed_model=expected_deployed_model,
traffic_split={"0": 100},
metadata=(),
timeout=None,
)

@pytest.mark.usefixtures(
"get_endpoint_mock", "get_model_mock", "create_endpoint_mock"
)
Expand Down