-
Notifications
You must be signed in to change notification settings - Fork 965
/
condor.py
321 lines (282 loc) · 13.5 KB
/
condor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""Job control via the Condor DRM via CLI.
This plugin has been used in production and isn't unstable but shouldn't be taken as an
example of how to write Galaxy job runners that interface with a DRM using command-line
invocations. When writing new job runners that leverage command-line calls for submitting
and checking the status of jobs please check out the CLI runner (cli.py in this directory)
start by writing a new job plugin in for that (see examples in
/galaxy/jobs/runners/util/cli/job). That approach will result in less boilerplate and allow
greater reuse of the DRM specific hooks you'll need to write. Ideally this plugin would
have been written to target that framework, but we don't have the bandwidth to rewrite
it at this time.
"""
import logging
import os
import subprocess
from galaxy import model
from galaxy.jobs.runners import (
AsynchronousJobRunner,
AsynchronousJobState,
)
from galaxy.jobs.runners.util.condor import (
build_submit_description,
condor_stop,
condor_submit,
submission_params,
summarize_condor_log,
)
from galaxy.util import asbool
log = logging.getLogger(__name__)
__all__ = ("CondorJobRunner",)
class CondorJobState(AsynchronousJobState):
def __init__(self, **kwargs):
"""
Encapsulates state related to a job that is being run via the DRM and
that we need to monitor.
"""
super().__init__(**kwargs)
self.failed = False
self.user_log = None
self.user_log_size = 0
class CondorJobRunner(AsynchronousJobRunner):
"""
Job runner backed by a finite pool of worker threads. FIFO scheduling
"""
runner_name = "CondorRunner"
def queue_job(self, job_wrapper):
"""Create job script and submit it to the DRM"""
# prepare the job
include_metadata = asbool(job_wrapper.job_destination.params.get("embed_metadata_in_job", True))
if not self.prepare_job(job_wrapper, include_metadata=include_metadata):
return
# get configured job destination
job_destination = job_wrapper.job_destination
# wrapper.get_id_tag() instead of job_id for compatibility with TaskWrappers.
galaxy_id_tag = job_wrapper.get_id_tag()
# get destination params
query_params = submission_params(prefix="", **job_destination.params)
container = None
universe = query_params.get("universe", None)
if universe and universe.strip().lower() == "docker":
container = self._find_container(job_wrapper)
if container:
# HTCondor needs the image as 'docker_image'
query_params.update({"docker_image": container.container_id})
if galaxy_slots := query_params.get("request_cpus", None):
galaxy_slots_statement = f'GALAXY_SLOTS="{galaxy_slots}"; export GALAXY_SLOTS; GALAXY_SLOTS_CONFIGURED="1"; export GALAXY_SLOTS_CONFIGURED;'
else:
galaxy_slots_statement = 'GALAXY_SLOTS="1"; export GALAXY_SLOTS;'
# define job attributes
cjs = CondorJobState(files_dir=job_wrapper.working_directory, job_wrapper=job_wrapper)
cjs.user_log = os.path.join(job_wrapper.working_directory, f"galaxy_{galaxy_id_tag}.condor.log")
cjs.register_cleanup_file_attribute("user_log")
submit_file = os.path.join(job_wrapper.working_directory, f"galaxy_{galaxy_id_tag}.condor.desc")
executable = cjs.job_file
build_submit_params = dict(
executable=executable,
output=cjs.output_file,
error=cjs.error_file,
user_log=cjs.user_log,
query_params=query_params,
)
submit_file_contents = build_submit_description(**build_submit_params)
script = self.get_job_file(
job_wrapper,
exit_code_path=cjs.exit_code_file,
slots_statement=galaxy_slots_statement,
shell=job_wrapper.shell,
)
try:
self.write_executable_script(executable, script, job_io=job_wrapper.job_io)
except Exception:
job_wrapper.fail("failure preparing job script", exception=True)
log.exception(f"({galaxy_id_tag}) failure preparing job script")
return
cleanup_job = job_wrapper.cleanup_job
try:
open(submit_file, "w").write(submit_file_contents)
except Exception:
if cleanup_job == "always":
cjs.cleanup()
# job_wrapper.fail() calls job_wrapper.cleanup()
job_wrapper.fail("failure preparing submit file", exception=True)
log.exception(f"({galaxy_id_tag}) failure preparing submit file")
return
# job was deleted while we were preparing it
if job_wrapper.get_state() in (model.Job.states.DELETED, model.Job.states.STOPPED):
log.debug("(%s) Job deleted/stopped by user before it entered the queue", galaxy_id_tag)
if cleanup_job in ("always", "onsuccess"):
os.unlink(submit_file)
cjs.cleanup()
job_wrapper.cleanup()
return
log.debug(f"({galaxy_id_tag}) submitting file {executable}")
external_job_id, message = condor_submit(submit_file)
if external_job_id is None:
log.debug(f"condor_submit failed for job {job_wrapper.get_id_tag()}: {message}")
if self.app.config.cleanup_job == "always":
os.unlink(submit_file)
cjs.cleanup()
job_wrapper.fail("condor_submit failed", exception=True)
return
os.unlink(submit_file)
log.info(f"({galaxy_id_tag}) queued as {external_job_id}")
# store runner information for tracking if Galaxy restarts
job_wrapper.set_external_id(external_job_id)
# Store DRM related state information for job
cjs.job_id = external_job_id
cjs.job_destination = job_destination
# Add to our 'queue' of jobs to monitor
self.monitor_queue.put(cjs)
def check_watched_items(self):
"""
Called by the monitor thread to look at each watched job and deal
with state changes.
"""
new_watched = []
for cjs in self.watched:
job_id = cjs.job_id
galaxy_id_tag = cjs.job_wrapper.get_id_tag()
try:
if (
cjs.job_wrapper.tool.tool_type != "interactive"
and os.stat(cjs.user_log).st_size == cjs.user_log_size
):
new_watched.append(cjs)
continue
s1, s4, s7, s5, s9, log_size = summarize_condor_log(cjs.user_log, job_id)
job_running = s1 and not (s4 or s7)
job_complete = s5
job_failed = s9
cjs.user_log_size = log_size
except Exception:
# so we don't kill the monitor thread
log.exception(f"({galaxy_id_tag}/{job_id}) Unable to check job status")
log.warning(f"({galaxy_id_tag}/{job_id}) job will now be errored")
cjs.fail_message = "Cluster could not complete job"
self.work_queue.put((self.fail_job, cjs))
continue
if job_running:
# If running, check for entry points...
cjs.job_wrapper.check_for_entry_points()
if job_running and not cjs.running:
log.debug(f"({galaxy_id_tag}/{job_id}) job is now running")
cjs.job_wrapper.change_state(model.Job.states.RUNNING)
if not job_running and cjs.running:
log.debug(f"({galaxy_id_tag}/{job_id}) job has stopped running")
# Will switching from RUNNING to QUEUED confuse Galaxy?
# cjs.job_wrapper.change_state( model.Job.states.QUEUED )
job_state = cjs.job_wrapper.get_state()
if job_complete or job_state == model.Job.states.STOPPED:
if job_state != model.Job.states.DELETED:
external_metadata = not asbool(
cjs.job_wrapper.job_destination.params.get("embed_metadata_in_job", True)
)
if external_metadata:
self._handle_metadata_externally(cjs.job_wrapper, resolve_requirements=True)
log.debug(f"({galaxy_id_tag}/{job_id}) job has completed")
self.work_queue.put((self.finish_job, cjs))
continue
if job_failed:
log.debug(f"({galaxy_id_tag}/{job_id}) job failed")
cjs.failed = True
self.work_queue.put((self.fail_job, cjs))
continue
cjs.runnning = job_running
new_watched.append(cjs)
# Replace the watch list with the updated version
self.watched = new_watched
def stop_job(self, job_wrapper):
"""Attempts to delete a job from the DRM queue"""
job = job_wrapper.get_job()
external_id = job.job_runner_external_id
galaxy_id_tag = job_wrapper.get_id_tag()
if job.container:
try:
log.info(f"stop_job(): {job.id}: trying to stop container .... ({external_id})")
# self.watched = [cjs for cjs in self.watched if cjs.job_id != external_id]
new_watch_list = []
cjs = None
for tcjs in self.watched:
if tcjs.job_id != external_id:
new_watch_list.append(tcjs)
else:
cjs = tcjs
break
self.watched = new_watch_list
self._stop_container(job_wrapper)
# self.watched.append(cjs)
if cjs.job_wrapper.get_state() != model.Job.states.DELETED:
external_metadata = not asbool(
cjs.job_wrapper.job_destination.params.get("embed_metadata_in_job", True)
)
if external_metadata:
self._handle_metadata_externally(cjs.job_wrapper, resolve_requirements=True)
log.debug(f"({galaxy_id_tag}/{external_id}) job has completed")
self.work_queue.put((self.finish_job, cjs))
except Exception as e:
log.warning(f"stop_job(): {job.id}: trying to stop container failed. ({e})")
try:
self._kill_container(job_wrapper)
except Exception as e:
log.warning(f"stop_job(): {job.id}: trying to kill container failed. ({e})")
failure_message = condor_stop(external_id)
if failure_message:
log.debug(f"({external_id}). Failed to stop condor {failure_message}")
else:
failure_message = condor_stop(external_id)
if failure_message:
log.debug(f"({external_id}). Failed to stop condor {failure_message}")
def recover(self, job, job_wrapper):
"""Recovers jobs stuck in the queued/running state when Galaxy started"""
# TODO Check if we need any changes here
job_id = job.get_job_runner_external_id()
galaxy_id_tag = job_wrapper.get_id_tag()
if job_id is None:
self.put(job_wrapper)
return
cjs = CondorJobState(job_wrapper=job_wrapper, files_dir=job_wrapper.working_directory)
cjs.job_id = str(job_id)
cjs.command_line = job.get_command_line()
cjs.job_wrapper = job_wrapper
cjs.job_destination = job_wrapper.job_destination
cjs.user_log = os.path.join(job_wrapper.working_directory, f"galaxy_{galaxy_id_tag}.condor.log")
cjs.register_cleanup_file_attribute("user_log")
if job.state in (model.Job.states.RUNNING, model.Job.states.STOPPED):
log.debug(
f"({job.id}/{job.get_job_runner_external_id()}) is still in {job.state} state, adding to the DRM queue"
)
cjs.running = True
self.monitor_queue.put(cjs)
elif job.state == model.Job.states.QUEUED:
log.debug(f"({job.id}/{job.job_runner_external_id}) is still in DRM queued state, adding to the DRM queue")
cjs.running = False
self.monitor_queue.put(cjs)
def _stop_container(self, job_wrapper):
return self._run_container_command(job_wrapper, "stop")
def _kill_container(self, job_wrapper):
return self._run_container_command(job_wrapper, "kill")
def _run_container_command(self, job_wrapper, command):
job = job_wrapper.get_job()
external_id = job.job_runner_external_id
if job:
cont = job.container
if cont:
if cont.container_type == "docker":
return self._run_command(cont.container_info["commands"][command], external_id)[0]
def _run_command(self, command, external_job_id):
command = f"condor_ssh_to_job {external_job_id} {command}"
p = subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True, preexec_fn=os.setpgrp
)
stdout, stderr = p.communicate()
exit_code = p.returncode
ret = None
if exit_code == 0:
ret = stdout.strip()
else:
log.debug(stderr)
# exit_code = subprocess.call(command,
# shell=True,
# preexec_fn=os.setpgrp)
log.debug("_run_command(%s) exit code (%s) and failure: %s", command, exit_code, stderr)
return (exit_code, ret)