Skip to content

Commit

Permalink
feat: add support for failure_policy in PipelineJob (#1452)
Browse files Browse the repository at this point in the history
* implement failure_policy

* add tests

* raise valueerror if failure_policy is invalid
  • Loading branch information
chongyouquan committed Jun 23, 2022
1 parent d778dee commit d0968ea
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 3 deletions.
2 changes: 2 additions & 0 deletions google/cloud/aiplatform/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@
types.model_evaluation_slice = types.model_evaluation_slice_v1beta1
types.model_service = types.model_service_v1beta1
types.operation = types.operation_v1beta1
types.pipeline_failure_policy = types.pipeline_failure_policy_v1beta1
types.pipeline_job = types.pipeline_job_v1beta1
types.pipeline_service = types.pipeline_service_v1beta1
types.pipeline_state = types.pipeline_state_v1beta1
Expand Down Expand Up @@ -180,6 +181,7 @@
types.model_evaluation_slice = types.model_evaluation_slice_v1
types.model_service = types.model_service_v1
types.operation = types.operation_v1
types.pipeline_failure_policy = types.pipeline_failure_policy_v1
types.pipeline_job = types.pipeline_job_v1
types.pipeline_service = types.pipeline_service_v1
types.pipeline_state = types.pipeline_state_v1
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/aiplatform/compat/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
model_evaluation_slice as model_evaluation_slice_v1beta1,
model_service as model_service_v1beta1,
operation as operation_v1beta1,
pipeline_failure_policy as pipeline_failure_policy_v1beta1,
pipeline_job as pipeline_job_v1beta1,
pipeline_service as pipeline_service_v1beta1,
pipeline_state as pipeline_state_v1beta1,
Expand Down Expand Up @@ -126,6 +127,7 @@
model_evaluation_slice as model_evaluation_slice_v1,
model_service as model_service_v1,
operation as operation_v1,
pipeline_failure_policy as pipeline_failure_policy_v1,
pipeline_job as pipeline_job_v1,
pipeline_service as pipeline_service_v1,
pipeline_state as pipeline_state_v1,
Expand Down Expand Up @@ -191,6 +193,7 @@
model_evaluation_slice_v1,
model_service_v1,
operation_v1,
pipeline_failure_policy_v1beta1,
pipeline_job_v1,
pipeline_service_v1,
pipeline_state_v1,
Expand Down Expand Up @@ -253,6 +256,7 @@
model_evaluation_slice_v1beta1,
model_service_v1beta1,
operation_v1beta1,
pipeline_failure_policy_v1beta1,
pipeline_job_v1beta1,
pipeline_service_v1beta1,
pipeline_state_v1beta1,
Expand Down
11 changes: 11 additions & 0 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def __init__(
credentials: Optional[auth_credentials.Credentials] = None,
project: Optional[str] = None,
location: Optional[str] = None,
failure_policy: Optional[str] = None,
):
"""Retrieves a PipelineJob resource and instantiates its
representation.
Expand Down Expand Up @@ -173,6 +174,15 @@ def __init__(
location (str):
Optional. Location to create PipelineJob. If not set,
location set in aiplatform.init will be used.
failure_policy (str):
Optional. The failure policy - "slow" or "fast".
Currently, the default of a pipeline is that the pipeline will continue to
run until no more tasks can be executed, also known as
PIPELINE_FAILURE_POLICY_FAIL_SLOW (corresponds to "slow").
However, if a pipeline is set to
PIPELINE_FAILURE_POLICY_FAIL_FAST (corresponds to "fast"),
it will stop scheduling any new tasks when a task has failed. Any
scheduled tasks will continue to completion.
Raises:
ValueError: If job_id or labels have incorrect format.
Expand Down Expand Up @@ -219,6 +229,7 @@ def __init__(
)
builder.update_pipeline_root(pipeline_root)
builder.update_runtime_parameters(parameter_values)
builder.update_failure_policy(failure_policy)
runtime_config_dict = builder.build()

runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb
Expand Down
53 changes: 51 additions & 2 deletions google/cloud/aiplatform/utils/pipeline_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import copy
import json
from typing import Any, Dict, Mapping, Optional, Union
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
import packaging.version


Expand All @@ -32,6 +33,7 @@ def __init__(
schema_version: str,
parameter_types: Mapping[str, str],
parameter_values: Optional[Dict[str, Any]] = None,
failure_policy: Optional[pipeline_failure_policy.PipelineFailurePolicy] = None,
):
"""Creates a PipelineRuntimeConfigBuilder object.
Expand All @@ -44,11 +46,20 @@ def __init__(
Required. The mapping from pipeline parameter name to its type.
parameter_values (Dict[str, Any]):
Optional. The mapping from runtime parameter name to its value.
failure_policy (pipeline_failure_policy.PipelineFailurePolicy):
Optional. Represents the failure policy of a pipeline. Currently, the
default of a pipeline is that the pipeline will continue to
run until no more tasks can be executed, also known as
PIPELINE_FAILURE_POLICY_FAIL_SLOW. However, if a pipeline is
set to PIPELINE_FAILURE_POLICY_FAIL_FAST, it will stop
scheduling any new tasks when a task has failed. Any
scheduled tasks will continue to completion.
"""
self._pipeline_root = pipeline_root
self._schema_version = schema_version
self._parameter_types = parameter_types
self._parameter_values = copy.deepcopy(parameter_values or {})
self._failure_policy = failure_policy

@classmethod
def from_job_spec_json(
Expand Down Expand Up @@ -80,7 +91,14 @@ def from_job_spec_json(

pipeline_root = runtime_config_spec.get("gcsOutputDirectory")
parameter_values = _parse_runtime_parameters(runtime_config_spec)
return cls(pipeline_root, schema_version, parameter_types, parameter_values)
failure_policy = runtime_config_spec.get("failurePolicy")
return cls(
pipeline_root,
schema_version,
parameter_types,
parameter_values,
failure_policy,
)

def update_pipeline_root(self, pipeline_root: Optional[str]) -> None:
"""Updates pipeline_root value.
Expand Down Expand Up @@ -111,6 +129,24 @@ def update_runtime_parameters(
parameters[k] = json.dumps(v)
self._parameter_values.update(parameters)

def update_failure_policy(self, failure_policy: Optional[str] = None) -> None:
"""Merges runtime failure policy.
Args:
failure_policy (str):
Optional. The failure policy - "slow" or "fast".
Raises:
ValueError: if failure_policy is not valid.
"""
if failure_policy:
if failure_policy in _FAILURE_POLICY_TO_ENUM_VALUE:
self._failure_policy = _FAILURE_POLICY_TO_ENUM_VALUE[failure_policy]
else:
raise ValueError(
f'failure_policy should be either "slow" or "fast", but got: "{failure_policy}".'
)

def build(self) -> Dict[str, Any]:
"""Build a RuntimeConfig proto.
Expand All @@ -128,7 +164,8 @@ def build(self) -> Dict[str, Any]:
parameter_values_key = "parameterValues"
else:
parameter_values_key = "parameters"
return {

runtime_config = {
"gcsOutputDirectory": self._pipeline_root,
parameter_values_key: {
k: self._get_vertex_value(k, v)
Expand All @@ -137,6 +174,11 @@ def build(self) -> Dict[str, Any]:
},
}

if self._failure_policy:
runtime_config["failurePolicy"] = self._failure_policy

return runtime_config

def _get_vertex_value(
self, name: str, value: Union[int, float, str, bool, list, dict]
) -> Union[int, float, str, bool, list, dict]:
Expand Down Expand Up @@ -205,3 +247,10 @@ def _parse_runtime_parameters(
else:
raise TypeError("Got unknown type of value: {}".format(value))
return result


_FAILURE_POLICY_TO_ENUM_VALUE = {
"slow": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW,
"fast": pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST,
None: pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_UNSPECIFIED,
}
94 changes: 94 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import pipeline_jobs
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
from google.cloud import storage
from google.protobuf import json_format

Expand Down Expand Up @@ -621,6 +622,99 @@ def test_run_call_pipeline_service_create_with_timeout_not_explicitly_set(
timeout=None,
)

@pytest.mark.parametrize(
"job_spec",
[_TEST_PIPELINE_SPEC_JSON, _TEST_PIPELINE_SPEC_YAML, _TEST_PIPELINE_JOB],
)
@pytest.mark.parametrize(
"failure_policy",
[
(
"slow",
pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW,
),
(
"fast",
pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST,
),
],
)
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_service_create_with_failure_policy(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
job_spec,
mock_load_yaml_and_json,
failure_policy,
sync,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
parameter_values=_TEST_PIPELINE_PARAMETER_VALUES,
enable_caching=True,
failure_policy=failure_policy[0],
)

job.run(
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
sync=sync,
create_request_timeout=None,
)

if not sync:
job.wait()

expected_runtime_config_dict = {
"gcsOutputDirectory": _TEST_GCS_BUCKET_NAME,
"parameterValues": _TEST_PIPELINE_PARAMETER_VALUES,
"failurePolicy": failure_policy[1],
}
runtime_config = gca_pipeline_job.PipelineJob.RuntimeConfig()._pb
json_format.ParseDict(expected_runtime_config_dict, runtime_config)

job_spec = yaml.safe_load(job_spec)
pipeline_spec = job_spec.get("pipelineSpec") or job_spec

# Construct expected request
expected_gapic_pipeline_job = gca_pipeline_job.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
pipeline_spec={
"components": {},
"pipelineInfo": pipeline_spec["pipelineInfo"],
"root": pipeline_spec["root"],
"schemaVersion": "2.1.0",
},
runtime_config=runtime_config,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
)

mock_pipeline_service_create.assert_called_once_with(
parent=_TEST_PARENT,
pipeline_job=expected_gapic_pipeline_job,
pipeline_job_id=_TEST_PIPELINE_JOB_ID,
timeout=None,
)

mock_pipeline_service_get.assert_called_with(
name=_TEST_PIPELINE_JOB_NAME, retry=base._DEFAULT_RETRY
)

assert job._gca_resource == make_pipeline_job(
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED
)

@pytest.mark.parametrize(
"job_spec",
[
Expand Down
31 changes: 30 additions & 1 deletion tests/unit/aiplatform/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from google.api_core import client_options, gapic_v1
from google.cloud import aiplatform
from google.cloud.aiplatform import compat, utils
from google.cloud.aiplatform.compat.types import pipeline_failure_policy
from google.cloud.aiplatform.utils import pipeline_utils, tensorboard_utils, yaml_utils
from google.cloud.aiplatform_v1.services.model_service import (
client as model_service_client_v1,
Expand Down Expand Up @@ -454,7 +455,22 @@ def test_pipeline_utils_runtime_config_builder_with_no_op_updates(self):
expected_runtime_config = self.SAMPLE_JOB_SPEC["runtimeConfig"]
assert expected_runtime_config == actual_runtime_config

def test_pipeline_utils_runtime_config_builder_with_merge_updates(self):
@pytest.mark.parametrize(
"failure_policy",
[
(
"slow",
pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_SLOW,
),
(
"fast",
pipeline_failure_policy.PipelineFailurePolicy.PIPELINE_FAILURE_POLICY_FAIL_FAST,
),
],
)
def test_pipeline_utils_runtime_config_builder_with_merge_updates(
self, failure_policy
):
my_builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json(
self.SAMPLE_JOB_SPEC
)
Expand All @@ -468,6 +484,7 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates(self):
"bool_param": True,
}
)
my_builder.update_failure_policy(failure_policy[0])
actual_runtime_config = my_builder.build()

expected_runtime_config = {
Expand All @@ -481,9 +498,21 @@ def test_pipeline_utils_runtime_config_builder_with_merge_updates(self):
"list_param": {"stringValue": "[1, 2, 3]"},
"bool_param": {"stringValue": "true"},
},
"failurePolicy": failure_policy[1],
}
assert expected_runtime_config == actual_runtime_config

def test_pipeline_utils_runtime_config_builder_invalid_failure_policy(self):
my_builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json(
self.SAMPLE_JOB_SPEC
)
with pytest.raises(ValueError) as e:
my_builder.update_failure_policy("slo")

assert e.match(
regexp=r'failure_policy should be either "slow" or "fast", but got: "slo".'
)

def test_pipeline_utils_runtime_config_builder_parameter_not_found(self):
my_builder = pipeline_utils.PipelineRuntimeConfigBuilder.from_job_spec_json(
self.SAMPLE_JOB_SPEC
Expand Down

0 comments on commit d0968ea

Please sign in to comment.