Skip to content

Commit

Permalink
add slurm_job_size argument for slurm job array (#325)
Browse files Browse the repository at this point in the history
Signed-off-by: Jinzhe Zeng <jinzhe.zeng@rutgers.edu>
  • Loading branch information
njzjz committed Apr 19, 2023
1 parent 3736b9c commit 6953c03
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 7 deletions.
4 changes: 2 additions & 2 deletions doc/batch.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ To avoid running multiple jobs at the same time, one could set {dargs:argument}`
One needs to make sure slurm has been setup in the remote server and the related environment is activated.

When `SlurmJobArray` is used, dpdispatcher submits Slurm jobs with [job arrays](https://slurm.schedmd.com/job_array.html).
In this way, a dpdispatcher {class}`task <dpdispatcher.submission.Task>` maps to a Slurm job and a dpdispatcher {class}`job <dpdispatcher.submission.Job>` maps to a Slurm job array.
In this way, several dpdispatcher {class}`task <dpdispatcher.submission.Task>`s map to a Slurm job and a dpdispatcher {class}`job <dpdispatcher.submission.Job>` maps to a Slurm job array.
Millions of Slurm jobs can be submitted quickly and Slurm can execute all Slurm jobs at the same time.
One can use {dargs:argument}`group_size <resources/group_size>` to control how many Slurm jobs are contained in a Slurm job array.
One can use {dargs:argument}`group_size <resources/group_size>` and {dargs:argument}`slurm_job_size <resources[SlurmJobArray]/kwargs/slurm_job_size>` to control how many Slurm jobs are contained in a Slurm job array.

## OpenPBS or PBSPro

Expand Down
43 changes: 38 additions & 5 deletions dpdispatcher/slurm.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
import pathlib
import shlex
from typing import List
Expand Down Expand Up @@ -199,25 +200,27 @@ class SlurmJobArray(Slurm):
"""Slurm with job array enabled for multiple tasks in a job."""

def gen_script_header(self, job):
slurm_job_size = job.resources.kwargs.get("slurm_job_size", 1)
if job.fail_count > 0:
# resubmit jobs, check if some of tasks have been finished
job_array = []
job_array = set()
for ii, task in enumerate(job.job_task_list):
task_tag_finished = (
pathlib.PurePath(task.task_work_path)
/ (task.task_hash + "_task_tag_finished")
).as_posix()
if not self.context.check_file_exists(task_tag_finished):
job_array.append(ii)
job_array.add(ii // slurm_job_size)
return super().gen_script_header(job) + "\n#SBATCH --array=%s" % (
",".join(map(str, job_array))
)
return super().gen_script_header(job) + "\n#SBATCH --array=0-%d" % (
len(job.job_task_list) - 1
math.ceil(len(job.job_task_list) / slurm_job_size) - 1
)

def gen_script_command(self, job):
resources = job.resources
slurm_job_size = resources.kwargs.get("slurm_job_size", 1)
# SLURM_ARRAY_TASK_ID: 0 ~ n_jobs-1
script_command = "case $SLURM_ARRAY_TASK_ID in\n"
for ii, task in enumerate(job.job_task_list):
Expand All @@ -243,10 +246,16 @@ def gen_script_command(self, job):
task_tag_finished=task_tag_finished,
log_err_part=log_err_part,
)
script_command += f"{ii})\n"
if ii % slurm_job_size == 0:
script_command += f"{ii // slurm_job_size})\n"
script_command += single_script_command
script_command += self.gen_script_wait(resources=resources)
script_command += "\n;;\n"
script_command += "\n"
if (
ii % slurm_job_size == slurm_job_size - 1
or ii == len(job.job_task_list) - 1
):
script_command += ";;\n"
script_command += "*)\nexit 1\n;;\nesac\n"
return script_command

Expand Down Expand Up @@ -343,3 +352,27 @@ def check_finish_tag(self, job):
).as_posix()
results.append(self.context.check_file_exists(task_tag_finished))
return all(results)

@classmethod
def resources_subfields(cls) -> List[Argument]:
"""Generate the resources subfields.
Returns
-------
list[Argument]
resources subfields
"""
doc_slurm_job_size = "Number of tasks in a Slurm job"
arg = super().resources_subfields()[0]
arg["kwargs"].extend_subfields(
[
Argument(
"slurm_job_size",
int,
optional=True,
default=1,
doc=doc_slurm_job_size,
),
]
)
return [arg]
12 changes: 12 additions & 0 deletions tests/test_run_submission.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,18 @@ def setUp(self):
self.resources_dict["queue_name"] = "normal"


@unittest.skipIf(
os.environ.get("DPDISPATCHER_TEST") != "slurm",
"outside the slurm testing environment",
)
class TestSlurmJobArrayRun2(RunSubmission, unittest.TestCase):
def setUp(self):
super().setUp()
self.machine_dict["batch_type"] = "SlurmJobArray"
self.resources_dict["queue_name"] = "normal"
self.resources_dict["kwargs"] = {"slurm_job_size": 2}


@unittest.skipIf(
os.environ.get("DPDISPATCHER_TEST") != "pbs", "outside the pbs testing environment"
)
Expand Down

0 comments on commit 6953c03

Please sign in to comment.