/
arlaunch_run.py
220 lines (196 loc) · 8.31 KB
/
arlaunch_run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
"""
A runnable script to launch a single Rocket (a command-line interface to rocket_launcher.py)
Modify from the original rlaunch.py script in Fireworks package
"""
import os
import signal
import sys
from argparse import ArgumentParser
from fireworks.fw_config import LAUNCHPAD_LOC, CONFIG_FILE_DIR
from fireworks.core.launchpad import LaunchPad
from fireworks.core.rocket_launcher import rapidfire, launch_rocket
from fireworks.utilities.fw_utilities import get_my_host, get_my_ip, get_fw_logger
from fireworks.features.multi_launcher import launch_multiprocess
from aiida_fireworks_scheduler.fworker import AiiDAFWorker
#pylint: disable=too-many-statements,line-too-long,import-outside-toplevel
def handle_interrupt(signum, frame):
"""Handler for interruption"""
del frame
sys.stderr.write("Interruped by signal {:d}\n".format(signum))
sys.exit(1)
def arlaunch():
"""
Function rapid-fire job launching
"""
m_description = 'This program launches one or more Rockets. A Rocket retrieves a job from the ' \
'central database and runs it. The "single-shot" option launches a single Rocket, ' \
'whereas the "rapidfire" option loops until all FireWorks are completed.'
parser = ArgumentParser(description=m_description)
subparsers = parser.add_subparsers(help='command', dest='command')
single_parser = subparsers.add_parser('singleshot',
help='launch a single Rocket')
rapid_parser = subparsers.add_parser(
'rapidfire',
help='launch multiple Rockets (loop until all FireWorks complete)')
multi_parser = subparsers.add_parser(
'multi', help='launches multiple Rockets simultaneously')
single_parser.add_argument('-f',
'--fw_id',
help='specific fw_id to run',
default=None,
type=int)
single_parser.add_argument('--offline',
help='run in offline mode (FW.json required)',
action='store_true')
single_parser.add_argument('--pdb',
help='shortcut to invoke debugger on error',
action='store_true')
rapid_parser.add_argument('--nlaunches',
help='num_launches (int or "infinite"; '
'default 0 is all jobs in DB)',
default=0)
rapid_parser.add_argument(
'--timeout',
help='timeout (secs) after which to quit (default None)',
default=None,
type=int)
rapid_parser.add_argument(
'--max_loops',
help='after this many sleep loops, quit even in '
'infinite nlaunches mode (default -1 is infinite loops)',
default=-1,
type=int)
rapid_parser.add_argument('--sleep',
help='sleep time between loops (secs)',
default=None,
type=int)
rapid_parser.add_argument(
'--local_redirect',
help="Redirect stdout and stderr to the launch directory",
action="store_true")
multi_parser.add_argument('num_jobs',
help='the number of jobs to run in parallel',
type=int)
multi_parser.add_argument('--nlaunches',
help='number of FireWorks to run in series per '
'parallel job (int or "infinite"; default 0 is '
'all jobs in DB)',
default=0)
multi_parser.add_argument(
'--sleep',
help='sleep time between loops in infinite launch mode'
'(secs)',
default=None,
type=int)
multi_parser.add_argument(
'--timeout',
help='timeout (secs) after which to quit (default None)',
default=None,
type=int)
multi_parser.add_argument(
'--nodefile',
help='nodefile name or environment variable name '
'containing the node file name (for populating'
' FWData only)',
default=None,
type=str)
multi_parser.add_argument(
'--ppn',
help='processors per node (for populating FWData only)',
default=1,
type=int)
multi_parser.add_argument('--exclude_current_node',
help="Don't use the script launching node"
"as compute node",
action="store_true")
multi_parser.add_argument(
'--local_redirect',
help="Redirect stdout and stderr to the launch directory",
action="store_true")
parser.add_argument('-l',
'--launchpad_file',
help='path to launchpad file')
parser.add_argument('-w',
'--fworker_file',
required=True,
help='path to fworker file')
parser.add_argument('-c',
'--config_dir',
help='path to a directory containing the config file '
'(used if -l, -w unspecified)',
default=CONFIG_FILE_DIR)
parser.add_argument('--loglvl',
help='level to print log messages',
default='INFO')
parser.add_argument('-s',
'--silencer',
help='shortcut to mute log messages',
action='store_true')
try:
import argcomplete
argcomplete.autocomplete(parser)
# This supports bash autocompletion. To enable this, pip install
# argcomplete, activate global completion, or add
# eval "$(register-python-argcomplete rlaunch)"
# into your .bash_profile or .bashrc
except ImportError:
pass
args = parser.parse_args()
signal.signal(signal.SIGINT, handle_interrupt) # graceful exit on ^C
if not args.launchpad_file and os.path.exists(
os.path.join(args.config_dir, 'my_launchpad.yaml')):
args.launchpad_file = os.path.join(args.config_dir,
'my_launchpad.yaml')
elif not args.launchpad_file:
args.launchpad_file = LAUNCHPAD_LOC
args.loglvl = 'CRITICAL' if args.silencer else args.loglvl
if args.command == 'singleshot' and args.offline:
launchpad = None
else:
launchpad = LaunchPad.from_file(
args.launchpad_file) if args.launchpad_file else LaunchPad(
strm_lvl=args.loglvl)
fworker = AiiDAFWorker.from_file(args.fworker_file)
# prime addr lookups
_log = get_fw_logger("rlaunch", stream_level="INFO")
_log.info("Hostname/IP lookup (this will take a few seconds)")
get_my_host()
get_my_ip()
if args.command == 'rapidfire':
rapidfire(launchpad,
fworker=fworker,
m_dir=None,
nlaunches=args.nlaunches,
max_loops=args.max_loops,
sleep_time=args.sleep,
strm_lvl=args.loglvl,
timeout=args.timeout,
local_redirect=args.local_redirect)
elif args.command == 'multi':
total_node_list = None
if args.nodefile:
if args.nodefile in os.environ:
args.nodefile = os.environ[args.nodefile]
with open(args.nodefile, 'r') as fhandle:
total_node_list = [
line.strip() for line in fhandle.readlines()
]
launch_multiprocess(launchpad,
fworker,
args.loglvl,
args.nlaunches,
args.num_jobs,
args.sleep,
total_node_list,
args.ppn,
timeout=args.timeout,
exclude_current_node=args.exclude_current_node,
local_redirect=args.local_redirect)
else:
launch_rocket(launchpad,
fworker,
args.fw_id,
args.loglvl,
pdb_on_exception=args.pdb)
if __name__ == '__main__':
arlaunch()