Skip to content

Commit

Permalink
Deferrable mode for Custom Training Job operators (#38584)
Browse files Browse the repository at this point in the history
  • Loading branch information
e-galan committed Apr 11, 2024
1 parent 1757704 commit 5ff2658
Show file tree
Hide file tree
Showing 12 changed files with 3,084 additions and 133 deletions.
1,505 changes: 1,431 additions & 74 deletions airflow/providers/google/cloud/hooks/vertex_ai/custom_job.py

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion airflow/providers/google/cloud/links/vertex_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

VERTEX_AI_BASE_LINK = "/vertex-ai"
VERTEX_AI_MODEL_LINK = (
VERTEX_AI_BASE_LINK + "/locations/{region}/models/{model_id}/deploy?project={project_id}"
VERTEX_AI_BASE_LINK
+ "/models/locations/{region}/models/{model_id}/versions/default/properties?project={project_id}"
)
VERTEX_AI_MODEL_LIST_LINK = VERTEX_AI_BASE_LINK + "/models?project={project_id}"
VERTEX_AI_MODEL_EXPORT_LINK = "/storage/browser/{bucket_name}/model-{model_id}?project={project_id}"
Expand Down
359 changes: 333 additions & 26 deletions airflow/providers/google/cloud/operators/vertex_ai/custom_job.py

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@ class RunPipelineJobOperator(GoogleCloudBaseOperator):
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
:param deferrable: If True, run the task in the deferrable mode.
Note that it requires calling the operator with `sync=False` parameter.
:param poll_interval: Time (seconds) to wait between two consecutive calls to check the job.
The default is 300 seconds.
"""
Expand Down
94 changes: 94 additions & 0 deletions airflow/providers/google/cloud/triggers/vertex_ai.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

from airflow.exceptions import AirflowException
from airflow.providers.google.cloud.hooks.vertex_ai.batch_prediction_job import BatchPredictionJobAsyncHook
from airflow.providers.google.cloud.hooks.vertex_ai.custom_job import CustomJobAsyncHook
from airflow.providers.google.cloud.hooks.vertex_ai.hyperparameter_tuning_job import (
HyperparameterTuningJobAsyncHook,
)
Expand Down Expand Up @@ -189,3 +190,96 @@ async def _wait_job(self) -> types.PipelineJob:
poll_interval=self.poll_interval,
)
return job


class CustomTrainingJobTrigger(BaseVertexAIJobTrigger):
"""
Make async calls to Vertex AI to check the state of a running custom training job.
Return the job when it enters a completed state.
"""

job_type_verbose_name = "Custom Training Job"
job_serializer_class = types.TrainingPipeline
statuses_success = {
PipelineState.PIPELINE_STATE_PAUSED,
PipelineState.PIPELINE_STATE_SUCCEEDED,
}

@cached_property
def async_hook(self) -> CustomJobAsyncHook:
return CustomJobAsyncHook(
gcp_conn_id=self.conn_id,
impersonation_chain=self.impersonation_chain,
)

async def _wait_job(self) -> types.TrainingPipeline:
pipeline: types.TrainingPipeline = await self.async_hook.wait_for_training_pipeline(
project_id=self.project_id,
location=self.location,
pipeline_id=self.job_id,
poll_interval=self.poll_interval,
)
return pipeline


class CustomContainerTrainingJobTrigger(BaseVertexAIJobTrigger):
"""
Make async calls to Vertex AI to check the state of a running custom container training job.
Return the job when it enters a completed state.
"""

job_type_verbose_name = "Custom Container Training Job"
job_serializer_class = types.TrainingPipeline
statuses_success = {
PipelineState.PIPELINE_STATE_PAUSED,
PipelineState.PIPELINE_STATE_SUCCEEDED,
}

@cached_property
def async_hook(self) -> CustomJobAsyncHook:
return CustomJobAsyncHook(
gcp_conn_id=self.conn_id,
impersonation_chain=self.impersonation_chain,
)

async def _wait_job(self) -> types.TrainingPipeline:
pipeline: types.TrainingPipeline = await self.async_hook.wait_for_training_pipeline(
project_id=self.project_id,
location=self.location,
pipeline_id=self.job_id,
poll_interval=self.poll_interval,
)
return pipeline


class CustomPythonPackageTrainingJobTrigger(BaseVertexAIJobTrigger):
"""
Make async calls to Vertex AI to check the state of a running custom python package training job.
Return the job when it enters a completed state.
"""

job_type_verbose_name = "Custom Python Package Training Job"
job_serializer_class = types.TrainingPipeline
statuses_success = {
PipelineState.PIPELINE_STATE_PAUSED,
PipelineState.PIPELINE_STATE_SUCCEEDED,
}

@cached_property
def async_hook(self) -> CustomJobAsyncHook:
return CustomJobAsyncHook(
gcp_conn_id=self.conn_id,
impersonation_chain=self.impersonation_chain,
)

async def _wait_job(self) -> types.TrainingPipeline:
pipeline: types.TrainingPipeline = await self.async_hook.wait_for_training_pipeline(
project_id=self.project_id,
location=self.location,
pipeline_id=self.job_id,
poll_interval=self.poll_interval,
)
return pipeline
48 changes: 41 additions & 7 deletions docs/apache-airflow-providers-google/operators/cloud/vertex_ai.rst
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ Preparation step

For each operator you must prepare and create dataset. Then put dataset id to ``dataset_id`` parameter in operator.

How to run Container Training Job
How to run a Custom Container Training Job
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomContainerTrainingJobOperator`

Before start running this Job you should create a docker image with training script inside. Documentation how to
Expand All @@ -121,7 +121,16 @@ for container which will be created from this image in ``command`` parameter.
:start-after: [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator]
:end-before: [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator]

How to run Python Package Training Job
The :class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomContainerTrainingJobOperator`
also provides the deferrable mode:

.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_container.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_container_training_job_operator_deferrable]
:end-before: [END how_to_cloud_vertex_ai_create_custom_container_training_job_operator_deferrable]

How to run a Python Package Training Job
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomPythonPackageTrainingJobOperator`

Before start running this Job you should create a python package with training script inside. Documentation how to
Expand All @@ -135,27 +144,52 @@ parameter should has the name of script which will run your training task.
:start-after: [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]
:end-before: [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator]

How to run Training Job
The :class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomPythonPackageTrainingJobOperator`
also provides the deferrable mode:

.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job_python_package.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator_deferrable]
:end-before: [END how_to_cloud_vertex_ai_create_custom_python_package_training_job_operator_deferrable]

How to run a Custom Training Job
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.CreateCustomTrainingJobOperator`.

For this Job you should put path to your local training script inside ``script_path`` parameter.
To create and run a Custom Training Job you should put the path to your local training script inside the ``script_path`` parameter.

.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_training_job_operator]
:end-before: [END how_to_cloud_vertex_ai_create_custom_training_job_operator]

Additionally, you can create new version of existing Training Job instead. In this case, the result will be new
version of existing Model instead of new Model created in Model Registry. This can be done by specifying
``parent_model`` parameter when running Training Job.
The same operation can be performed in the deferrable mode:

.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_training_job_operator_deferrable]
:end-before: [END how_to_cloud_vertex_ai_create_custom_training_job_operator_deferrable]

Additionally, you can create a new version of an existing Custom Training Job. It will replace the existing
Model with another version, instead of creating a new Model in the Model Registry.
This can be done by specifying the ``parent_model`` parameter when running a Custom Training Job.

.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_training_job_v2_operator]
:end-before: [END how_to_cloud_vertex_ai_create_custom_training_job_v2_operator]

The same operation can be performed in the deferrable mode:

.. exampleinclude:: /../../tests/system/providers/google/cloud/vertex_ai/example_vertex_ai_custom_job.py
:language: python
:dedent: 4
:start-after: [START how_to_cloud_vertex_ai_create_custom_training_job_v2_operator_deferrable]
:end-before: [END how_to_cloud_vertex_ai_create_custom_training_job_v2_operator_deferrable]


You can get a list of Training Jobs using
:class:`~airflow.providers.google.cloud.operators.vertex_ai.custom_job.ListCustomTrainingJobOperator`.
Expand Down

0 comments on commit 5ff2658

Please sign in to comment.