Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] expose Job.spot #13396

Merged
merged 2 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion batch/utils/stress.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def stress():
if flip(0.01):
c._machine_type = 'n1-standard-1'
if flip(0.5):
c._preemptible = False
c.spot(False)

b.run(open=False, wait=False)

Expand Down
10 changes: 10 additions & 0 deletions hail/python/hailtop/batch/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ class Batch:
the `dill` Python package installed and have the same version of Python installed that is
currently running. If `None`, a compatible Python image with `dill` pre-installed will
automatically be used if the current Python version is 3.9, or 3.10.
default_spot:
If unspecified or ``True``, jobs will run by default on spot instances. If ``False``, jobs
will run by default on non-spot instances. Each job can override this setting with
:meth:`.Job.spot`.
project:
DEPRECATED: please specify `google_project` on the ServiceBackend instead. If specified,
the project to use when authenticating with Google Storage. Google Storage is used to
Expand Down Expand Up @@ -150,6 +154,7 @@ def __init__(self,
default_timeout: Optional[Union[float, int]] = None,
default_shell: Optional[str] = None,
default_python_image: Optional[str] = None,
default_spot: Optional[bool] = None,
project: Optional[str] = None,
cancel_after_n_failures: Optional[int] = None):
self._jobs: List[job.Job] = []
Expand Down Expand Up @@ -186,6 +191,7 @@ def __init__(self,
self._default_timeout = default_timeout
self._default_shell = default_shell
self._default_python_image = default_python_image
self._default_spot = default_spot

if project is not None:
warnings.warn(
Expand Down Expand Up @@ -310,6 +316,8 @@ def new_bash_job(self,
j.storage(self._default_storage)
if self._default_timeout is not None:
j.timeout(self._default_timeout)
if self._default_spot is not None:
j.spot(self._default_spot)

if isinstance(self._backend, _backend.ServiceBackend):
j.regions(self._backend.regions)
Expand Down Expand Up @@ -382,6 +390,8 @@ def hello(name):
j.storage(self._default_storage)
if self._default_timeout is not None:
j.timeout(self._default_timeout)
if self._default_spot is not None:
j.spot(self._default_spot)

if isinstance(self._backend, _backend.ServiceBackend):
j.regions(self._backend.regions)
Expand Down
26 changes: 26 additions & 0 deletions hail/python/hailtop/batch/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,32 @@ def always_run(self, always_run: bool = True) -> 'Job':
self._always_run = always_run
return self

def spot(self, is_spot: bool) -> 'Job':
"""
Set whether a job is run on spot instances. By default, all jobs run on spot instances.

Examples
--------

Ensure a job only runs on non-spot instances:

>>> b = Batch(backend=backend.ServiceBackend('test'))
>>> j = b.new_job()
>>> j = j.spot(False)
>>> j = j.command(f'echo "hello"'))
danking marked this conversation as resolved.
Show resolved Hide resolved

Parameters
----------
is_spot:
If False, this job will be run on non-spot instances.

Returns
-------
Same job object.
"""
self._preemptible = is_spot
return self

def regions(self, regions: Optional[List[str]]) -> 'Job':
"""
Set the cloud regions a job can run in.
Expand Down
48 changes: 43 additions & 5 deletions hail/python/test/hailtop/batch/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,16 +525,13 @@ def sync_exists(self, url):
def sync_write(self, url, data):
return async_to_blocking(self.router_fs.write(url, data))

def batch(self, requester_pays_project=None, default_python_image=None,
cancel_after_n_failures=None):
def batch(self, **kwargs):
name_of_test_method = inspect.stack()[1][3]
return Batch(name=name_of_test_method,
backend=self.backend,
default_image=DOCKER_ROOT_IMAGE,
attributes={'foo': 'a', 'bar': 'b'},
requester_pays_project=requester_pays_project,
default_python_image=default_python_image,
cancel_after_n_failures=cancel_after_n_failures)
**kwargs)

def test_single_task_no_io(self):
b = self.batch()
Expand Down Expand Up @@ -1319,3 +1316,44 @@ def test_wait_on_empty_batch_update(self):
b = self.batch()
b.run(wait=True)
b.run(wait=True)

def test_non_spot_job(self):
b = self.batch()
j = b.new_job()
j.spot(False)
j.command('echo hello')
res = b.run()
assert res is not None
assert res.get_job(1).status()['spec']['resources']['preemptible'] == False

def test_spot_unspecified_job(self):
b = self.batch()
j = b.new_job()
j.command('echo hello')
res = b.run()
assert res is not None
assert res.get_job(1).status()['spec']['resources']['preemptible'] == True

def test_spot_true_job(self):
b = self.batch()
j = b.new_job()
j.spot(True)
j.command('echo hello')
res = b.run()
assert res is not None
assert res.get_job(1).status()['spec']['resources']['preemptible'] == True

def test_non_spot_batch(self):
b = self.batch(default_spot=False)
j1 = b.new_job()
j1.command('echo hello')
j2 = b.new_job()
j2.command('echo hello')
j3 = b.new_job()
j3.spot(True)
j3.command('echo hello')
res = b.run()
assert res is not None
assert res.get_job(1).status()['spec']['resources']['preemptible'] == False
assert res.get_job(2).status()['spec']['resources']['preemptible'] == False
assert res.get_job(3).status()['spec']['resources']['preemptible'] == True