/
__init__.py
687 lines (597 loc) · 30.8 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
"""
Base classes for job runner plugins.
"""
import datetime
import logging
import os
import string
import subprocess
import threading
import time
from six.moves.queue import (
Empty,
Queue
)
import galaxy.jobs
from galaxy import model
from galaxy.jobs.command_factory import build_command
from galaxy.jobs.output_checker import DETECTED_JOB_STATE
from galaxy.jobs.runners.util.env import env_to_statement
from galaxy.jobs.runners.util.job_script import (
job_script,
write_script
)
from galaxy.util import (
DATABASE_MAX_STRING_SIZE,
ExecutionTimer,
in_directory,
ParamsWithSpecs,
shrink_stream_by_size
)
from galaxy.util.bunch import Bunch
from galaxy.util.monitors import Monitors
from .state_handler_factory import build_state_handlers
log = logging.getLogger(__name__)
STOP_SIGNAL = object()
JOB_RUNNER_PARAMETER_UNKNOWN_MESSAGE = "Invalid job runner parameter for this plugin: %s"
JOB_RUNNER_PARAMETER_MAP_PROBLEM_MESSAGE = "Job runner parameter '%s' value '%s' could not be converted to the correct type"
JOB_RUNNER_PARAMETER_VALIDATION_FAILED_MESSAGE = "Job runner parameter %s failed validation"
GALAXY_LIB_ADJUST_TEMPLATE = """GALAXY_LIB="%s"; if [ "$GALAXY_LIB" != "None" ]; then if [ -n "$PYTHONPATH" ]; then PYTHONPATH="$GALAXY_LIB:$PYTHONPATH"; else PYTHONPATH="$GALAXY_LIB"; fi; export PYTHONPATH; fi;"""
GALAXY_VENV_TEMPLATE = """GALAXY_VIRTUAL_ENV="%s"; if [ "$GALAXY_VIRTUAL_ENV" != "None" -a -z "$VIRTUAL_ENV" -a -f "$GALAXY_VIRTUAL_ENV/bin/activate" ]; then . "$GALAXY_VIRTUAL_ENV/bin/activate"; fi;"""
class RunnerParams(ParamsWithSpecs):
def _param_unknown_error(self, name):
raise Exception(JOB_RUNNER_PARAMETER_UNKNOWN_MESSAGE % name)
def _param_map_error(self, name, value):
raise Exception(JOB_RUNNER_PARAMETER_MAP_PROBLEM_MESSAGE % (name, value))
def _param_vaildation_error(self, name, value):
raise Exception(JOB_RUNNER_PARAMETER_VALIDATION_FAILED_MESSAGE % name)
class BaseJobRunner(object):
DEFAULT_SPECS = dict(recheck_missing_job_retries=dict(map=int, valid=lambda x: x >= 0, default=0))
def __init__(self, app, nworkers, **kwargs):
"""Start the job runner
"""
self.app = app
self.sa_session = app.model.context
self.nworkers = nworkers
runner_param_specs = self.DEFAULT_SPECS.copy()
if 'runner_param_specs' in kwargs:
runner_param_specs.update(kwargs.pop('runner_param_specs'))
if kwargs:
log.debug('Loading %s with params: %s', self.runner_name, kwargs)
self.runner_params = RunnerParams(specs=runner_param_specs, params=kwargs)
self.runner_state_handlers = build_state_handlers()
def _init_worker_threads(self):
"""Start ``nworkers`` worker threads.
"""
self.work_queue = Queue()
self.work_threads = []
log.debug('Starting %s %s workers' % (self.nworkers, self.runner_name))
for i in range(self.nworkers):
worker = threading.Thread(name="%s.work_thread-%d" % (self.runner_name, i), target=self.run_next)
worker.setDaemon(True)
self.app.application_stack.register_postfork_function(worker.start)
self.work_threads.append(worker)
def run_next(self):
"""Run the next item in the work queue (a job waiting to run)
"""
while True:
(method, arg) = self.work_queue.get()
if method is STOP_SIGNAL:
return
# id and name are collected first so that the call of method() is the last exception.
try:
if isinstance(arg, AsynchronousJobState):
job_id = arg.job_wrapper.get_id_tag()
else:
# arg should be a JobWrapper/TaskWrapper
job_id = arg.get_id_tag()
except Exception:
job_id = 'unknown'
try:
name = method.__name__
except Exception:
name = 'unknown'
try:
method(arg)
except Exception:
log.exception("(%s) Unhandled exception calling %s" % (job_id, name))
# Causes a runner's `queue_job` method to be called from a worker thread
def put(self, job_wrapper):
"""Add a job to the queue (by job identifier), indicate that the job is ready to run.
"""
put_timer = ExecutionTimer()
job = job_wrapper.get_job()
# Change to queued state before handing to worker thread so the runner won't pick it up again
job_wrapper.change_state(model.Job.states.QUEUED, flush=False, job=job)
# Persist the destination so that the job will be included in counts if using concurrency limits
job_wrapper.set_job_destination(job_wrapper.job_destination, None, flush=False, job=job)
self.sa_session.flush()
self.mark_as_queued(job_wrapper)
log.debug("Job [%s] queued %s" % (job_wrapper.job_id, put_timer))
def mark_as_queued(self, job_wrapper):
self.work_queue.put((self.queue_job, job_wrapper))
def shutdown(self):
"""Attempts to gracefully shut down the worker threads
"""
log.info("%s: Sending stop signal to %s worker threads" % (self.runner_name, len(self.work_threads)))
for i in range(len(self.work_threads)):
self.work_queue.put((STOP_SIGNAL, None))
join_timeout = self.app.config.monitor_thread_join_timeout
if join_timeout > 0:
exception = None
for thread in self.work_threads:
try:
thread.join(join_timeout)
except Exception as e:
exception = e
log.exception("Faild to shutdown worker thread")
if exception:
raise exception
# Most runners should override the legacy URL handler methods and destination param method
def url_to_destination(self, url):
"""
Convert a legacy URL to a JobDestination.
Job runner URLs are deprecated, JobDestinations should be used instead.
This base class method converts from a URL to a very basic
JobDestination without destination params.
"""
return galaxy.jobs.JobDestination(runner=url.split(':')[0])
def parse_destination_params(self, params):
"""Parse the JobDestination ``params`` dict and return the runner's native representation of those params.
"""
raise NotImplementedError()
def prepare_job(self, job_wrapper, include_metadata=False, include_work_dir_outputs=True,
modify_command_for_container=True):
"""Some sanity checks that all runners' queue_job() methods are likely to want to do
"""
job_id = job_wrapper.get_id_tag()
job_state = job_wrapper.get_state()
job_wrapper.is_ready = False
job_wrapper.runner_command_line = None
# Make sure the job hasn't been deleted
if job_state == model.Job.states.DELETED:
log.debug("(%s) Job deleted by user before it entered the %s queue" % (job_id, self.runner_name))
if self.app.config.cleanup_job in ("always", "onsuccess"):
job_wrapper.cleanup()
return False
elif job_state != model.Job.states.QUEUED:
log.info("(%s) Job is in state %s, skipping execution" % (job_id, job_state))
# cleanup may not be safe in all states
return False
# Prepare the job
try:
job_wrapper.prepare()
job_wrapper.runner_command_line = self.build_command_line(
job_wrapper,
include_metadata=include_metadata,
include_work_dir_outputs=include_work_dir_outputs,
modify_command_for_container=modify_command_for_container
)
except Exception as e:
log.exception("(%s) Failure preparing job" % job_id)
job_wrapper.fail(e.message if hasattr(e, 'message') else "Job preparation failed", exception=True)
return False
if not job_wrapper.runner_command_line:
job_wrapper.finish('', '')
return False
return True
# Runners must override the job handling methods
def queue_job(self, job_wrapper):
raise NotImplementedError()
def stop_job(self, job):
raise NotImplementedError()
def recover(self, job, job_wrapper):
raise NotImplementedError()
def build_command_line(self, job_wrapper, include_metadata=False, include_work_dir_outputs=True,
modify_command_for_container=True):
container = self._find_container(job_wrapper)
if not container and job_wrapper.requires_containerization:
raise Exception("Failed to find a container when required, contact Galaxy admin.")
return build_command(
self,
job_wrapper,
include_metadata=include_metadata,
include_work_dir_outputs=include_work_dir_outputs,
modify_command_for_container=modify_command_for_container,
container=container
)
def get_work_dir_outputs(self, job_wrapper, job_working_directory=None, tool_working_directory=None):
"""
Returns list of pairs (source_file, destination) describing path
to work_dir output file and ultimate destination.
"""
if tool_working_directory is not None and job_working_directory is not None:
raise Exception("get_work_dir_outputs called with both a job and tool working directory, only one may be specified")
if tool_working_directory is None:
if not job_working_directory:
job_working_directory = os.path.abspath(job_wrapper.working_directory)
tool_working_directory = os.path.join(job_working_directory, "working")
# Set up dict of dataset id --> output path; output path can be real or
# false depending on outputs_to_working_directory
output_paths = {}
for dataset_path in job_wrapper.get_output_fnames():
path = dataset_path.real_path
if self.app.config.outputs_to_working_directory:
path = dataset_path.false_path
output_paths[dataset_path.dataset_id] = path
output_pairs = []
# Walk job's output associations to find and use from_work_dir attributes.
job = job_wrapper.get_job()
job_tool = job_wrapper.tool
for (joda, dataset) in self._walk_dataset_outputs(job):
if joda and job_tool:
hda_tool_output = job_tool.find_output_def(joda.name)
if hda_tool_output and hda_tool_output.from_work_dir:
# Copy from working dir to HDA.
# TODO: move instead of copy to save time?
source_file = os.path.join(tool_working_directory, hda_tool_output.from_work_dir)
destination = job_wrapper.get_output_destination(output_paths[dataset.dataset_id])
if in_directory(source_file, tool_working_directory):
output_pairs.append((source_file, destination))
else:
# Security violation.
log.exception("from_work_dir specified a location not in the working directory: %s, %s", source_file, job_wrapper.working_directory)
return output_pairs
def _walk_dataset_outputs(self, job):
for dataset_assoc in job.output_datasets + job.output_library_datasets:
for dataset in dataset_assoc.dataset.dataset.history_associations + dataset_assoc.dataset.dataset.library_associations:
if isinstance(dataset, self.app.model.HistoryDatasetAssociation):
joda = self.sa_session.query(self.app.model.JobToOutputDatasetAssociation).filter_by(job=job, dataset=dataset).first()
yield (joda, dataset)
# TODO: why is this not just something easy like:
# for dataset_assoc in job.output_datasets + job.output_library_datasets:
# yield (dataset_assoc, dataset_assoc.dataset)
# I don't understand the reworking it backwards. -John
def _handle_metadata_externally(self, job_wrapper, resolve_requirements=False):
"""
Set metadata externally. Used by the Pulsar job runner where this
shouldn't be attached to command line to execute.
"""
# run the metadata setting script here
# this is terminate-able when output dataset/job is deleted
# so that long running set_meta()s can be canceled without having to reboot the server
if job_wrapper.get_state() not in [model.Job.states.ERROR, model.Job.states.DELETED] and job_wrapper.output_paths:
lib_adjust = GALAXY_LIB_ADJUST_TEMPLATE % job_wrapper.galaxy_lib_dir
venv = GALAXY_VENV_TEMPLATE % job_wrapper.galaxy_virtual_env
external_metadata_script = job_wrapper.setup_external_metadata(output_fnames=job_wrapper.get_output_fnames(),
set_extension=True,
tmp_dir=job_wrapper.working_directory,
# We don't want to overwrite metadata that was copied over in init_meta(), as per established behavior
kwds={'overwrite' : False})
external_metadata_script = "%s %s %s" % (lib_adjust, venv, external_metadata_script)
if resolve_requirements:
dependency_shell_commands = self.app.datatypes_registry.set_external_metadata_tool.build_dependency_shell_commands(job_directory=job_wrapper.working_directory)
if dependency_shell_commands:
if isinstance(dependency_shell_commands, list):
dependency_shell_commands = "&&".join(dependency_shell_commands)
external_metadata_script = "%s&&%s" % (dependency_shell_commands, external_metadata_script)
log.debug('executing external set_meta script for job %d: %s' % (job_wrapper.job_id, external_metadata_script))
external_metadata_proc = subprocess.Popen(args=external_metadata_script,
shell=True,
cwd=job_wrapper.working_directory,
env=os.environ,
preexec_fn=os.setpgrp)
job_wrapper.external_output_metadata.set_job_runner_external_pid(external_metadata_proc.pid, self.sa_session)
external_metadata_proc.wait()
log.debug('execution of external set_meta for job %d finished' % job_wrapper.job_id)
def get_job_file(self, job_wrapper, **kwds):
job_metrics = job_wrapper.app.job_metrics
job_instrumenter = job_metrics.job_instrumenters[job_wrapper.job_destination.id]
env_setup_commands = kwds.get('env_setup_commands', [])
env_setup_commands.append(job_wrapper.get_env_setup_clause() or '')
destination = job_wrapper.job_destination or {}
envs = destination.get("env", [])
envs.extend(job_wrapper.environment_variables)
for env in envs:
env_setup_commands.append(env_to_statement(env))
command_line = job_wrapper.runner_command_line
options = dict(
job_instrumenter=job_instrumenter,
galaxy_lib=job_wrapper.galaxy_lib_dir,
galaxy_virtual_env=job_wrapper.galaxy_virtual_env,
env_setup_commands=env_setup_commands,
working_directory=os.path.abspath(job_wrapper.working_directory),
command=command_line,
shell=job_wrapper.shell,
preserve_python_environment=job_wrapper.tool.requires_galaxy_python_environment,
)
# Additional logging to enable if debugging from_work_dir handling, metadata
# commands, etc... (or just peak in the job script.)
job_id = job_wrapper.job_id
log.debug('(%s) command is: %s' % (job_id, command_line))
options.update(**kwds)
return job_script(**options)
def write_executable_script(self, path, contents, mode=0o755):
write_script(path, contents, self.app.config, mode=mode)
def _find_container(
self,
job_wrapper,
compute_working_directory=None,
compute_tool_directory=None,
compute_job_directory=None,
):
job_directory_type = "galaxy" if compute_working_directory is None else "pulsar"
if not compute_working_directory:
compute_working_directory = job_wrapper.tool_working_directory
if not compute_job_directory:
compute_job_directory = job_wrapper.working_directory
if not compute_tool_directory:
compute_tool_directory = job_wrapper.tool.tool_dir
tool = job_wrapper.tool
from galaxy.tools.deps import containers
tool_info = containers.ToolInfo(tool.containers, tool.requirements, tool.requires_galaxy_python_environment)
job_info = containers.JobInfo(
compute_working_directory,
compute_tool_directory,
compute_job_directory,
job_directory_type,
)
destination_info = job_wrapper.job_destination.params
return self.app.container_finder.find_container(
tool_info,
destination_info,
job_info
)
def _handle_runner_state(self, runner_state, job_state):
try:
for handler in self.runner_state_handlers.get(runner_state, []):
handler(self.app, self, job_state)
if job_state.runner_state_handled:
break
except Exception:
log.exception('Caught exception in runner state handler')
def fail_job(self, job_state, exception=False):
if getattr(job_state, 'stop_job', True):
self.stop_job(self.sa_session.query(self.app.model.Job).get(job_state.job_wrapper.job_id))
self._handle_runner_state('failure', job_state)
# Not convinced this is the best way to indicate this state, but
# something necessary
if not job_state.runner_state_handled:
job_state.job_wrapper.fail(getattr(job_state, 'fail_message', 'Job failed'), exception=exception)
if job_state.job_wrapper.cleanup_job == "always":
job_state.cleanup()
def mark_as_resubmitted(self, job_state, info=None):
job_state.job_wrapper.mark_as_resubmitted(info=info)
if not self.app.config.track_jobs_in_database:
job_state.job_wrapper.change_state(model.Job.states.QUEUED)
self.app.job_manager.job_handler.dispatcher.put(job_state.job_wrapper)
def _finish_or_resubmit_job(self, job_state, stdout, stderr, exit_code):
job = job_state.job_wrapper.get_job()
check_output_detected_state = job_state.job_wrapper.check_tool_output(stdout, stderr, exit_code, job)
# Flush with streams...
self.sa_session.add(job)
self.sa_session.flush()
if check_output_detected_state != DETECTED_JOB_STATE.OK:
job_runner_state = JobState.runner_states.TOOL_DETECT_ERROR
if check_output_detected_state == DETECTED_JOB_STATE.OUT_OF_MEMORY_ERROR:
job_runner_state = JobState.runner_states.MEMORY_LIMIT_REACHED
job_state.runner_state = job_runner_state
self._handle_runner_state('failure', job_state)
# Was resubmitted or something - I think we are done with it.
if job_state.runner_state_handled:
return
job_state.job_wrapper.finish(stdout, stderr, exit_code, check_output_detected_state=check_output_detected_state)
class JobState(object):
"""
Encapsulate state of jobs.
"""
runner_states = Bunch(
WALLTIME_REACHED='walltime_reached',
MEMORY_LIMIT_REACHED='memory_limit_reached',
JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER='Job output not returned from cluster',
UNKNOWN_ERROR='unknown_error',
GLOBAL_WALLTIME_REACHED='global_walltime_reached',
OUTPUT_SIZE_LIMIT='output_size_limit',
TOOL_DETECT_ERROR='tool_detected', # job runner interaction worked fine but the tool indicated error
)
def __init__(self, job_wrapper, job_destination):
self.runner_state_handled = False
self.job_wrapper = job_wrapper
self.job_destination = job_destination
self.cleanup_file_attributes = ['job_file', 'output_file', 'error_file', 'exit_code_file']
def set_defaults(self, files_dir):
if self.job_wrapper is not None:
id_tag = self.job_wrapper.get_id_tag()
if files_dir is not None:
self.job_file = JobState.default_job_file(files_dir, id_tag)
self.output_file = os.path.join(files_dir, 'galaxy_%s.o' % id_tag)
self.error_file = os.path.join(files_dir, 'galaxy_%s.e' % id_tag)
self.exit_code_file = os.path.join(files_dir, 'galaxy_%s.ec' % id_tag)
job_name = 'g%s' % id_tag
if self.job_wrapper.tool.old_id:
job_name += '_%s' % self.job_wrapper.tool.old_id
if self.job_wrapper.user:
job_name += '_%s' % self.job_wrapper.user
self.job_name = ''.join(x if x in (string.ascii_letters + string.digits + '_') else '_' for x in job_name)
@staticmethod
def default_job_file(files_dir, id_tag):
return os.path.join(files_dir, 'galaxy_%s.sh' % id_tag)
@staticmethod
def default_exit_code_file(files_dir, id_tag):
return os.path.join(files_dir, 'galaxy_%s.ec' % id_tag)
def cleanup(self):
for file in [getattr(self, a) for a in self.cleanup_file_attributes if hasattr(self, a)]:
try:
os.unlink(file)
except Exception as e:
# TODO: Move this prefix stuff to a method so we don't have dispatch on attributes we may or may
# not have.
if not hasattr(self, "job_id"):
prefix = "(%s)" % self.job_wrapper.get_id_tag()
else:
prefix = "(%s/%s)" % (self.job_wrapper.get_id_tag(), self.job_id)
log.debug("%s Unable to cleanup %s: %s" % (prefix, file, str(e)))
class AsynchronousJobState(JobState):
"""
Encapsulate the state of an asynchronous job, this should be subclassed as
needed for various job runners to capture additional information needed
to communicate with distributed resource manager.
"""
def __init__(self, files_dir=None, job_wrapper=None, job_id=None, job_file=None, output_file=None, error_file=None, exit_code_file=None, job_name=None, job_destination=None):
super(AsynchronousJobState, self).__init__(job_wrapper, job_destination)
self.old_state = None
self._running = False
self.check_count = 0
self.start_time = None
# job_id is the DRM's job id, not the Galaxy job id
self.job_id = job_id
self.job_file = job_file
self.output_file = output_file
self.error_file = error_file
self.exit_code_file = exit_code_file
self.job_name = job_name
self.set_defaults(files_dir)
@property
def running(self):
return self._running
@running.setter
def running(self, is_running):
self._running = is_running
# This will be invalid for job recovery
if self.start_time is None:
self.start_time = datetime.datetime.now()
def check_limits(self, runtime=None):
limit_state = None
if self.job_wrapper.has_limits():
self.check_count += 1
if self.running and (self.check_count % 20 == 0):
if runtime is None:
runtime = datetime.datetime.now() - (self.start_time or datetime.datetime.now())
self.check_count = 0
limit_state = self.job_wrapper.check_limits(runtime=runtime)
if limit_state is not None:
# Set up the job for failure, but the runner will do the actual work
self.runner_state, self.fail_message = limit_state
self.stop_job = True
return True
return False
def register_cleanup_file_attribute(self, attribute):
if attribute not in self.cleanup_file_attributes:
self.cleanup_file_attributes.append(attribute)
class AsynchronousJobRunner(Monitors, BaseJobRunner):
"""Parent class for any job runner that runs jobs asynchronously (e.g. via
a distributed resource manager). Provides general methods for having a
thread to monitor the state of asynchronous jobs and submitting those jobs
to the correct methods (queue, finish, cleanup) at appropriate times..
"""
def __init__(self, app, nworkers, **kwargs):
super(AsynchronousJobRunner, self).__init__(app, nworkers, **kwargs)
# 'watched' and 'queue' are both used to keep track of jobs to watch.
# 'queue' is used to add new watched jobs, and can be called from
# any thread (usually by the 'queue_job' method). 'watched' must only
# be modified by the monitor thread, which will move items from 'queue'
# to 'watched' and then manage the watched jobs.
self.watched = []
self.monitor_queue = Queue()
def _init_monitor_thread(self):
name = "%s.monitor_thread" % self.runner_name
super(AsynchronousJobRunner, self)._init_monitor_thread(name=name, target=self.monitor, start=True, config=self.app.config)
def handle_stop(self):
# DRMAA and SGE runners should override this and disconnect.
pass
def monitor(self):
"""
Watches jobs currently in the monitor queue and deals with state
changes (queued to running) and job completion.
"""
while True:
# Take any new watched jobs and put them on the monitor list
try:
while True:
async_job_state = self.monitor_queue.get_nowait()
if async_job_state is STOP_SIGNAL:
# TODO: This is where any cleanup would occur
self.handle_stop()
return
self.watched.append(async_job_state)
except Empty:
pass
# Iterate over the list of watched jobs and check state
try:
self.check_watched_items()
except Exception:
log.exception('Unhandled exception checking active jobs')
# Sleep a bit before the next state check
time.sleep(1)
def monitor_job(self, job_state):
self.monitor_queue.put(job_state)
def shutdown(self):
"""Attempts to gracefully shut down the monitor thread"""
log.info("%s: Sending stop signal to monitor thread" % self.runner_name)
self.monitor_queue.put(STOP_SIGNAL)
# Call the parent's shutdown method to stop workers
self.shutdown_monitor()
super(AsynchronousJobRunner, self).shutdown()
def check_watched_items(self):
"""
This method is responsible for iterating over self.watched and handling
state changes and updating self.watched with a new list of watched job
states. Subclasses can opt to override this directly (as older job runners will
initially) or just override check_watched_item and allow the list processing to
reuse the logic here.
"""
new_watched = []
for async_job_state in self.watched:
new_async_job_state = self.check_watched_item(async_job_state)
if new_async_job_state:
new_watched.append(new_async_job_state)
self.watched = new_watched
# Subclasses should implement this unless they override check_watched_items all together.
def check_watched_item(self, job_state):
raise NotImplementedError()
def finish_job(self, job_state):
"""
Get the output/error for a finished job, pass to `job_wrapper.finish`
and cleanup all the job's temporary files.
"""
galaxy_id_tag = job_state.job_wrapper.get_id_tag()
external_job_id = job_state.job_id
# To ensure that files below are readable, ownership must be reclaimed first
job_state.job_wrapper.reclaim_ownership()
# wait for the files to appear
which_try = 0
collect_output_success = True
while which_try < self.app.config.retry_job_output_collection + 1:
try:
stdout = shrink_stream_by_size(open(job_state.output_file, "r"), DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True)
stderr = shrink_stream_by_size(open(job_state.error_file, "r"), DATABASE_MAX_STRING_SIZE, join_by="\n..\n", left_larger=True, beginning_on_size_error=True)
break
except Exception as e:
if which_try == self.app.config.retry_job_output_collection:
stdout = ''
stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
log.error('(%s/%s) %s: %s' % (galaxy_id_tag, external_job_id, stderr, str(e)))
collect_output_success = False
else:
time.sleep(1)
which_try += 1
if not collect_output_success:
job_state.fail_message = stderr
job_state.runner_state = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
self.mark_as_failed(job_state)
return
try:
# This should be an 8-bit exit code, but read ahead anyway:
exit_code_str = open(job_state.exit_code_file, "r").read(32)
except Exception:
# By default, the exit code is 0, which typically indicates success.
exit_code_str = "0"
try:
# Decode the exit code. If it's bogus, then just use 0.
exit_code = int(exit_code_str)
except ValueError:
log.warning("(%s/%s) Exit code '%s' invalid. Using 0." % (galaxy_id_tag, external_job_id, exit_code_str))
exit_code = 0
# clean up the job files
cleanup_job = job_state.job_wrapper.cleanup_job
if cleanup_job == "always" or (not stderr and cleanup_job == "onsuccess"):
job_state.cleanup()
try:
self._finish_or_resubmit_job(job_state, stdout, stderr, exit_code)
except Exception:
log.exception("(%s/%s) Job wrapper finish method failed" % (galaxy_id_tag, external_job_id))
job_state.job_wrapper.fail("Unable to finish job", exception=True)
def mark_as_finished(self, job_state):
self.work_queue.put((self.finish_job, job_state))
def mark_as_failed(self, job_state):
self.work_queue.put((self.fail_job, job_state))