From f428006507e9b053a2121089e89fc54aedd3550a Mon Sep 17 00:00:00 2001 From: A Vertex SDK engineer Date: Fri, 5 Apr 2024 16:38:20 -0700 Subject: [PATCH] feat: Add Persistent Resource Id parameter to Custom Training Job run and submit methods. PiperOrigin-RevId: 622311949 --- google/cloud/aiplatform/training_jobs.py | 83 +++++++ tests/unit/aiplatform/test_training_jobs.py | 239 ++++++++++++++++++++ 2 files changed, 322 insertions(+) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 35a10529b8..c5ee3896d0 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -1489,6 +1489,7 @@ def _prepare_training_task_inputs_and_output_dir( enable_dashboard_access: bool = False, tensorboard: Optional[str] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Tuple[Dict, str]: """Prepares training task inputs and output directory for custom job. @@ -1539,6 +1540,14 @@ def _prepare_training_task_inputs_and_output_dir( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. + Returns: Training task inputs and Output directory for custom job. """ @@ -1565,6 +1574,8 @@ def _prepare_training_task_inputs_and_output_dir( training_task_inputs["enable_web_access"] = enable_web_access if enable_dashboard_access: training_task_inputs["enable_dashboard_access"] = enable_dashboard_access + if persistent_resource_id: + training_task_inputs["persistent_resource_id"] = persistent_resource_id if timeout or restart_job_on_worker_restart or disable_retries: timeout = f"{timeout}s" if timeout else None @@ -2962,6 +2973,7 @@ def run( sync=True, create_request_timeout: Optional[float] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Runs the custom training job. @@ -3249,6 +3261,13 @@ def run( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -3311,6 +3330,7 @@ def run( sync=sync, create_request_timeout=create_request_timeout, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) def submit( @@ -3362,6 +3382,7 @@ def submit( sync=True, create_request_timeout: Optional[float] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Submits the custom training job without blocking until completion. @@ -3649,6 +3670,13 @@ def submit( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -3711,6 +3739,7 @@ def submit( create_request_timeout=create_request_timeout, block=False, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) @base.optional_sync(construct_object_on_arg="managed_model") @@ -3757,6 +3786,7 @@ def _run( create_request_timeout: Optional[float] = None, block: Optional[bool] = True, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -3946,6 +3976,13 @@ def _run( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -3999,6 +4036,7 @@ def _run( enable_dashboard_access=enable_dashboard_access, tensorboard=tensorboard, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) model = self._run_job( @@ -4321,6 +4359,7 @@ def run( sync=True, create_request_timeout: Optional[float] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Runs the custom training job. @@ -4601,6 +4640,13 @@ def run( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -4662,6 +4708,7 @@ def run( sync=sync, create_request_timeout=create_request_timeout, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) def submit( @@ -4713,6 +4760,7 @@ def submit( sync=True, create_request_timeout: Optional[float] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Submits the custom training job without blocking until completion. @@ -4993,6 +5041,13 @@ def submit( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -5054,6 +5109,7 @@ def submit( create_request_timeout=create_request_timeout, block=False, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) @base.optional_sync(construct_object_on_arg="managed_model") @@ -5099,6 +5155,7 @@ def _run( create_request_timeout: Optional[float] = None, block: Optional[bool] = True, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Packages local script and launches training_job. Args: @@ -5284,6 +5341,13 @@ def _run( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -5331,6 +5395,7 @@ def _run( enable_dashboard_access=enable_dashboard_access, tensorboard=tensorboard, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) model = self._run_job( @@ -7249,6 +7314,7 @@ def run( sync=True, create_request_timeout: Optional[float] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Runs the custom training job. @@ -7530,6 +7596,13 @@ def run( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -7586,6 +7659,7 @@ def run( sync=sync, create_request_timeout=create_request_timeout, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) @base.optional_sync(construct_object_on_arg="managed_model") @@ -7630,6 +7704,7 @@ def _run( sync=True, create_request_timeout: Optional[float] = None, disable_retries: bool = False, + persistent_resource_id: Optional[str] = None, ) -> Optional[models.Model]: """Packages local script and launches training_job. @@ -7800,6 +7875,13 @@ def _run( Indicates if the job should retry for internal errors after the job starts running. If True, overrides `restart_job_on_worker_restart` to False. + persistent_resource_id (str): + Optional. The ID of the PersistentResource in the same Project + and Location. If this is specified, the job will be run on + existing machines held by the PersistentResource instead of + on-demand short-live machines. The network, CMEK, and node pool + configs on the job should be consistent with those on the + PersistentResource, otherwise, the job will be rejected. Returns: model: The trained Vertex AI Model resource or None if training did not @@ -7847,6 +7929,7 @@ def _run( enable_dashboard_access=enable_dashboard_access, tensorboard=tensorboard, disable_retries=disable_retries, + persistent_resource_id=persistent_resource_id, ) model = self._run_job( diff --git a/tests/unit/aiplatform/test_training_jobs.py b/tests/unit/aiplatform/test_training_jobs.py index eb807116f9..a22e8c07f7 100644 --- a/tests/unit/aiplatform/test_training_jobs.py +++ b/tests/unit/aiplatform/test_training_jobs.py @@ -238,6 +238,9 @@ _TEST_ENABLE_DASHBOARD_ACCESS = True _TEST_WEB_ACCESS_URIS = test_constants.TrainingJobConstants._TEST_WEB_ACCESS_URIS _TEST_DASHBOARD_ACCESS_URIS = {"workerpool0-0:8888": "uri"} +_TEST_PERSISTENT_RESOURCE_ID = ( + test_constants.PersistentResourceConstants._TEST_PERSISTENT_RESOURCE_ID +) _TEST_BASE_CUSTOM_JOB_PROTO = gca_custom_job.CustomJob( job_spec=gca_custom_job.CustomJobSpec(), @@ -268,6 +271,17 @@ def _get_custom_job_proto_with_enable_dashboard_access( return custom_job_proto +def _get_custom_job_proto_with_persistent_resource_id( + state=None, name=None, version="v1" +): + custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO) + custom_job_proto.name = name + custom_job_proto.state = state + custom_job_proto.job_spec.persistent_resource_id = _TEST_PERSISTENT_RESOURCE_ID + + return custom_job_proto + + def _get_custom_job_proto_with_scheduling(state=None, name=None, version="v1"): custom_job_proto = copy.deepcopy(_TEST_BASE_CUSTOM_JOB_PROTO) custom_job_proto.name = name @@ -428,6 +442,40 @@ def mock_get_backing_custom_job_with_enable_dashboard_access(): yield get_custom_job_mock +@pytest.fixture +def mock_get_backing_custom_job_with_persistent_resource_id(): + with patch.object( + job_service_client.JobServiceClient, "get_custom_job" + ) as get_custom_job_mock: + get_custom_job_mock.side_effect = [ + _get_custom_job_proto_with_persistent_resource_id( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_PENDING, + ), + _get_custom_job_proto_with_persistent_resource_id( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto_with_persistent_resource_id( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto_with_persistent_resource_id( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_RUNNING, + ), + _get_custom_job_proto_with_persistent_resource_id( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_SUCCEEDED, + ), + _get_custom_job_proto_with_persistent_resource_id( + name=_TEST_CUSTOM_JOB_RESOURCE_NAME, + state=gca_job_state.JobState.JOB_STATE_SUCCEEDED, + ), + ] + yield get_custom_job_mock + + @pytest.mark.skipif( sys.executable is None, reason="requires python path to invoke subprocess" ) @@ -725,6 +773,19 @@ def make_training_pipeline_with_enable_dashboard_access(state): return training_pipeline +def make_training_pipeline_with_persistent_resource_id(state): + training_pipeline = gca_training_pipeline.TrainingPipeline( + name=_TEST_PIPELINE_RESOURCE_NAME, + state=state, + training_task_inputs={"persistent_resource_id": _TEST_PERSISTENT_RESOURCE_ID}, + ) + if state == gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING: + training_pipeline.training_task_metadata = { + "backingCustomJob": _TEST_CUSTOM_JOB_RESOURCE_NAME + } + return training_pipeline + + def make_training_pipeline_with_scheduling(state): training_pipeline = gca_training_pipeline.TrainingPipeline( name=_TEST_PIPELINE_RESOURCE_NAME, @@ -826,6 +887,35 @@ def mock_pipeline_service_get_with_enable_dashboard_access(): yield mock_get_training_pipeline +@pytest.fixture +def mock_pipeline_service_get_with_persistent_resource_id(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "get_training_pipeline" + ) as mock_get_training_pipeline: + mock_get_training_pipeline.side_effect = [ + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_PENDING, + ), + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ), + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ), + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING, + ), + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, + ), + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED, + ), + ] + + yield mock_get_training_pipeline + + @pytest.fixture def mock_pipeline_service_get_with_scheduling(): with mock.patch.object( @@ -903,6 +993,19 @@ def mock_pipeline_service_create_with_enable_dashboard_access(): yield mock_create_training_pipeline +@pytest.fixture +def mock_pipeline_service_create_with_persistent_resource_id(): + with mock.patch.object( + pipeline_service_client.PipelineServiceClient, "create_training_pipeline" + ) as mock_create_training_pipeline: + mock_create_training_pipeline.return_value = ( + make_training_pipeline_with_persistent_resource_id( + state=gca_pipeline_state.PipelineState.PIPELINE_STATE_PENDING, + ) + ) + yield mock_create_training_pipeline + + @pytest.fixture def mock_pipeline_service_create_with_scheduling(): with mock.patch.object( @@ -3101,6 +3204,51 @@ def test_cancel_training_job_without_running(self, mock_pipeline_service_cancel) assert e.match(regexp=r"TrainingJob has not been launched") + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @pytest.mark.usefixtures( + "mock_pipeline_service_create_with_persistent_resource_id", + "mock_pipeline_service_get_with_persistent_resource_id", + "mock_get_backing_custom_job_with_persistent_resource_id", + "mock_python_package_to_gcs", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_persistent_resource_id( + self, sync, caplog + ): + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomTrainingJob( + display_name=_TEST_DISPLAY_NAME, + script_path=_TEST_LOCAL_SCRIPT_FILE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + sync=sync, + create_request_timeout=None, + persistent_resource_id=_TEST_PERSISTENT_RESOURCE_ID, + ) + + if not sync: + job.wait() + + assert job._gca_resource == make_training_pipeline_with_persistent_resource_id( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + @pytest.mark.usefixtures("google_auth_mock") class TestCustomContainerTrainingJob: @@ -4898,6 +5046,51 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno create_request_timeout=None, ) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @pytest.mark.usefixtures( + "mock_pipeline_service_create_with_persistent_resource_id", + "mock_pipeline_service_get_with_persistent_resource_id", + "mock_get_backing_custom_job_with_persistent_resource_id", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_persistent_resource_id( + self, sync, caplog + ): + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomContainerTrainingJob( + display_name=_TEST_DISPLAY_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + command=_TEST_TRAINING_CONTAINER_CMD, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + sync=sync, + create_request_timeout=None, + persistent_resource_id=_TEST_PERSISTENT_RESOURCE_ID, + ) + + if not sync: + job.wait() + + print(caplog.text) + assert job._gca_resource == make_training_pipeline_with_persistent_resource_id( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + class Test_WorkerPoolSpec: def test_machine_spec_return_spec_dict(self): @@ -7192,6 +7385,52 @@ def test_run_call_pipeline_service_create_with_nontabular_dataset_raises_if_anno model_display_name=_TEST_MODEL_DISPLAY_NAME, ) + @mock.patch.object(training_jobs, "_JOB_WAIT_TIME", 1) + @mock.patch.object(training_jobs, "_LOG_WAIT_TIME", 1) + @pytest.mark.usefixtures( + "mock_pipeline_service_create_with_persistent_resource_id", + "mock_pipeline_service_get_with_persistent_resource_id", + "mock_get_backing_custom_job_with_persistent_resource_id", + ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_persistent_resource_id( + self, sync, caplog + ): + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + credentials=_TEST_CREDENTIALS, + ) + + job = training_jobs.CustomPythonPackageTrainingJob( + display_name=_TEST_DISPLAY_NAME, + python_package_gcs_uri=_TEST_OUTPUT_PYTHON_PACKAGE_PATH, + python_module_name=_TEST_PYTHON_MODULE_NAME, + container_uri=_TEST_TRAINING_CONTAINER_IMAGE, + ) + + job.run( + base_output_dir=_TEST_BASE_OUTPUT_DIR, + args=_TEST_RUN_ARGS, + machine_type=_TEST_MACHINE_TYPE, + accelerator_type=_TEST_ACCELERATOR_TYPE, + accelerator_count=_TEST_ACCELERATOR_COUNT, + sync=sync, + create_request_timeout=None, + persistent_resource_id=_TEST_PERSISTENT_RESOURCE_ID, + ) + + if not sync: + job.wait() + + print(caplog.text) + assert job._gca_resource == make_training_pipeline_with_persistent_resource_id( + gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + ) + class TestVersionedTrainingJobs: @pytest.mark.usefixtures("mock_pipeline_service_get")