Skip to content

Commit

Permalink
feat: Added scheduling to CustomTrainingJob, CustomPythonPackageTrain…
Browse files Browse the repository at this point in the history
…ingJob, CustomContainerTrainingJob (#970)

* Added scheduling to customtrainingjob

* Added unit tests

Fixed tests

Fixed test

fix: Broken test

* Added integration test

* Removed comment

* Updated e2e tabular test

* Fixed lint issue

* Simplfied tests

* Added more assertions
  • Loading branch information
ivanmkc committed Feb 15, 2022
1 parent c10923b commit 89078e0
Show file tree
Hide file tree
Showing 3 changed files with 317 additions and 1 deletion.
83 changes: 83 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -1379,6 +1379,8 @@ def _prepare_training_task_inputs_and_output_dir(
base_output_dir: Optional[str] = None,
service_account: Optional[str] = None,
network: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
) -> Tuple[Dict, str]:
Expand All @@ -1398,6 +1400,13 @@ def _prepare_training_task_inputs_and_output_dir(
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
Expand Down Expand Up @@ -1442,6 +1451,14 @@ def _prepare_training_task_inputs_and_output_dir(
if enable_web_access:
training_task_inputs["enable_web_access"] = enable_web_access

if timeout or restart_job_on_worker_restart:
timeout = f"{timeout}s" if timeout else None
scheduling = {
"timeout": timeout,
"restart_job_on_worker_restart": restart_job_on_worker_restart,
}
training_task_inputs["scheduling"] = scheduling

return training_task_inputs, base_output_dir

@property
Expand Down Expand Up @@ -1794,6 +1811,8 @@ def run(
test_filter_split: Optional[str] = None,
predefined_split_column_name: Optional[str] = None,
timestamp_split_column_name: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
sync=True,
Expand Down Expand Up @@ -2014,6 +2033,13 @@ def run(
that piece is ignored by the pipeline.
Supported only for tabular and time series Datasets.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
Expand Down Expand Up @@ -2080,6 +2106,8 @@ def run(
test_filter_split=test_filter_split,
predefined_split_column_name=predefined_split_column_name,
timestamp_split_column_name=timestamp_split_column_name,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
Expand Down Expand Up @@ -2117,6 +2145,8 @@ def _run(
test_filter_split: Optional[str] = None,
predefined_split_column_name: Optional[str] = None,
timestamp_split_column_name: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
reduction_server_container_uri: Optional[str] = None,
Expand Down Expand Up @@ -2237,6 +2267,13 @@ def _run(
that piece is ignored by the pipeline.
Supported only for tabular and time series Datasets.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
Expand Down Expand Up @@ -2309,6 +2346,8 @@ def _run(
base_output_dir=base_output_dir,
service_account=service_account,
network=network,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
)
Expand Down Expand Up @@ -2598,6 +2637,8 @@ def run(
test_filter_split: Optional[str] = None,
predefined_split_column_name: Optional[str] = None,
timestamp_split_column_name: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
sync=True,
Expand Down Expand Up @@ -2811,6 +2852,13 @@ def run(
that piece is ignored by the pipeline.
Supported only for tabular and time series Datasets.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
Expand Down Expand Up @@ -2876,6 +2924,8 @@ def run(
test_filter_split=test_filter_split,
predefined_split_column_name=predefined_split_column_name,
timestamp_split_column_name=timestamp_split_column_name,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
Expand Down Expand Up @@ -2912,6 +2962,8 @@ def _run(
test_filter_split: Optional[str] = None,
predefined_split_column_name: Optional[str] = None,
timestamp_split_column_name: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
reduction_server_container_uri: Optional[str] = None,
Expand Down Expand Up @@ -2965,6 +3017,13 @@ def _run(
should be peered. For example, projects/12345/global/networks/myVPC.
Private services access must already be configured for the network.
If left unspecified, the job is not peered with any network.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
bigquery_destination (str):
The BigQuery project location where the training data is to
be written to. In the given project a new dataset is created
Expand Down Expand Up @@ -3094,6 +3153,8 @@ def _run(
base_output_dir=base_output_dir,
service_account=service_account,
network=network,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
)
Expand Down Expand Up @@ -5373,6 +5434,8 @@ def run(
test_filter_split: Optional[str] = None,
predefined_split_column_name: Optional[str] = None,
timestamp_split_column_name: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
sync=True,
Expand Down Expand Up @@ -5586,6 +5649,13 @@ def run(
that piece is ignored by the pipeline.
Supported only for tabular and time series Datasets.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
Expand Down Expand Up @@ -5646,6 +5716,8 @@ def run(
predefined_split_column_name=predefined_split_column_name,
timestamp_split_column_name=timestamp_split_column_name,
bigquery_destination=bigquery_destination,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
reduction_server_container_uri=reduction_server_container_uri
Expand Down Expand Up @@ -5682,6 +5754,8 @@ def _run(
predefined_split_column_name: Optional[str] = None,
timestamp_split_column_name: Optional[str] = None,
bigquery_destination: Optional[str] = None,
timeout: Optional[int] = None,
restart_job_on_worker_restart: bool = False,
enable_web_access: bool = False,
tensorboard: Optional[str] = None,
reduction_server_container_uri: Optional[str] = None,
Expand Down Expand Up @@ -5785,6 +5859,13 @@ def _run(
that piece is ignored by the pipeline.
Supported only for tabular and time series Datasets.
timeout (int):
The maximum job running time in seconds. The default is 7 days.
restart_job_on_worker_restart (bool):
Restarts the entire CustomJob if a worker
gets restarted. This feature can be used by
distributed training jobs that are not resilient
to workers leaving and joining a job.
enable_web_access (bool):
Whether you want Vertex AI to enable interactive shell access
to training containers.
Expand Down Expand Up @@ -5851,6 +5932,8 @@ def _run(
base_output_dir=base_output_dir,
service_account=service_account,
network=network,
timeout=timeout,
restart_job_on_worker_restart=restart_job_on_worker_restart,
enable_web_access=enable_web_access,
tensorboard=tensorboard,
)
Expand Down
15 changes: 15 additions & 0 deletions tests/system/aiplatform/test_e2e_tabular.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ def test_end_to_end_tabular(self, shared_state):
ds,
replica_count=1,
model_display_name=self._make_display_name("custom-housing-model"),
timeout=1234,
restart_job_on_worker_restart=True,
enable_web_access=True,
sync=False,
)
Expand Down Expand Up @@ -147,6 +149,19 @@ def test_end_to_end_tabular(self, shared_state):
# Send online prediction with same instance to both deployed models
# This sample is taken from an observation where median_house_value = 94600
custom_endpoint.wait()

# Check scheduling is correctly set
assert (
custom_job._gca_resource.training_task_inputs["scheduling"]["timeout"]
== "1234s"
)
assert (
custom_job._gca_resource.training_task_inputs["scheduling"][
"restartJobOnWorkerRestart"
]
is True
)

custom_prediction = custom_endpoint.predict([_INSTANCE])

custom_batch_prediction_job.wait()
Expand Down
Loading

0 comments on commit 89078e0

Please sign in to comment.