diff --git a/google/cloud/aiplatform/preview/models.py b/google/cloud/aiplatform/preview/models.py index bde0c2669c..d001df5059 100644 --- a/google/cloud/aiplatform/preview/models.py +++ b/google/cloud/aiplatform/preview/models.py @@ -28,6 +28,7 @@ from google.cloud.aiplatform import models from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import _explanation_utils +from google.cloud.aiplatform.utils import prediction_utils from google.cloud.aiplatform.compat.services import ( deployment_resource_pool_service_client_v1beta1, endpoint_service_client, @@ -140,6 +141,7 @@ def create( create_request_timeout: Optional[float] = None, required_replica_count: Optional[int] = 0, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> "DeploymentResourcePool": """Creates a new DeploymentResourcePool. @@ -209,6 +211,13 @@ def create( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. + max_runtime_duration (int): + Optional. Immutable. If set, use FlexStart VM powered by + Dynamic Workload Scheduler to schedule the deployment workload. + The FlexStart VM will be available for a maximum of 7 days or up + to the max_runtime_duration specified, whichever is shorter. + After this period, the model will be automatically undeployed. + The value is in seconds. Returns: DeploymentResourcePool @@ -237,6 +246,7 @@ def create( create_request_timeout=create_request_timeout, required_replica_count=required_replica_count, multihost_gpu_node_count=multihost_gpu_node_count, + max_runtime_duration=max_runtime_duration, ) @classmethod @@ -260,6 +270,7 @@ def _create( create_request_timeout: Optional[float] = None, required_replica_count: Optional[int] = 0, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> "DeploymentResourcePool": """Creates a new DeploymentResourcePool. @@ -332,7 +343,13 @@ def _create( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. - + max_runtime_duration (int): + Optional. Immutable. If set, use FlexStart VM powered by Dynamic + Workload Scheduler to schedule the deployment workload. The DWS + resource will be available for a maximum of 7 days or up to the + max_runtime_duration specified, whichever is shorter. After this + period, the model will be automatically undeployed. The value is + in seconds. Returns: DeploymentResourcePool """ @@ -347,6 +364,10 @@ def _create( required_replica_count=required_replica_count, ) + prediction_utils.add_flex_start_to_dedicated_resources( + dedicated_resources, max_runtime_duration + ) + machine_spec = gca_machine_resources_compat.MachineSpec( machine_type=machine_type, multihost_gpu_node_count=multihost_gpu_node_count, @@ -706,6 +727,7 @@ def deploy( required_replica_count: Optional[int] = 0, rollout_options: Optional[RolloutOptions] = None, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -809,6 +831,12 @@ def deploy( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. + max_runtime_duration (int): + Optional. Immutable. If set, use DWS resource + to schedule the deployment workload. The DWS resource will be available + for a maximum of 7 days or up to the max_runtime_duration specified, + whichever is shorter. After this period, the model will be + automatically undeployed. The value is in seconds. """ self._sync_gca_resource_if_skipped() @@ -853,6 +881,7 @@ def deploy( required_replica_count=required_replica_count, rollout_options=rollout_options, multihost_gpu_node_count=multihost_gpu_node_count, + max_runtime_duration=max_runtime_duration, ) @base.optional_sync() @@ -882,6 +911,7 @@ def _deploy( required_replica_count: Optional[int] = 0, rollout_options: Optional[RolloutOptions] = None, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> None: """Deploys a Model to the Endpoint. @@ -979,7 +1009,12 @@ def _deploy( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. - + max_runtime_duration (int): + Optional. Immutable. If set, use DWS resource + to schedule the deployment workload. The DWS resource will be available + for a maximum of 7 days or up to the max_runtime_duration specified, + whichever is shorter. After this period, the model will be + automatically undeployed. The value is in seconds. """ _LOGGER.log_action_start_against_resource( f"Deploying Model {model.resource_name} to", "", self @@ -1013,6 +1048,7 @@ def _deploy( required_replica_count=required_replica_count, rollout_options=rollout_options, multihost_gpu_node_count=multihost_gpu_node_count, + max_runtime_duration=max_runtime_duration, ) _LOGGER.log_action_completed_against_resource("model", "deployed", self) @@ -1049,6 +1085,7 @@ def _deploy_call( required_replica_count: Optional[int] = 0, rollout_options: Optional[RolloutOptions] = None, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> None: """Helper method to deploy model to endpoint. @@ -1153,6 +1190,12 @@ def _deploy_call( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. + max_runtime_duration (int): + Optional. Immutable. If set, use DWS resource + to schedule the deployment workload. The DWS resource will be available + for a maximum of 7 days or up to the max_runtime_duration specified, + whichever is shorter. After this period, the model will be + automatically undeployed. The value is in seconds. Raises: ValueError: If only `accelerator_type` or `accelerator_count` is @@ -1235,6 +1278,10 @@ def _deploy_call( required_replica_count=required_replica_count, ) + prediction_utils.add_flex_start_to_dedicated_resources( + dedicated_resources, max_runtime_duration + ) + machine_spec = gca_machine_resources_compat.MachineSpec( machine_type=machine_type, multihost_gpu_node_count=multihost_gpu_node_count, @@ -1599,6 +1646,7 @@ def deploy( required_replica_count: Optional[int] = 0, rollout_options: Optional[RolloutOptions] = None, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> Union[Endpoint, models.PrivateEndpoint]: """Deploys model to endpoint. @@ -1723,7 +1771,12 @@ def deploy( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. - + max_runtime_duration (int): + Optional. Immutable. If set, use DWS resource + to schedule the deployment workload. The DWS resource will be available + for a maximum of 7 days or up to the max_runtime_duration specified, + whichever is shorter. After this period, the model will be + automatically undeployed. The value is in seconds. Returns: endpoint (Union[Endpoint, models.PrivateEndpoint]): Endpoint with the deployed model. @@ -1785,6 +1838,7 @@ def deploy( required_replica_count=required_replica_count, rollout_options=rollout_options, multihost_gpu_node_count=multihost_gpu_node_count, + max_runtime_duration=max_runtime_duration, ) def _should_enable_dedicated_endpoint(self, fast_tryout_enabled: bool) -> bool: @@ -1823,6 +1877,7 @@ def _deploy( required_replica_count: Optional[int] = 0, rollout_options: Optional[RolloutOptions] = None, multihost_gpu_node_count: Optional[int] = None, + max_runtime_duration: Optional[int] = None, ) -> Union[Endpoint, models.PrivateEndpoint]: """Deploys model to endpoint. @@ -1938,6 +1993,12 @@ def _deploy( multihost_gpu_node_count (int): Optional. The number of nodes per replica for multihost GPU deployments. Required for multihost GPU deployments. + max_runtime_duration (int): + Optional. Immutable. If set, use DWS resource + to schedule the deployment workload. The DWS resource will be available + for a maximum of 7 days or up to the max_runtime_duration specified, + whichever is shorter. After this period, the model will be + automatically undeployed. The value is in seconds. Returns: endpoint (Union[Endpoint, models.PrivateEndpoint]): Endpoint with the deployed model. @@ -2004,6 +2065,7 @@ def _deploy( system_labels=system_labels, required_replica_count=required_replica_count, multihost_gpu_node_count=multihost_gpu_node_count, + max_runtime_duration=max_runtime_duration, **preview_kwargs, ) diff --git a/google/cloud/aiplatform/utils/prediction_utils.py b/google/cloud/aiplatform/utils/prediction_utils.py index c6a15d712c..6e71e9dcb8 100644 --- a/google/cloud/aiplatform/utils/prediction_utils.py +++ b/google/cloud/aiplatform/utils/prediction_utils.py @@ -26,6 +26,10 @@ from google.cloud import storage from google.cloud.aiplatform.constants import prediction from google.cloud.aiplatform.utils import path_utils +from google.cloud.aiplatform.compat.types import ( + machine_resources_v1beta1 as gca_machine_resources_compat, +) +from google.protobuf import duration_pb2 _logger = logging.getLogger(__name__) @@ -151,3 +155,22 @@ def download_model_artifacts(artifact_uri: str) -> None: else: # Copy files to the current working directory. shutil.copytree(artifact_uri, ".", dirs_exist_ok=True) + + +def add_flex_start_to_dedicated_resources( + dedicated_resources: gca_machine_resources_compat.DedicatedResources, + max_runtime_duration: Optional[int] = None, +) -> None: + """Adds FlexStart configuration to DedicatedResources if max_runtime_duration is provided. + + Args: + dedicated_resources (gca_machine_resources_compat.DedicatedResources): + Required. The DedicatedResources object to modify. + max_runtime_duration (int): + Optional. The maximum runtime duration in seconds. If provided and + greater than 0, a FlexStart configuration will be added. + """ + if max_runtime_duration is not None and max_runtime_duration > 0: + dedicated_resources.flex_start = gca_machine_resources_compat.FlexStart( + max_runtime_duration=duration_pb2.Duration(seconds=max_runtime_duration) + ) diff --git a/tests/unit/aiplatform/test_endpoints.py b/tests/unit/aiplatform/test_endpoints.py index 610892b141..69d9d11693 100644 --- a/tests/unit/aiplatform/test_endpoints.py +++ b/tests/unit/aiplatform/test_endpoints.py @@ -279,6 +279,8 @@ inference_timeout=duration_pb2.Duration(seconds=_TEST_INFERENCE_TIMEOUT) ) +_TEST_MAX_RUNTIME_DURATION = 600 + """ ---------------------------------------------------------------------------- Endpoint Fixtures @@ -4645,3 +4647,60 @@ def test_construct_sdk_resource_from_gapic_uses_resource_project(self): ) assert endpoint2.project != _TEST_PROJECT assert endpoint2.location != _TEST_LOCATION + + @pytest.mark.usefixtures( + "get_endpoint_mock", "get_model_mock", "create_endpoint_mock" + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_deploy_no_endpoint_with_max_runtime_duration( + self, preview_deploy_model_mock, sync + ): + test_model = preview_models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + test_endpoint = test_model.deploy( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + service_account=_TEST_SERVICE_ACCOUNT, + sync=sync, + deploy_request_timeout=None, + max_runtime_duration=_TEST_MAX_RUNTIME_DURATION, + ) + + if not sync: + test_endpoint.wait() + + expected_machine_spec = gca_machine_resources_v1beta1.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + ) + expected_dedicated_resources = gca_machine_resources_v1beta1.DedicatedResources( + machine_spec=expected_machine_spec, + min_replica_count=1, + max_replica_count=1, + spot=False, + required_replica_count=0, + flex_start=gca_machine_resources_v1beta1.FlexStart( + max_runtime_duration=duration_pb2.Duration( + seconds=_TEST_MAX_RUNTIME_DURATION + ), + ), + ) + expected_deployed_model = gca_endpoint_v1beta1.DeployedModel( + dedicated_resources=expected_dedicated_resources, + model=test_model.resource_name, + display_name=None, + service_account=_TEST_SERVICE_ACCOUNT, + enable_container_logging=True, + faster_deployment_config=gca_endpoint_v1beta1.FasterDeploymentConfig(), + ) + preview_deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=expected_deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) diff --git a/tests/unit/aiplatform/test_models.py b/tests/unit/aiplatform/test_models.py index ee2233d0d8..cf0047af03 100644 --- a/tests/unit/aiplatform/test_models.py +++ b/tests/unit/aiplatform/test_models.py @@ -2556,6 +2556,63 @@ def test_preview_deploy_no_endpoint_dedicated_resources_autoscaling_request_coun timeout=None, ) + @pytest.mark.usefixtures( + "get_endpoint_mock", + "get_model_mock", + "create_endpoint_mock", + "preview_deploy_model_mock", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_preview_deploy_no_endpoint_with_flex_start( + self, preview_deploy_model_mock, sync + ): + test_model = preview_models.Model(_TEST_ID) + test_model._gca_resource.supported_deployment_resources_types.append( + aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES + ) + + test_endpoint = test_model.deploy( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + sync=sync, + deploy_request_timeout=None, + max_runtime_duration=600, + ) + + if not sync: + test_endpoint.wait() + + expected_machine_spec = gca_machine_resources_v1beta1.MachineSpec( + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + ) + expected_dedicated_resources = gca_machine_resources_v1beta1.DedicatedResources( + machine_spec=expected_machine_spec, + min_replica_count=1, + max_replica_count=1, + spot=False, + flex_start=gca_machine_resources_v1beta1.FlexStart( + max_runtime_duration=duration_pb2.Duration(seconds=600), + ), + ) + expected_deployed_model = gca_endpoint_v1beta1.DeployedModel( + dedicated_resources=expected_dedicated_resources, + model=test_model.resource_name, + display_name=None, + enable_container_logging=True, + faster_deployment_config=gca_endpoint_v1beta1.FasterDeploymentConfig(), + ) + + preview_deploy_model_mock.assert_called_once_with( + endpoint=test_endpoint.resource_name, + deployed_model=expected_deployed_model, + traffic_split={"0": 100}, + metadata=(), + timeout=None, + ) + @pytest.mark.usefixtures( "get_endpoint_mock", "get_model_mock", "create_endpoint_mock" )