Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implemented --max_array_size (default 100) option #584

Merged
merged 1 commit into from
Apr 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions src/duqtools/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,10 +293,17 @@ def cli_recreate(**kwargs):
'--schedule',
is_flag=True,
help='Schedule and submit jobs automatically. `max_jobs` must be defined.')
@click.option('-j',
'--max_jobs',
type=int,
help='Maximum number of jobs to submit.')
@click.option(
'-j',
'--max_jobs',
type=int,
help=
'Maximum number of jobs to submit (simultaneously for array submission).')
@click.option(
'--max_array_size',
type=int,
default=100,
help='Maximum array size for slurm (usually 1001, default = 100).')
@click.option('-a', '--array', is_flag=True, help='Submit jobs as array.')
@click.option('-r',
'--resubmit',
Expand Down
17 changes: 13 additions & 4 deletions src/duqtools/jetto/_batchfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ def write_batchfile(run_dir: Path,
llcmd_path.chmod(llcmd_path.stat().st_mode | stat.S_IXUSR)


def write_array_batchfile(jobs: Sequence[Job], max_jobs: int):
def write_array_batchfile(jobs: Sequence[Job], max_jobs: int,
max_array_size: int):
"""Write array batchfile to start jetto runs.

Parameters
Expand All @@ -90,18 +91,26 @@ def write_array_batchfile(jobs: Sequence[Job], max_jobs: int):

scripts = '\n'.join(f' {job.submit_script}' for job in jobs)

# Calculate array size
array_size = min(len(jobs), max_array_size)

string = f"""#!/bin/sh
{options}
#SBATCH -o {out_file}
#SBATCH -e {err_file}
#SBATCH --array=0-{len(jobs)-1}%{max_jobs}
#SBATCH --array=0-{array_size-1}%{max_jobs}
#SBATCH -J duqtools-array

scripts=(
{scripts}
)
echo executing ${{scripts[$SLURM_ARRAY_TASK_ID]}}
${{scripts[$SLURM_ARRAY_TASK_ID]}} || true
i=$SLURM_ARRAY_TASK_ID
while [ $i -le {len(jobs)} ]; do
echo executing ${{scripts[$i]}}
${{scripts[$i]}} || true
i=$((i+{array_size}))
done

"""

with open('duqtools_slurm_array.sh', 'w') as f:
Expand Down
13 changes: 8 additions & 5 deletions src/duqtools/jetto/_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,25 @@ def submit_prominence(job: Job):
_ = jetto_manager.submit_job_to_prominence(jetto_config, job.dir)

@staticmethod
def submit_array(jobs: Sequence[Job], max_jobs: int):
def submit_array(jobs: Sequence[Job],
max_jobs: int,
max_array_size: int = 100):
if jobs[0].cfg.submit.submit_system == 'slurm':
JettoSystem.submit_array_slurm(jobs, max_jobs)
JettoSystem.submit_array_slurm(jobs, max_jobs, max_array_size)
else:
raise NotImplementedError(
'array submission type {jobs[0].cfg.submit.submit_system}'
f'array submission type {jobs[0].cfg.submit.submit_system}'
' not implemented')

@staticmethod
@add_to_op_queue('Submit single array job', 'duqtools_slurm_array.sh')
def submit_array_slurm(jobs: Sequence[Job], max_jobs: int):
def submit_array_slurm(jobs: Sequence[Job], max_jobs: int,
max_array_size: int):
for job in jobs:
job.lockfile.touch()

logger.info('writing duqtools_slurm_array.sh file')
_write_array_batchfile(jobs, max_jobs)
_write_array_batchfile(jobs, max_jobs, max_array_size)

submit_cmd = jobs[0].cfg.submit.submit_command.split()
cmd: list[Any] = [*submit_cmd, 'duqtools_slurm_array.sh']
Expand Down
14 changes: 9 additions & 5 deletions src/duqtools/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def _submit_job(job: Job, *, delay: float = 0):
job.submit()


def job_submitter(jobs: Sequence[Job], *, max_jobs):
def job_submitter(jobs: Sequence[Job], *, max_jobs, **kwargs):
for n, job in enumerate(jobs):
if max_jobs and (n >= max_jobs):
info(f'Max jobs ({max_jobs}) reached.')
Expand All @@ -48,7 +48,7 @@ def job_submitter(jobs: Sequence[Job], *, max_jobs):


@add_to_op_queue('Start job scheduler')
def job_scheduler(queue: Deque[Job], max_jobs=10):
def job_scheduler(queue: Deque[Job], max_jobs=10, **kwargs):
interval = 1.0

s = Spinner()
Expand Down Expand Up @@ -79,7 +79,8 @@ def job_scheduler(queue: Deque[Job], max_jobs=10):
)


def job_array_submitter(jobs: Sequence[Job], *, max_jobs):
def job_array_submitter(jobs: Sequence[Job], *, max_jobs, max_array_size,
**kwargs):
if len(jobs) == 0:
duqlog_screen.error('No jobs to submit, not creating array ...')
return
Expand All @@ -93,7 +94,7 @@ def job_array_submitter(jobs: Sequence[Job], *, max_jobs):
logger.info('Max jobs not specified, defaulting to 10')
max_jobs = 10

get_system().submit_array(jobs, max_jobs)
get_system().submit_array(jobs, max_jobs, max_array_size)


def submission_script_ok(job):
Expand Down Expand Up @@ -159,6 +160,7 @@ def get_resubmit_jobs(resubmit_names: Sequence[Path]) -> list[Job]:
def submit(*,
force: bool,
max_jobs: int,
max_array_size: int,
schedule: bool,
array: bool,
resubmit: Sequence[Path] = (),
Expand All @@ -173,6 +175,8 @@ def submit(*,
Force the submission even in the presence of lockfiles
max_jobs : int
Maximum number of jobs to submit at once
max_array_size : int
Maximum array size for slurm (usually 1001, default = 100)
schedule : bool
Schedule `max_jobs` to run at once, keeps the process alive until
finished.
Expand Down Expand Up @@ -221,4 +225,4 @@ def submit(*,

submitter = job_scheduler if schedule else job_submitter
submitter = job_array_submitter if array else submitter
submitter(job_queue, max_jobs=max_jobs)
submitter(job_queue, max_jobs=max_jobs, max_array_size=max_array_size)