-
Notifications
You must be signed in to change notification settings - Fork 246
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[batch] Add Job Groups to Batch #14282
[batch] Add Job Groups to Batch #14282
Conversation
Tests are all passing again. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished at last! I think these are pretty minor changes. Let's try to merge this afternoon?
batch/test/test_batch.py
Outdated
@@ -23,6 +25,8 @@ | |||
|
|||
deploy_config = get_deploy_config() | |||
|
|||
MAX_JOB_GROUP_NESTING_DEPTH = 2 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we import this from constants.py? I strongly prefer one source of truth.
batch/test/test_batch.py
Outdated
|
||
assert len(debug_info['jobs']) == 1, str(debug_info) | ||
assert len(list(jg.jobs())) == 1, str(debug_info) | ||
assert jg.attributes()['name'] == 'foo', str(debug_info) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing debug_info
distracts from the important assertions of this test. Debug info is not a building block of any public API. It's meant only as a standard (possibly expensive to obtain) collection of information useful when a test fails.
batch/test/test_batch.py
Outdated
assert len(job_groups) == 1, str(job_groups) | ||
assert job_groups[0].attributes()['name'] == 'foo', str(job_groups) | ||
assert len(jobs) == 1, str(jobs) | ||
b.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the function of this? We don't have any assertions after it (so we're not testing it as an operation) but we also don't try-finally it (so it's not meant as a cleanup step).
batch/test/test_batch.py
Outdated
b.submit() | ||
job_groups = list(b.job_groups()) | ||
# need to include the initial job group created | ||
assert len(job_groups) == max_bunch_size + 2, str(job_groups) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The presence of a comment suggests to me that we haven't made the test code clear enough. Maybe a blank line after 1894 is enough?
There's also the extremely explicit option:
n_groups = 0
# ...
b.create_job_group(...)
n_groups += 1
for i in range(..):
# ...
n_groups += 1
# ...
assert n_groups == max_bunch_size + 2
assert len(job_groups) == n_groups
job_groups = list(b.job_groups()) | ||
assert len(job_groups) == 1, str(job_groups) | ||
jobs = list(b.jobs()) | ||
assert len(jobs) == 4, str(jobs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems worthwhile to assert that the JobGroup.jobs
is empty. A separate test for that also seems fine! We only seem to test JobGroup.jobs in the first test here and only in the case that all the jobs in the batch are in the group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm losing track, but I think this test addresses it:
def test_job_group_creation_with_no_jobs_but_batch_is_not_empty(client: BatchClient):
b = create_batch(client)
jg = b.create_job_group(attributes={'name': 'foo'})
for _ in range(4):
b.create_job(DOCKER_ROOT_IMAGE, ['true'])
b.submit()
job_groups = list(b.job_groups())
assert len(job_groups) == 1, str(job_groups)
jobs = list(b.jobs())
assert len(jobs) == 4, str(jobs)
assert len(list(jg.jobs())) == 0, str(jg.debug_info())
assert len(list(jg.job_groups())) == 0, str(jg.debug_info())
batch/test/test_batch.py
Outdated
assert len(jobs) == 1, str(jg.debug_info()) | ||
assert len(job_groups) == 0, str(jg.debug_info()) | ||
|
||
await jg.cancel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs to be in a finally or needs an assertion after it
cancel_after_n_failures=cancel_after_n_failures, | ||
) | ||
|
||
# FIXME Error if this is called while in a job within the same job group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolve the FIXME or create an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if i < 64: | ||
i = i + 1 | ||
|
||
# FIXME Error if this is called while in a job within the same job group |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolve the FIXME or create an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@@ -464,7 +699,7 @@ async def _wait( | |||
if i < 64: | |||
i = i + 1 | |||
|
|||
# FIXME Error if this is called while within a job of the same Batch | |||
# FIXME Error if this is called while in a job within the same Batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These FIXMEs should never have been in here in the first place. Let's either create an issue or address them in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want the link to the issue in the comment or no comment at all?
batch/batch/constants.py
Outdated
@@ -1 +1,3 @@ | |||
ROOT_JOB_GROUP_ID = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ROOT_JOB_GROUP_ID should be defined in exactly one place, since we need it both client-side and server-side, that place has to be hailtop.
On Azure, one of the tests timed out with 500 responses from the server. I'll need to debug in GCP, but the PR queue is long right now. |
5f4d439
to
d5574c1
Compare
@jigold everything passing, shall we merge now? |
I took out the transaction in the migration as we'll need those triggers to be committed even in the event of the migration failing part of the way through changing the table primary keys. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
This PR adds the job groups functionality as described in this RFC to the Batch backend and
hailtop.batch_client
. This includes supporting nested job groups up to a maximum depth of 5. Note, that none of these changes are user-facing yet (hence no change log here).The PRs that came before this one:
Subsequent PRs will need to implement the following:
cancel_after_n_failures=1
for all new stages of worker jobshailtop.batch
interface for users to define and work with Job GroupsA couple of nuances in the implementation came up that I also tried to articulate in the RFC:
(batch_updates.committed OR job_groups.job_group_id = %s)
where "%s" is the ROOT_JOB_GROUP_ID.job_groups_cancelled
. This table does NOT contain all transitive job groups that were also cancelled indirectly. The reason for this is we cannot guarantee that a user wouldn't have millions of job groups and we can't insert millions of records inside a single SQL stored procedure. Now, any query on the driver / front_end must look up the tree and see if any parent has been cancelled. This code looks similar to the code below [1].DELETE FROM
statements incommit_batch_update
andcommit_batch
that cleaned up old records that were no longer used injob_group_inst_coll_cancellable_resources
andjob_groups_inst_coll_staging
. This cleanup now occurs in a periodic loop on the driver.job_group_inst_coll_cancellable_resources
andjob_groups_inst_coll_staging
tables have values which represent the sum of all child job groups. For example, if a job group has 1 job and it's child job group has 2 jobs, then the staging table would have n_jobs = 3 for the parent job group and n_jobs = 2 for the child job group. Likewise, all of the billing triggers and MJC have to use thejob_group_self_and_ancestors
table to modify the job group the job belongs to as well its parent job groups.[1] Code to check whether a job group has been cancelled.