Skip to content

Commit

Permalink
Merge pull request #1041 from Libensemble/feature/allow_dir_reuse
Browse files Browse the repository at this point in the history
Feature/allow ensemble dir overwrite
  • Loading branch information
jlnav committed Aug 3, 2023
2 parents 682fa29 + 43a0ca5 commit dfdcd4a
Show file tree
Hide file tree
Showing 13 changed files with 209 additions and 72 deletions.
4 changes: 4 additions & 0 deletions docs/data_structures/libE_specs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ the ``LibeSpecs`` class. When provided as a Python class, options are validated
Whether to copy back directories within ``ensemble_dir_path`` back to launch
location. Useful if ``ensemble_dir_path`` located on node-local storage.

"reuse_output_dir" [bool] = ``False``:
Whether to allow overwrites and access to previous ensemble and workflow directories in subsequent runs.
``False`` by default to protect results.

"use_worker_dirs" [bool] = ``False``:
Whether to organize calculation directories under worker-specific directories:

Expand Down
82 changes: 41 additions & 41 deletions libensemble/comms/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,11 @@ def set_directory(self, dirname: str) -> None:
dirname = Path(dirname)
if not dirname.exists():
dirname.mkdir(parents=True)
if self.logger_set:
logger = logging.getLogger(self.name)
logger.warning("Cannot set directory after loggers initialized")
else:
baselog = Path(self.filename).name
basestat = Path(self.stat_filename).name
self.filename = str(dirname / baselog)
self.stat_filename = str(dirname / basestat)

baselog = Path(self.filename).name
basestat = Path(self.stat_filename).name
self.filename = str(dirname / baselog)
self.stat_filename = str(dirname / basestat)


class CommLogHandler(logging.Handler):
Expand Down Expand Up @@ -163,40 +160,43 @@ def manager_logging_config(specs={}):
# Regular logging
logconfig = LogConfig.config

if not logconfig.logger_set:
if specs.get("use_workflow_dir"): # placing logfiles in separate directory
logconfig.set_directory(specs.get("workflow_dir_path"))

formatter = logging.Formatter(logconfig.fmt)
wfilter = WorkerIDFilter(0)
fh = logging.FileHandler(logconfig.filename, mode="w")
fh.addFilter(wfilter)
fh.setFormatter(formatter)
logger = logging.getLogger(logconfig.name)
logger.propagate = False
logger.setLevel(logconfig.log_level) # Formatter filters on top of this
logger.addHandler(fh)
logconfig.logger_set = True
if specs.get("use_workflow_dir"): # placing logfiles in separate directory
logconfig.set_directory(specs.get("workflow_dir_path"))

# Stats logging
# NB: Could add a specialized handler for immediate flushing
fhs = logging.FileHandler(logconfig.stat_filename, mode="w")
fhs.addFilter(wfilter)
fhs.setFormatter(logging.Formatter("%(prefix)s: %(message)s"))
stat_logger = logging.getLogger(logconfig.stats_name)
stat_logger.propagate = False
stat_logger.setLevel(logging.DEBUG)
stat_logger.addHandler(fhs)

# Mirror error-logging to stderr of user-specified level
fhe = logging.StreamHandler(stream=sys.stderr)
fhe.addFilter(wfilter)
efilter = ErrorFilter(logconfig.stderr_level)
fhe.addFilter(efilter)
fhe.setFormatter(formatter)
logger.addHandler(fhe)
else:
stat_logger = logging.getLogger(logconfig.stats_name)
formatter = logging.Formatter(logconfig.fmt)
wfilter = WorkerIDFilter(0)
fh = logging.FileHandler(logconfig.filename, mode="a")
fh.addFilter(wfilter)
fh.setFormatter(formatter)
logger = logging.getLogger(logconfig.name)
if logconfig.logger_set:
remove_handlers(logger)
logger.propagate = False
logger.setLevel(logconfig.log_level) # Formatter filters on top of this
logger.addHandler(fh)
logconfig.logger_set = True

# Stats logging
# NB: Could add a specialized handler for immediate flushing
fhs = logging.FileHandler(logconfig.stat_filename, mode="a")
fhs.addFilter(wfilter)
fhs.setFormatter(logging.Formatter("%(prefix)s: %(message)s"))
stat_logger = logging.getLogger(logconfig.stats_name)
if logconfig.logger_set:
remove_handlers(stat_logger)
stat_logger.propagate = False
stat_logger.setLevel(logging.DEBUG)
stat_logger.addHandler(fhs)

# Mirror error-logging to stderr of user-specified level
fhe = logging.StreamHandler(stream=sys.stderr)
fhe.addFilter(wfilter)
efilter = ErrorFilter(logconfig.stderr_level)
fhe.addFilter(efilter)
fhe.setFormatter(formatter)
logger.addHandler(fhe)
# else:
# stat_logger = logging.getLogger(logconfig.stats_name)

stat_logger.info(f"Starting ensemble at: {stat_timer.date_start}")

Expand Down
2 changes: 1 addition & 1 deletion libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def __init__(

try:
temp_EnsembleDirectory.make_copyback()
except OSError as e: # Ensemble dir exists and isn't empty.
except AssertionError as e: # Ensemble dir exists and isn't empty.
logger.manager_warning(_USER_CALC_DIR_WARNING.format(temp_EnsembleDirectory.ensemble_dir))
self._kill_workers()
raise ManagerException(
Expand Down
6 changes: 6 additions & 0 deletions libensemble/specs.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,12 @@ class LibeSpecs(BaseModel):
in the workflow directory.
"""

reuse_output_dir: Optional[bool] = False
"""
Whether to allow overwrites and access to previous ensemble and workflow directories in subsequent runs.
``False`` by default to protect results.
"""

workflow_dir_path: Optional[Union[str, Path]] = "."
"""
Optional path to the workflow directory. Autogenerated in the current directory if `use_workflow_dir`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
# libE_specs["zero_resource_workers"] = [1] # If first worker must be gen, use this instead

libE_specs["sim_dirs_make"] = True
libE_specs["workflow_dir_path"] = "./CUDA_intermediate/workflow" + str(nworkers)
libE_specs["workflow_dir_path"] = "./ensemble_CUDA/workflow_" + libE_specs["comms"] + "_w" + str(nworkers) + "_N"
libE_specs["sim_dir_copy_files"] = [".gitignore"]
libE_specs["reuse_output_dir"] = True

if libE_specs["comms"] == "tcp":
sys.exit("This test only runs with MPI or local -- aborting...")
Expand Down Expand Up @@ -82,9 +83,13 @@
exit_criteria = {"sim_max": 40, "wallclock_max": 300}

# Perform the run
H, persis_info, flag = libE(
sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs, alloc_specs=alloc_specs
)

for i in range(2):
persis_info = add_unique_random_streams({}, nworkers + 1)
libE_specs["workflow_dir_path"] = libE_specs["workflow_dir_path"][:-1] + str(i)
H, persis_info, flag = libE(
sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs, alloc_specs=alloc_specs
)

if is_manager:
assert flag == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
libE_specs["sim_dir_copy_files"] = [dir_to_copy]
libE_specs["sim_dir_symlink_files"] = [dir_to_symlink]
libE_specs["ensemble_copy_back"] = True
libE_specs["reuse_output_dir"] = True

sim_specs = {
"sim_f": sim_f,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
e_ensemble = "./ensemble_ex_w" + str(nworkers) + "_" + libE_specs.get("comms")

if not os.path.isdir(e_ensemble):
os.makedirs(os.path.join(e_ensemble, "sim0_worker0"), exist_ok=True)
os.makedirs(os.path.join(e_ensemble, "sim0"), exist_ok=True)

libE_specs["sim_dirs_make"] = True
libE_specs["ensemble_dir_path"] = e_ensemble
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@

persis_info = add_unique_random_streams({}, nworkers + 1)

exit_criteria = {"sim_max": 21}
exit_criteria = {"sim_max": 20}

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

Expand Down
93 changes: 93 additions & 0 deletions libensemble/tests/functionality_tests/test_workflow_dir.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""
Runs libEnsemble with uniform random sampling and writes results into sim dirs.
Tests sim_input_dir capabilities
Execute via one of the following commands (e.g. 3 workers):
mpiexec -np 4 python test_sim_input_dir_option.py
python test_sim_input_dir_option.py --nworkers 3 --comms local
python test_sim_input_dir_option.py --nworkers 3 --comms tcp
The number of concurrent evaluations of the objective function will be 4-1=3.
"""

# Do not change these lines - they are parsed by run-tests.sh
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_NPROCS: 2 4

import os

import numpy as np

from libensemble.gen_funcs.sampling import uniform_random_sample as gen_f
from libensemble.libE import libE
from libensemble.tests.regression_tests.support import write_sim_func as sim_f
from libensemble.tools import add_unique_random_streams, parse_args

nworkers, is_manager, libE_specs, _ = parse_args()

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":
sim_input_dir = os.path.abspath("./sim_input_dir")
dir_to_copy = sim_input_dir + "/copy_this"

for dire in [sim_input_dir, dir_to_copy]:
if not os.path.isdir(dire):
os.makedirs(dire, exist_ok=True)

libE_specs["sim_input_dir"] = sim_input_dir
libE_specs["sim_dirs_make"] = False
libE_specs["sim_dir_symlink_files"] = [
os.path.abspath("./test_sim_input_dir_option.py")
] # to cover FileExistsError catch
libE_specs["ensemble_copy_back"] = True
libE_specs["use_workflow_dir"] = True

sim_specs = {
"sim_f": sim_f,
"in": ["x"],
"out": [("f", float)],
}

gen_specs = {
"gen_f": gen_f,
"out": [("x", float, (1,))],
"user": {
"gen_batch_size": 20,
"lb": np.array([-3]),
"ub": np.array([3]),
},
}

persis_info = add_unique_random_streams({}, nworkers + 1)

exit_criteria = {"sim_max": 21}

ensemble_lens = []
stats_lens = []

for i in range(2):

libE_specs["workflow_dir_path"] = (
"./test_workflow" + str(i) + "_nworkers" + str(nworkers) + "_comms-" + libE_specs["comms"]
)

H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria, persis_info, libE_specs=libE_specs)

assert os.path.isdir(libE_specs["workflow_dir_path"]), "workflow_dir not created"
assert all(
[
i in os.listdir(libE_specs["workflow_dir_path"])
for i in ["ensemble.log", "libE_stats.txt", "ensemble", "ensemble_back"]
]
)

with open(os.path.join(libE_specs["workflow_dir_path"], "ensemble.log"), "r") as f:
lines = f.readlines()
ensemble_lens.append(len(lines))

with open(os.path.join(libE_specs["workflow_dir_path"], "libE_stats.txt"), "r") as f:
lines = f.readlines()
stats_lens.append(len(lines))

assert ensemble_lens[0] == ensemble_lens[1], "ensemble.log's didn't have same length"
assert stats_lens[0] == stats_lens[1], "libE_stats.txt's didn't have same length"
9 changes: 0 additions & 9 deletions libensemble/tests/unit_tests_logger/test_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ def test_set_filename():


def test_set_directory(tmp_path):
from libensemble.comms.logs import manager_logging_config

logs = LogConfig.config
logs.logger_set = False
Expand All @@ -77,14 +76,6 @@ def test_set_directory(tmp_path):
assert logs.filename == os.path.join(tmp_path, "ensemble.log")
assert logs.stat_filename == os.path.join(tmp_path, "libE_stats.txt")

manager_logging_config()
logger.set_directory("toolate")
assert logs.filename == os.path.join(tmp_path, "ensemble.log")
assert logs.stat_filename == os.path.join(tmp_path, "libE_stats.txt")

assert os.path.isfile(os.path.join(tmp_path, "ensemble.log"))
assert os.path.isfile(os.path.join(tmp_path, "libE_stats.txt"))


def test_set_stderr_level():
stderr_level = logger.get_stderr_level()
Expand Down
3 changes: 2 additions & 1 deletion libensemble/tools/tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@
+ "\n"
+ "libEnsemble attempted to reuse {} as a parent directory for calc dirs.\n"
+ "If allowed to continue, previous results may have been overwritten!\n"
+ "Resolve this by ensuring libE_specs['ensemble_dir_path'] is unique for each run."
+ "Resolve this either by ensuring libE_specs['ensemble_dir_path'] is unique for each run\n"
+ "or by setting libE_specs['reuse_output_dir'] = True.\n"
+ "\n"
+ 79 * "*"
+ "\n\n"
Expand Down
33 changes: 28 additions & 5 deletions libensemble/utils/loc_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,27 @@ def __init__(self) -> None:
self.dirs = {}
self.stack = []

def copy_or_symlink(
self, destdir: str, copy_files: List[Path] = [], symlink_files: List[Path] = [], ignore_FileExists: bool = False
def copy_file(
self,
destdir: Path,
copy_files: List[Path] = [],
ignore_FileExists: bool = False,
allow_overwrite: bool = False,
) -> None:
"""Inspired by https://stackoverflow.com/a/9793699.
Determine paths, basenames, and conditions for copying/symlinking
"""
for file_path in copy_files:
file_path = Path(file_path).absolute()
dest_path = destdir / Path(file_path.name)
if allow_overwrite and dest_path.exists():
if dest_path.is_dir():
shutil.rmtree(dest_path)
else:
dest_path.unlink()
try:
if file_path.is_dir():
shutil.copytree(file_path, dest_path)
shutil.copytree(file_path, dest_path, dirs_exist_ok=allow_overwrite)
else:
shutil.copy(file_path, dest_path)
except FileExistsError:
Expand All @@ -36,9 +45,19 @@ def copy_or_symlink(
else: # Indicates problem with unique sim_dirs
raise

def symlink_file(
self,
destdir: Path,
symlink_files: List[Path] = [],
ignore_FileExists: bool = False,
allow_overwrite: bool = False,
) -> None:

for file_path in symlink_files:
src_path = Path(file_path).absolute()
dest_path = destdir / Path(file_path.name)
if allow_overwrite and dest_path.exists():
dest_path.unlink(missing_ok=True)
try:
os.symlink(src_path, dest_path)
except FileExistsError:
Expand All @@ -55,6 +74,7 @@ def register_loc(
copy_files: List[Path] = [],
symlink_files: List[Path] = [],
ignore_FileExists: bool = False,
allow_overwrite: bool = False,
) -> str:
"""Register a new location in the dictionary.
Expand Down Expand Up @@ -84,8 +104,11 @@ def register_loc(
dirname.mkdir(parents=True, exist_ok=True)

self.dirs[key] = dirname
if len(copy_files) or len(symlink_files):
self.copy_or_symlink(dirname, copy_files, symlink_files, ignore_FileExists)
if len(copy_files):
self.copy_file(dirname, copy_files, ignore_FileExists, allow_overwrite)

if len(symlink_files):
self.symlink_file(dirname, symlink_files, ignore_FileExists, allow_overwrite)

return dirname

Expand Down

0 comments on commit dfdcd4a

Please sign in to comment.