Skip to content

Commit

Permalink
feat: support autoscaling metrics when deploying models (#1197)
Browse files Browse the repository at this point in the history
* feat: support autoscaling metrics when deploying models

* feat: support model deploy to endpoint with autoscaling metrics

* fix autoscaling_target_accelerator_duty_cycle check

* fix docstring: specify that autoscaling_params are optional

* bug fix: add autoscaling_target_cpu_utilization to custom_resource_spec

* add tests

* add _TEST_METRIC_NAME_CPU_UTILIZATION and _TEST_METRIC_NAME_GPU_UTILIZATION

* remove not required arguments in tests

* fix tests: wait for LRO to complete even if not sync

* fix lint: run black

Co-authored-by: Sara Robinson <sararob@users.noreply.github.com>
  • Loading branch information
munagekar and sararob committed May 23, 2022
1 parent 15bc80b commit 095717c
Show file tree
Hide file tree
Showing 2 changed files with 234 additions and 9 deletions.
104 changes: 95 additions & 9 deletions google/cloud/aiplatform/models.py
Expand Up @@ -643,6 +643,8 @@ def deploy(
metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync=True,
deploy_request_timeout: Optional[float] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
) -> None:
"""Deploys a Model to the Endpoint.
Expand Down Expand Up @@ -716,6 +718,13 @@ def deploy(
be immediately returned and synced when the Future has completed.
deploy_request_timeout (float):
Optional. The timeout for the deploy request in seconds.
autoscaling_target_cpu_utilization (int):
Target CPU Utilization to use for Autoscaling Replicas.
A default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Target Accelerator Duty Cycle.
Must also set accelerator_type and accelerator_count if specified.
A default value of 60 will be used if not specified.
"""
self._sync_gca_resource_if_skipped()

Expand Down Expand Up @@ -746,6 +755,8 @@ def deploy(
metadata=metadata,
sync=sync,
deploy_request_timeout=deploy_request_timeout,
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
)

@base.optional_sync()
Expand All @@ -766,6 +777,8 @@ def _deploy(
metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync=True,
deploy_request_timeout: Optional[float] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
) -> None:
"""Deploys a Model to the Endpoint.
Expand Down Expand Up @@ -839,6 +852,13 @@ def _deploy(
be immediately returned and synced when the Future has completed.
deploy_request_timeout (float):
Optional. The timeout for the deploy request in seconds.
autoscaling_target_cpu_utilization (int):
Target CPU Utilization to use for Autoscaling Replicas.
A default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Target Accelerator Duty Cycle.
Must also set accelerator_type and accelerator_count if specified.
A default value of 60 will be used if not specified.
Raises:
ValueError: If there is not current traffic split and traffic percentage
is not 0 or 100.
Expand All @@ -865,6 +885,8 @@ def _deploy(
explanation_parameters=explanation_parameters,
metadata=metadata,
deploy_request_timeout=deploy_request_timeout,
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
)

_LOGGER.log_action_completed_against_resource("model", "deployed", self)
Expand All @@ -891,6 +913,8 @@ def _deploy_call(
explanation_parameters: Optional[explain.ExplanationParameters] = None,
metadata: Optional[Sequence[Tuple[str, str]]] = (),
deploy_request_timeout: Optional[float] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
):
"""Helper method to deploy model to endpoint.
Expand Down Expand Up @@ -964,6 +988,13 @@ def _deploy_call(
be immediately returned and synced when the Future has completed.
deploy_request_timeout (float):
Optional. The timeout for the deploy request in seconds.
autoscaling_target_cpu_utilization (int):
Optional. Target CPU Utilization to use for Autoscaling Replicas.
A default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Optional. Target Accelerator Duty Cycle.
Must also set accelerator_type and accelerator_count if specified.
A default value of 60 will be used if not specified.
Raises:
ValueError: If there is not current traffic split and traffic percentage
is not 0 or 100.
Expand All @@ -979,6 +1010,14 @@ def _deploy_call(
"Both `accelerator_type` and `accelerator_count` should be specified or None."
)

if autoscaling_target_accelerator_duty_cycle is not None and (
not accelerator_type or not accelerator_count
):
raise ValueError(
"Both `accelerator_type` and `accelerator_count` should be set "
"when specifying autoscaling_target_accelerator_duty_cycle`"
)

deployed_model = gca_endpoint_compat.DeployedModel(
model=model.resource_name,
display_name=deployed_model_display_name,
Expand All @@ -994,7 +1033,11 @@ def _deploy_call(
in model.supported_deployment_resources_types
)
provided_custom_machine_spec = (
machine_type or accelerator_type or accelerator_count
machine_type
or accelerator_type
or accelerator_count
or autoscaling_target_accelerator_duty_cycle
or autoscaling_target_cpu_utilization
)

# If the model supports both automatic and dedicated deployment resources,
Expand All @@ -1006,30 +1049,51 @@ def _deploy_call(
if provided_custom_machine_spec and not use_dedicated_resources:
_LOGGER.info(
"Model does not support dedicated deployment resources. "
"The machine_type, accelerator_type and accelerator_count parameters are ignored."
"The machine_type, accelerator_type and accelerator_count,"
"autoscaling_target_accelerator_duty_cycle,"
"autoscaling_target_cpu_utilization parameters are ignored."
)

if use_dedicated_resources and not machine_type:
machine_type = _DEFAULT_MACHINE_TYPE
_LOGGER.info(f"Using default machine_type: {machine_type}")

if use_dedicated_resources:

dedicated_resources = gca_machine_resources_compat.DedicatedResources(
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
)

machine_spec = gca_machine_resources_compat.MachineSpec(
machine_type=machine_type
)

if autoscaling_target_cpu_utilization:
autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec(
metric_name="aiplatform.googleapis.com/prediction/online/cpu/utilization",
target=autoscaling_target_cpu_utilization,
)
dedicated_resources.autoscaling_metric_specs.extend(
[autoscaling_metric_spec]
)

if accelerator_type and accelerator_count:
utils.validate_accelerator_type(accelerator_type)
machine_spec.accelerator_type = accelerator_type
machine_spec.accelerator_count = accelerator_count

deployed_model.dedicated_resources = (
gca_machine_resources_compat.DedicatedResources(
machine_spec=machine_spec,
min_replica_count=min_replica_count,
max_replica_count=max_replica_count,
)
)
if autoscaling_target_accelerator_duty_cycle:
autoscaling_metric_spec = gca_machine_resources_compat.AutoscalingMetricSpec(
metric_name="aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle",
target=autoscaling_target_accelerator_duty_cycle,
)
dedicated_resources.autoscaling_metric_specs.extend(
[autoscaling_metric_spec]
)

dedicated_resources.machine_spec = machine_spec
deployed_model.dedicated_resources = dedicated_resources

elif supports_automatic_resources:
deployed_model.automatic_resources = (
Expand Down Expand Up @@ -1994,6 +2058,8 @@ def deploy(
encryption_spec_key_name: Optional[str] = None,
sync=True,
deploy_request_timeout: Optional[float] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
) -> Endpoint:
"""Deploys model to endpoint. Endpoint will be created if unspecified.
Expand Down Expand Up @@ -2078,6 +2144,13 @@ def deploy(
be immediately returned and synced when the Future has completed.
deploy_request_timeout (float):
Optional. The timeout for the deploy request in seconds.
autoscaling_target_cpu_utilization (int):
Optional. Target CPU Utilization to use for Autoscaling Replicas.
A default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Optional. Target Accelerator Duty Cycle.
Must also set accelerator_type and accelerator_count if specified.
A default value of 60 will be used if not specified.
Returns:
endpoint ("Endpoint"):
Endpoint with the deployed model.
Expand Down Expand Up @@ -2112,6 +2185,8 @@ def deploy(
or initializer.global_config.encryption_spec_key_name,
sync=sync,
deploy_request_timeout=deploy_request_timeout,
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
)

@base.optional_sync(return_input_arg="endpoint", bind_future_to_self=False)
Expand All @@ -2133,6 +2208,8 @@ def _deploy(
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
deploy_request_timeout: Optional[float] = None,
autoscaling_target_cpu_utilization: Optional[int] = None,
autoscaling_target_accelerator_duty_cycle: Optional[int] = None,
) -> Endpoint:
"""Deploys model to endpoint. Endpoint will be created if unspecified.
Expand Down Expand Up @@ -2217,6 +2294,13 @@ def _deploy(
be immediately returned and synced when the Future has completed.
deploy_request_timeout (float):
Optional. The timeout for the deploy request in seconds.
autoscaling_target_cpu_utilization (int):
Optional. Target CPU Utilization to use for Autoscaling Replicas.
A default value of 60 will be used if not specified.
autoscaling_target_accelerator_duty_cycle (int):
Optional. Target Accelerator Duty Cycle.
Must also set accelerator_type and accelerator_count if specified.
A default value of 60 will be used if not specified.
Returns:
endpoint ("Endpoint"):
Endpoint with the deployed model.
Expand Down Expand Up @@ -2252,6 +2336,8 @@ def _deploy(
explanation_parameters=explanation_parameters,
metadata=metadata,
deploy_request_timeout=deploy_request_timeout,
autoscaling_target_cpu_utilization=autoscaling_target_cpu_utilization,
autoscaling_target_accelerator_duty_cycle=autoscaling_target_accelerator_duty_cycle,
)

_LOGGER.log_action_completed_against_resource("model", "deployed", endpoint)
Expand Down
139 changes: 139 additions & 0 deletions tests/unit/aiplatform/test_endpoints.py
Expand Up @@ -103,6 +103,13 @@
_TEST_ACCELERATOR_TYPE = "NVIDIA_TESLA_P100"
_TEST_ACCELERATOR_COUNT = 2

_TEST_METRIC_NAME_CPU_UTILIZATION = (
"aiplatform.googleapis.com/prediction/online/cpu/utilization"
)
_TEST_METRIC_NAME_GPU_UTILIZATION = (
"aiplatform.googleapis.com/prediction/online/accelerator/duty_cycle"
)

_TEST_EXPLANATIONS = [gca_prediction_service.explanation.Explanation(attributions=[])]

_TEST_ATTRIBUTIONS = [
Expand Down Expand Up @@ -1054,6 +1061,138 @@ def test_deploy_with_dedicated_resources(self, deploy_model_mock, sync):
timeout=None,
)

@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
@pytest.mark.parametrize("sync", [True, False])
def test_deploy_with_autoscaling_target_cpu_utilization(
self, deploy_model_mock, sync
):
test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME)
test_model = models.Model(_TEST_ID)
test_model._gca_resource.supported_deployment_resources_types.append(
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
)
test_endpoint.deploy(
model=test_model,
machine_type=_TEST_MACHINE_TYPE,
service_account=_TEST_SERVICE_ACCOUNT,
sync=sync,
deploy_request_timeout=None,
autoscaling_target_cpu_utilization=70,
)

if not sync:
test_endpoint.wait()

expected_machine_spec = gca_machine_resources.MachineSpec(
machine_type=_TEST_MACHINE_TYPE,
)

expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec(
metric_name=_TEST_METRIC_NAME_CPU_UTILIZATION,
target=70,
)

expected_dedicated_resources = gca_machine_resources.DedicatedResources(
machine_spec=expected_machine_spec,
min_replica_count=1,
max_replica_count=1,
)
expected_dedicated_resources.autoscaling_metric_specs.extend(
[expected_autoscaling_metric_spec]
)

expected_deployed_model = gca_endpoint.DeployedModel(
dedicated_resources=expected_dedicated_resources,
model=test_model.resource_name,
display_name=None,
service_account=_TEST_SERVICE_ACCOUNT,
)
deploy_model_mock.assert_called_once_with(
endpoint=test_endpoint.resource_name,
deployed_model=expected_deployed_model,
traffic_split={"0": 100},
metadata=(),
timeout=None,
)

@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
@pytest.mark.parametrize("sync", [True, False])
def test_deploy_with_autoscaling_target_accelerator_duty_cycle(
self, deploy_model_mock, sync
):
test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME)
test_model = models.Model(_TEST_ID)
test_model._gca_resource.supported_deployment_resources_types.append(
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
)
test_endpoint.deploy(
model=test_model,
machine_type=_TEST_MACHINE_TYPE,
accelerator_type=_TEST_ACCELERATOR_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
service_account=_TEST_SERVICE_ACCOUNT,
sync=sync,
deploy_request_timeout=None,
autoscaling_target_accelerator_duty_cycle=70,
)

if not sync:
test_endpoint.wait()

expected_machine_spec = gca_machine_resources.MachineSpec(
machine_type=_TEST_MACHINE_TYPE,
accelerator_type=_TEST_ACCELERATOR_TYPE,
accelerator_count=_TEST_ACCELERATOR_COUNT,
)

expected_autoscaling_metric_spec = gca_machine_resources.AutoscalingMetricSpec(
metric_name=_TEST_METRIC_NAME_GPU_UTILIZATION,
target=70,
)

expected_dedicated_resources = gca_machine_resources.DedicatedResources(
machine_spec=expected_machine_spec,
min_replica_count=1,
max_replica_count=1,
)
expected_dedicated_resources.autoscaling_metric_specs.extend(
[expected_autoscaling_metric_spec]
)

expected_deployed_model = gca_endpoint.DeployedModel(
dedicated_resources=expected_dedicated_resources,
model=test_model.resource_name,
display_name=None,
service_account=_TEST_SERVICE_ACCOUNT,
)
deploy_model_mock.assert_called_once_with(
endpoint=test_endpoint.resource_name,
deployed_model=expected_deployed_model,
traffic_split={"0": 100},
metadata=(),
timeout=None,
)

@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
@pytest.mark.parametrize("sync", [True, False])
def test_deploy_with_autoscaling_target_accelerator_duty_cycle_and_no_accelerator_type_or_count_raises(
self, sync
):
with pytest.raises(ValueError):
test_endpoint = models.Endpoint(_TEST_ENDPOINT_NAME)
test_model = models.Model(_TEST_ID)
test_model._gca_resource.supported_deployment_resources_types.append(
aiplatform.gapic.Model.DeploymentResourcesType.DEDICATED_RESOURCES
)
test_endpoint.deploy(
model=test_model,
sync=sync,
autoscaling_target_accelerator_duty_cycle=70,
)

if not sync:
test_endpoint.wait()

@pytest.mark.usefixtures("get_endpoint_mock", "get_model_mock")
@pytest.mark.parametrize("sync", [True, False])
def test_deploy_with_explanations(self, deploy_model_with_explanations_mock, sync):
Expand Down

0 comments on commit 095717c

Please sign in to comment.