-
Notifications
You must be signed in to change notification settings - Fork 24
/
keras_cvae_ml_genf.py
209 lines (160 loc) · 7.71 KB
/
keras_cvae_ml_genf.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
__all__ = ['run_keras_cvae_ml_genf']
import os
import glob
import yaml
import json
import numpy as np
from libensemble.executors.executor import Executor
from libensemble.message_numbers import (STOP_TAG, PERSIS_STOP, FINISHED_PERSISTENT_GEN_TAG)
from libensemble.tools.gen_support import get_mgr_worker_msg, send_mgr_worker_msg
def get_stage(persis_info):
return str(persis_info['stage_count']).zfill(4)
def update_config_file(user, app_type, pinfo):
"""
Updates configuration files for each application prior to launching, and
produces expected output directory structure for each.
"""
with open(user[app_type + '_config'], 'r') as f:
config = yaml.safe_load(f)
output_path = os.getcwd() + '/{}_runs/stage'.format(app_type) + \
get_stage(pinfo) + '/task0000'
updates = {
'experiment_directory': os.getcwd(),
'output_path': output_path,
'stage_idx': pinfo['stage_count']
}
if app_type == 'aggregation':
updates.update({'last_n_h5_files': user['initial_sample_size']})
elif app_type == 'machine_learning':
updates.update({
'model_tag': 'keras_cvae_model' + get_stage(pinfo),
'last_n_h5_files': user['last_n_h5_files']
})
elif app_type == 'model_selection':
updates.update({'checkpoint_dir': output_path.replace(app_type, 'machine_learning') + '/checkpoint'})
elif app_type == 'agent':
updates.update({
'num_intrinsic_outliers': user['outliers'],
'num_extrinsic_outliers': user['outliers'],
'n_most_recent_h5_files': user['n_most_recent_h5_files'],
'n_traj_frames': user['n_traj_frames']
})
os.makedirs(output_path, exist_ok=True)
task_config = os.path.join(output_path, 'stage' + get_stage(pinfo) + '_task0000.yaml')
config.update(updates)
with open(task_config, 'w') as f:
yaml.dump(config, f)
return output_path, task_config
def submit_application(exctr, user, app_type, output_path, task_config):
"""
Switches to an expected output directory, launches an application
via libEnsemble's executor, then polls its status until it finishes.
"""
start = os.getcwd()
os.chdir(output_path)
args = '-c ' + os.path.join(os.getcwd(), task_config)
task = exctr.submit(app_name=app_type, app_args=args, wait_on_run=True,
num_procs=1, num_nodes=1, ranks_per_node=1)
calc_status = exctr.polling_loop(task, timeout=user[app_type + '_kill_minutes']*60, delay=1)
os.chdir(start)
return calc_status
def postprocess_md_sim_dirs(calc_in, pinfo):
"""
Symlink the Molecular Dynamics results into directories that resemble
DeepDriveMD's output.
"""
expected_md_dir = './molecular_dynamics_runs/stage' + get_stage(pinfo)
os.makedirs(expected_md_dir)
for entry in calc_in:
base_task_dir = 'task' + str(entry['task_id']).zfill(4)
full_task_dir = os.path.join(expected_md_dir, base_task_dir)
sim_dir = entry['sim_dir_loc']
os.symlink(os.path.abspath('../' + sim_dir), os.path.abspath(full_task_dir))
def generate_initial_md_runs(gen_specs, persis_info):
"""
Generate an initial local History array, and populate with an initial set
of parameters for an initial set of MD simulations.
"""
persis_info['stage_count'] += 1
sample_size = gen_specs['user']['initial_sample_size']
local_H = np.zeros(sample_size, dtype=gen_specs['out'])
local_H['task_id'] = np.arange(sample_size)
local_H['initial'] = True
local_H['gen_dir_loc'] = os.getcwd().split('/')[-1]
local_H['sim_id'] = np.arange(sample_size)
local_H['stage_id'] = 0
persis_info['last_sim_id'] = local_H['sim_id'][-1]
return local_H, persis_info
def generate_subsequent_md_runs(gen_specs, persis_info, local_H, output_path):
"""
Generate subsequent MD simulation run parameters in the local History array,
based on the number of outlier points detected by the Agent application.
"""
persis_info['stage_count'] += 1
presumed_agent_output = glob.glob(output_path + '/stage*_task*.json')[0]
with open(os.path.join(output_path, presumed_agent_output), 'r') as f:
sample_size = len(json.load(f))
local_H.resize(len(local_H) + sample_size, refcheck=False)
local_H['task_id'][-sample_size:] = np.arange(sample_size)
local_H['initial'][-sample_size:] = False
local_H['gen_dir_loc'][-sample_size:] = os.getcwd().split('/')[-1]
subs_sim_id = persis_info['last_sim_id'] + 1
local_H['sim_id'][-sample_size:] = np.arange(subs_sim_id, subs_sim_id + sample_size)
local_H['stage_id'][-sample_size:] = persis_info['stage_count']
persis_info['last_sim_id'] = local_H['sim_id'][-1]
return local_H, persis_info
def skip_app(gen_specs, app):
"""
Optionally skip certain apps, if specified in gen_specs['user']
"""
if 'skip_' + app in gen_specs['user']:
if gen_specs['user']['skip_' + app]:
return True
return False
def run_keras_cvae_ml_genf(H, persis_info, gen_specs, libE_info):
""" Persistent Generator user function for processing simulation output and
running via the Executor each of the remaining DeepDriveMD applications concerned
with simulation output. This generator does not return until libEnsemble
concludes.
On initialization, this generator function produces an initial set of parameters
for an initial set of simulation function calls, then sends the local History
array containing these values directly to the Manager, which will distribute
the work accordingly.
After this, the persistent generator waits until all the results are available
from the Manager, preprocesses some of the output, then configures and launches
the other DeepDriveMD applications in a sequence. The final app's output
determines the number of future candidate simulations. The local History array
is updated, then sent directly to the Manager.
"""
user = gen_specs['user']
exctr = Executor.executor
persis_info['stage_count'] = -1
os.environ["OMP_NUM_THREADS"] = '4'
os.environ["CUDA_VISIBLE_DEVICES"] = str(libE_info['workerID'])
initial_complete = False
tag = None
while True:
if not initial_complete:
local_H, persis_info = generate_initial_md_runs(gen_specs, persis_info)
# Send initial MD run parameters directly to the Manager
send_mgr_worker_msg(libE_info['comm'], local_H)
initial_complete = True
else:
# Wait for batch of MD results
tag, Work, calc_in = get_mgr_worker_msg(libE_info['comm'])
if tag in [STOP_TAG, PERSIS_STOP]: # Generator instructed to stop
break
# Symlink MD data into directory structure expected by future apps
postprocess_md_sim_dirs(calc_in, persis_info)
# Run each subsequent DeepDriveMD app
for app in ['aggregation', 'machine_learning', 'model_selection', 'agent']:
if skip_app(gen_specs, app):
continue
output_path, task_config = update_config_file(user, app, persis_info)
calc_status = submit_application(exctr, user, app, output_path, task_config)
local_H[app + '_cstat'][Work['libE_info']['H_rows']] = calc_status
# Produce subsequent set of MD runs parameters based on the final app's results
local_H, persis_info = generate_subsequent_md_runs(gen_specs, persis_info, local_H, output_path)
# Send subsequent MD run parameters directly to the Manager
send_mgr_worker_msg(libE_info['comm'], local_H)
return local_H, persis_info, FINISHED_PERSISTENT_GEN_TAG