diff --git a/.black b/.black index 07104554e..62d4af095 100644 --- a/.black +++ b/.black @@ -7,17 +7,8 @@ force-exclude = ''' | \.git | \.github )/ - | libensemble/libE.py | libensemble/gen_funcs/old_aposmm.py | libensemble/gen_funcs/persistent_aposmm.py | libensemble/gen_funcs/aposmm_localopt_support.py - | libensemble/tests/unit_tests/test_allocation_funcs_and_support.py - | libensemble/tests/unit_tests/test_loc_stack.py - | libensemble/tests/unit_tests/test_sim_dir_properties.py - | libensemble/tools/check_inputs.py - | libensemble/tools/parse_args.py - | libensemble/executors/mpi_runner.py - | libensemble/resources/mpi_resources.py - | libensemble/resources/scheduler.py ) ''' diff --git a/libensemble/executors/mpi_runner.py b/libensemble/executors/mpi_runner.py index fc5927823..aa6f68c66 100644 --- a/libensemble/executors/mpi_runner.py +++ b/libensemble/executors/mpi_runner.py @@ -9,17 +9,16 @@ class MPIRunner: - @staticmethod def get_runner(mpi_runner_type, runner_name=None): mpi_runners = { - 'mpich': MPICH_MPIRunner, - 'openmpi': OPENMPI_MPIRunner, - 'aprun': APRUN_MPIRunner, - 'srun': SRUN_MPIRunner, - 'jsrun': JSRUN_MPIRunner, - 'custom': MPIRunner + "mpich": MPICH_MPIRunner, + "openmpi": OPENMPI_MPIRunner, + "aprun": APRUN_MPIRunner, + "srun": SRUN_MPIRunner, + "jsrun": JSRUN_MPIRunner, + "custom": MPIRunner, } mpi_runner = mpi_runners[mpi_runner_type] if runner_name is not None: @@ -28,25 +27,24 @@ def get_runner(mpi_runner_type, runner_name=None): runner = mpi_runner() return runner - def __init__(self, run_command='mpiexec'): + def __init__(self, run_command="mpiexec"): self.run_command = run_command - self.mpi_command = [self.run_command, '{extra_args}'] + self.mpi_command = [self.run_command, "{extra_args}"] self.subgroup_launch = False self.mfile_support = False - self.arg_nprocs = ('--LIBE_NPROCS_ARG_EMPTY',) - self.arg_nnodes = ('--LIBE_NNODES_ARG_EMPTY',) - self.arg_ppn = ('--LIBE_PPN_ARG_EMPTY',) + self.arg_nprocs = ("--LIBE_NPROCS_ARG_EMPTY",) + self.arg_nnodes = ("--LIBE_NNODES_ARG_EMPTY",) + self.arg_ppn = ("--LIBE_PPN_ARG_EMPTY",) def _get_parser(self, p_args, nprocs, nnodes, ppn): - parser = argparse.ArgumentParser(description='Parse extra_args', allow_abbrev=False) - parser.add_argument(*nprocs, type=int, dest='num_procs', default=None) - parser.add_argument(*nnodes, type=int, dest='num_nodes', default=None) - parser.add_argument(*ppn, type=int, dest='procs_per_node', default=None) + parser = argparse.ArgumentParser(description="Parse extra_args", allow_abbrev=False) + parser.add_argument(*nprocs, type=int, dest="num_procs", default=None) + parser.add_argument(*nnodes, type=int, dest="num_nodes", default=None) + parser.add_argument(*ppn, type=int, dest="procs_per_node", default=None) args, _ = parser.parse_known_args(p_args) return args - def _parse_extra_args(self, num_procs, num_nodes, procs_per_node, - hyperthreads, extra_args): + def _parse_extra_args(self, num_procs, num_nodes, procs_per_node, hyperthreads, extra_args): splt_extra_args = extra_args.split() p_args = self._get_parser(splt_extra_args, self.arg_nprocs, self.arg_nnodes, self.arg_ppn) @@ -59,7 +57,7 @@ def _parse_extra_args(self, num_procs, num_nodes, procs_per_node, if procs_per_node is None: procs_per_node = p_args.procs_per_node - extra_args = ' '.join(splt_extra_args) + extra_args = " ".join(splt_extra_args) return num_procs, num_nodes, procs_per_node, p_args def _rm_replicated_args(self, num_procs, num_nodes, procs_per_node, p_args): @@ -71,10 +69,9 @@ def _rm_replicated_args(self, num_procs, num_nodes, procs_per_node, p_args): procs_per_node = None return num_procs, num_nodes, procs_per_node - def express_spec(self, task, num_procs, num_nodes, - procs_per_node, machinefile, - hyperthreads, extra_args, - resources, workerID): + def express_spec( + self, task, num_procs, num_nodes, procs_per_node, machinefile, hyperthreads, extra_args, resources, workerID + ): hostlist = None machinefile = None @@ -82,84 +79,90 @@ def express_spec(self, task, num_procs, num_nodes, hostlist = mpi_resources.get_hostlist(resources, num_nodes) return hostlist, machinefile - def get_mpi_specs(self, task, num_procs, num_nodes, - procs_per_node, machinefile, - hyperthreads, extra_args, - resources, workerID): + def get_mpi_specs( + self, task, num_procs, num_nodes, procs_per_node, machinefile, hyperthreads, extra_args, resources, workerID + ): "Form the mpi_specs dictionary." # Return auto_resource variables inc. extra_args additions if extra_args: - num_procs, num_nodes, procs_per_node, p_args = \ - self._parse_extra_args(num_procs, num_nodes, procs_per_node, - hyperthreads, extra_args=extra_args) + num_procs, num_nodes, procs_per_node, p_args = self._parse_extra_args( + num_procs, num_nodes, procs_per_node, hyperthreads, extra_args=extra_args + ) hostlist = None if machinefile and not self.mfile_support: - logger.warning('User machinefile ignored - not supported by {}'.format(self.run_command)) + logger.warning("User machinefile ignored - not supported by {}".format(self.run_command)) machinefile = None if machinefile is None and resources is not None: - num_procs, num_nodes, procs_per_node = \ - mpi_resources.get_resources(resources, num_procs, num_nodes, - procs_per_node, hyperthreads) - hostlist, machinefile = \ - self.express_spec(task, num_procs, num_nodes, - procs_per_node, machinefile, - hyperthreads, extra_args, - resources, workerID) + num_procs, num_nodes, procs_per_node = mpi_resources.get_resources( + resources, num_procs, num_nodes, procs_per_node, hyperthreads + ) + hostlist, machinefile = self.express_spec( + task, num_procs, num_nodes, procs_per_node, machinefile, hyperthreads, extra_args, resources, workerID + ) else: - num_procs, num_nodes, procs_per_node = \ - mpi_resources.task_partition(num_procs, num_nodes, - procs_per_node, machinefile) + num_procs, num_nodes, procs_per_node = mpi_resources.task_partition( + num_procs, num_nodes, procs_per_node, machinefile + ) # Remove portable variable if in extra_args if extra_args: - num_procs, num_nodes, procs_per_node = \ - self._rm_replicated_args(num_procs, num_nodes, - procs_per_node, p_args) - - return {'num_procs': num_procs, - 'num_nodes': num_nodes, - 'procs_per_node': procs_per_node, - 'extra_args': extra_args, - 'machinefile': machinefile, - 'hostlist': hostlist} + num_procs, num_nodes, procs_per_node = self._rm_replicated_args( + num_procs, num_nodes, procs_per_node, p_args + ) + + return { + "num_procs": num_procs, + "num_nodes": num_nodes, + "procs_per_node": procs_per_node, + "extra_args": extra_args, + "machinefile": machinefile, + "hostlist": hostlist, + } class MPICH_MPIRunner(MPIRunner): - - def __init__(self, run_command='mpirun'): + def __init__(self, run_command="mpirun"): self.run_command = run_command self.subgroup_launch = True self.mfile_support = True - self.arg_nprocs = ('-n', '-np') - self.arg_nnodes = ('--LIBE_NNODES_ARG_EMPTY',) - self.arg_ppn = ('--ppn',) - self.mpi_command = [self.run_command, '--env {env}', - '-machinefile {machinefile}', - '-hosts {hostlist}', '-np {num_procs}', - '--ppn {procs_per_node}', '{extra_args}'] + self.arg_nprocs = ("-n", "-np") + self.arg_nnodes = ("--LIBE_NNODES_ARG_EMPTY",) + self.arg_ppn = ("--ppn",) + self.mpi_command = [ + self.run_command, + "--env {env}", + "-machinefile {machinefile}", + "-hosts {hostlist}", + "-np {num_procs}", + "--ppn {procs_per_node}", + "{extra_args}", + ] class OPENMPI_MPIRunner(MPIRunner): - - def __init__(self, run_command='mpirun'): + def __init__(self, run_command="mpirun"): self.run_command = run_command self.subgroup_launch = True self.mfile_support = True - self.arg_nprocs = ('-n', '-np', '-c', '--n') - self.arg_nnodes = ('--LIBE_NNODES_ARG_EMPTY',) - self.arg_ppn = ('-npernode',) - self.mpi_command = [self.run_command, '-x {env}', - '-machinefile {machinefile}', - '-host {hostlist}', '-np {num_procs}', - '-npernode {procs_per_node}', '{extra_args}'] - - def express_spec(self, task, num_procs, num_nodes, - procs_per_node, machinefile, - hyperthreads, extra_args, - resources, workerID): + self.arg_nprocs = ("-n", "-np", "-c", "--n") + self.arg_nnodes = ("--LIBE_NNODES_ARG_EMPTY",) + self.arg_ppn = ("-npernode",) + self.mpi_command = [ + self.run_command, + "-x {env}", + "-machinefile {machinefile}", + "-host {hostlist}", + "-np {num_procs}", + "-npernode {procs_per_node}", + "{extra_args}", + ] + + def express_spec( + self, task, num_procs, num_nodes, procs_per_node, machinefile, hyperthreads, extra_args, resources, workerID + ): hostlist = None machinefile = None @@ -170,99 +173,103 @@ def express_spec(self, task, num_procs, num_nodes, if workerID is not None: machinefile += "_for_worker_{}".format(workerID) machinefile += "_task_{}".format(task.id) - mfile_created, num_procs, num_nodes, procs_per_node = \ - mpi_resources.create_machinefile(resources, machinefile, - num_procs, num_nodes, - procs_per_node, hyperthreads) + mfile_created, num_procs, num_nodes, procs_per_node = mpi_resources.create_machinefile( + resources, machinefile, num_procs, num_nodes, procs_per_node, hyperthreads + ) jassert(mfile_created, "Auto-creation of machinefile failed") return hostlist, machinefile class APRUN_MPIRunner(MPIRunner): - - def __init__(self, run_command='aprun'): + def __init__(self, run_command="aprun"): self.run_command = run_command self.subgroup_launch = False self.mfile_support = False - self.arg_nprocs = ('-n',) - self.arg_nnodes = ('--LIBE_NNODES_ARG_EMPTY',) - self.arg_ppn = ('-N',) - self.mpi_command = [self.run_command, '-e {env}', - '-L {hostlist}', '-n {num_procs}', - '-N {procs_per_node}', '{extra_args}'] + self.arg_nprocs = ("-n",) + self.arg_nnodes = ("--LIBE_NNODES_ARG_EMPTY",) + self.arg_ppn = ("-N",) + self.mpi_command = [ + self.run_command, + "-e {env}", + "-L {hostlist}", + "-n {num_procs}", + "-N {procs_per_node}", + "{extra_args}", + ] class SRUN_MPIRunner(MPIRunner): - - def __init__(self, run_command='srun'): + def __init__(self, run_command="srun"): self.run_command = run_command self.subgroup_launch = False self.mfile_support = False - self.arg_nprocs = ('-n', '--ntasks') - self.arg_nnodes = ('-N', '--nodes') - self.arg_ppn = ('--ntasks-per-node',) - self.mpi_command = [self.run_command, '-w {hostlist}', - '--ntasks {num_procs}', - '--nodes {num_nodes}', - '--ntasks-per-node {procs_per_node}', - '{extra_args}'] + self.arg_nprocs = ("-n", "--ntasks") + self.arg_nnodes = ("-N", "--nodes") + self.arg_ppn = ("--ntasks-per-node",) + self.mpi_command = [ + self.run_command, + "-w {hostlist}", + "--ntasks {num_procs}", + "--nodes {num_nodes}", + "--ntasks-per-node {procs_per_node}", + "{extra_args}", + ] class JSRUN_MPIRunner(MPIRunner): - - def __init__(self, run_command='jsrun'): + def __init__(self, run_command="jsrun"): self.run_command = run_command self.subgroup_launch = True self.mfile_support = False # TODO: Add multiplier to resources checks (for -c/-a) - self.arg_nprocs = ('--np', '-n') - self.arg_nnodes = ('--LIBE_NNODES_ARG_EMPTY',) - self.arg_ppn = ('-r',) - self.mpi_command = [self.run_command, '-n {num_procs}', - '-r {procs_per_node}', '{extra_args}'] + self.arg_nprocs = ("--np", "-n") + self.arg_nnodes = ("--LIBE_NNODES_ARG_EMPTY",) + self.arg_ppn = ("-r",) + self.mpi_command = [self.run_command, "-n {num_procs}", "-r {procs_per_node}", "{extra_args}"] - def get_mpi_specs(self, task, num_procs, num_nodes, - procs_per_node, machinefile, - hyperthreads, extra_args, - resources, workerID): + def get_mpi_specs( + self, task, num_procs, num_nodes, procs_per_node, machinefile, hyperthreads, extra_args, resources, workerID + ): # Return auto_resource variables inc. extra_args additions if extra_args: - num_procs, num_nodes, procs_per_node, p_args = \ - self._parse_extra_args(num_procs, num_nodes, procs_per_node, - hyperthreads, extra_args=extra_args) + num_procs, num_nodes, procs_per_node, p_args = self._parse_extra_args( + num_procs, num_nodes, procs_per_node, hyperthreads, extra_args=extra_args + ) rm_rpn = True if procs_per_node is None and num_nodes is None else False hostlist = None if machinefile and not self.mfile_support: - logger.warning('User machinefile ignored - not supported by {}'.format(self.run_command)) + logger.warning("User machinefile ignored - not supported by {}".format(self.run_command)) machinefile = None if machinefile is None and resources is not None: - num_procs, num_nodes, procs_per_node = \ - mpi_resources.get_resources(resources, num_procs, num_nodes, - procs_per_node, hyperthreads) + num_procs, num_nodes, procs_per_node = mpi_resources.get_resources( + resources, num_procs, num_nodes, procs_per_node, hyperthreads + ) # TODO: Create ERF file if mapping worker to resources req. else: - num_procs, num_nodes, procs_per_node = \ - mpi_resources.task_partition(num_procs, num_nodes, - procs_per_node, machinefile) + num_procs, num_nodes, procs_per_node = mpi_resources.task_partition( + num_procs, num_nodes, procs_per_node, machinefile + ) # Remove portable variable if in extra_args if extra_args: - num_procs, num_nodes, procs_per_node = \ - self._rm_replicated_args(num_procs, num_nodes, - procs_per_node, p_args) + num_procs, num_nodes, procs_per_node = self._rm_replicated_args( + num_procs, num_nodes, procs_per_node, p_args + ) if rm_rpn: procs_per_node = None - return {'num_procs': num_procs, - 'num_nodes': num_nodes, - 'procs_per_node': procs_per_node, - 'extra_args': extra_args, - 'machinefile': machinefile, - 'hostlist': hostlist} + return { + "num_procs": num_procs, + "num_nodes": num_nodes, + "procs_per_node": procs_per_node, + "extra_args": extra_args, + "machinefile": machinefile, + "hostlist": hostlist, + } diff --git a/libensemble/libE.py b/libensemble/libE.py index f3a200d9f..da0af3487 100644 --- a/libensemble/libE.py +++ b/libensemble/libE.py @@ -104,7 +104,7 @@ See below for the complete traditional ``libE()`` API. """ -__all__ = ['libE'] +__all__ = ["libE"] import os import logging @@ -134,11 +134,7 @@ # logger.setLevel(logging.DEBUG) -def libE(sim_specs, gen_specs, exit_criteria, - persis_info=None, - alloc_specs=None, - libE_specs=None, - H0=None): +def libE(sim_specs, gen_specs, exit_criteria, persis_info=None, alloc_specs=None, libE_specs=None, H0=None): """ Parameters ---------- @@ -218,45 +214,51 @@ def libE(sim_specs, gen_specs, exit_criteria, H0 = np.empty(0) # Set default comms - if 'comms' not in libE_specs: - libE_specs['comms'] = 'mpi' + if "comms" not in libE_specs: + libE_specs["comms"] = "mpi" - libE_funcs = {'mpi': libE_mpi, - 'tcp': libE_tcp, - 'local': libE_local} + libE_funcs = {"mpi": libE_mpi, "tcp": libE_tcp, "local": libE_local} - comms_type = libE_specs.get('comms') + comms_type = libE_specs.get("comms") assert comms_type in libE_funcs, "Unknown comms type: {}".format(comms_type) # Resource management not supported with TCP - if comms_type == 'tcp': - libE_specs['disable_resource_manager'] = True + if comms_type == "tcp": + libE_specs["disable_resource_manager"] = True Resources.init_resources(libE_specs) - return libE_funcs[comms_type](sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0) - - -def manager(wcomms, sim_specs, gen_specs, exit_criteria, persis_info, - alloc_specs, libE_specs, hist, - on_abort=None, on_cleanup=None): + return libE_funcs[comms_type](sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0) + + +def manager( + wcomms, + sim_specs, + gen_specs, + exit_criteria, + persis_info, + alloc_specs, + libE_specs, + hist, + on_abort=None, + on_cleanup=None, +): """Generic manager routine run.""" - logger.info('Logger initializing: [workerID] precedes each line. [0] = Manager') - logger.info('libE version v{}'.format(__version__)) + logger.info("Logger initializing: [workerID] precedes each line. [0] = Manager") + logger.info("libE version v{}".format(__version__)) - if 'out' in gen_specs and ('sim_id', int) in gen_specs['out']: + if "out" in gen_specs and ("sim_id", int) in gen_specs["out"]: logger.manager_warning(_USER_SIM_ID_WARNING) - save_H = libE_specs.get('save_H_and_persis_on_abort', True) + save_H = libE_specs.get("save_H_and_persis_on_abort", True) try: try: - persis_info, exit_flag, elapsed_time = \ - manager_main(hist, libE_specs, alloc_specs, sim_specs, gen_specs, - exit_criteria, persis_info, wcomms) + persis_info, exit_flag, elapsed_time = manager_main( + hist, libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, persis_info, wcomms + ) logger.info("Manager total time: {}".format(elapsed_time)) except LoggedException: # Exception already logged in manager @@ -270,9 +272,9 @@ def manager(wcomms, sim_specs, gen_specs, exit_criteria, persis_info, except Exception as e: exit_flag = 1 # Only exits if no abort/raise _dump_on_abort(hist, persis_info, save_H=save_H) - if libE_specs.get('abort_on_exception', True) and on_abort is not None: + if libE_specs.get("abort_on_exception", True) and on_abort is not None: on_abort() - raise LoggedException(*e.args, 'See error details above and in ensemble.log') from None + raise LoggedException(*e.args, "See error details above and in ensemble.log") from None else: logger.debug("Manager exiting") logger.debug("Exiting with {} workers.".format(len(wcomms))) @@ -287,8 +289,10 @@ def manager(wcomms, sim_specs, gen_specs, exit_criteria, persis_info, # ==================== MPI version ================================= + class DupComm: """Duplicate MPI communicator for use with a with statement""" + def __init__(self, mpi_comm): self.parent_comm = mpi_comm @@ -310,26 +314,25 @@ def libE_mpi_defaults(libE_specs): from mpi4py import MPI - if 'mpi_comm' not in libE_specs: - libE_specs['mpi_comm'] = MPI.COMM_WORLD # Will be duplicated immediately + if "mpi_comm" not in libE_specs: + libE_specs["mpi_comm"] = MPI.COMM_WORLD # Will be duplicated immediately return libE_specs, MPI.COMM_NULL -def libE_mpi(sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0): +def libE_mpi(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """MPI version of the libE main routine""" libE_specs, mpi_comm_null = libE_mpi_defaults(libE_specs) - if libE_specs['mpi_comm'] == mpi_comm_null: + if libE_specs["mpi_comm"] == mpi_comm_null: return [], persis_info, 3 # Process not in mpi_comm check_inputs(libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, H0) - with DupComm(libE_specs['mpi_comm']) as mpi_comm: + with DupComm(libE_specs["mpi_comm"]) as mpi_comm: rank = mpi_comm.Get_rank() - is_manager = (rank == 0) + is_manager = rank == 0 resources = Resources.resources if resources is not None: @@ -348,16 +351,16 @@ def libE_mpi(sim_specs, gen_specs, exit_criteria, if is_manager: if resources is not None: resources.set_resource_manager(nworkers) - return libE_mpi_manager(mpi_comm, sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0) + return libE_mpi_manager( + mpi_comm, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0 + ) # Worker returns a subset of MPI output libE_mpi_worker(mpi_comm, sim_specs, gen_specs, libE_specs) return [], {}, [] -def libE_mpi_manager(mpi_comm, sim_specs, gen_specs, exit_criteria, persis_info, - alloc_specs, libE_specs, H0): +def libE_mpi_manager(mpi_comm, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """Manager routine runs on rank 0.""" from libensemble.comms.mpi import MainMPIComm @@ -365,10 +368,9 @@ def libE_mpi_manager(mpi_comm, sim_specs, gen_specs, exit_criteria, persis_info, hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0) # Launch worker team - wcomms = [MainMPIComm(mpi_comm, w) for w in - range(1, mpi_comm.Get_size())] + wcomms = [MainMPIComm(mpi_comm, w) for w in range(1, mpi_comm.Get_size())] - if not libE_specs.get('disable_log_files', False): + if not libE_specs.get("disable_log_files", False): manager_logging_config() # Set up abort handler @@ -377,15 +379,16 @@ def on_abort(): comms_abort(mpi_comm) # Run generic manager - return manager(wcomms, sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, hist, - on_abort=on_abort) + return manager( + wcomms, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, hist, on_abort=on_abort + ) def libE_mpi_worker(libE_comm, sim_specs, gen_specs, libE_specs): """Worker routines run on ranks > 0.""" from libensemble.comms.mpi import MainMPIComm + comm = MainMPIComm(libE_comm) worker_main(comm, sim_specs, gen_specs, libE_specs, log_comm=True) logger.debug("Worker {} exiting".format(libE_comm.Get_rank())) @@ -396,8 +399,7 @@ def libE_mpi_worker(libE_comm, sim_specs, gen_specs, libE_specs): def start_proc_team(nworkers, sim_specs, gen_specs, libE_specs, log_comm=True): """Launch a process worker team.""" - wcomms = [QCommProcess(worker_main, sim_specs, gen_specs, libE_specs, w, log_comm) - for w in range(1, nworkers+1)] + wcomms = [QCommProcess(worker_main, sim_specs, gen_specs, libE_specs, w, log_comm) for w in range(1, nworkers + 1)] for wcomm in wcomms: wcomm.run() return wcomms @@ -412,11 +414,10 @@ def kill_proc_team(wcomms, timeout): wcomm.terminate() -def libE_local(sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0): +def libE_local(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """Main routine for thread/process launch of libE.""" - nworkers = libE_specs['nworkers'] + nworkers = libE_specs["nworkers"] check_inputs(libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, H0) @@ -446,7 +447,7 @@ def libE_local(sim_specs, gen_specs, exit_criteria, if resources is not None: resources.set_resource_manager(nworkers) - if not libE_specs.get('disable_log_files', False): + if not libE_specs.get("disable_log_files", False): close_logs = manager_logging_config() else: close_logs = None @@ -454,14 +455,14 @@ def libE_local(sim_specs, gen_specs, exit_criteria, # Set up cleanup routine to shut down worker team def cleanup(): """Handler to clean up comms team.""" - kill_proc_team(wcomms, timeout=libE_specs.get('worker_timeout', 1)) + kill_proc_team(wcomms, timeout=libE_specs.get("worker_timeout", 1)) if close_logs is not None: # logger remains set between multiple libE calls close_logs() # Run generic manager - return manager(wcomms, sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, hist, - on_cleanup=cleanup) + return manager( + wcomms, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, hist, on_cleanup=cleanup + ) # ==================== TCP version ================================= @@ -472,13 +473,13 @@ def get_ip(): try: return socket.gethostbyname(socket.gethostname()) except socket.gaierror: - return 'localhost' + return "localhost" def libE_tcp_authkey(): """Generate an authkey if not assigned by manager.""" nonce = random.randrange(99999) - return 'libE_auth_{}'.format(nonce) + return "libE_auth_{}".format(nonce) def libE_tcp_default_ID(): @@ -486,13 +487,12 @@ def libE_tcp_default_ID(): return "{}_pid{}".format(get_ip(), os.getpid()) -def libE_tcp(sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0): +def libE_tcp(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """Main routine for TCP multiprocessing launch of libE.""" check_inputs(libE_specs, alloc_specs, sim_specs, gen_specs, exit_criteria, H0) - is_worker = True if 'workerID' in libE_specs else False + is_worker = True if "workerID" in libE_specs else False exctr = Executor.executor if exctr is not None: @@ -505,45 +505,42 @@ def libE_tcp(sim_specs, gen_specs, exit_criteria, libE_tcp_worker(sim_specs, gen_specs, libE_specs) return [], persis_info, [] - return libE_tcp_mgr(sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0) + return libE_tcp_mgr(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0) def libE_tcp_worker_launcher(libE_specs): """Get a launch function from libE_specs.""" - if 'worker_launcher' in libE_specs: - worker_launcher = libE_specs['worker_launcher'] + if "worker_launcher" in libE_specs: + worker_launcher = libE_specs["worker_launcher"] else: - worker_cmd = libE_specs['worker_cmd'] + worker_cmd = libE_specs["worker_cmd"] def worker_launcher(specs): """Basic worker launch function.""" return launcher.launch(worker_cmd, specs) + return worker_launcher -def libE_tcp_start_team(manager, nworkers, workers, - ip, port, authkey, launchf): +def libE_tcp_start_team(manager, nworkers, workers, ip, port, authkey, launchf): """Launch nworkers workers that attach back to a managers server.""" worker_procs = [] - specs = {'manager_ip': ip, 'manager_port': port, 'authkey': authkey} + specs = {"manager_ip": ip, "manager_port": port, "authkey": authkey} with Timer() as timer: - for w in range(1, nworkers+1): + for w in range(1, nworkers + 1): logger.info("Manager is launching worker {}".format(w)) if workers is not None: - specs['worker_ip'] = workers[w-1] - specs['tunnel_port'] = 0x71BE - specs['workerID'] = w + specs["worker_ip"] = workers[w - 1] + specs["tunnel_port"] = 0x71BE + specs["workerID"] = w worker_procs.append(launchf(specs)) logger.info("Manager is awaiting {} workers".format(nworkers)) wcomms = manager.await_workers(nworkers) - logger.info("Manager connected to {} workers ({} s)". - format(nworkers, timer.elapsed)) + logger.info("Manager connected to {} workers ({} s)".format(nworkers, timer.elapsed)) return worker_procs, wcomms -def libE_tcp_mgr(sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, H0): +def libE_tcp_mgr(sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, H0): """Main routine for TCP multiprocessing launch of libE at manager.""" hist = History(alloc_specs, sim_specs, gen_specs, exit_criteria, H0) @@ -552,56 +549,53 @@ def libE_tcp_mgr(sim_specs, gen_specs, exit_criteria, launchf = libE_tcp_worker_launcher(libE_specs) # Get worker launch parameters and fill in defaults for TCP/IP conn - if 'nworkers' in libE_specs: + if "nworkers" in libE_specs: workers = None - nworkers = libE_specs['nworkers'] - elif 'workers' in libE_specs: - workers = libE_specs['workers'] + nworkers = libE_specs["nworkers"] + elif "workers" in libE_specs: + workers = libE_specs["workers"] nworkers = len(workers) - ip = libE_specs.get('ip', None) or get_ip() - port = libE_specs.get('port', 0) - authkey = libE_specs.get('authkey', libE_tcp_authkey()) + ip = libE_specs.get("ip", None) or get_ip() + port = libE_specs.get("port", 0) + authkey = libE_specs.get("authkey", libE_tcp_authkey()) osx_set_mp_method() - with ServerQCommManager(port, authkey.encode('utf-8')) as tcp_manager: + with ServerQCommManager(port, authkey.encode("utf-8")) as tcp_manager: # Get port if needed because of auto-assignment if port == 0: _, port = tcp_manager.address - if not libE_specs.get('disable_log_files', False): + if not libE_specs.get("disable_log_files", False): manager_logging_config() logger.info("Launched server at ({}, {})".format(ip, port)) # Launch worker team and set up logger - worker_procs, wcomms =\ - libE_tcp_start_team(tcp_manager, nworkers, workers, - ip, port, authkey, launchf) + worker_procs, wcomms = libE_tcp_start_team(tcp_manager, nworkers, workers, ip, port, authkey, launchf) def cleanup(): """Handler to clean up launched team.""" for wp in worker_procs: - launcher.cancel(wp, timeout=libE_specs.get('worker_timeout')) + launcher.cancel(wp, timeout=libE_specs.get("worker_timeout")) # Run generic manager - return manager(wcomms, sim_specs, gen_specs, exit_criteria, - persis_info, alloc_specs, libE_specs, hist, - on_cleanup=cleanup) + return manager( + wcomms, sim_specs, gen_specs, exit_criteria, persis_info, alloc_specs, libE_specs, hist, on_cleanup=cleanup + ) def libE_tcp_worker(sim_specs, gen_specs, libE_specs): """Main routine for TCP worker launched by libE.""" - ip = libE_specs['ip'] - port = libE_specs['port'] - authkey = libE_specs['authkey'] - workerID = libE_specs['workerID'] + ip = libE_specs["ip"] + port = libE_specs["port"] + authkey = libE_specs["authkey"] + workerID = libE_specs["workerID"] with ClientQCommManager(ip, port, authkey, workerID) as comm: - worker_main(comm, sim_specs, gen_specs, libE_specs, - workerID=workerID, log_comm=True) + worker_main(comm, sim_specs, gen_specs, libE_specs, workerID=workerID, log_comm=True) logger.debug("Worker {} exiting".format(workerID)) @@ -611,10 +605,9 @@ def libE_tcp_worker(sim_specs, gen_specs, libE_specs): def _dump_on_abort(hist, persis_info, save_H=True): """Dump history and persis_info on abort""" logger.error("Manager exception raised .. aborting ensemble:") - logger.error("Dumping ensemble history with {} sims evaluated:". - format(hist.sim_ended_count)) + logger.error("Dumping ensemble history with {} sims evaluated:".format(hist.sim_ended_count)) if save_H: - np.save('libE_history_at_abort_' + str(hist.sim_ended_count) + '.npy', hist.trim_H()) - with open('libE_persis_info_at_abort_' + str(hist.sim_ended_count) + '.pickle', "wb") as f: + np.save("libE_history_at_abort_" + str(hist.sim_ended_count) + ".npy", hist.trim_H()) + with open("libE_persis_info_at_abort_" + str(hist.sim_ended_count) + ".pickle", "wb") as f: pickle.dump(persis_info, f) diff --git a/libensemble/resources/mpi_resources.py b/libensemble/resources/mpi_resources.py index fae84517f..d36c6a2d2 100644 --- a/libensemble/resources/mpi_resources.py +++ b/libensemble/resources/mpi_resources.py @@ -22,10 +22,10 @@ def rassert(test, *args): def get_MPI_runner(): - """ Return whether ``mpirun`` is openmpi or mpich """ + """Return whether ``mpirun`` is openmpi or mpich""" var = get_MPI_variant() - if var in ['mpich', 'openmpi']: - return 'mpirun' + if var in ["mpich", "openmpi"]: + return "mpirun" else: return var @@ -41,31 +41,30 @@ def get_MPI_variant(): """ try: - subprocess.check_call(['aprun', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return 'aprun' + subprocess.check_call(["aprun", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return "aprun" except OSError: pass try: - subprocess.check_call(['jsrun', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return 'jsrun' + subprocess.check_call(["jsrun", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return "jsrun" except OSError: pass try: # Explore mpi4py.MPI.get_vendor() and mpi4py.MPI.Get_library_version() for mpi4py - try_mpich = subprocess.Popen(['mpirun', '-npernode'], stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) + try_mpich = subprocess.Popen(["mpirun", "-npernode"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT) stdout, _ = try_mpich.communicate() - if 'unrecognized argument npernode' in stdout.decode(): - return 'mpich' - return 'openmpi' + if "unrecognized argument npernode" in stdout.decode(): + return "mpich" + return "openmpi" except Exception: pass try: - subprocess.check_call(['srun', '--version'], stdout=subprocess.PIPE, stderr=subprocess.PIPE) - return 'srun' + subprocess.check_call(["srun", "--version"], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + return "srun" except OSError: pass @@ -83,37 +82,33 @@ def task_partition(num_procs, num_nodes, procs_per_node, machinefile=None): # If machinefile is provided - ignore everything else if machinefile: if num_procs or num_nodes or procs_per_node: - logger.warning("Machinefile provided - overriding " - "procs/nodes/procs_per_node") + logger.warning("Machinefile provided - overriding " "procs/nodes/procs_per_node") return None, None, None if not num_procs: - rassert(num_nodes and procs_per_node, - "Need num_procs, num_nodes/procs_per_node, or machinefile") + rassert(num_nodes and procs_per_node, "Need num_procs, num_nodes/procs_per_node, or machinefile") num_procs = num_nodes * procs_per_node elif not num_nodes: procs_per_node = procs_per_node or num_procs - num_nodes = num_procs//procs_per_node + num_nodes = num_procs // procs_per_node elif not procs_per_node: - procs_per_node = num_procs//num_nodes + procs_per_node = num_procs // num_nodes - rassert(num_procs == num_nodes*procs_per_node, - "num_procs does not equal num_nodes*procs_per_node") + rassert(num_procs == num_nodes * procs_per_node, "num_procs does not equal num_nodes*procs_per_node") return num_procs, num_nodes, procs_per_node def _max_rsets_per_node(worker_resources): - """ Return the maximum rsets per node for any node on this worker""" + """Return the maximum rsets per node for any node on this worker""" rset_team = worker_resources.rset_team local_rsets_list = worker_resources.local_rsets_list rsets_on_node = [local_rsets_list[rset] for rset in rset_team] return max(rsets_on_node) -def get_resources(resources, num_procs=None, num_nodes=None, - procs_per_node=None, hyperthreads=False): +def get_resources(resources, num_procs=None, num_nodes=None, procs_per_node=None, hyperthreads=False): """Reconciles user-supplied options with available worker resources to produce run configuration. @@ -130,33 +125,37 @@ def get_resources(resources, num_procs=None, num_nodes=None, rassert(node_list, "Node list is empty - aborting") local_node_count = wresources.local_node_count - cores_avail_per_node = \ - (gresources.logical_cores_avail_per_node if hyperthreads else - gresources.physical_cores_avail_per_node) + cores_avail_per_node = ( + gresources.logical_cores_avail_per_node if hyperthreads else gresources.physical_cores_avail_per_node + ) rsets_per_node = _max_rsets_per_node(wresources) # Advantage of cores per rset first is they will always get same no. per rset (double rsets, double cores) # Advantage of multiply first is less wasted cores. - cores_avail_per_node_per_worker = cores_avail_per_node//rsets_per_node * wresources.slot_count + cores_avail_per_node_per_worker = cores_avail_per_node // rsets_per_node * wresources.slot_count # cores_avail_per_node_per_worker = int(cores_avail_per_node/rsets_per_node * wresources.slot_count) - rassert(wresources.even_slots, - "Uneven distribution of node resources not yet supported. Nodes and slots are: {}" - .format(wresources.slots)) + rassert( + wresources.even_slots, + "Uneven distribution of node resources not yet supported. Nodes and slots are: {}".format(wresources.slots), + ) if not num_procs and not procs_per_node: - rassert(cores_avail_per_node_per_worker > 0, - "There is less than one core per resource set. " - "Provide num_procs or num_nodes/procs_per_node to oversubsribe") + rassert( + cores_avail_per_node_per_worker > 0, + "There is less than one core per resource set. " + "Provide num_procs or num_nodes/procs_per_node to oversubsribe", + ) procs_per_node = cores_avail_per_node_per_worker if not num_nodes: # If no decomposition supplied - use all available cores/nodes num_nodes = local_node_count - logger.debug("No decomposition supplied - " - "using all available resource. " - "Nodes: {} procs_per_node {}". - format(num_nodes, procs_per_node)) + logger.debug( + "No decomposition supplied - " + "using all available resource. " + "Nodes: {} procs_per_node {}".format(num_nodes, procs_per_node) + ) elif not num_nodes and not procs_per_node: if num_procs <= cores_avail_per_node_per_worker: num_nodes = 1 @@ -166,46 +165,49 @@ def get_resources(resources, num_procs=None, num_nodes=None, num_nodes = local_node_count # Checks config is consistent and sufficient to express - num_procs, num_nodes, procs_per_node = \ - task_partition(num_procs, num_nodes, procs_per_node) + num_procs, num_nodes, procs_per_node = task_partition(num_procs, num_nodes, procs_per_node) - rassert(num_nodes <= local_node_count, - "Not enough nodes to honor arguments. " - "Requested {}. Only {} available". - format(num_nodes, local_node_count)) + rassert( + num_nodes <= local_node_count, + "Not enough nodes to honor arguments. " "Requested {}. Only {} available".format(num_nodes, local_node_count), + ) if gresources.enforce_worker_core_bounds: - rassert(procs_per_node <= cores_avail_per_node, - "Not enough processors on a node to honor arguments. " - "Requested {}. Only {} available". - format(procs_per_node, cores_avail_per_node)) - - rassert(procs_per_node <= cores_avail_per_node_per_worker, - "Not enough processors per worker to honor arguments. " - "Requested {}. Only {} available". - format(procs_per_node, cores_avail_per_node_per_worker)) - - rassert(num_procs <= (cores_avail_per_node * local_node_count), - "Not enough procs to honor arguments. " - "Requested {}. Only {} available". - format(num_procs, cores_avail_per_node*local_node_count)) + rassert( + procs_per_node <= cores_avail_per_node, + "Not enough processors on a node to honor arguments. " + "Requested {}. Only {} available".format(procs_per_node, cores_avail_per_node), + ) + + rassert( + procs_per_node <= cores_avail_per_node_per_worker, + "Not enough processors per worker to honor arguments. " + "Requested {}. Only {} available".format(procs_per_node, cores_avail_per_node_per_worker), + ) + + rassert( + num_procs <= (cores_avail_per_node * local_node_count), + "Not enough procs to honor arguments. " + "Requested {}. Only {} available".format(num_procs, cores_avail_per_node * local_node_count), + ) if num_nodes < local_node_count: - logger.warning("User constraints mean fewer nodes being used " - "than available. {} nodes used. {} nodes available". - format(num_nodes, local_node_count)) + logger.warning( + "User constraints mean fewer nodes being used " + "than available. {} nodes used. {} nodes available".format(num_nodes, local_node_count) + ) return num_procs, num_nodes, procs_per_node -def create_machinefile(resources, machinefile=None, num_procs=None, - num_nodes=None, procs_per_node=None, - hyperthreads=False): +def create_machinefile( + resources, machinefile=None, num_procs=None, num_nodes=None, procs_per_node=None, hyperthreads=False +): """Creates a machinefile based on user-supplied config options, completed by detected machine resources """ - machinefile = machinefile or 'machinefile' + machinefile = machinefile or "machinefile" if os.path.isfile(machinefile): try: os.remove(machinefile) @@ -213,15 +215,13 @@ def create_machinefile(resources, machinefile=None, num_procs=None, logger.warning("Could not remove existing machinefile: {}".format(e)) node_list = resources.worker_resources.local_nodelist - logger.debug("Creating machinefile with {} nodes and {} ranks per node". - format(num_nodes, procs_per_node)) + logger.debug("Creating machinefile with {} nodes and {} ranks per node".format(num_nodes, procs_per_node)) - with open(machinefile, 'w') as f: + with open(machinefile, "w") as f: for node in node_list[:num_nodes]: - f.write((node + '\n') * procs_per_node) + f.write((node + "\n") * procs_per_node) - built_mfile = (os.path.isfile(machinefile) - and os.path.getsize(machinefile) > 0) + built_mfile = os.path.isfile(machinefile) and os.path.getsize(machinefile) > 0 return built_mfile, num_procs, num_nodes, procs_per_node diff --git a/libensemble/resources/scheduler.py b/libensemble/resources/scheduler.py index 34b30737c..e27aff615 100644 --- a/libensemble/resources/scheduler.py +++ b/libensemble/resources/scheduler.py @@ -64,8 +64,8 @@ def __init__(self, user_resources=None, sched_opts={}): self.log_msg = None # Process scheduler options - self.split2fit = sched_opts.get('split2fit', True) - self.match_slots = sched_opts.get('match_slots', True) + self.split2fit = sched_opts.get("split2fit", True) + self.match_slots = sched_opts.get("match_slots", True) def assign_resources(self, rsets_req): """Schedule resource sets to a work item if possible. @@ -87,23 +87,22 @@ def assign_resources(self, rsets_req): if rsets_req > self.resources.total_num_rsets: raise InsufficientResourcesError( - "More resource sets requested {} than exist {}".format( - rsets_req, self.resources.total_num_rsets - ) + "More resource sets requested {} than exist {}".format(rsets_req, self.resources.total_num_rsets) ) if rsets_req > self.rsets_free: raise InsufficientFreeResources - self.log_msg = None # Log resource messages only when find resources + self.log_msg = None # Log resource messages only when find resources num_groups = self.resources.num_groups max_grpsize = self.resources.rsets_per_node # assumes even avail_rsets_by_group = self.get_avail_rsets_by_group() try_split = self.split2fit # Work out best target fit - if all rsets were free. - rsets_req, num_groups_req, rsets_req_per_group = \ - self.calc_req_split(rsets_req, max_grpsize, num_groups, extend=True) + rsets_req, num_groups_req, rsets_req_per_group = self.calc_req_split( + rsets_req, max_grpsize, num_groups, extend=True + ) # Check enough slots sorted_lengths = ResourceScheduler.get_sorted_lens(avail_rsets_by_group) @@ -114,9 +113,7 @@ def assign_resources(self, rsets_req): if self.match_slots: slots_avail_by_group = self.get_avail_slots_by_group(avail_rsets_by_group) - cand_groups, cand_slots = self.get_matching_slots( - slots_avail_by_group, num_groups_req, rsets_req_per_group - ) + cand_groups, cand_slots = self.get_matching_slots(slots_avail_by_group, num_groups_req, rsets_req_per_group) if cand_groups is None: if not self.split2fit: @@ -129,15 +126,13 @@ def assign_resources(self, rsets_req): found_split = False while not found_split: # Finds a split with enough slots (not nec. matching slots) if exists. - rsets_req, num_groups_req, rsets_req_per_group = \ - self.calc_even_split_uneven_groups( - max_even_grpsize, num_groups_req, rsets_req, sorted_lengths, num_groups - ) + rsets_req, num_groups_req, rsets_req_per_group = self.calc_even_split_uneven_groups( + max_even_grpsize, num_groups_req, rsets_req, sorted_lengths, num_groups + ) if self.match_slots: - cand_groups, cand_slots = \ - self.get_matching_slots( - slots_avail_by_group, num_groups_req, rsets_req_per_group - ) + cand_groups, cand_slots = self.get_matching_slots( + slots_avail_by_group, num_groups_req, rsets_req_per_group + ) if cand_groups is not None: found_split = True else: @@ -154,8 +149,7 @@ def assign_resources(self, rsets_req): ) else: rset_team = self.find_rsets_any_slots( - avail_rsets_by_group, max_grpsize, rsets_req, - num_groups_req, rsets_req_per_group + avail_rsets_by_group, max_grpsize, rsets_req, num_groups_req, rsets_req_per_group ) if self.log_msg is not None: @@ -180,10 +174,9 @@ def find_rsets_any_slots(self, rsets_by_group, max_grpsize, rsets_req, ngroups, group_list = [] for ng in range(ngroups): - cand_team, cand_group = \ - self.find_candidate( - tmp_rsets_by_group, group_list, rsets_per_group, max_upper_bound - ) + cand_team, cand_group = self.find_candidate( + tmp_rsets_by_group, group_list, rsets_per_group, max_upper_bound + ) if cand_group is not None: accum_team.extend(cand_team) @@ -230,13 +223,13 @@ def get_avail_rsets_by_group(self): """ if self.avail_rsets_by_group is None: rsets = self.resources.rsets - groups = np.unique(rsets['group']) + groups = np.unique(rsets["group"]) self.avail_rsets_by_group = {} for g in groups: self.avail_rsets_by_group[g] = [] for ind, rset in enumerate(rsets): - if not rset['assigned']: - g = rset['group'] + if not rset["assigned"]: + g = rset["group"] self.avail_rsets_by_group[g].append(ind) return self.avail_rsets_by_group @@ -249,17 +242,19 @@ def get_avail_slots_by_group(self, avail_rsets_by_group): """Return a dictionary of free slot IDS for each group (e.g. node)""" slots_avail_by_group = {} for k, v in avail_rsets_by_group.items(): - slots_avail_by_group[k] = set([self.resources.rsets[i]['slot'] for i in v]) + slots_avail_by_group[k] = set([self.resources.rsets[i]["slot"] for i in v]) return slots_avail_by_group def calc_req_split(self, rsets_req, max_grpsize, num_groups, extend): if self.resources.even_groups: # This is total group sizes even (not available sizes) - rsets_req, num_groups_req, rsets_per_group = \ - self.calc_rsets_even_grps(rsets_req, max_grpsize, num_groups, extend) + rsets_req, num_groups_req, rsets_per_group = self.calc_rsets_even_grps( + rsets_req, max_grpsize, num_groups, extend + ) else: - logger.warning('Uneven groups - but using even groups function') - rsets_req, num_groups_req, rsets_per_group = \ - self.calc_rsets_even_grps(rsets_req, max_grpsize, num_groups, extend) + logger.warning("Uneven groups - but using even groups function") + rsets_req, num_groups_req, rsets_per_group = self.calc_rsets_even_grps( + rsets_req, max_grpsize, num_groups, extend + ) return rsets_req, num_groups_req, rsets_per_group def calc_rsets_even_grps(self, rsets_req, max_grpsize, max_groups, extend): @@ -268,7 +263,7 @@ def calc_rsets_even_grps(self, rsets_req, max_grpsize, max_groups, extend): return 0, 0, 0 # Divide with roundup - num_groups_req = rsets_req//max_grpsize + (rsets_req % max_grpsize > 0) + num_groups_req = rsets_req // max_grpsize + (rsets_req % max_grpsize > 0) # Up to max groups - keep trying for an even split if num_groups_req > 1: @@ -282,16 +277,17 @@ def calc_rsets_even_grps(self, rsets_req, max_grpsize, max_groups, extend): if even_partition: num_groups_req = tmp_num_groups - rsets_per_group = rsets_req//num_groups_req # This should always divide perfectly. + rsets_per_group = rsets_req // num_groups_req # This should always divide perfectly. else: if extend: - rsets_per_group = rsets_req//num_groups_req + (rsets_req % num_groups_req > 0) + rsets_per_group = rsets_req // num_groups_req + (rsets_req % num_groups_req > 0) orig_rsets_req = rsets_req rsets_req = num_groups_req * rsets_per_group self.log_msg = ( "Increasing resource requirement to obtain an even partition of resource sets\n" - "to nodes. rsets_req orig: {} New: {} num_groups_req {} rsets_per_group {}". - format(orig_rsets_req, rsets_req, num_groups_req, rsets_per_group) + "to nodes. rsets_req orig: {} New: {} num_groups_req {} rsets_per_group {}".format( + orig_rsets_req, rsets_req, num_groups_req, rsets_per_group + ) ) else: rsets_per_group = max_grpsize @@ -318,8 +314,8 @@ def assign_team_from_slots(self, slots_avail_by_group, cand_groups, cand_slots, # Ignore extra slots if i >= rsets_per_group: break - group = (self.resources.rsets['group'] == grp) - slot = (self.resources.rsets['slot'] == slot) + group = self.resources.rsets["group"] == grp + slot = self.resources.rsets["slot"] == slot rset = int(np.where(group & slot)[0]) rset_team.append(rset) self.avail_rsets_by_group[grp].remove(rset) diff --git a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py index 1fa0ef859..f1e577266 100644 --- a/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py +++ b/libensemble/tests/unit_tests/test_allocation_funcs_and_support.py @@ -10,26 +10,32 @@ from libensemble.resources.scheduler import ResourceScheduler from libensemble.resources.resources import Resources -al = {'alloc_f': give_sim_work_first, 'out': []} -libE_specs = {'comms': 'local', 'nworkers': 4} +al = {"alloc_f": give_sim_work_first, "out": []} +libE_specs = {"comms": "local", "nworkers": 4} H0 = [] -W = np.array([(1, 0, 0, 0, False), (2, 0, 0, 0, False), - (3, 0, 0, 0, False), (4, 0, 0, 0, False)], - dtype=[('worker_id', '= np.array(field0.shape))), \ - "H too small to receive all components of H0 in field {}".format(name) + assert field0.ndim == field1.ndim, "H0 and H have different ndim for field {}".format(name) + assert np.all( + np.array(field1.shape) >= np.array(field0.shape) + ), "H too small to receive all components of H0 in field {}".format(name) def check_libE_specs(libE_specs, serial_check=False): assert isinstance(libE_specs, dict), "libE_specs must be a dictionary" - comms_type = libE_specs.get('comms', 'mpi') - if comms_type in ['mpi']: + comms_type = libE_specs.get("comms", "mpi") + if comms_type in ["mpi"]: if not serial_check: - assert libE_specs['mpi_comm'].Get_size() > 1, "Manager only - must be at least one worker (2 MPI tasks)" - elif comms_type in ['local']: - assert libE_specs['nworkers'] >= 1, "Must specify at least one worker" - elif comms_type in ['tcp']: + assert libE_specs["mpi_comm"].Get_size() > 1, "Manager only - must be at least one worker (2 MPI tasks)" + elif comms_type in ["local"]: + assert libE_specs["nworkers"] >= 1, "Must specify at least one worker" + elif comms_type in ["tcp"]: # TODO, differentiate and test SSH/Client - assert libE_specs['nworkers'] >= 1, "Must specify at least one worker" + assert libE_specs["nworkers"] >= 1, "Must specify at least one worker" for k in libE_specs.keys(): - assert k in allowed_libE_spec_keys,\ - "Key %s is not allowed in libE_specs. Supported keys are: %s " % (k, allowed_libE_spec_keys) + assert k in allowed_libE_spec_keys, "Key %s is not allowed in libE_specs. Supported keys are: %s " % ( + k, + allowed_libE_spec_keys, + ) - if k in ['ensemble_copy_back', 'use_worker_dirs', 'sim_dirs_make', 'gen_dirs_make']: + if k in ["ensemble_copy_back", "use_worker_dirs", "sim_dirs_make", "gen_dirs_make"]: assert isinstance(libE_specs[k], bool), "Value for libE_specs['{}'] must be boolean".format(k) - if k in ['sim_input_dir', 'gen_input_dir']: - assert isinstance(libE_specs[k], str), \ - "Value for libE_specs['{}'] must be a single path-like string".format(k) - assert os.path.exists(libE_specs[k]), \ - "libE_specs['{}'] does not refer to an existing path.".format(k) - - if k == 'ensemble_dir_path': - assert isinstance(libE_specs[k], str), \ - "Value for libE_specs['{}'] must be a single path-like string".format(k) - - if k in ['sim_dir_copy_files', 'sim_dir_symlink_files', 'gen_dir_copy_files', 'gen_dir_symlink_files']: - assert isinstance(libE_specs[k], list), \ - "Value for libE_specs['{}'] must be a list of path-like strings".format(k) + if k in ["sim_input_dir", "gen_input_dir"]: + assert isinstance( + libE_specs[k], str + ), "Value for libE_specs['{}'] must be a single path-like string".format(k) + assert os.path.exists(libE_specs[k]), "libE_specs['{}'] does not refer to an existing path.".format(k) + + if k == "ensemble_dir_path": + assert isinstance( + libE_specs[k], str + ), "Value for libE_specs['{}'] must be a single path-like string".format(k) + + if k in ["sim_dir_copy_files", "sim_dir_symlink_files", "gen_dir_copy_files", "gen_dir_symlink_files"]: + assert isinstance( + libE_specs[k], list + ), "Value for libE_specs['{}'] must be a list of path-like strings".format(k) for j in libE_specs[k]: - assert os.path.exists(j), \ - "'{}' in libE_specs['{}'] does not refer to an existing path.".format(j, k) + assert os.path.exists(j), "'{}' in libE_specs['{}'] does not refer to an existing path.".format(j, k) def check_alloc_specs(alloc_specs): assert isinstance(alloc_specs, dict), "alloc_specs must be a dictionary" - assert alloc_specs['alloc_f'], "Allocation function must be specified" + assert alloc_specs["alloc_f"], "Allocation function must be specified" for k in alloc_specs.keys(): - assert k in allowed_alloc_spec_keys,\ - "Key %s is not allowed in alloc_specs. Supported keys are: %s " % (k, allowed_alloc_spec_keys) + assert k in allowed_alloc_spec_keys, "Key %s is not allowed in alloc_specs. Supported keys are: %s " % ( + k, + allowed_alloc_spec_keys, + ) def check_sim_specs(sim_specs): assert isinstance(sim_specs, dict), "sim_specs must be a dictionary" - assert any([term_field in sim_specs for term_field in ['sim_f', 'in', 'persis_in', 'out']]), \ - "sim_specs must contain 'sim_f', 'in', 'out'" + assert any( + [term_field in sim_specs for term_field in ["sim_f", "in", "persis_in", "out"]] + ), "sim_specs must contain 'sim_f', 'in', 'out'" - assert all(isinstance(i, str) for i in sim_specs['in']), \ - "Entries in sim_specs['in'] must be strings. Also can't be lists or tuples of strings." + assert all( + isinstance(i, str) for i in sim_specs["in"] + ), "Entries in sim_specs['in'] must be strings. Also can't be lists or tuples of strings." - assert len(sim_specs['out']), "sim_specs must have 'out' entries" + assert len(sim_specs["out"]), "sim_specs must have 'out' entries" - assert isinstance(sim_specs['in'], list), "'in' field must exist and be a list of field names" + assert isinstance(sim_specs["in"], list), "'in' field must exist and be a list of field names" for k in sim_specs.keys(): - assert k in allowed_sim_spec_keys,\ - "Key %s is not allowed in sim_specs. Supported keys are: %s " % (k, allowed_sim_spec_keys) + assert k in allowed_sim_spec_keys, "Key %s is not allowed in sim_specs. Supported keys are: %s " % ( + k, + allowed_sim_spec_keys, + ) def check_gen_specs(gen_specs): assert isinstance(gen_specs, dict), "gen_specs must be a dictionary" - assert not bool(gen_specs) or len(gen_specs['out']), "gen_specs must have 'out' entries" + assert not bool(gen_specs) or len(gen_specs["out"]), "gen_specs must have 'out' entries" - if 'in' in gen_specs: - assert all(isinstance(i, str) for i in gen_specs['in']), \ - "Entries in gen_specs['in'] must be strings. Also can't be lists or tuples of strings." + if "in" in gen_specs: + assert all( + isinstance(i, str) for i in gen_specs["in"] + ), "Entries in gen_specs['in'] must be strings. Also can't be lists or tuples of strings." for k in gen_specs.keys(): - assert k in allowed_gen_spec_keys,\ - "Key %s is not allowed in gen_specs. Supported keys are: %s " % (k, allowed_gen_spec_keys) + assert k in allowed_gen_spec_keys, "Key %s is not allowed in gen_specs. Supported keys are: %s " % ( + k, + allowed_gen_spec_keys, + ) def check_exit_criteria(exit_criteria, sim_specs, gen_specs): @@ -100,27 +117,28 @@ def check_exit_criteria(exit_criteria, sim_specs, gen_specs): assert len(exit_criteria) > 0, "Must have some exit criterion" - if 'elapsed_wallclock_time' in exit_criteria: + if "elapsed_wallclock_time" in exit_criteria: logger.warning( "exit_criteria['elapsed_wallclock_time'] is deprecated.'\n" + "This will break in the future. Use exit_criteria['wallclock_max']" ) - exit_criteria['wallclock_max'] = exit_criteria.pop('elapsed_wallclock_time') + exit_criteria["wallclock_max"] = exit_criteria.pop("elapsed_wallclock_time") # Ensure termination criteria are valid - valid_term_fields = ['sim_max', 'gen_max', - 'wallclock_max', 'stop_val'] - assert all([term_field in valid_term_fields for term_field in exit_criteria]), \ - "Valid termination options: " + str(valid_term_fields) + valid_term_fields = ["sim_max", "gen_max", "wallclock_max", "stop_val"] + assert all([term_field in valid_term_fields for term_field in exit_criteria]), "Valid termination options: " + str( + valid_term_fields + ) # Make sure stop-values match parameters in gen_specs or sim_specs - if 'stop_val' in exit_criteria: - stop_name = exit_criteria['stop_val'][0] - sim_out_names = [e[0] for e in sim_specs['out']] - gen_out_names = [e[0] for e in gen_specs['out']] - assert stop_name in sim_out_names + gen_out_names, \ - "Can't stop on {} if it's not in a sim/gen output".format(stop_name) + if "stop_val" in exit_criteria: + stop_name = exit_criteria["stop_val"][0] + sim_out_names = [e[0] for e in sim_specs["out"]] + gen_out_names = [e[0] for e in gen_specs["out"]] + assert stop_name in sim_out_names + gen_out_names, "Can't stop on {} if it's not in a sim/gen output".format( + stop_name + ) def check_H(H0, sim_specs, alloc_specs, gen_specs): @@ -129,23 +147,24 @@ def check_H(H0, sim_specs, alloc_specs, gen_specs): # Combines all 'out' fields (if they exist) in sim_specs, gen_specs, or alloc_specs specs = [sim_specs, alloc_specs, gen_specs] - dtype_list = list(set(libE_fields + sum([k.get('out', []) for k in specs if k], []))) + dtype_list = list(set(libE_fields + sum([k.get("out", []) for k in specs if k], []))) Dummy_H = np.zeros(1 + len(H0), dtype=dtype_list) fields = H0.dtype.names # Prior history must contain the fields in new history - assert set(fields).issubset(set(Dummy_H.dtype.names)), \ - "H0 contains fields {} not in the History.".\ - format(set(fields).difference(set(Dummy_H.dtype.names))) + assert set(fields).issubset(set(Dummy_H.dtype.names)), "H0 contains fields {} not in the History.".format( + set(fields).difference(set(Dummy_H.dtype.names)) + ) # Prior history cannot contain unreturned points # assert 'sim_ended' not in fields or np.all(H0['sim_ended']), \ # "H0 contains unreturned points." # Fail if prior history contains unreturned points (or returned but not given). - assert('sim_ended' not in fields or np.all(H0['sim_started'] == H0['sim_ended'])), \ - 'H0 contains unreturned or invalid points' + assert "sim_ended" not in fields or np.all( + H0["sim_started"] == H0["sim_ended"] + ), "H0 contains unreturned or invalid points" # # Fail if points in prior history don't have a sim_id. # assert('sim_id' in fields), 'Points in H0 must have sim_ids' @@ -155,8 +174,9 @@ def check_H(H0, sim_specs, alloc_specs, gen_specs): _check_consistent_field(field, H0[field], Dummy_H[field]) -def check_inputs(libE_specs=None, alloc_specs=None, sim_specs=None, - gen_specs=None, exit_criteria=None, H0=None, serial_check=False): +def check_inputs( + libE_specs=None, alloc_specs=None, sim_specs=None, gen_specs=None, exit_criteria=None, H0=None, serial_check=False +): """ Checks whether the libEnsemble arguments are of the correct data type and contain sufficient information to perform a run. There is no return value. @@ -191,53 +211,58 @@ def check_inputs(libE_specs=None, alloc_specs=None, sim_specs=None, if H0 is not None and H0.dtype.names is not None: out_names += list(H0.dtype.names) if sim_specs is not None: - out_names += [e[0] for e in sim_specs.get('out', [])] + out_names += [e[0] for e in sim_specs.get("out", [])] if gen_specs is not None: - out_names += [e[0] for e in gen_specs.get('out', [])] + out_names += [e[0] for e in gen_specs.get("out", [])] if alloc_specs is not None: - out_names += [e[0] for e in alloc_specs.get('out', [])] + out_names += [e[0] for e in alloc_specs.get("out", [])] # Detailed checking based on Required Keys in docs for each specs if libE_specs is not None: - for name in libE_specs.get('final_fields', []): - assert name in out_names, \ - name + " in libE_specs['fields_keys'] is not in sim_specs['out'], "\ + for name in libE_specs.get("final_fields", []): + assert name in out_names, ( + name + " in libE_specs['fields_keys'] is not in sim_specs['out'], " "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." + ) check_libE_specs(libE_specs, serial_check) if alloc_specs is not None: - assert 'in' not in alloc_specs, "alloc_specs['in'] is not needed as all of the history is available alloc_f." + assert "in" not in alloc_specs, "alloc_specs['in'] is not needed as all of the history is available alloc_f." check_alloc_specs(alloc_specs) if sim_specs is not None: - if 'in' in sim_specs: - assert isinstance(sim_specs['in'], list), "sim_specs['in'] must be a list" + if "in" in sim_specs: + assert isinstance(sim_specs["in"], list), "sim_specs['in'] must be a list" - for name in sim_specs.get('in', []): - assert name in out_names, \ - name + " in sim_specs['in'] is not in sim_specs['out'], "\ + for name in sim_specs.get("in", []): + assert name in out_names, ( + name + " in sim_specs['in'] is not in sim_specs['out'], " "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." + ) check_sim_specs(sim_specs) if gen_specs is not None: - if 'in' in gen_specs: - assert isinstance(gen_specs['in'], list), "gen_specs['in'] must be a list" + if "in" in gen_specs: + assert isinstance(gen_specs["in"], list), "gen_specs['in'] must be a list" - for name in gen_specs.get('in', []): - assert name in out_names, \ - name + " in gen_specs['in'] is not in sim_specs['out'], "\ + for name in gen_specs.get("in", []): + assert name in out_names, ( + name + " in gen_specs['in'] is not in sim_specs['out'], " "gen_specs['out'], alloc_specs['out'], H0, or libE_fields." + ) check_gen_specs(gen_specs) if exit_criteria is not None: - assert sim_specs is not None and gen_specs is not None, \ - "Can't check exit_criteria without sim_specs and gen_specs" + assert ( + sim_specs is not None and gen_specs is not None + ), "Can't check exit_criteria without sim_specs and gen_specs" check_exit_criteria(exit_criteria, sim_specs, gen_specs) if H0 is not None: - assert sim_specs is not None and alloc_specs is not None and gen_specs is not None, \ - "Can't check H0 without sim_specs, alloc_specs, gen_specs" + assert ( + sim_specs is not None and alloc_specs is not None and gen_specs is not None + ), "Can't check H0 without sim_specs, alloc_specs, gen_specs" check_H(H0, sim_specs, alloc_specs, gen_specs) diff --git a/libensemble/tools/parse_args.py b/libensemble/tools/parse_args.py index 741b58c2a..d365b3371 100644 --- a/libensemble/tools/parse_args.py +++ b/libensemble/tools/parse_args.py @@ -6,31 +6,30 @@ # ==================== Command-line argument parsing =========================== -parser = argparse.ArgumentParser(prog='test_...') - -parser.add_argument('--comms', type=str, nargs='?', - choices=['local', 'tcp', 'ssh', 'client', 'mpi'], - default='mpi', help='Type of communicator') -parser.add_argument('--nworkers', type=int, nargs='?', - help='Number of local forked processes') -parser.add_argument('--nsim_workers', type=int, nargs='?', - help='Number of workers for sims. 1+ zero-resource gen worker will be added') -parser.add_argument('--nresource_sets', type=int, nargs='?', - help='Number of resource sets') -parser.add_argument('--workers', type=str, nargs='+', - help='List of worker nodes') -parser.add_argument('--workerID', type=int, nargs='?', help='Client worker ID') -parser.add_argument('--server', type=str, nargs=3, - help='Triple of (ip, port, authkey) used to reach manager') -parser.add_argument('--pwd', type=str, nargs='?', - help='Working directory to be used') -parser.add_argument('--worker_pwd', type=str, nargs='?', - help='Working directory on remote client') -parser.add_argument('--worker_python', type=str, nargs='?', - default=sys.executable, - help='Python version on remote client') -parser.add_argument('--tester_args', type=str, nargs='*', - help='Additional arguments for use by specific testers') +parser = argparse.ArgumentParser(prog="test_...") + +parser.add_argument( + "--comms", + type=str, + nargs="?", + choices=["local", "tcp", "ssh", "client", "mpi"], + default="mpi", + help="Type of communicator", +) +parser.add_argument("--nworkers", type=int, nargs="?", help="Number of local forked processes") +parser.add_argument( + "--nsim_workers", type=int, nargs="?", help="Number of workers for sims. 1+ zero-resource gen worker will be added" +) +parser.add_argument("--nresource_sets", type=int, nargs="?", help="Number of resource sets") +parser.add_argument("--workers", type=str, nargs="+", help="List of worker nodes") +parser.add_argument("--workerID", type=int, nargs="?", help="Client worker ID") +parser.add_argument("--server", type=str, nargs=3, help="Triple of (ip, port, authkey) used to reach manager") +parser.add_argument("--pwd", type=str, nargs="?", help="Working directory to be used") +parser.add_argument("--worker_pwd", type=str, nargs="?", help="Working directory on remote client") +parser.add_argument( + "--worker_python", type=str, nargs="?", default=sys.executable, help="Python version on remote client" +) +parser.add_argument("--tester_args", type=str, nargs="*", help="Additional arguments for use by specific testers") def _get_zrw(nworkers, nsim_workers): @@ -43,20 +42,21 @@ def _get_zrw(nworkers, nsim_workers): def _mpi_parse_args(args): """Parses arguments for MPI comms.""" from mpi4py import MPI, rc + if rc.initialize is False and not MPI.Is_initialized(): MPI.Init() - nworkers = MPI.COMM_WORLD.Get_size()-1 + nworkers = MPI.COMM_WORLD.Get_size() - 1 is_manager = MPI.COMM_WORLD.Get_rank() == 0 - libE_specs = {'mpi_comm': MPI.COMM_WORLD, 'comms': 'mpi'} + libE_specs = {"mpi_comm": MPI.COMM_WORLD, "comms": "mpi"} if args.nresource_sets is not None: - libE_specs['num_resource_sets'] = args.nresource_sets + libE_specs["num_resource_sets"] = args.nresource_sets # Convenience option which sets other libE_specs options. nsim_workers = args.nsim_workers if nsim_workers is not None: - libE_specs['zero_resource_workers'] = _get_zrw(nworkers, nsim_workers) + libE_specs["zero_resource_workers"] = _get_zrw(nworkers, nsim_workers) return nworkers, is_manager, libE_specs, args.tester_args @@ -64,20 +64,20 @@ def _mpi_parse_args(args): def _local_parse_args(args): """Parses arguments for forked processes using multiprocessing.""" - libE_specs = {'comms': 'local'} + libE_specs = {"comms": "local"} nworkers = args.nworkers if args.nresource_sets is not None: - libE_specs['num_resource_sets'] = args.nresource_sets + libE_specs["num_resource_sets"] = args.nresource_sets # Convenience option which sets other libE_specs options. nsim_workers = args.nsim_workers if nsim_workers is not None: nworkers = nworkers or nsim_workers + 1 - libE_specs['zero_resource_workers'] = _get_zrw(nworkers, nsim_workers) + libE_specs["zero_resource_workers"] = _get_zrw(nworkers, nsim_workers) nworkers = nworkers or 4 - libE_specs['nworkers'] = nworkers + libE_specs["nworkers"] = nworkers return nworkers, True, libE_specs, args.tester_args @@ -86,11 +86,20 @@ def _tcp_parse_args(args): """Parses arguments for local TCP connections""" nworkers = args.nworkers or 4 cmd = [ - sys.executable, sys.argv[0], "--comms", "client", "--server", - "{manager_ip}", "{manager_port}", "{authkey}", "--workerID", - "{workerID}", "--nworkers", - str(nworkers)] - libE_specs = {'nworkers': nworkers, 'worker_cmd': cmd, 'comms': 'tcp'} + sys.executable, + sys.argv[0], + "--comms", + "client", + "--server", + "{manager_ip}", + "{manager_port}", + "{authkey}", + "--workerID", + "{workerID}", + "--nworkers", + str(nworkers), + ] + libE_specs = {"nworkers": nworkers, "worker_cmd": cmd, "comms": "tcp"} return nworkers, True, libE_specs, args.tester_args @@ -100,20 +109,25 @@ def _ssh_parse_args(args): worker_pwd = args.worker_pwd or os.getcwd() script_dir, script_name = os.path.split(sys.argv[0]) worker_script_name = os.path.join(worker_pwd, script_name) - ssh = [ - "ssh", "-R", "{tunnel_port}:localhost:{manager_port}", "{worker_ip}"] + ssh = ["ssh", "-R", "{tunnel_port}:localhost:{manager_port}", "{worker_ip}"] cmd = [ - args.worker_python, worker_script_name, "--comms", "client", - "--server", "localhost", "{tunnel_port}", "{authkey}", "--workerID", - "{workerID}", "--nworkers", - str(nworkers)] + args.worker_python, + worker_script_name, + "--comms", + "client", + "--server", + "localhost", + "{tunnel_port}", + "{authkey}", + "--workerID", + "{workerID}", + "--nworkers", + str(nworkers), + ] cmd = " ".join(cmd) cmd = "( cd {} ; {} )".format(worker_pwd, cmd) ssh.append(cmd) - libE_specs = {'workers': args.workers, - 'worker_cmd': ssh, - 'ip': 'localhost', - 'comms': 'tcp'} + libE_specs = {"workers": args.workers, "worker_cmd": ssh, "ip": "localhost", "comms": "tcp"} return nworkers, True, libE_specs, args.tester_args @@ -121,12 +135,14 @@ def _client_parse_args(args): """Parses arguments for a TCP client.""" nworkers = args.nworkers or 4 ip, port, authkey = args.server - libE_specs = {'ip': ip, - 'port': int(port), - 'authkey': authkey.encode('utf-8'), - 'workerID': args.workerID, - 'nworkers': nworkers, - 'comms': 'tcp'} + libE_specs = { + "ip": ip, + "port": int(port), + "authkey": authkey.encode("utf-8"), + "workerID": args.workerID, + "nworkers": nworkers, + "comms": "tcp", + } return nworkers, False, libE_specs, args.tester_args @@ -202,14 +218,15 @@ def parse_args(): """ args, unknown = parser.parse_known_args(sys.argv[1:]) front_ends = { - 'mpi': _mpi_parse_args, - 'local': _local_parse_args, - 'tcp': _tcp_parse_args, - 'ssh': _ssh_parse_args, - 'client': _client_parse_args} + "mpi": _mpi_parse_args, + "local": _local_parse_args, + "tcp": _tcp_parse_args, + "ssh": _ssh_parse_args, + "client": _client_parse_args, + } if args.pwd is not None: os.chdir(args.pwd) - nworkers, is_manager, libE_specs, tester_args = front_ends[args.comms or 'mpi'](args) + nworkers, is_manager, libE_specs, tester_args = front_ends[args.comms or "mpi"](args) if is_manager and unknown: - logger.warning('parse_args ignoring unrecognized arguments: {}'.format(' '.join(unknown))) + logger.warning("parse_args ignoring unrecognized arguments: {}".format(" ".join(unknown))) return nworkers, is_manager, libE_specs, tester_args