/
cluster_utils.ipy
328 lines (262 loc) · 10.5 KB
/
cluster_utils.ipy
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
# <nbformat>3.0</nbformat>
# <codecell>
import os
import cPickle
import sys
from time import sleep
from IPython.core.display import clear_output
__author__ = "Daniel McDonald"
__copyright__ = "Copyright 2013, The American Gut Project"
__credits__ = ["Daniel McDonald"]
__license__ = "BSD"
__version__ = "unversioned"
__maintainer__ = "Daniel McDonald"
__email__ = "mcdonadt@colorado.edu"
REGISTERED_ENV_FILE = '.registered_env'
REGISTERED_ENV_VAR = '_registered_env'
def submit_qsub(cmd, job_name='ipy_ftw', queue=None, extra_args=''):
"""Submit a job and return the full job name"""
if isinstance(cmd, list):
cmd = '; '.join(cmd)
job_data = {'workdir':working_dir,
'cmd':cmd,
'queue':'-q %s' % queue if queue is not None else '',
'extra_args':extra_args,
'jobname':job_name}
job_template = 'echo "cd %(workdir)s; %(cmd)s" | qsub -k oe -N %(jobname)s %(queue)s %(extra_args)s'
job = job_template % job_data
job_id = !$job
return (job_name, job_id[0].split('.')[0])
def parse_qstat():
"""Process qstat output"""
user = os.environ['USER']
lines = !qstat -u $user
jobs = {}
for id_, name, state in lines.grep(user).fields(0,3,9).fields():
job_id = id_.split('.')[0]
jobs[job_id] = {}
jobs[job_id]['name'] = name
jobs[job_id]['state'] = state
return jobs
def still_running(monitoring, running_jobs, additional_prefix=None):
"""Check if our jobs to be monitored are running
additional_prefix can be specified to track derivative worker processes
(e.g., from a parallel QIIME workflow)
"""
new_monitoring = set([])
for name, id_ in monitoring:
# stop monitoring if not present anymore
if id_ not in running_jobs:
continue
# stop monitoring if complete
if running_jobs[id_]['state'] == 'C':
continue
new_monitoring.add((name, id_))
if additional_prefix is not None:
# see if we have any new jobs with a prefix we're interested in
for id_,md in running_jobs.items():
if md['name'].startswith(additional_prefix):
if md['state'] in ['R','Q']: # running or queued
new_monitoring.add((md['name'], id_))
if new_monitoring != monitoring:
obj = globals().get(REGISTERED_ENV_VAR, None)
if obj is not None:
obj._monitoring_jobs = new_monitoring
return new_monitoring
def job_run_details(name, id_):
"""Run tracejob and parse out the useful bits"""
# go back 2 days. This may need to be smarter.
job_details = !tracejob -a -m -l -f job -n 2 $id_
if not job_details.grep('dest='):
raise ValueError("Cannot find job %s!" % id_)
dest = job_details.grep('dest=').fields(-2)[0].strip('(),').split('=')[1]
tmp = job_details.grep('Exit_status=').fields(3,4,5,6).fields()[0]
exit_status, walltime, mem, vmem = map(lambda x: x.split('=')[-1], tmp)
stderr_file = os.path.expandvars("$HOME/%s.e%s" % (name, id_))
if not os.path.exists(stderr_file):
raise ValueError, "Could not find expected standard error output: %s" % stderr_file
stdout_file = os.path.expandvars("$HOME/%s.o%s" % (name, id_))
if not os.path.exists(stdout_file):
raise ValueError, "Could not find expected standard output: %s" % stdout_file
return {'exit_status':exit_status,
'walltime':walltime,
'mem':mem,
'vmem':vmem,
'stderr_file':stderr_file,
'stdout_file':stdout_file}
def wait_on(jobs_to_monitor, additional_prefix=None):
"""Block while jobs to monitor are running, and provide a status update"""
POLL_INTERVAL = 5
elapsed = 0
# fragile.
if isinstance(jobs_to_monitor, tuple) and len(jobs_to_monitor) == 2:
jobs_to_monitor = [jobs_to_monitor]
all_jobs = set(jobs_to_monitor)
n_jobs = len(jobs_to_monitor)
print "monitoring %d jobs..." % n_jobs
sys.stdout.flush()
running_jobs = parse_qstat()
while jobs_to_monitor:
sleep(POLL_INTERVAL)
elapsed += POLL_INTERVAL
running_jobs = parse_qstat()
jobs_to_monitor = still_running(jobs_to_monitor, running_jobs, additional_prefix)
all_jobs.update(set(jobs_to_monitor))
n_running = len(jobs_to_monitor)
n_total = len(all_jobs)
clear_output()
print "%d / %d jobs still running, approximately %d seconds elapsed" % (n_running, n_total, elapsed)
sys.stdout.flush()
n_running = len(jobs_to_monitor)
n_total = len(all_jobs)
clear_output()
print "%d / %d jobs still running, approximately %d seconds elapsed" % (n_running, n_total, elapsed)
print "All jobs completed!"
sys.stdout.flush()
# check if any jobs errored out
for name, id_ in all_jobs:
deets = job_run_details(name, id_)
if deets['exit_status'] != '0':
stderr_fp = deets['stderr_file']
last_stderr_line = !tail -n 1 $stderr_fp
print "ERROR! Job %s did not exit cleanly." % id_
print "Here is the last line of standard error (%s)" % stderr_fp
print last_stderr_line[0]
print
return all_jobs
class EnvironmentState(object):
"""Store particulars from an environment, write to disk on change"""
def __init__(self, basedir, **kwargs):
env_fp = os.path.join(basedir, REGISTERED_ENV_FILE)
super(EnvironmentState, self).__setattr__('_state_fp', env_fp)
if os.path.exists(self._state_fp):
raise ValueError("State on disk appears to exist!")
super(EnvironmentState, self).__setattr__('pickler', cPickle.dumps)
super(EnvironmentState, self).__setattr__('_basedir', basedir)
self.__dict__.update(**kwargs)
self.save_state()
def save_state(self):
"""Pickle self out to disk"""
f = open(self._state_fp,'w')
f.write(self.pickler(self))
f.close()
def __setattr__(self, key, value):
"""Update self"""
self.__dict__[key] = value
self.save_state()
def update(self, **kwargs):
"""Update self"""
for k,v in kwargs.items():
self.__dict__[k] = v
self.save_state()
def __repr__(self):
"""Represent thy self"""
out = ["{"]
for k,v in self.__dict__.items():
if isinstance(v, int):
out.append("'%s':%d," % (str(k), v))
elif isinstance(v, float):
out.append("'%s':%f," % (str(k), v))
else:
out.append("'%s':'%s'," % (str(k), str(v)))
out.append("}")
return ''.join(out)
def register(**kwargs):
"""Store variables in the EnvironmentState instance"""
if not REGISTERED_ENV_VAR in globals():
raise ValueError("No registered environment in globals!")
obj = globals()[REGISTERED_ENV_VAR]
for k,v in kwargs.items():
obj.__setattr__(k, v)
def register_env(basedir=None, **kwargs):
"""Create a registered environment"""
import os
if basedir is None:
basedir = os.getcwd()
if REGISTERED_ENV_VAR in globals():
raise KeyError("%s already exists!" % REGISTERED_ENV_VAR)
globals()[REGISTERED_ENV_VAR] = EnvironmentState(basedir, **kwargs)
def recover(basedir=None):
"""Is there a registered environment to recover from?"""
import os
if basedir is None:
basedir=os.getcwd()
env_file = os.path.join(basedir, REGISTERED_ENV_FILE)
if os.path.exists(env_file):
return True
else:
return False
def recover_env(basedir=None):
"""Recover an environment"""
import os
if basedir is None:
basedir = os.getcwd()
registered_env = os.path.join(basedir, REGISTERED_ENV_FILE)
if not os.path.exists(registered_env):
raise ValueError("Cannot recover environment, %s does not exist!" \
% REGISTERED_ENV_FILE)
global_vars = globals()
obj = loads(open(registered_env).read())
global_vars[REGISTERED_ENV_VAR] = obj
for k,v in obj.__dict__.items():
global_vars[k] = v
def recover_jobs():
"""Attempt to recover monitoring of running jobs"""
obj = globals().get(REGISTERED_ENV_VAR, None)
if obj is None:
return
jobs = obj.__dict__.get('_monitoring_jobs', None)
if jobs is None:
return
return wait_on(jobs)
def drop_env():
"""Drop an existing environment"""
import os
if os.path.exists(REGISTERED_ENV_FILE):
os.remove(REGISTERED_ENV_FILE)
if REGISTERED_ENV_VAR in globals():
foo = globals().pop(REGISTERED_ENV_VAR)
def register_items(**kwargs):
"""Insert into the global namespace"""
if REGISTERED_ENV_VAR not in globals():
raise KeyError("%s is not in globals!" % REGISTERED_ENV_VAR)
obj = globals()[REGISTERED_ENV_VAR]
obj.update(**kwargs)
def create_env(prj_name, basedir=None):
"""Creates a simple project environment and working directory
prj_name : a name for the project. It is advised that this be < 14
characters if submitting on a cluster. A random tag is
appended on as well
basedir : set a base working directory, cwd if None
"""
# create a working directory
from random import choice
import os
if basedir is None:
basedir = os.getcwd()
alpha = 'abcdefghijklmnopqrstuvwxyz'; alpha += alpha.upper(); alpha += '0123456789'
retries = 0
not_created = True
while retries < 5 and not_created:
name = '_'.join([prj_name, ''.join([choice(alpha) for i in range(3)])])
working_dir = os.path.join(basedir, name)
if os.path.exists(working_dir):
retries += 1
else:
os.mkdir(working_dir)
not_created = False
register_env(basedir, prj_name=name, working_dir=working_dir)
def submit(prj_name, cmd, mem="8gb"):
"""Submission wrapper for easily submitting jobs"""
return submit_qsub(cmd, job_name=prj_name, queue='memroute',
extra_args='-l pvmem=%s' % mem)
def farm_commands(commands, chunk_size):
"""Chunk up commands, and submit each chunk for serial processing"""
commands = list(commands)
start = 0
jobs = []
for end in range(chunk_size, len(commands) + chunk_size, chunk_size):
chunk = commands[start:end]
start = end
jobs.append(submit(chunk))
return wait_on(jobs)