Skip to content

Commit

Permalink
Call submit_job() recursively if the max num of job limit was reached (
Browse files Browse the repository at this point in the history
…#605)

If the max number of jobs submitted to a server (either via local or
ssh) was reached, sleep for 5 minutes and try again recursively.
Currently implemented for Slurm.
  • Loading branch information
kfir4444 committed Mar 9, 2023
2 parents 5f64900 + 1b4f0b8 commit a9bc6ad
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
22 changes: 18 additions & 4 deletions arc/job/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ def submit_job(path: str,
cluster_soft: Optional[str] = None,
submit_cmd: Optional[str] = None,
submit_filename: Optional[str] = None,
recursion: bool = False,
) -> Tuple[Optional[str], Optional[str]]:
"""
Submit a job.
Expand All @@ -231,19 +232,31 @@ def submit_job(path: str,
cluster_soft (str, optional): The server cluster software.
submit_cmd (str, optional): The submit command.
submit_filename (str, optional): The submit script file name.
recursion (bool, optional): Whether this call is within a recursion.
Returns:
Tuple[Optional[str], Optional[str]]: job_status, job_id
"""
cluster_soft = cluster_soft or servers['local']['cluster_soft']
job_status, job_id = '', ''
submit_cmd = submit_cmd or submit_command[servers['local']['cluster_soft']]
submit_filename = submit_filename or submit_filenames[servers['local']['cluster_soft']]
submit_cmd = submit_cmd or submit_command[cluster_soft]
submit_filename = submit_filename or submit_filenames[cluster_soft]
cmd = f"cd {path}; {submit_cmd} {submit_filename}"
stdout, stderr = execute_command(cmd)
if not len(stdout):
time.sleep(10)
stdout, stderr = execute_command(cmd)
if not len(stdout):
if stderr:
if cluster_soft.lower() == 'slurm' and any('AssocMaxSubmitJobLimit' in err_line for err_line in stderr):
logger.warning(f'Max number of submitted jobs was reached, sleeping...')
time.sleep(5 * 60)
submit_job(path=path,
cluster_soft=cluster_soft,
submit_cmd=submit_cmd,
submit_filename=submit_filename,
recursion=True,
)
if not len(stdout) or recursion:
return None, None
if len(stderr) > 0 or len(stdout) == 0:
logger.warning(f'Got the following error when trying to submit job:\n{stderr}.')
Expand All @@ -268,7 +281,8 @@ def _determine_job_id(stdout: List[str],
str: The determined job ID.
"""
job_id = ''
cluster_soft = cluster_soft or servers['local']['cluster_soft'].lower()
cluster_soft = cluster_soft or servers['local']['cluster_soft']
cluster_soft = cluster_soft.lower() if cluster_soft is not None else None
if cluster_soft in ['oge', 'sge'] and 'submitted' in stdout[0].lower():
job_id = stdout[0].split()[2]
elif cluster_soft == 'slurm' and 'submitted' in stdout[0].lower():
Expand Down
22 changes: 15 additions & 7 deletions arc/job/ssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,13 +284,15 @@ def check_running_jobs_ids(self) -> list:
running_job_ids.append(job_id)
return running_job_ids

def submit_job(self, remote_path: str) -> Tuple[str, int]:
def submit_job(self, remote_path: str,
recursion: bool = False,
) -> Tuple[Optional[str], Optional[str]]:
"""
Submit a job to the server.
Args:
remote_path (str): The remote path contains the input file
and the submission script.
remote_path (str): The remote path contains the input file and the submission script.
recursion (bool, optional): Whether this call is within a recursion.
Returns: Tuple[str, int]
- A string indicate the status of job submission.
Expand All @@ -309,13 +311,19 @@ def submit_job(self, remote_path: str) -> Tuple[str, int]:
if 'Requested node configuration is not available' in line:
logger.warning('User may be requesting more resources than are available. Please check server '
'settings, such as cpus and memory, in ARC/arc/settings/settings.py')
elif servers[self.server]['cluster_soft'].lower() in ['oge', 'sge'] and 'submitted' in stdout[0].lower():
if cluster_soft.lower() == 'slurm' and 'AssocMaxSubmitJobLimit' in line:
logger.warning(f'Max number of submitted jobs was reached, sleeping...')
time.sleep(5 * 60)
self.submit_job(remote_path=remote_path, recursion=True)
if recursion:
return None, None
elif cluster_soft.lower() in ['oge', 'sge'] and 'submitted' in stdout[0].lower():
job_id = stdout[0].split()[2]
elif servers[self.server]['cluster_soft'].lower() == 'slurm' and 'submitted' in stdout[0].lower():
elif cluster_soft.lower() == 'slurm' and 'submitted' in stdout[0].lower():
job_id = stdout[0].split()[3]
elif servers[self.server]['cluster_soft'].lower() == 'pbs':
elif cluster_soft.lower() == 'pbs':
job_id = stdout[0].split('.')[0]
elif servers[self.server]['cluster_soft'].lower() == 'htcondor' and 'submitting' in stdout[0].lower():
elif cluster_soft.lower() == 'htcondor' and 'submitting' in stdout[0].lower():
# Submitting job(s).
# 1 job(s) submitted to cluster 443069.
if len(stdout) and len(stdout[1].split()) and len(stdout[1].split()[-1].split('.')):
Expand Down

0 comments on commit a9bc6ad

Please sign in to comment.