Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[batch] Add ability to create job groups at top level #14170

Closed
wants to merge 77 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
77 commits
Select commit Hold shift + click to select a range
80b664a
[batch] Finalize job groups in database
jigold Oct 16, 2023
0b50b03
fix
jigold Nov 9, 2023
6c9c776
fix for ambig column
jigold Nov 13, 2023
284b457
fix foreign key constraint
jigold Nov 13, 2023
d1fd11a
dont lock primary key updates
jigold Nov 13, 2023
8ac5425
fix cancel job group
jigold Nov 13, 2023
5ffa578
last fix?
jigold Nov 13, 2023
03cdaa5
get rid of extra complexity
jigold Nov 30, 2023
904a045
fixup estimated-current.sql
jigold Nov 30, 2023
3bdca11
fix cancel child job groups
jigold Nov 30, 2023
e7fe638
add new index
jigold Dec 1, 2023
e09c512
add back batch updates fields
jigold Jan 12, 2024
b40bff2
[batch] Use job group id in front end and driver queries
jigold Oct 18, 2023
1e20595
address comments
jigold Dec 1, 2023
ed95628
get rid of exposing job group id to worker
jigold Dec 1, 2023
e6ed1f0
address comments
jigold Dec 1, 2023
853d949
delint
jigold Dec 1, 2023
4c2b750
Merge commit '24525adb9c09a73a1ae820c9945acd35299878ed' into thread-j…
jigold Jan 12, 2024
d7d3b53
Merge commit 'fa2ef0f2c76654d0c037ff6db60ccb8842fb8539' into thread-j…
jigold Jan 12, 2024
1dc4ce9
partial ruff apply
jigold Jan 12, 2024
b777802
partial ruff apply
jigold Jan 12, 2024
295c339
[batch] Add job group in client and capability to list and get job gr…
jigold Oct 16, 2023
166928c
wip
jigold Nov 30, 2023
322b01d
fix
jigold Nov 30, 2023
f1697c2
delint
jigold Jan 9, 2024
0d97818
delint
jigold Jan 12, 2024
0f2cc55
[batch] Add ability to create job groups at top level only
jigold Jan 16, 2024
5fbd6e8
minor fixes
jigold Jan 18, 2024
9b17076
minor fixes
jigold Jan 18, 2024
239bd86
bad rebase fix
jigold Feb 1, 2024
9889031
fixing bad rebase
jigold Feb 1, 2024
937d501
finish fixing rebase
jigold Feb 1, 2024
d32e968
addressed most of front end comments
jigold Feb 1, 2024
328e7a6
refactored bunching
jigold Feb 1, 2024
26fe167
add update id default to 1
jigold Feb 2, 2024
0b1b66b
more front end changes
jigold Feb 2, 2024
9555687
more changes
jigold Feb 2, 2024
8a468ba
addressing more comments
jigold Feb 5, 2024
8aa3bb8
lots of comments addressed
jigold Feb 5, 2024
f3b6e4c
add ability to create jobs
jigold Feb 5, 2024
c3b825f
fix tests
jigold Feb 5, 2024
36af4f8
more fixes
jigold Feb 6, 2024
7bb3f2b
ruff check
jigold Feb 6, 2024
0802a8e
ruff format
jigold Feb 6, 2024
d631d70
delint client
jigold Feb 6, 2024
3597a89
final delint
jigold Feb 6, 2024
1030f59
fix index and various bugs
jigold Feb 6, 2024
229d8b6
fix database error in commit_batch_update
jigold Feb 6, 2024
fc781a6
attempt to fix mjc
jigold Feb 6, 2024
ef6163c
fix ambig field
jigold Feb 6, 2024
9fd31c3
wip
jigold Feb 7, 2024
4569b3d
cleanup db code
jigold Feb 7, 2024
4219370
fix mjc missing var
jigold Feb 7, 2024
9a9610f
turn off updating attempts to try and debug
jigold Feb 7, 2024
1336950
process of elimination
jigold Feb 7, 2024
f66f615
actually have new triggers in database
jigold Feb 7, 2024
56f6c77
fix build.yaml
jigold Feb 7, 2024
3ffdfae
modify commit_batch_update
jigold Feb 7, 2024
823da60
recursive job group state n_jobs and no migration transaction
jigold Feb 8, 2024
df9ebcd
fix cancel_job_group
jigold Feb 8, 2024
f76070d
more fixes
jigold Feb 8, 2024
ae1b484
fix python front end icr
jigold Feb 8, 2024
51242a0
fix bad global var collision
jigold Feb 8, 2024
b138287
fix cancel
jigold Feb 8, 2024
c63f961
fix cancel
jigold Feb 8, 2024
490cff2
turn off unschedule job in canceller
jigold Feb 9, 2024
e67594f
get rid of committed check
jigold Feb 9, 2024
e50ab12
dont unschedule jobs in canceller
jigold Feb 9, 2024
2fdfcfc
recursive populate jg_inst_coll_cancellable_resources
jigold Feb 9, 2024
aacfddb
test_job_group_cancel_after_n_failures_does_not_cancel_higher_up_jobs…
jigold Feb 9, 2024
16187ca
fix test
jigold Feb 9, 2024
964dcfa
in sync sql
jigold Feb 9, 2024
244354a
delint
jigold Feb 9, 2024
c4028c1
get state right in commit
jigold Feb 9, 2024
946ef12
get rid of unsed columns in staging table
jigold Feb 9, 2024
1870039
more fixes
jigold Feb 9, 2024
de473d5
add nested job groups
jigold Feb 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 72 additions & 21 deletions batch/batch/batch.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,25 @@
import json
import logging
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Optional, cast

from gear import transaction
from hailtop.batch_client.types import CostBreakdownEntry, JobListEntryV1Alpha
from hailtop.batch_client.types import CostBreakdownEntry, GetJobGroupResponseV1Alpha, JobListEntryV1Alpha
from hailtop.utils import humanize_timedelta_msecs, time_msecs_str

from .batch_format_version import BatchFormatVersion
from .exceptions import NonExistentBatchError, OpenBatchError
from .constants import ROOT_JOB_GROUP_ID
from .exceptions import NonExistentJobGroupError
from .utils import coalesce

log = logging.getLogger('batch')


def _maybe_time_msecs_str(t):
if t:
return time_msecs_str(t)
return None


def cost_breakdown_to_dict(cost_breakdown: Dict[str, float]) -> List[CostBreakdownEntry]:
return [{'resource': resource, 'cost': cost} for resource, cost in cost_breakdown.items()]

Expand All @@ -30,14 +37,9 @@ def batch_record_to_dict(record: Dict[str, Any]) -> Dict[str, Any]:
else:
state = 'running'

def _time_msecs_str(t):
if t:
return time_msecs_str(t)
return None

time_created = _time_msecs_str(record['time_created'])
time_closed = _time_msecs_str(record['time_closed'])
time_completed = _time_msecs_str(record['time_completed'])
time_created = _maybe_time_msecs_str(record['time_created'])
time_closed = _maybe_time_msecs_str(record['time_closed'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
Expand Down Expand Up @@ -79,6 +81,52 @@ def _time_msecs_str(t):
return d


def job_group_record_to_dict(record: Dict[str, Any]) -> GetJobGroupResponseV1Alpha:
if record['n_failed'] > 0:
state = 'failure'
elif record['cancelled'] or record['n_cancelled'] > 0:
state = 'cancelled'
elif record['state'] == 'complete':
assert record['n_succeeded'] == record['n_jobs']
state = 'success'
else:
state = 'running'

time_created = _maybe_time_msecs_str(record['time_created'])
time_completed = _maybe_time_msecs_str(record['time_completed'])

if record['time_created'] and record['time_completed']:
duration_ms = record['time_completed'] - record['time_created']
else:
duration_ms = None

if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

d = {
'batch_id': record['batch_id'],
'job_group_id': record['job_group_id'],
'state': state,
'complete': record['state'] == 'complete',
'n_jobs': record['n_jobs'],
'n_completed': record['n_completed'],
'n_succeeded': record['n_succeeded'],
'n_failed': record['n_failed'],
'n_cancelled': record['n_cancelled'],
'time_created': time_created,
'time_completed': time_completed,
'duration': duration_ms,
'cost': coalesce(record['cost'], 0),
'cost_breakdown': record['cost_breakdown'],
}

attributes = json.loads(record['attributes'])
if attributes:
d['attributes'] = attributes

return cast(GetJobGroupResponseV1Alpha, d)


def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEntryV1Alpha:
format_version = BatchFormatVersion(record['format_version'])

Expand All @@ -93,7 +141,7 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEn
if record['cost_breakdown'] is not None:
record['cost_breakdown'] = cost_breakdown_to_dict(json.loads(record['cost_breakdown']))

return {
d = {
'batch_id': record['batch_id'],
'job_id': record['job_id'],
'name': name,
Expand All @@ -107,24 +155,27 @@ def job_record_to_dict(record: Dict[str, Any], name: Optional[str]) -> JobListEn
'cost_breakdown': record['cost_breakdown'],
}

return cast(JobListEntryV1Alpha, d)

async def cancel_batch_in_db(db, batch_id):

async def cancel_job_group_in_db(db, batch_id, job_group_id):
@transaction(db)
async def cancel(tx):
record = await tx.execute_and_fetchone(
"""
SELECT `state` FROM batches
WHERE id = %s AND NOT deleted
SELECT 1
FROM job_groups
LEFT JOIN batches ON batches.id = job_groups.batch_id
LEFT JOIN batch_updates ON job_groups.batch_id = batch_updates.batch_id AND
job_groups.update_id = batch_updates.update_id
WHERE job_groups.batch_id = %s AND job_groups.job_group_id = %s AND NOT deleted AND (batch_updates.committed OR job_groups.job_group_id = %s)
FOR UPDATE;
""",
(batch_id,),
(batch_id, job_group_id, ROOT_JOB_GROUP_ID),
)
if not record:
raise NonExistentBatchError(batch_id)

if record['state'] == 'open':
raise OpenBatchError(batch_id)
raise NonExistentJobGroupError(batch_id, job_group_id)

await tx.just_execute('CALL cancel_batch(%s);', (batch_id,))
await tx.just_execute('CALL cancel_job_group(%s, %s);', (batch_id, job_group_id))

await cancel()
2 changes: 2 additions & 0 deletions batch/batch/constants.py
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
ROOT_JOB_GROUP_ID = 0

MAX_JOB_GROUPS_DEPTH = 5
57 changes: 28 additions & 29 deletions batch/batch/driver/canceller.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,39 +94,38 @@ async def cancel_cancelled_ready_jobs_loop_body(self):
}

async def user_cancelled_ready_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
"""
SELECT batches.id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM batches
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM job_groups
LEFT JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
ON job_groups.batch_id = job_groups_cancelled.id AND
job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE user = %s AND `state` = 'running';
""",
(user,),
):
if batch['cancelled']:
if job_group['cancelled']:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this do a Python-side filter rather than a SQL-side filter like the next query?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's two types of cancelled jobs. One is for cancelled job groups / batches in which all jobs that are not always run are cancelled. But then there's cancellation of individual jobs if their parent's failed. I'm guessing it was setup this way with two separate queries due to optimizing speed and being able to use the remaining share variable. I can see if there's a way to optimize it into 1 query if that's something you want to see happen.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this only applies to ready jobs -- not creating or running jobs.

async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id
SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
yield record
else:
async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id
SELECT jobs.batch_id, jobs.job_id
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
WHERE batch_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
WHERE batch_id = %s AND job_group_id = %s AND state = 'Ready' AND always_run = 0 AND cancelled = 1
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand Down Expand Up @@ -182,28 +181,28 @@ async def cancel_cancelled_creating_jobs_loop_body(self):
}

async def user_cancelled_creating_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
"""
SELECT batches.id
FROM batches
SELECT job_groups.batch_id, job_groups.job_group_id
FROM job_groups
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
ON job_groups.batch_id = job_groups_cancelled.id AND
job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE user = %s AND `state` = 'running';
""",
(user,),
):
async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name
SELECT jobs.batch_id, jobs.job_id, attempts.attempt_id, attempts.instance_name
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
STRAIGHT_JOIN attempts
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id
WHERE jobs.batch_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Creating' AND always_run = 0 AND cancelled = 0
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand Down Expand Up @@ -279,28 +278,28 @@ async def cancel_cancelled_running_jobs_loop_body(self):
}

async def user_cancelled_running_jobs(user, remaining) -> AsyncIterator[Dict[str, Any]]:
async for batch in self.db.select_and_fetchall(
async for job_group in self.db.select_and_fetchall(
"""
SELECT batches.id
FROM batches
SELECT job_groups.batch_id, job_groups.job_group_id, job_groups_cancelled.id IS NOT NULL AS cancelled
FROM job_groups
INNER JOIN job_groups_cancelled
ON batches.id = job_groups_cancelled.id
ON job_groups.batch_id = job_groups_cancelled.id AND
job_groups.job_group_id = job_groups_cancelled.job_group_id
WHERE user = %s AND `state` = 'running';
""",
(user,),
):
async for record in self.db.select_and_fetchall(
"""
SELECT jobs.job_id, attempts.attempt_id, attempts.instance_name
SELECT jobs.batch_id, jobs.job_id, attempts.attempt_id, attempts.instance_name
FROM jobs FORCE INDEX(jobs_batch_id_state_always_run_cancelled)
STRAIGHT_JOIN attempts
ON attempts.batch_id = jobs.batch_id AND attempts.job_id = jobs.job_id
WHERE jobs.batch_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0
WHERE jobs.batch_id = %s AND jobs.job_group_id = %s AND state = 'Running' AND always_run = 0 AND cancelled = 0
LIMIT %s;
""",
(batch['id'], remaining.value),
(job_group['batch_id'], job_group['job_group_id'], remaining.value),
):
record['batch_id'] = batch['id']
yield record

waitable_pool = WaitableSharedPool(self.async_worker_pool)
Expand Down
Loading