-
Notifications
You must be signed in to change notification settings - Fork 24
/
script_test_balsam_hworld.py
83 lines (63 loc) · 3.13 KB
/
script_test_balsam_hworld.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
# This script is submitted as an app and job to Balsam. The job submission is
# via 'balsam launch' executed in the test_balsam_hworld.py script.
import numpy as np
import mpi4py
from mpi4py import MPI
from libensemble.executors.balsam_executor import BalsamMPIExecutor
from libensemble.message_numbers import WORKER_DONE, WORKER_KILL_ON_ERR, WORKER_KILL_ON_TIMEOUT, TASK_FAILED
from libensemble.libE import libE
from libensemble.sim_funcs.executor_hworld import executor_hworld
from libensemble.gen_funcs.sampling import uniform_random_sample
from libensemble.tools import add_unique_random_streams
import libensemble.sim_funcs.six_hump_camel as six_hump_camel
mpi4py.rc.recv_mprobe = False # Disable matching probes
libE_specs = {'mpi_comm': MPI.COMM_WORLD,
'comms': 'mpi',
'save_every_k_sims': 400,
'save_every_k_gens': 20,
}
nworkers = MPI.COMM_WORLD.Get_size() - 1
is_manager = MPI.COMM_WORLD.Get_rank() == 0
cores_per_task = 1
sim_app = './my_simtask.x'
sim_app2 = six_hump_camel.__file__
exctr = BalsamMPIExecutor(auto_resources=False, central_mode=False, custom_info={'not': 'used'})
exctr.register_calc(full_path=sim_app, calc_type='sim') # Default 'sim' app - backward compatible
exctr.register_calc(full_path=sim_app2, app_name='six_hump_camel') # Named app
exctr.register_calc(full_path=sim_app2, app_name='sim_hump_camel_dry_run')
sim_specs = {'sim_f': executor_hworld,
'in': ['x'],
'out': [('f', float), ('cstat', int)],
'user': {'cores': cores_per_task,
'balsam_test': True}
}
gen_specs = {'gen_f': uniform_random_sample,
'in': ['sim_id'],
'out': [('x', float, (2,))],
'user': {'lb': np.array([-3, -2]),
'ub': np.array([3, 2]),
'gen_batch_size': nworkers}
}
persis_info = add_unique_random_streams({}, nworkers + 1)
exit_criteria = {'elapsed_wallclock_time': 60}
# Perform the run
H, persis_info, flag = libE(sim_specs, gen_specs, exit_criteria,
persis_info, libE_specs=libE_specs)
if is_manager:
print('\nChecking expected task status against Workers ...\n')
calc_status_list_in = np.asarray([WORKER_DONE, WORKER_KILL_ON_ERR,
WORKER_DONE, WORKER_KILL_ON_TIMEOUT,
TASK_FAILED, 0])
calc_status_list = np.repeat(calc_status_list_in, nworkers)
print("Expecting: {}".format(calc_status_list))
print("Received: {}\n".format(H['cstat']))
assert np.array_equal(H['cstat'], calc_status_list), "Error - unexpected calc status. Received: " + str(H['cstat'])
# Check dry_run submissions inside ensemble.log
with open('ensemble.log', 'r') as f:
lines = f.readlines()
assert len([i for i in lines if 'Test (No submit) Runline:' in i]) == (len(calc_status_list_in) - 1) * nworkers, \
"Dry run runlines not listed in ensemble.log for each dry_run submission instance."
# Cleanup (maybe cover del_apps() and del_tasks())
exctr.del_apps()
exctr.del_tasks()
print("\n\n\nRun completed.")