Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature / Request History options #1141

Merged
merged 14 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
43 changes: 29 additions & 14 deletions libensemble/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,34 +269,51 @@ def _kill_workers(self) -> None:

# --- Checkpointing logic

def _save_every_k(self, fname: str, count: int, k: int) -> None:
"""Saves history every kth step"""
count = k * (count // k)
date_start = self.date_start
def _get_date_start_str(self) -> str:
"""Get timestamp for workflow start, for saving History"""
date_start = self.date_start + "_"
if platform.system() == "Windows":
date_start = date_start.replace(":", "-") # ":" is invalid in windows filenames
filename = fname.format(date_start, count)
if not self.libE_specs["save_H_with_date"]:
date_start = ""
return date_start

def _save_every_k(self, fname: str, count: int, k: int, complete: bool) -> None:
"""Saves history every kth step"""
if not complete:
count = k * (count // k)
date_start = self._get_date_start_str()

filename = fname.format(self.libE_specs["H_file_prefix"], date_start, count)
if not os.path.isfile(filename) and count > 0:
for old_file in glob.glob(fname.format(date_start, "*")):
for old_file in glob.glob(fname.format(self.libE_specs["H_file_prefix"], date_start, "*")):
os.remove(old_file)
np.save(filename, self.hist.H)

def _save_every_k_sims(self) -> None:
def _save_every_k_sims(self, complete: bool) -> None:
"""Saves history every kth sim step"""
self._save_every_k(
os.path.join(self.libE_specs["workflow_dir_path"], "libE_history_for_run_starting_{}_after_sim_{}.npy"),
os.path.join(self.libE_specs["workflow_dir_path"], "{}_{}after_sim_{}.npy"),
self.hist.sim_ended_count,
self.libE_specs["save_every_k_sims"],
complete,
)

def _save_every_k_gens(self) -> None:
def _save_every_k_gens(self, complete: bool) -> None:
"""Saves history every kth gen step"""
self._save_every_k(
os.path.join(self.libE_specs["workflow_dir_path"], "libE_history_for_run_starting_{}_after_gen_{}.npy"),
os.path.join(self.libE_specs["workflow_dir_path"], "{}_{}after_gen_{}.npy"),
self.hist.index,
self.libE_specs["save_every_k_gens"],
complete,
)

def _init_every_k_save(self, complete=False) -> None:
if self.libE_specs.get("save_every_k_sims"):
self._save_every_k_sims(complete)
if self.libE_specs.get("save_every_k_gens"):
self._save_every_k_gens(complete)

# --- Handle outgoing messages to workers (work orders from alloc)

def _check_work_order(self, Work: dict, w: int, force: bool = False) -> None:
Expand Down Expand Up @@ -417,10 +434,7 @@ def _receive_from_workers(self, persis_info: dict) -> dict:
new_stuff = True
self._handle_msg_from_worker(persis_info, w)

if self.libE_specs.get("save_every_k_sims"):
self._save_every_k_sims()
if self.libE_specs.get("save_every_k_gens"):
self._save_every_k_gens()
self._init_every_k_save()
return persis_info

def _update_state_on_worker_msg(self, persis_info: dict, D_recv: dict, w: int) -> None:
Expand Down Expand Up @@ -559,6 +573,7 @@ def _final_receive_and_kill(self, persis_info: dict) -> (dict, int, int):
if self.WorkerExc:
exit_flag = 1

self._init_every_k_save(complete=True)
self._kill_workers()
return persis_info, exit_flag, self.elapsed()

Expand Down
14 changes: 12 additions & 2 deletions libensemble/tests/functionality_tests/test_uniform_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
# TESTSUITE_COMMS: mpi local tcp
# TESTSUITE_NPROCS: 2 4

import datetime
import os

import numpy as np

from libensemble.gen_funcs.sampling import uniform_random_sample
Expand All @@ -22,13 +25,16 @@
from libensemble.libE import libE
from libensemble.sim_funcs.six_hump_camel import six_hump_camel
from libensemble.tests.regression_tests.support import six_hump_camel_minima as minima
from libensemble.tools import add_unique_random_streams, parse_args, save_libE_output
from libensemble.tools import add_unique_random_streams, parse_args

# Main block is necessary only when using local comms with spawn start method (default on macOS and Windows).
if __name__ == "__main__":
nworkers, is_manager, libE_specs, _ = parse_args()
libE_specs["save_every_k_sims"] = 400
libE_specs["save_every_k_gens"] = 300
libE_specs["save_H_with_date"] = True
libE_specs["save_H_on_completion"] = False
libE_specs["H_file_prefix"] = "TESTING"

sim_specs = {
"sim_f": six_hump_camel, # Function whose output is being minimized
Expand Down Expand Up @@ -62,5 +68,9 @@
for m in minima:
assert np.min(np.sum((H["x"] - m) ** 2, 1)) < tol

files = os.listdir(os.path.dirname(__file__))
date = str(datetime.datetime.today()).split(" ")[0]
assert any([i.startswith("TESTING_") for i in files])
assert any([date in i for i in files])

print("\nlibEnsemble found the 6 minima within a tolerance " + str(tol))
save_libE_output(H, persis_info, __file__, nworkers)