-
Notifications
You must be signed in to change notification settings - Fork 1
/
ezqsub
executable file
·378 lines (296 loc) · 13.6 KB
/
ezqsub
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#!/oasis/projects/nsf/sua137/peanut/usr/bin/python3
# -*- coding: utf-8 -*-
# vim:fenc=utf-8 tabstop=4 expandtab shiftwidth=4 softtabstop=4
#
# Copyright © Mike Dacre <mike.dacre@gmail.com>
#
# Distributed under terms of the MIT license
"""
====================================================================================
FILE: ezqsub (python 3)
AUTHOR: Michael D Dacre, mike.dacre@gmail.com
ORGANIZATION: Stanford University
LICENSE: MIT License
VERSION: 1.0
CREATED: 2013-12-26 17:37
Last modified: 2014-07-17 17:23
DESCRIPTION: Take a file of scripts and submit it to the gordon cluster
The file should be one line per job, the lines can be arbitrarily
long and complex bash scripts, just use semi-colons instead of new-
lines.
USAGE: ezqsub script_file.txt or ezqsub < script_file.txt
====================================================================================
"""
import sys, os
from __future__ import absolute_import
debug = 0
# Get user name
user = os.environ['USER']
# Set the defaults
default_threads = 16
default_commands = default_threads
default_address = 'sua137'
default_queue = 'normal'
default_params = ''
default_modules = 'python'
default_tmpdir = ''.join(['/oasis/scratch/', user, '/temp_project/ezqsub/'])
# This is required on systems with an incompatible PYTHONPATH
pypath = 'PYTHONPATH=' + '/oasis/projects/nsf/sua137/peanut/usr/lib/python3.4' + '; '
# How many jobs should be submitted at any one time
max_running_default = 500
def run_parallel(infile, threads=default_threads):
"""Take a file path to a file of commands and execute them in parallel.
threads variable takes the number of threads to run at once
This should not usually be run directly in this script, it is intended
for node running.
Returns a dictionary of results:
command -> (output_code, stdout_and_stderr)
If verbose, results of any failed command will also be logged
to logfile (default STDERR)"""
# We use the multi-threading pool and subprocess for this
from subprocess import getstatusoutput as call
from multiprocessing import Pool
pool = Pool(processes=int(threads))
# List to hold results
threads = []
# Open file, and for each line, split line into list by whitespace
# and pass to subprocess via multiprocessing pool
with open(infile) as file:
for line in file:
if not line.startswith('#'):
command = [line.rstrip()]
threads.append((line.rstrip(), pool.apply_async(call, command)))
# Delete infile
os.remove(infile)
# Run the threads
results = []
for name, thread in threads:
results.append((name, thread.get()))
# Return results
return results
def split_scripts(infile, prefix='job', commands=default_commands, tmpdir=default_tmpdir):
"""
Take a file of scripts and split it into temp script files to be submitted
to the cluster.
The file should be one line per job, the lines can be arbitrarily
long and complex bash scripts, just use semi-colons instead of new-
lines.
The commands variable specifies how many lines to create per file, or in
other words, how many jobs to run per node.
NOTE: If the number of commands is less than the number of processes, the
extras will be run in series.
The first argument must be an open file handle
"""
from tempfile import mkstemp
import os
outfile_name = ''
filtered_lines = 0
files = []
fd = ''
total_jobs = 0
for line in infile:
if not line.startswith('#'):
if not filtered_lines:
filtered_lines = int(commands)
fd, outfile_name = mkstemp(dir=tmpdir, text=True, prefix=prefix)
os.close(fd)
files.append(os.path.realpath(outfile_name))
with open(outfile_name, 'a') as outfile:
print(line.rstrip(), file=outfile)
filtered_lines = filtered_lines - 1
total_jobs = total_jobs + 1
print("There are", str(total_jobs), "jobs split between", str(len(files)), "node files")
return(files)
def submit_files(file_list, walltime, modules=default_modules, name='job', mail='n', queue=default_queue, params=default_params, address=default_address, threads=default_threads, max_running=480):
""" Take a list of files, and submit each file to qsub
You can modify the queue with the queue variable or any other qsub
parameters using the params variable """
import subprocess
from os import path
from time import sleep
my_loc = path.realpath(__file__)
node_string = ''.join(['nodes=1:ppn=', str(threads), ':native'])
pbs_job_nos = []
# Find out how many jobs are already running
job_length = check_qstat(user)
job_number = 1
m = False
print("\nSubmitting jobs now")
while 1:
if job_length > max_running:
if not m:
print("There are", job_length, "job queued or running in the queue",
"which is greater than the max of", max_running,
"\nWaiting for other jobs to run...", file=sys.stderr)
m = True
sleep(2)
job_length = check_qstat(user)
continue
m = False
file = file_list.pop(0)
# Create the unique job name
job_name = '_'.join([name, str(job_number)])
job_number = job_number + 1
# Create the virtual file for pbs
template = "#!/bin/bash\n#PBS -S /bin/bash\n"
template = ''.join([template, "#PBS -q ", queue, '\n#PBS -N ', job_name,
'\n#PBS -A ', address, '\n#PBS -l ', node_string,
'\n#PBS -l walltime=', walltime, "\n#PBS -m ", mail])
if params:
template = ''.join([template, '\n#PBS ', params])
template = ''.join([template, '\n\ncd $PBS_O_WORKDIR\n\n'])
for module in modules:
template = ''.join([template, 'module load ', module, '\n'])
template = ''.join([template, '\n', pypath, my_loc, ' -f ', file, ' -t ', str(threads), '\n'])
if debug:
print(template)
# Craft the pbs qsub command
pbs_command = ('qsub', '-q', queue, '-N', job_name, '-A', address, '-l', node_string, '-l', '='.join(['walltime', walltime]))
if debug:
print(pbs_command)
# Submit
pbs_submit = subprocess.Popen(pbs_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
pbs_submit.stdin.write(template.encode())
pbs_submit.stdin.close()
# Get job number
job_no = (pbs_submit.stdout.read().decode().rstrip())
print('\t'.join([job_no, job_name]))
pbs_job_nos.append(job_no)
pbs_submit.stdout.close()
job_length = job_length + 1
# End loop if array empty
if not len(file_list):
break
return(pbs_job_nos)
def check_qstat(user):
"""Don't allow more than 500 jobs at a time"""
from subprocess import check_output
import re
q = check_output(['qstat', '-u', user]).decode('utf8').rstrip().split('\n')
running = 0
# Get only running jobs from queue
for i in q:
if re.match(r'[0-9]', i):
if re.split(r' +', i)[9] == 'R' or re.split(r' +', i)[9] == 'Q':
running = running + 1
return(running)
def _get_args():
"""Command Line Argument Parsing"""
import argparse
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
# Optional Files
parser.add_argument('-i', '--infile', nargs='?', default='',
help="Input file, Default STDIN")
parser.add_argument('-f', '--tmpfile', default='', help=argparse.SUPPRESS)
# Debugging flag
parser.add_argument('-x', '--debug', action='store_true', help=argparse.SUPPRESS)
# Required Arguments
parser.add_argument('-w', '--walltime', default='',
help=''.join(["Set walltime, use least possible, max=336:00:00"]) )
# Optional Arguments
parser.add_argument('-n', '--name', default='job',
help="Job Name, will be prefix in qstat. Default: job")
parser.add_argument('-t', '--threads', default=default_threads,
help=''.join(["Over-ride number of threads per node, you should use this ",
"if you want less than ", str(default_threads), " to run at ",
"once on a single node. Note that you will still be billed for ",
"all ", str(default_threads), " cores. This is a good idea if ",
"you want a few jobs only to run. e.g. for a job requiring 30G ",
"of memory, you will want one job per node, so you can set -t ",
"to 1.",
"Default: ", str(default_threads)]) )
parser.add_argument('--commands',
help=''.join(["Over-ride number of commands sent to each node. This defaults ",
"to the same as '-t'. If you want less than ", str(default_threads),
"commands to run on a node, you can just set '-t'. If however, ",
"you want jobs to run in serial on a node, this can be a good option. \n",
"This option should be completely unnecessary most of the time\n",
"Default: ", str(default_commands)]) )
parser.add_argument('-d', '--tmpdir', default=default_tmpdir,
help=''.join(["Where to store job files. These will be deleted when they run ",
"Default: ", default_tmpdir]) )
parser.add_argument('-q', '--queue', default=default_queue,
help=''.join(["Queue Choice, Default: ", default_queue]) )
parser.add_argument('-m', '--modules', default=default_modules, nargs='+',
help=''.join(["Choose modules to load, Default: ", default_modules]) )
parser.add_argument('--mail', default='n',
help=''.join(["qsub mail option, choose one of 'a', 'b', 'e', or 'n' \n",
"a: mail is sent when the job is aborted by the batch system. \n",
"b: mail is sent when the job begins execution. \n",
"e: mail is sent when the job terminates. \n",
"n: no mail sent. \n",
"Default: 'n' (no mail)"]) )
parser.add_argument('-p', '--params', default=default_params,
help=''.join([ "qsub parameters. These are any additional qsub flags you ",
"wish to pass. Note that they should be enclosed in parentheses ",
"e.g. \"-l mem=32GB\", not just plain 'mem=32GB', ",
"If you don't include the flags, it won't work. Default: ",
default_params]) )
parser.add_argument('-a', '--billing', default=default_address,
help=''.join(["Choose the address to bill to, find this with ",
"show_account or on portal.xsedeq.org",
", Default: ", default_address]) )
return parser
# Main function for direct running
def main():
"""Run directly"""
# Get commandline arguments
parser = _get_args()
args = parser.parse_args()
global debug
if args.debug:
debug = 1
else:
debug = 0
# If '-f' is set, we are running on the nodes, so let's just do that
if args.tmpfile:
results = run_parallel(infile=args.tmpfile, threads=args.threads)
# Loop through outputs and print
for name, output in results:
print(name, ':\n', 'Result code: ', output[0], '\nOutput:\n\n',
output[1], '\n\n\n', sep='')
sys.exit(0)
# Require walltime for command line run
if not args.tmpfile and not args.walltime:
print(parser.print_help(), file=sys.stderr)
print("Walltime required\nPlease prived a walltime. e.g.:\n",
__file__, "-w 1:00:00 -i <script_file>", file=sys.stderr)
sys.exit(1)
# Turn modules into array - split on space or comma
modules = []
if isinstance(args.modules, str):
for i in args.modules.split(','):
modules.append(i)
else:
for i in args.modules:
temps = i.split(',')
for j in temps:
modules.append(j)
if 'python' not in modules:
modules.insert(0, 'python')
# Set number of commands per file
if args.commands:
commands = args.commands
else:
commands = args.threads
# Make sure tmpdir exists
if not os.path.exists(args.tmpdir):
os.makedirs(args.tmpdir)
os.chmod(args.tmpdir, 0o755)
# Split input file into temp files stored in /tmp
if args.infile:
with open(args.infile, 'r') as infile:
tmpfiles = split_scripts(infile=infile, prefix=args.name, commands=commands, tmpdir=args.tmpdir)
else:
tmpfiles = split_scripts(infile=sys.stdin, prefix=args.name, commands=commands, tmpdir=args.tmpdir)
# Loop through input files and submit via qsub
results = submit_files(file_list=tmpfiles, modules=modules, mail=args.mail, name=args.name, queue=args.queue, walltime=args.walltime, params=args.params, address=args.billing, threads=args.threads, max_running=max_running_default)
# Success
print('\n')
print(str(len(results)), "jobs submitted")
print("Done")
# The end
if __name__ == '__main__':
main()