Skip to content

Commit

Permalink
Merge e128ac5 into 29f12d2
Browse files Browse the repository at this point in the history
  • Loading branch information
shuds13 committed Mar 21, 2020
2 parents 29f12d2 + e128ac5 commit b6b5554
Showing 1 changed file with 45 additions and 40 deletions.
85 changes: 45 additions & 40 deletions libensemble/executors/mpi_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,11 @@ def __init__(self, auto_resources=True,
"""

Executor.__init__(self)
self.auto_resources = auto_resources

# MPI launch settings
self.max_launch_attempts = 5
self.fail_time = 2
self.auto_resources = auto_resources

mpi_commands = {
'mpich': ['mpirun', '--env {env}', '-machinefile {machinefile}',
Expand All @@ -108,6 +110,10 @@ def __init__(self, auto_resources=True,
self.mpi_launch_type = MPIResources.get_MPI_variant()
self.mpi_command = mpi_commands[self.mpi_launch_type]

self.subgroup_launch = True
if self.mpi_launch_type in ['aprun', 'srun']:
self.subgroup_launch = False

if self.auto_resources:
self.resources = \
MPIResources(top_level_dir=self.top_level_dir,
Expand Down Expand Up @@ -166,6 +172,42 @@ def _get_mpi_specs(self, task, num_procs, num_nodes,
'machinefile': machinefile,
'hostlist': hostlist}

def _launch_with_retries(self, task, runline, wait_on_run):
""" Launch task with retry mechanism"""
retry_count = 0
while retry_count < self.max_launch_attempts:
retry = False
try:
retry_string = " (Retry {})".format(retry_count) if retry_count > 0 else ""
logger.info("Launching task {}{}: {}".
format(task.name, retry_string, " ".join(runline)))

task.process = launcher.launch(runline, cwd='./',
stdout=open(task.stdout, 'w'),
stderr=open(task.stderr, 'w'),
start_new_session=self.subgroup_launch)
except Exception as e:
logger.warning('task {} submit command failed on "\
"try {} with error {}'.format(task.name, retry_count, e))
retry = True
retry_count += 1
else:
if (wait_on_run):
self._wait_on_run(task, self.fail_time)

if task.state == 'FAILED':
logger.warning('task {} failed within fail_time on"\
"try {} with err code {}'.format(task.name, retry_count, task.errcode))
retry = True
retry_count += 1

if retry and retry_count < self.max_launch_attempts:
logger.debug('Retry number {} for task {}')
time.sleep(retry_count*5)
task.reset() # Some cases may require user cleanup - currently not supported (could use callback)
else:
break

def submit(self, calc_type, num_procs=None, num_nodes=None,
ranks_per_node=None, machinefile=None, app_args=None,
stdout=None, stderr=None, stage_inout=None,
Expand Down Expand Up @@ -250,51 +292,14 @@ def submit(self, calc_type, num_procs=None, num_nodes=None,
if test:
logger.info('Test (No submit) Runline: {}'.format(' '.join(runline)))
else:
subgroup_launch = True
if self.mpi_launch_type in ['aprun', 'srun']:
subgroup_launch = False

retry_count = 0
while retry_count < self.max_launch_attempts:
retry = False
try:
retry_string = " (Retry {})".format(retry_count) if retry_count > 0 else ""
logger.info("Launching task {}{}: {}".
format(task.name, retry_string, " ".join(runline)))

task.process = launcher.launch(runline, cwd='./',
stdout=open(task.stdout, 'w'),
stderr=open(task.stderr, 'w'),
start_new_session=subgroup_launch)
except Exception as e:
logger.warning('task {} submit command failed on "\
"try {} with error {}'.format(task.name, retry_count, e))
retry = True
retry_count += 1
else:
if (wait_on_run):
self._wait_on_run(task, self.fail_time)

if task.state == 'FAILED':
logger.warning('task {} failed within fail_time on"\
"try {} with err code {}'.format(task.name, retry_count, task.errcode))
retry = True
retry_count += 1

if retry and retry_count < self.max_launch_attempts:
# retry_count += 1 # Do not want to reset task if not going to retry.
logger.debug('Retry number {} for task {}')
time.sleep(retry_count*5)
task.reset() # Some cases may require user cleanup - currently not supported (could use callback)
else:
break
# Launch Task
self._launch_with_retries(task, runline, wait_on_run)

if not task.timer.timing:
task.timer.start()
task.submit_time = task.timer.tstart # Time not date - may not need if using timer.

self.list_of_tasks.append(task)

return task

def set_worker_info(self, comm, workerid=None):
Expand Down

0 comments on commit b6b5554

Please sign in to comment.