Skip to content

Commit

Permalink
feat: Add Batch Delete and Batch Cancel Pipeline Jobs and unit tests.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 615930540
  • Loading branch information
vertex-sdk-bot authored and Copybara-Service committed Mar 14, 2024
1 parent c2ba7d7 commit cb495e7
Show file tree
Hide file tree
Showing 2 changed files with 250 additions and 0 deletions.
108 changes: 108 additions & 0 deletions google/cloud/aiplatform/pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import tempfile
import time
from typing import Any, Callable, Dict, List, Optional, Union
from google.api_core import operation

from google.auth import credentials as auth_credentials
from google.cloud import aiplatform
from google.cloud import aiplatform_v1
from google.cloud.aiplatform import base
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import utils
Expand All @@ -44,6 +46,12 @@
pipeline_job as gca_pipeline_job,
pipeline_state as gca_pipeline_state,
)
from google.cloud.aiplatform_v1.types import (
pipeline_service as PipelineServiceV1,
)
from google.cloud.aiplatform_v1.services.pipeline_service import (
PipelineServiceClient as PipelineServiceClientGa,
)

_LOGGER = base.Logger(__name__)

Expand Down Expand Up @@ -551,6 +559,106 @@ def wait(self):
else:
super().wait()

def batch_delete(
self,
project: str,
location: str,
names: List[str],
) -> PipelineServiceV1.BatchDeletePipelineJobsResponse:
"""
Example Usage:
pipeline_job = aiplatform.PipelineJob(
display_name='job_display_name',
template_path='your_pipeline.yaml',
)
pipeline_job.batch_delete(
project='your_project_id',
location='your_location',
names=['pipeline_job_name',
'pipeline_job_name2']
)
Args:
project: Required. The project id of the PipelineJobs to batch delete.
location: Required. The location of the PipelineJobs to batch delete.
names: Required. The names of the PipelineJobs to delete. A
maximum of 32 PipelineJobs can be deleted in a batch.
Returns:
BatchDeletePipelineJobsResponse contains PipelineJobs deleted.
"""
user_project = project or initializer.global_config.project
user_location = location or initializer.global_config.location
parent = initializer.global_config.common_location_path(
project=user_project, location=user_location
)
pipeline_jobs_names = [
utils.full_resource_name(
resource_name=name,
resource_noun="pipelineJobs",
parse_resource_name_method=PipelineServiceClientGa.parse_pipeline_job_path,
format_resource_name_method=PipelineServiceClientGa.pipeline_job_path,
project=user_project,
location=user_location,
)
for name in names
]
request = aiplatform_v1.BatchDeletePipelineJobsRequest(
parent=parent, names=pipeline_jobs_names
)
operation = self.api_client.batch_delete_pipeline_jobs(request)
return operation.result()

def batch_cancel(
self,
project: str,
location: str,
names: List[str],
) -> operation.Operation:
"""
Example Usage:
pipeline_job = aiplatform.PipelineJob(
display_name='job_display_name',
template_path='your_pipeline.yaml',
)
pipeline_job.batch_cancel(
project='your_project_id',
location='your_location',
names=['pipeline_job_name',
'pipeline_job_name2']
)
Args:
project: Required. The project id of the PipelineJobs to batch delete.
location: Required. The location of the PipelineJobs to batch delete.
names: Required. The names of the PipelineJobs to cancel. A
maximum of 32 PipelineJobs can be cancelled in a batch.
Returns:
operation (Operation):
An object representing a long-running operation.
"""
user_project = project or initializer.global_config.project
user_location = location or initializer.global_config.location
parent = initializer.global_config.common_location_path(
project=user_project, location=user_location
)
pipeline_jobs_names = [
utils.full_resource_name(
resource_name=name,
resource_noun="pipelineJobs",
parse_resource_name_method=PipelineServiceClientGa.parse_pipeline_job_path,
format_resource_name_method=PipelineServiceClientGa.pipeline_job_path,
project=user_project,
location=user_location,
)
for name in names
]
request = aiplatform_v1.BatchCancelPipelineJobsRequest(
parent=parent, names=pipeline_jobs_names
)
return self.api_client.batch_cancel_pipeline_jobs(request)

@property
def pipeline_spec(self):
return self._gca_resource.pipeline_spec
Expand Down
142 changes: 142 additions & 0 deletions tests/unit/aiplatform/test_pipeline_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
from google.cloud.aiplatform_v1beta1.types import (
pipeline_service as PipelineServiceV1Beta1,
)
from google.cloud.aiplatform_v1.types import (
pipeline_service as PipelineServiceV1,
)
from google.cloud.aiplatform_v1beta1.services import (
pipeline_service as v1beta1_pipeline_service,
)
Expand Down Expand Up @@ -255,6 +258,46 @@ def mock_pipeline_service_create():
yield mock_create_pipeline_job


@pytest.fixture
def mock_pipeline_v1_service_batch_cancel():
with patch.object(
pipeline_service_client.PipelineServiceClient, "batch_cancel_pipeline_jobs"
) as batch_cancel_pipeline_jobs_mock:
batch_cancel_pipeline_jobs_mock.return_value = mock.Mock(ga_operation.Operation)
yield batch_cancel_pipeline_jobs_mock


@pytest.fixture
def mock_pipeline_v1_service_batch_delete():
with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "batch_delete_pipeline_jobs"
) as mock_batch_pipeline_jobs:
mock_batch_pipeline_jobs.return_value = (
make_v1_batch_delete_pipeline_jobs_response()
)
mock_lro = mock.Mock(ga_operation.Operation)
mock_lro.result.return_value = make_v1_batch_delete_pipeline_jobs_response()
mock_batch_pipeline_jobs.return_value = mock_lro
yield mock_batch_pipeline_jobs


def make_v1_batch_delete_pipeline_jobs_response():
response = PipelineServiceV1.BatchDeletePipelineJobsResponse()
response.pipeline_jobs.append(
make_pipeline_job_with_name(
_TEST_PIPELINE_JOB_NAME,
gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
)
)
response.pipeline_jobs.append(
make_pipeline_job_with_name(
_TEST_PIPELINE_JOB_NAME_2,
gca_pipeline_state.PipelineState.PIPELINE_STATE_FAILED,
)
)
return response


@pytest.fixture
def mock_pipeline_v1beta1_service_batch_delete():
with mock.patch.object(
Expand Down Expand Up @@ -342,6 +385,22 @@ def make_pipeline_job(state):
)


def make_pipeline_job_with_name(name, state):
return gca_pipeline_job.PipelineJob(
name=name,
state=state,
create_time=_TEST_PIPELINE_CREATE_TIME,
service_account=_TEST_SERVICE_ACCOUNT,
network=_TEST_NETWORK,
reserved_ip_ranges=_TEST_RESERVED_IP_RANGES,
job_detail=gca_pipeline_job.PipelineJobDetail(
pipeline_run_context=gca_context.Context(
name=name,
)
),
)


@pytest.fixture
def mock_pipeline_service_get():
with mock.patch.object(
Expand Down Expand Up @@ -2079,3 +2138,86 @@ def test_create_two_and_batch_delete_pipeline_jobs_returns_response(

assert mock_pipeline_v1beta1_service_batch_delete.call_count == 1
assert len(response.pipeline_jobs) == 2

@pytest.mark.usefixtures(
"mock_pipeline_service_get",
"mock_pipeline_v1_service_batch_delete",
)
@pytest.mark.parametrize(
"job_spec",
[
_TEST_PIPELINE_SPEC_JSON,
_TEST_PIPELINE_SPEC_YAML,
_TEST_PIPELINE_JOB,
_TEST_PIPELINE_SPEC_LEGACY_JSON,
_TEST_PIPELINE_SPEC_LEGACY_YAML,
_TEST_PIPELINE_JOB_LEGACY,
],
)
def test_create_two_and_batch_delete_v1_pipeline_jobs_returns_response(
self,
mock_load_yaml_and_json,
mock_pipeline_v1_service_batch_delete,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
)

response = job.batch_delete(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
names=[_TEST_PIPELINE_JOB_ID, _TEST_PIPELINE_JOB_ID_2],
)

assert mock_pipeline_v1_service_batch_delete.call_count == 1
assert len(response.pipeline_jobs) == 2

@pytest.mark.usefixtures(
"mock_pipeline_service_get",
"mock_pipeline_v1_service_batch_cancel",
)
@pytest.mark.parametrize(
"job_spec",
[
_TEST_PIPELINE_SPEC_JSON,
_TEST_PIPELINE_SPEC_YAML,
_TEST_PIPELINE_JOB,
_TEST_PIPELINE_SPEC_LEGACY_JSON,
_TEST_PIPELINE_SPEC_LEGACY_YAML,
_TEST_PIPELINE_JOB_LEGACY,
],
)
def test_create_two_and_batch_cancel_v1_pipeline_jobs_returns_response(
self,
mock_load_yaml_and_json,
mock_pipeline_v1_service_batch_cancel,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_GCS_BUCKET_NAME,
location=_TEST_LOCATION,
credentials=_TEST_CREDENTIALS,
)

job = pipeline_jobs.PipelineJob(
display_name=_TEST_PIPELINE_JOB_DISPLAY_NAME,
template_path=_TEST_TEMPLATE_PATH,
job_id=_TEST_PIPELINE_JOB_ID,
)

job.batch_cancel(
project=_TEST_PROJECT,
location=_TEST_LOCATION,
names=[_TEST_PIPELINE_JOB_ID, _TEST_PIPELINE_JOB_ID_2],
)

assert mock_pipeline_v1_service_batch_cancel.call_count == 1

0 comments on commit cb495e7

Please sign in to comment.