forked from whilp/clusterui
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cui
executable file
·389 lines (320 loc) · 12.4 KB
/
cui
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
#!/usr/bin/env python2.6
"""%prog [options] [command]
Submit a UI job and connect to it when it begins to run. If <command> is
specified, it is executed on the remote machine. Otherwise, the current shell
($SHELL) is executed.
To connect a UI job that's already running, pass its ClusterId to the '-i'
option. Use the special ID 'any' to automatically select a suitable running job.
By default, %prog will use glexec to switch to the grid user after connecting to
the UI job; pass the '-g' option to disable glexec.
After the connection is terminated, the UI job's temporary directory including
its submit file, log file and any files transferred back from the remote
machine will be removed. To prevent cleanup, pass the '-P' option; since no
temporary directory is created when connecting to an existing job with '-i',
cleanup does not occur in that case, either.
"""
import logging
import os
import sys
from collections import namedtuple
from contextlib import contextmanager
from optparse import OptionParser, make_option as Opt
from pwd import getpwuid
from shutil import rmtree
from subprocess import CalledProcessError, Popen, PIPE
from tempfile import mkdtemp
from time import sleep
RUNTIME = 3600
SLEEP = "/bin/sleep"
log = logging.getLogger(__name__)
def main():
optparser = OptionParser(usage=__doc__, option_list=options)
(opts, args) = optparser.parse_args()
try:
verbose = int(opts.verbose)
except ValueError:
verbose = opts.verbose.count("v")
if verbose >= 0:
log.level = max(1, logging.WARNING - (10 * verbose))
log.addHandler(logging.StreamHandler())
SUBMIT["executable"] = SLEEP
SUBMIT["arguments"] = RUNTIME
hascvmfs(SUBMIT)
if opts.x509:
hasx509(SUBMIT)
user = getuser()
jobid = opts.id
if jobid == "any":
jobid = discover(user)
if not jobid:
log.warn("failed to find a running UI job")
if jobid:
# Connecting to an existing job, so no tempdir needed.
create = preserve = dir = prefix = None
else:
create = True
prefix = "%s-%s-" % ("clusterui", user)
preserve = opts.preserve or opts.persist
dir = os.path.abspath(opts.submit_dir) if opts.submit_dir else None
with submitdir(cleanup=not preserve, dir=dir, prefix=prefix, create=True) as tmp:
try:
if not jobid:
jobid = submit(tmp, SUBMIT, **vars(opts))
condor_ssh_to_job = connect(jobid, args)
ret = condor_ssh_to_job.returncode
except ProcessError, e:
# Command failed, bail.
log.debug("%s returned %d", e.process.cmd, e.returncode)
if e.process.stdout:
sys.stdout.write(e.process.stdout.read())
if e.process.stderr:
sys.stderr.write(e.process.stderr.read())
ret = e.returncode
try:
cleanupjob(tmp, user)
except ProcessError:
pass
return ret
options = [
Opt("-v", "--verbose", default=0, help="set logging level to VERBOSE"),
Opt("-P", "--persist", default=False, action="store_true",
help="leave UI job in the queue"),
Opt("-d", "--submit-dir", default=None,
help="create submit directory under SUBMIT_DIR"),
Opt("-i", "--id", default=None, action="store",
help="connect to existing job ID"),
Opt("-p", "--preserve", default=False, action="store_true",
help="preserve temporary job directory"),
Opt("-x", "--x509", default=True, action="store_false",
help="disable X509 proxy detection"),
]
def discover(user):
"""Find running UI jobs for *user*.
Returns a single jobid string or None if no jobs are found. *user* is a
string username matching a Condor 'Owner' ClassAd.
"""
condor_q = Process(["condor_q",
"-con", (
"IsUIJob == true && "
'Owner == "{user}" && '
"JobStatus == {running}").format(user=user, running=2),
"-f", "%s\n", "ClusterId"])
condor_q.check()
jobids = condor_q.stdout.read().splitlines()
log.debug("discovered %d active UI jobs", len(jobids))
if jobids:
return jobids[0]
def connect(jobid, args):
"""Connect to running UI job *jobid*.
Executes `condor_ssh_to_job` with *args* and returns a :class:`Process`
object. Raises :class:`ProcessError` if `condor_ssh_to_job` returns a value
other than 0.
"""
log.info("connecting to job")
condor_ssh_to_job = Process(["condor_ssh_to_job", jobid] + args,
stdout=None, stderr=None)
try:
condor_ssh_to_job.check()
except ProcessError:
log.warn("failed to connect to job %s", jobid)
raise
return condor_ssh_to_job
def submit(tmp, jdl, **kwargs):
"""Submit a UI job described by *jdl* and found in the *tmp* directory.
*tmp* is a path to a temporary directory. The *jdl* mapping is passed to
:func:`template` and its output is written to a new file named `submit` in
*tmp*. The job is submitted using `condor_submit` and its log is monitored
until the job begins running. Returns the string ID of the running job.
*kwargs* are currently ignored. Raises :class:`ProcessError` if
`condor_submit` returns a value other than 0.
"""
with open(os.path.join(tmp, "submit"), 'w') as submit:
log.debug("writing submit file to %s", submit.name)
template(submit, SUBMIT.items())
submit = submit.name
log.info("submitting UI job")
condor_submit = Process(["condor_submit", submit])
try:
condor_submit.check()
except ProcessError:
log.debug("failed to submit UI job")
raise
log.info("waiting for job to begin to run")
with open(os.path.join(tmp, "log")) as logfile:
jobid = monitor(tail(logfile))
return jobid
def cleanupjob(iwd, user):
"""Clean up after a job.
Searches for a job with a working directory set to *iwd* owned by *user* and
removes it from the queue.
"""
condor_rm = Process(["condor_rm",
"-con", 'IsUIJob =?= true && '
'Owner == "{user}" && '
'Iwd == "{iwd}"'.format(user=user, iwd=iwd),
])
condor_rm.check()
def hasx509(submit):
"""Update JDL mapping *submit* with 'X509UserProxy if a proxy is found."""
x509 = os.environ.get("X509_USER_PROXY", "/tmp/x509up_u%s" % os.getuid())
if os.path.exists(x509):
# XXX: Can cause condor_submit to fail if a stale/expired proxy is
# present.
submit["X509UserProxy"] = x509
return x509
def hascvmfs(submit):
"""Update JDL mapping *submit*'s 'requirements' expression.
If a CVMFS mount is found, require that TARGET advertize a CVMFS catalog
revision greater than or equal to the local catalog revision. Otherwise,
require a revision greater than 0.
"""
cmscvmfs = "/cvmfs/cms.hep.wisc.edu"
try:
with open(os.devnull, 'w') as null:
attr = Process(["attr", "-q", "-g", "revision", cmscvmfs], stdout=PIPE, stderr=null)
cvmfsrev = attr.stdout.read()
except (ProcessError, OSError):
cvmfsrev = "0"
submit["requirements"] = (submit.setdefault("requirements", "TRUE") +
" && TARGET.UWCMS_CVMFS_Revision >= %s" % cvmfsrev)
def isexec(path):
"""Return True if *path* exists and is an executable file."""
try:
return os.path.isfile(path) and os.access(path, os.X_OK)
except (OSError, IOError):
return False
def getuser(env=os.environ):
"""Return the name of the current user.
If the `$CLUSTERUI_USER` environment variable is defined, return that.
Otherwise, return the name associated with the current UID.
"""
return env.get("CLUSTERUI_USER", getpwuid(os.getuid())[0])
def getexec(path):
"""Search for *path* among the directories in the `$PATH` environment variable.
If *path* is an executable file, return it. Otherwise, search each directory
in $PATH in order, checking for an executable file named *path* in each.
Return None if no match is found.
"""
if isexec(path):
return path
elif os.path.sep in path:
return None
base = os.path.basename(path)
for dir in os.environ.get("PATH", "").split(os.pathsep):
path = os.path.join(dir, base)
if isexec(path):
return path
def getexecp(path):
"""Return the absolute path to an executable matching *path*.
See :func:`getexec`.
"""
return os.path.abspath(getexec(path))
def tail(file):
"""Read lines as they're appended to *file*.
*file* should have a `readline` method that returns a string. If the string
is empty, this function will sleep briefly before calling `readline` again.
Yields lines returned by `readline` indefinitely.
"""
while True:
line = file.readline()
if not line:
sleep(.2)
else:
yield line
def monitor(stream):
"""Monitor a stream of Condor job logs.
*stream* is an iterable that yields string lines. Lines containing only
'...' and a newline are ignored. Other lines are split into four fields by
whitespace and used to create :class:`Record` objects. Runs until a
:attr:`Record.message` indicates that the job has begun executing and
returns the string job ID.
"""
for line in stream:
if line == "...\n":
continue
record = Record(*line.split(None, 4))
log.debug("log update: %s", line.strip())
if "Job executing" in record.message:
return record.id.strip("()")
@contextmanager
def submitdir(cleanup=True, create=True, **kwargs):
"""Create a submit directory, removing it and its contents.
*kwargs* are passed to :func:`mkdtemp` to create a temporary directory if
*create* is True. Sets the current working directory to the temporary
directory and yields control. Then, if *cleanup* and *create* are True,
calls :func:`cleanup` and removes the temporary directory and its contents.
"""
if create:
tmp = mkdtemp(**kwargs)
log.debug("created temporary directory %s", tmp)
os.chdir(tmp)
try:
yield tmp
finally:
if not (cleanup and create):
return
log.debug("cleaning up temporary directory %s", tmp)
rmtree(tmp)
def template(out, context):
"""Format iterable JDL *context* and write it to *out*.
*out* should be a file-like object with a `write` method. *context* should
be an iterable with (*key*, *value*) pairs.
"""
out.write("\n".join("%s = %s" % (k, v) for k, v in context))
out.write("\n")
out.write("queue\n")
class ProcessError(CalledProcessError):
"""Raised when a :class:`Process` fails.
Prepares the arguments for :class:`subprocess.CalledProcessError` using
*process* and stores it in :attr:`process` for later inspection.
"""
def __init__(self, process=None, **kwargs):
super(ProcessError, self).__init__(
process.returncode,
' '.join(process.args),
**kwargs)
self.process = process
class Process(Popen):
"""A process.
Stores *args* in :attr:`args` and formats :attr:`cmd` for later inspection.
"""
def __init__(self, args, stdout=PIPE, stderr=PIPE, **kwargs):
self.args = args
self.cmd = args[0] + " " + " ".join(repr(x) for x in args[1:])
super(Process, self).__init__(args,
stdout=stdout,
stderr=stderr,
**kwargs)
def _execute_child(self, *args, **kwargs):
"""Log a command before executing it."""
log.debug("running `%s`", self.cmd)
return super(Process, self)._execute_child(*args, **kwargs)
def check(self):
"""Wait for the process to return, raising :class:`ProcessError` if it returns a value other than 0."""
ret = self.wait()
if ret != 0:
raise ProcessError(self)
# Default JDL.
SUBMIT = dict(
universe="vanilla",
notification="never",
log="log",
transfer_executable="false",
should_transfer_files="true",
when_to_transfer_output="on_exit",
getenv="true",
requirements=(
'TARGET.Arch == "X86_64" && '
'TARGET.HasAFS_OSG && IsSlowSlot=!=true && '
'TARGET.IsDedicated =?= true && '
'TARGET.UidDomain == "hep.wisc.edu"'
),
)
SUBMIT["+IsUIJob"] = "true"
Record = namedtuple("Record", "entry id date time message")
"""A Condor job log record."""
if __name__ == "__main__":
try:
sys.exit(main())
except KeyboardInterrupt:
sys.exit()