Skip to content

Commit

Permalink
Add the deferrable mode to RunPipelineJobOperator (#37969)
Browse files Browse the repository at this point in the history
  • Loading branch information
e-galan committed Mar 26, 2024
1 parent 4d60681 commit 067fedb
Show file tree
Hide file tree
Showing 7 changed files with 687 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@

if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.api_core.retry import AsyncRetry, Retry
from google.cloud.aiplatform_v1.services.job_service.pagers import ListHyperparameterTuningJobsPager


Expand Down
242 changes: 230 additions & 12 deletions airflow/providers/google/cloud/hooks/vertex_ai/pipeline_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,27 @@

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any, Sequence

from google.api_core.client_options import ClientOptions
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.aiplatform import PipelineJob
from google.cloud.aiplatform_v1 import PipelineServiceClient
from google.cloud.aiplatform_v1 import (
PipelineServiceAsyncClient,
PipelineServiceClient,
PipelineState,
types,
)

from airflow.exceptions import AirflowException
from airflow.providers.google.common.consts import CLIENT_INFO
from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from airflow.providers.google.common.hooks.base_google import GoogleBaseAsyncHook, GoogleBaseHook

if TYPE_CHECKING:
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry import AsyncRetry, Retry
from google.auth.credentials import Credentials
from google.cloud.aiplatform.metadata import experiment_resources
from google.cloud.aiplatform_v1.services.pipeline_service.pagers import ListPipelineJobsPager

Expand Down Expand Up @@ -102,11 +109,6 @@ def get_pipeline_job_object(
failure_policy=failure_policy,
)

@staticmethod
def extract_pipeline_job_id(obj: dict) -> str:
"""Return unique id of the pipeline_job."""
return obj["name"].rpartition("/")[-1]

def wait_for_operation(self, operation: Operation, timeout: float | None = None):
"""Wait for long-lasting operation to complete."""
try:
Expand All @@ -130,7 +132,7 @@ def create_pipeline_job(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> PipelineJob:
) -> types.PipelineJob:
"""
Create a PipelineJob. A PipelineJob will run immediately when created.
Expand Down Expand Up @@ -183,7 +185,7 @@ def run_pipeline_job(
# END: run param
) -> PipelineJob:
"""
Run PipelineJob and monitor the job until completion.
Create and run a PipelineJob until its completion.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
Expand Down Expand Up @@ -244,15 +246,110 @@ def run_pipeline_job(
location=region,
failure_policy=failure_policy,
)
self._pipeline_job.submit(
service_account=service_account,
network=network,
create_request_timeout=create_request_timeout,
experiment=experiment,
)
self._pipeline_job.wait()

return self._pipeline_job

@GoogleBaseHook.fallback_to_default_project_id
def submit_pipeline_job(
self,
project_id: str,
region: str,
display_name: str,
template_path: str,
job_id: str | None = None,
pipeline_root: str | None = None,
parameter_values: dict[str, Any] | None = None,
input_artifacts: dict[str, str] | None = None,
enable_caching: bool | None = None,
encryption_spec_key_name: str | None = None,
labels: dict[str, str] | None = None,
failure_policy: str | None = None,
# START: run param
service_account: str | None = None,
network: str | None = None,
create_request_timeout: float | None = None,
experiment: str | experiment_resources.Experiment | None = None,
# END: run param
) -> PipelineJob:
"""
Create and start a PipelineJob run.
For more info about the client method please see:
https://cloud.google.com/python/docs/reference/aiplatform/latest/google.cloud.aiplatform.PipelineJob#google_cloud_aiplatform_PipelineJob_submit
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param display_name: Required. The user-defined name of this Pipeline.
:param template_path: Required. The path of PipelineJob or PipelineSpec JSON or YAML file. It can be
a local path, a Google Cloud Storage URI (e.g. "gs://project.name"), an Artifact Registry URI
(e.g. "https://us-central1-kfp.pkg.dev/proj/repo/pack/latest"), or an HTTPS URI.
:param job_id: Optional. The unique ID of the job run. If not specified, pipeline name + timestamp
will be used.
:param pipeline_root: Optional. The root of the pipeline outputs. If not set, the staging bucket set
in aiplatform.init will be used. If that's not set a pipeline-specific artifacts bucket will be
used.
:param parameter_values: Optional. The mapping from runtime parameter names to its values that
control the pipeline run.
:param input_artifacts: Optional. The mapping from the runtime parameter name for this artifact to
its resource id. For example: "vertex_model":"456". Note: full resource name
("projects/123/locations/us-central1/metadataStores/default/artifacts/456") cannot be used.
:param enable_caching: Optional. Whether to turn on caching for the run.
If this is not set, defaults to the compile time settings, which are True for all tasks by
default, while users may specify different caching options for individual tasks.
If this is set, the setting applies to all tasks in the pipeline. Overrides the compile time
settings.
:param encryption_spec_key_name: Optional. The Cloud KMS resource identifier of the customer managed
encryption key used to protect the job. Has the form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``.
The key needs to be in the same region as where the compute resource is created. If this is set,
then all resources created by the PipelineJob will be encrypted with the provided encryption key.
Overrides encryption_spec_key_name set in aiplatform.init.
:param labels: Optional. The user defined metadata to organize PipelineJob.
:param failure_policy: Optional. The failure policy - "slow" or "fast". Currently, the default of a
pipeline is that the pipeline will continue to run until no more tasks can be executed, also
known as PIPELINE_FAILURE_POLICY_FAIL_SLOW (corresponds to "slow"). However, if a pipeline is set
to PIPELINE_FAILURE_POLICY_FAIL_FAST (corresponds to "fast"), it will stop scheduling any new
tasks when a task has failed. Any scheduled tasks will continue to completion.
:param service_account: Optional. Specifies the service account for workload run-as account. Users
submitting jobs must have act-as permission on this run-as account.
:param network: Optional. The full name of the Compute Engine network to which the job should be
peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network. If left unspecified, the
network set in aiplatform.init will be used. Otherwise, the job is not peered with any network.
:param create_request_timeout: Optional. The timeout for the create request in seconds.
:param experiment: Optional. The Vertex AI experiment name or instance to associate to this PipelineJob.
Metrics produced by the PipelineJob as system.Metric Artifacts will be associated as metrics
to the current Experiment Run. Pipeline parameters will be associated as parameters to
the current Experiment Run.
"""
self._pipeline_job = self.get_pipeline_job_object(
display_name=display_name,
template_path=template_path,
job_id=job_id,
pipeline_root=pipeline_root,
parameter_values=parameter_values,
input_artifacts=input_artifacts,
enable_caching=enable_caching,
encryption_spec_key_name=encryption_spec_key_name,
labels=labels,
project=project_id,
location=region,
failure_policy=failure_policy,
)
self._pipeline_job.submit(
service_account=service_account,
network=network,
create_request_timeout=create_request_timeout,
experiment=experiment,
)

self._pipeline_job.wait()
return self._pipeline_job

@GoogleBaseHook.fallback_to_default_project_id
Expand All @@ -264,7 +361,7 @@ def get_pipeline_job(
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> PipelineJob:
) -> types.PipelineJob:
"""
Get a PipelineJob.
Expand Down Expand Up @@ -408,3 +505,124 @@ def delete_pipeline_job(
metadata=metadata,
)
return result

@staticmethod
def extract_pipeline_job_id(obj: dict) -> str:
"""Return unique id of a pipeline job from its name."""
return obj["name"].rpartition("/")[-1]


class PipelineJobAsyncHook(GoogleBaseAsyncHook):
"""Asynchronous hook for Google Cloud Vertex AI Pipeline Job APIs."""

sync_hook_class = PipelineJobHook
PIPELINE_COMPLETE_STATES = (
PipelineState.PIPELINE_STATE_CANCELLED,
PipelineState.PIPELINE_STATE_FAILED,
PipelineState.PIPELINE_STATE_PAUSED,
PipelineState.PIPELINE_STATE_SUCCEEDED,
)

def __init__(
self,
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(
gcp_conn_id=gcp_conn_id,
impersonation_chain=impersonation_chain,
**kwargs,
)

async def get_credentials(self) -> Credentials:
sync_hook = await self.get_sync_hook()
return sync_hook.get_credentials()

async def get_project_id(self) -> str:
sync_hook = await self.get_sync_hook()
return sync_hook.project_id

async def get_location(self) -> str:
sync_hook = await self.get_sync_hook()
return sync_hook.location

async def get_pipeline_service_client(
self,
region: str | None = None,
) -> PipelineServiceAsyncClient:
if region and region != "global":
client_options = ClientOptions(api_endpoint=f"{region}-aiplatform.googleapis.com:443")
else:
client_options = ClientOptions()
return PipelineServiceAsyncClient(
credentials=await self.get_credentials(),
client_info=CLIENT_INFO,
client_options=client_options,
)

async def get_pipeline_job(
self,
project_id: str,
location: str,
job_id: str,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | _MethodDefault | None = DEFAULT,
metadata: Sequence[tuple[str, str]] = (),
) -> types.PipelineJob:
"""
Get a PipelineJob proto message from PipelineServiceAsyncClient.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param location: Required. The ID of the Google Cloud region that the service belongs to.
:param job_id: Required. The ID of the PipelineJob resource.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
"""
client = await self.get_pipeline_service_client(region=location)
pipeline_job_name = client.pipeline_job_path(
project=project_id,
location=location,
pipeline_job=job_id,
)
pipeline_job: types.PipelineJob = await client.get_pipeline_job(
request={"name": pipeline_job_name},
retry=retry,
timeout=timeout,
metadata=metadata,
)
return pipeline_job

async def wait_for_pipeline_job(
self,
project_id: str,
location: str,
job_id: str,
retry: AsyncRetry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
poll_interval: int = 10,
) -> types.PipelineJob:
"""Wait until the pipeline job is in a complete state and return it."""
while True:
try:
self.log.info("Requesting a pipeline job with id %s", job_id)
job: types.PipelineJob = await self.get_pipeline_job(
project_id=project_id,
location=location,
job_id=job_id,
retry=retry,
timeout=timeout,
metadata=metadata,
)
except Exception as ex:
self.log.exception("Exception occurred while requesting pipeline job %s", job_id)
raise AirflowException(ex)

self.log.info("Status of the pipeline job %s is %s", job.name, job.state.name)
if job.state in self.PIPELINE_COMPLETE_STATES:
return job

self.log.info("Sleeping for %s seconds.", poll_interval)
await asyncio.sleep(poll_interval)
Loading

0 comments on commit 067fedb

Please sign in to comment.