/
__init__.py
2112 lines (1828 loc) · 95.9 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
Support for running a tool in Galaxy via an internal job management system
"""
import copy
import datetime
import logging
import os
import pwd
import shlex
import shutil
import string
import subprocess
import sys
import time
import traceback
from abc import ABCMeta, abstractmethod
from json import loads
from tempfile import NamedTemporaryFile
from xml.etree import ElementTree
import six
import galaxy
from galaxy import model, util
from galaxy.datatypes import metadata, sniff
from galaxy.exceptions import ObjectInvalid, ObjectNotFound
from galaxy.jobs.actions.post import ActionBox
from galaxy.jobs.mapper import JobRunnerMapper
from galaxy.jobs.runners import BaseJobRunner, JobState
from galaxy.util import safe_makedirs, unicodify
from galaxy.util.bunch import Bunch
from galaxy.util.expressions import ExpressionContext
from galaxy.util.handlers import ConfiguresHandlers
from galaxy.util.xml_macros import load
from .datasets import (DatasetPath, NullDatasetPathRewriter,
OutputsToWorkingDirectoryPathRewriter, TaskPathRewriter)
from .output_checker import check_output
log = logging.getLogger(__name__)
# Legacy definition - this is read by certain misbehaving tool wrappers
# that import Galaxy internals - but it shouldn't be used in Galaxy's code
# itself.
TOOL_PROVIDED_JOB_METADATA_FILE = 'galaxy.json'
# Override with config.default_job_shell.
DEFAULT_JOB_SHELL = '/bin/bash'
DEFAULT_CLEANUP_JOB = "always"
class JobDestination(Bunch):
"""
Provides details about where a job runs
"""
def __init__(self, **kwds):
self['id'] = None
self['url'] = None
self['tags'] = None
self['runner'] = None
self['legacy'] = False
self['converted'] = False
self['shell'] = None
self['env'] = []
self['resubmit'] = []
# dict is appropriate (rather than a bunch) since keys may not be valid as attributes
self['params'] = dict()
# Use the values persisted in an existing job
if 'from_job' in kwds and kwds['from_job'].destination_id is not None:
self['id'] = kwds['from_job'].destination_id
self['params'] = kwds['from_job'].destination_params
super(JobDestination, self).__init__(**kwds)
# Store tags as a list
if self.tags is not None:
self['tags'] = [x.strip() for x in self.tags.split(',')]
class JobToolConfiguration(Bunch):
"""
Provides details on what handler and destination a tool should use
A JobToolConfiguration will have the required attribute 'id' and optional
attributes 'handler', 'destination', and 'params'
"""
def __init__(self, **kwds):
self['handler'] = None
self['destination'] = None
self['params'] = dict()
super(JobToolConfiguration, self).__init__(**kwds)
def get_resource_group(self):
return self.get("resources", None)
def config_exception(e, file):
abs_path = os.path.abspath(file)
message = 'Problem parsing the XML in file %s, ' % abs_path
message += 'please correct the indicated portion of the file and restart Galaxy. '
message += unicodify(e)
log.exception(message)
return Exception(message)
class JobConfiguration(object, ConfiguresHandlers):
"""A parser and interface to advanced job management features.
These features are configured in the job configuration, by default, ``job_conf.xml``
"""
DEFAULT_NWORKERS = 4
JOB_RESOURCE_CONDITIONAL_XML = """<conditional name="__job_resource">
<param name="__job_resource__select" type="select" label="Job Resource Parameters">
<option value="no">Use default job resource parameters</option>
<option value="yes">Specify job resource parameters</option>
</param>
<when value="no"/>
<when value="yes"/>
</conditional>"""
def __init__(self, app):
"""Parse the job configuration XML.
"""
self.app = app
self.runner_plugins = []
self.dynamic_params = None
self.handlers = {}
self.handler_runner_plugins = {}
self.default_handler_id = None
self.destinations = {}
self.destination_tags = {}
self.default_destination_id = None
self.tools = {}
self.resource_groups = {}
self.default_resource_group = None
self.resource_parameters = {}
self.limits = Bunch()
default_resubmits = []
default_resubmit_condition = self.app.config.default_job_resubmission_condition
if default_resubmit_condition:
default_resubmits.append(dict(
destination=None,
condition=default_resubmit_condition,
handler=None,
delay=None,
))
self.default_resubmits = default_resubmits
self.__parse_resource_parameters()
# Initialize the config
job_config_file = self.app.config.job_config_file
try:
tree, _ = load(job_config_file)
self.__parse_job_conf_xml(tree)
except IOError:
log.warning('Job configuration "%s" does not exist, using legacy'
' job configuration from Galaxy config file "%s" instead'
% (self.app.config.job_config_file, self.app.config.config_file))
self.__parse_job_conf_legacy()
except Exception as e:
raise config_exception(e, job_config_file)
def __parse_job_conf_xml(self, tree):
"""Loads the new-style job configuration from options in the job config file (by default, job_conf.xml).
:param tree: Object representing the root ``<job_conf>`` object in the job config file.
:type tree: ``xml.etree.ElementTree.Element``
"""
root = tree.getroot()
log.debug('Loading job configuration from %s' % self.app.config.job_config_file)
# Parse job plugins
plugins = root.find('plugins')
if plugins is not None:
for plugin in self._findall_with_required(plugins, 'plugin', ('id', 'type', 'load')):
if plugin.get('type') == 'runner':
workers = plugin.get('workers', plugins.get('workers', JobConfiguration.DEFAULT_NWORKERS))
runner_kwds = self.__get_params(plugin)
if not self.__is_enabled(runner_kwds):
continue
runner_info = dict(id=plugin.get('id'),
load=plugin.get('load'),
workers=int(workers),
kwds=runner_kwds)
self.runner_plugins.append(runner_info)
else:
log.error('Unknown plugin type: %s' % plugin.get('type'))
for plugin in self._findall_with_required(plugins, 'plugin', ('id', 'type')):
if plugin.get('id') == 'dynamic' and plugin.get('type') == 'runner':
self.dynamic_params = self.__get_params(plugin)
# Load tasks if configured
if self.app.config.use_tasked_jobs:
self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers))
# Parse handlers
handlers_conf = root.find('handlers')
self._init_handlers(handlers_conf)
# Must define at least one handler to have a default.
if not self.handlers:
raise ValueError("Job configuration file defines no valid handler elements.")
# Determine the default handler(s)
self.default_handler_id = self._get_default(self.app.config, handlers_conf, list(self.handlers.keys()))
# Parse destinations
destinations = root.find('destinations')
job_metrics = self.app.job_metrics
for destination in self._findall_with_required(destinations, 'destination', ('id', 'runner')):
id = destination.get('id')
destination_metrics = destination.get("metrics", None)
if destination_metrics:
if not util.asbool(destination_metrics):
# disable
job_metrics.set_destination_instrumenter(id, None)
else:
metrics_conf_path = self.app.config.resolve_path(destination_metrics)
job_metrics.set_destination_conf_file(id, metrics_conf_path)
else:
metrics_elements = self._findall_with_required(destination, 'job_metrics', ())
if metrics_elements:
job_metrics.set_destination_conf_element(id, metrics_elements[0])
job_destination = JobDestination(**dict(destination.items()))
params = self.__get_params(destination)
if not self.__is_enabled(params):
continue
job_destination['params'] = params
job_destination['env'] = self.__get_envs(destination)
destination_resubmits = self.__get_resubmits(destination)
if destination_resubmits:
resubmits = destination_resubmits
else:
resubmits = self.default_resubmits
job_destination["resubmit"] = resubmits
self.destinations[id] = (job_destination,)
if job_destination.tags is not None:
for tag in job_destination.tags:
if tag not in self.destinations:
self.destinations[tag] = []
self.destinations[tag].append(job_destination)
# Determine the default destination
self.default_destination_id = self._get_default(self.app.config, destinations, list(self.destinations.keys()))
# Parse resources...
resources = root.find('resources')
if resources is not None:
self.default_resource_group = resources.get("default", None)
for group in self._findall_with_required(resources, 'group'):
id = group.get('id')
fields_str = group.get('fields', None) or group.text or ''
fields = [f for f in fields_str.split(",") if f]
self.resource_groups[id] = fields
# Parse tool mappings
tools = root.find('tools')
if tools is not None:
for tool in self._findall_with_required(tools, 'tool'):
# There can be multiple definitions with identical ids, but different params
id = tool.get('id').lower().rstrip('/')
if id not in self.tools:
self.tools[id] = list()
self.tools[id].append(JobToolConfiguration(**dict(tool.items())))
self.tools[id][-1]['params'] = self.__get_params(tool)
types = dict(registered_user_concurrent_jobs=int,
anonymous_user_concurrent_jobs=int,
walltime=str,
total_walltime=str,
output_size=util.size_to_bytes)
self.limits = Bunch(registered_user_concurrent_jobs=None,
anonymous_user_concurrent_jobs=None,
walltime=None,
walltime_delta=None,
total_walltime={},
output_size=None,
destination_user_concurrent_jobs={},
destination_total_concurrent_jobs={})
# Parse job limits
limits = root.find('limits')
if limits is not None:
for limit in self._findall_with_required(limits, 'limit', ('type',)):
type = limit.get('type')
# concurrent_jobs renamed to destination_user_concurrent_jobs in job_conf.xml
if type in ('destination_user_concurrent_jobs', 'concurrent_jobs', 'destination_total_concurrent_jobs'):
id = limit.get('tag', None) or limit.get('id')
if type == 'destination_total_concurrent_jobs':
self.limits.destination_total_concurrent_jobs[id] = int(limit.text)
else:
self.limits.destination_user_concurrent_jobs[id] = int(limit.text)
elif type == 'total_walltime':
self.limits.total_walltime["window"] = (
int(limit.get('window')) or 30
)
self.limits.total_walltime["raw"] = (
types.get(type, str)(limit.text)
)
elif limit.text:
self.limits.__dict__[type] = types.get(type, str)(limit.text)
if self.limits.walltime is not None:
h, m, s = [int(v) for v in self.limits.walltime.split(':')]
self.limits.walltime_delta = datetime.timedelta(0, s, 0, 0, m, h)
if "raw" in self.limits.total_walltime:
h, m, s = [int(v) for v in
self.limits.total_walltime["raw"].split(':')]
self.limits.total_walltime["delta"] = datetime.timedelta(
0, s, 0, 0, m, h
)
log.debug('Done loading job configuration')
def _parse_handler(self, handler_id, handler_element):
for plugin in handler_element.findall('plugin'):
if handler_id not in self.handler_runner_plugins:
self.handler_runner_plugins[handler_id] = []
self.handler_runner_plugins[handler_id].append(plugin.get('id'))
def __parse_job_conf_legacy(self):
"""Loads the old-style job configuration from options in the galaxy config file (by default, config/galaxy.ini).
"""
log.debug('Loading job configuration from %s' % self.app.config.config_file)
# Always load local
self.runner_plugins = [dict(id='local', load='local', workers=self.app.config.local_job_queue_workers)]
# Load tasks if configured
if self.app.config.use_tasked_jobs:
self.runner_plugins.append(dict(id='tasks', load='tasks', workers=self.app.config.local_task_queue_workers))
for runner in self.app.config.start_job_runners:
self.runner_plugins.append(dict(id=runner, load=runner, workers=self.app.config.cluster_job_queue_workers))
# Set the handlers
for id in self.app.config.job_handlers:
self.handlers[id] = (id,)
self.handlers['default_job_handlers'] = self.app.config.default_job_handlers
self.default_handler_id = 'default_job_handlers'
# Set tool handler configs
for id, tool_handlers in self.app.config.tool_handlers.items():
self.tools[id] = list()
for handler_config in tool_handlers:
# rename the 'name' key to 'handler'
handler_config['handler'] = handler_config.pop('name')
self.tools[id].append(JobToolConfiguration(**handler_config))
# Set tool runner configs
for id, tool_runners in self.app.config.tool_runners.items():
# Might have been created in the handler parsing above
if id not in self.tools:
self.tools[id] = list()
for runner_config in tool_runners:
url = runner_config['url']
if url not in self.destinations:
# Create a new "legacy" JobDestination - it will have its URL converted to a destination params once the appropriate plugin has loaded
self.destinations[url] = (JobDestination(id=url, runner=url.split(':', 1)[0], url=url, legacy=True, converted=False),)
for tool_conf in self.tools[id]:
if tool_conf.params == runner_config.get('params', {}):
tool_conf['destination'] = url
break
else:
# There was not an existing config (from the handlers section) with the same params
# rename the 'url' key to 'destination'
runner_config['destination'] = runner_config.pop('url')
self.tools[id].append(JobToolConfiguration(**runner_config))
self.destinations[self.app.config.default_cluster_job_runner] = (JobDestination(id=self.app.config.default_cluster_job_runner,
runner=self.app.config.default_cluster_job_runner.split(':', 1)[0],
url=self.app.config.default_cluster_job_runner,
legacy=True,
converted=False),)
self.default_destination_id = self.app.config.default_cluster_job_runner
# Set the job limits
self.limits = Bunch(registered_user_concurrent_jobs=self.app.config.registered_user_job_limit,
anonymous_user_concurrent_jobs=self.app.config.anonymous_user_job_limit,
walltime=self.app.config.job_walltime,
walltime_delta=self.app.config.job_walltime_delta,
total_walltime={},
output_size=self.app.config.output_size_limit,
destination_user_concurrent_jobs={},
destination_total_concurrent_jobs={})
log.debug('Done loading job configuration')
def get_tool_resource_xml(self, tool_id, tool_type):
""" Given a tool id, return XML elements describing parameters to
insert into job resources.
:tool id: A tool ID (a string)
:tool type: A tool type (a string)
:returns: List of parameter elements.
"""
if tool_id and tool_type is 'default':
# TODO: Only works with exact matches, should handle different kinds of ids
# the way destination lookup does.
resource_group = None
if tool_id in self.tools:
resource_group = self.tools[tool_id][0].get_resource_group()
resource_group = resource_group or self.default_resource_group
if resource_group and resource_group in self.resource_groups:
fields_names = self.resource_groups[resource_group]
fields = [self.resource_parameters[n] for n in fields_names]
if fields:
conditional_element = ElementTree.fromstring(self.JOB_RESOURCE_CONDITIONAL_XML)
when_yes_elem = conditional_element.findall('when')[1]
for parameter in fields:
when_yes_elem.append(parameter)
return conditional_element
def __parse_resource_parameters(self):
if os.path.exists(self.app.config.job_resource_params_file):
resource_param_file = self.app.config.job_resource_params_file
try:
resource_definitions = util.parse_xml(resource_param_file)
except Exception as e:
raise config_exception(e, resource_param_file)
resource_definitions_root = resource_definitions.getroot()
# TODO: Also handling conditionals would be awesome!
for parameter_elem in resource_definitions_root.findall("param"):
name = parameter_elem.get("name")
self.resource_parameters[name] = parameter_elem
def __get_params(self, parent):
"""Parses any child <param> tags in to a dictionary suitable for persistence.
:param parent: Parent element in which to find child <param> tags.
:type parent: ``xml.etree.ElementTree.Element``
:returns: dict
"""
rval = {}
for param in parent.findall('param'):
key = param.get('id')
if key in ["container", "container_override"]:
from galaxy.tools.deps import requirements
containers = map(requirements.container_from_element, list(param))
param_value = map(lambda c: c.to_dict(), containers)
else:
param_value = param.text
if 'from_environ' in param.attrib:
environ_var = param.attrib['from_environ']
param_value = os.environ.get(environ_var, param_value)
elif 'from_config' in param.attrib:
config_val = param.attrib['from_config']
param_value = self.app.config.config_dict.get(config_val, param_value)
rval[key] = param_value
return rval
def __get_envs(self, parent):
"""Parses any child <env> tags in to a dictionary suitable for persistence.
:param parent: Parent element in which to find child <env> tags.
:type parent: ``xml.etree.ElementTree.Element``
:returns: dict
"""
rval = []
for param in parent.findall('env'):
rval.append(dict(
name=param.get('id'),
file=param.get('file'),
execute=param.get('exec'),
value=param.text,
raw=util.asbool(param.get('raw', 'false'))
))
return rval
def __get_resubmits(self, parent):
"""Parses any child <resubmit> tags in to a dictionary suitable for persistence.
:param parent: Parent element in which to find child <resubmit> tags.
:type parent: ``xml.etree.ElementTree.Element``
:returns: dict
"""
rval = []
for resubmit in parent.findall('resubmit'):
rval.append(dict(
condition=resubmit.get('condition'),
destination=resubmit.get('destination'),
handler=resubmit.get('handler'),
delay=resubmit.get('delay'),
))
return rval
def __is_enabled(self, params):
"""Check for an enabled parameter - pop it out - and return as boolean."""
enabled = True
if "enabled" in params:
raw_enabled = params.pop("enabled")
enabled = util.asbool(raw_enabled)
return enabled
@property
def default_job_tool_configuration(self):
"""
The default JobToolConfiguration, used if a tool does not have an
explicit defintion in the configuration. It consists of a reference to
the default handler and default destination.
:returns: JobToolConfiguration -- a representation of a <tool> element that uses the default handler and destination
"""
return JobToolConfiguration(id='default', handler=self.default_handler_id, destination=self.default_destination_id)
# Called upon instantiation of a Tool object
def get_job_tool_configurations(self, ids):
"""
Get all configured JobToolConfigurations for a tool ID, or, if given
a list of IDs, the JobToolConfigurations for the first id in ``ids``
matching a tool definition.
.. note:: You should not mix tool shed tool IDs, versionless tool shed
IDs, and tool config tool IDs that refer to the same tool.
:param ids: Tool ID or IDs to fetch the JobToolConfiguration of.
:type ids: list or str.
:returns: list -- JobToolConfiguration Bunches representing <tool> elements matching the specified ID(s).
Example tool ID strings include:
* Full tool shed id: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool/1.0.0``
* Tool shed id less version: ``toolshed.example.org/repos/nate/filter_tool_repo/filter_tool``
* Tool config tool id: ``filter_tool``
"""
rval = []
# listify if ids is a single (string) id
ids = util.listify(ids)
for id in ids:
if id in self.tools:
# If a tool has definitions that include job params but not a
# definition for jobs without params, include the default
# config
for job_tool_configuration in self.tools[id]:
if not job_tool_configuration.params:
break
else:
rval.append(self.default_job_tool_configuration)
rval.extend(self.tools[id])
break
else:
rval.append(self.default_job_tool_configuration)
return rval
def get_destination(self, id_or_tag):
"""Given a destination ID or tag, return the JobDestination matching the provided ID or tag
:param id_or_tag: A destination ID or tag.
:type id_or_tag: str
:returns: JobDestination -- A valid destination
Destinations are deepcopied as they are expected to be passed in to job
runners, which will modify them for persisting params set at runtime.
"""
if id_or_tag is None:
id_or_tag = self.default_destination_id
return copy.deepcopy(self._get_single_item(self.destinations[id_or_tag]))
def get_destinations(self, id_or_tag):
"""Given a destination ID or tag, return all JobDestinations matching the provided ID or tag
:param id_or_tag: A destination ID or tag.
:type id_or_tag: str
:returns: list or tuple of JobDestinations
Destinations are not deepcopied, so they should not be passed to
anything which might modify them.
"""
return self.destinations.get(id_or_tag, None)
def get_job_runner_plugins(self, handler_id):
"""Load all configured job runner plugins
:returns: list of job runner plugins
"""
rval = {}
if handler_id in self.handler_runner_plugins:
plugins_to_load = [rp for rp in self.runner_plugins if rp['id'] in self.handler_runner_plugins[handler_id]]
log.info("Handler '%s' will load specified runner plugins: %s", handler_id, ', '.join([rp['id'] for rp in plugins_to_load]))
else:
plugins_to_load = self.runner_plugins
log.info("Handler '%s' will load all configured runner plugins", handler_id)
for runner in plugins_to_load:
class_names = []
module = None
id = runner['id']
load = runner['load']
if ':' in load:
# Name to load was specified as '<module>:<class>'
module_name, class_name = load.rsplit(':', 1)
class_names = [class_name]
module = __import__(module_name)
else:
# Name to load was specified as '<module>'
if '.' not in load:
# For legacy reasons, try from galaxy.jobs.runners first if there's no '.' in the name
module_name = 'galaxy.jobs.runners.' + load
try:
module = __import__(module_name)
except ImportError:
# No such module, we'll retry without prepending galaxy.jobs.runners.
# All other exceptions (e.g. something wrong with the module code) will raise
pass
if module is None:
# If the name included a '.' or loading from the static runners path failed, try the original name
module = __import__(load)
module_name = load
if module is None:
# Module couldn't be loaded, error should have already been displayed
continue
for comp in module_name.split(".")[1:]:
module = getattr(module, comp)
if not class_names:
# If there's not a ':', we check <module>.__all__ for class names
try:
assert module.__all__
class_names = module.__all__
except AssertionError:
log.error('Runner "%s" does not contain a list of exported classes in __all__' % load)
continue
for class_name in class_names:
runner_class = getattr(module, class_name)
try:
assert issubclass(runner_class, BaseJobRunner)
except TypeError:
log.warning("A non-class name was found in __all__, ignoring: %s" % id)
continue
except AssertionError:
log.warning("Job runner classes must be subclassed from BaseJobRunner, %s has bases: %s" % (id, runner_class.__bases__))
continue
try:
rval[id] = runner_class(self.app, runner['workers'], **runner.get('kwds', {}))
except TypeError:
log.exception("Job runner '%s:%s' has not been converted to a new-style runner or encountered TypeError on load",
module_name, class_name)
rval[id] = runner_class(self.app)
log.debug("Loaded job runner '%s:%s' as '%s'" % (module_name, class_name, id))
return rval
def is_id(self, collection):
"""Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID
:param collection: A representation of a destination or handler
:type collection: tuple or list
:returns: bool
"""
return type(collection) == tuple
def is_tag(self, collection):
"""Given a collection of handlers or destinations, indicate whether the collection represents a tag or a real ID
:param collection: A representation of a destination or handler
:type collection: tuple or list
:returns: bool
"""
return type(collection) == list
def convert_legacy_destinations(self, job_runners):
"""Converts legacy (from a URL) destinations to contain the appropriate runner params defined in the URL.
:param job_runners: All loaded job runner plugins.
:type job_runners: list of job runner plugins
"""
for id, destination in [(id, destinations[0]) for id, destinations in self.destinations.items() if self.is_id(destinations)]:
# Only need to deal with real destinations, not members of tags
if destination.legacy and not destination.converted:
if destination.runner in job_runners:
destination.params = job_runners[destination.runner].url_to_destination(destination.url).params
destination.converted = True
if destination.params:
log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url))
for k, v in destination.params.items():
log.debug(" %s: %s" % (k, v))
else:
log.debug("Legacy destination with id '%s', url '%s' converted, got params:" % (id, destination.url))
else:
log.warning("Legacy destination with id '%s' could not be converted: Unknown runner plugin: %s" % (id, destination.runner))
class HasResourceParameters:
def get_resource_parameters(self, job=None):
# Find the dymically inserted resource parameters and give them
# to rule.
if job is None:
job = self.get_job()
app = self.app
param_values = job.get_param_values(app, ignore_errors=True)
resource_params = {}
try:
resource_params_raw = param_values["__job_resource"]
if resource_params_raw["__job_resource__select"].lower() in ["1", "yes", "true"]:
for key, value in resource_params_raw.items():
resource_params[key] = value
except KeyError:
pass
return resource_params
class JobWrapper(object, HasResourceParameters):
"""
Wraps a 'model.Job' with convenience methods for running processes and
state management.
"""
def __init__(self, job, queue, use_persisted_destination=False):
self.job_id = job.id
self.session_id = job.session_id
self.user_id = job.user_id
self.tool = queue.app.toolbox.get_tool(job.tool_id, job.tool_version, exact=True)
self.queue = queue
self.app = queue.app
self.sa_session = self.app.model.context
self.extra_filenames = []
self.command_line = None
self.dependencies = []
# Tool versioning variables
self.write_version_cmd = None
self.version_string = ""
self.__galaxy_lib_dir = None
# With job outputs in the working directory, we need the working
# directory to be set before prepare is run, or else premature deletion
# and job recovery fail.
# Create the working dir if necessary
self._create_working_directory()
self.dataset_path_rewriter = self._job_dataset_path_rewriter(self.working_directory)
self.output_paths = None
self.output_hdas_and_paths = None
self.tool_provided_job_metadata = None
# Wrapper holding the info required to restore and clean up from files used for setting metadata externally
self.external_output_metadata = metadata.JobExternalOutputMetadataWrapper(job)
self.job_runner_mapper = JobRunnerMapper(self, queue.dispatcher.url_to_destination, self.app.job_config)
self.params = None
if job.params:
self.params = loads(job.params)
if use_persisted_destination:
self.job_runner_mapper.cached_job_destination = JobDestination(from_job=job)
self.__commands_in_new_shell = True
self.__user_system_pwent = None
self.__galaxy_system_pwent = None
def _job_dataset_path_rewriter(self, working_directory):
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory:
dataset_path_rewriter = OutputsToWorkingDirectoryPathRewriter(working_directory)
else:
dataset_path_rewriter = NullDatasetPathRewriter()
return dataset_path_rewriter
@property
def cleanup_job(self):
""" Remove the job after it is complete, should return "always", "onsuccess", or "never".
"""
return self.get_destination_configuration("cleanup_job", DEFAULT_CLEANUP_JOB)
@property
def requires_containerization(self):
return util.asbool(self.get_destination_configuration("require_container", "False"))
def can_split(self):
# Should the job handler split this job up?
return self.app.config.use_tasked_jobs and self.tool.parallelism
def get_job_runner_url(self):
log.warning('(%s) Job runner URLs are deprecated, use destinations instead.' % self.job_id)
return self.job_destination.url
def get_parallelism(self):
return self.tool.parallelism
@property
def shell(self):
return self.job_destination.shell or getattr(self.app.config, 'default_job_shell', DEFAULT_JOB_SHELL)
def disable_commands_in_new_shell(self):
"""Provide an extension point to disable this isolation,
Pulsar builds its own job script so this is not needed for
remote jobs."""
self.__commands_in_new_shell = False
@property
def strict_shell(self):
return self.tool.strict_shell
@property
def commands_in_new_shell(self):
return self.__commands_in_new_shell
@property
def galaxy_lib_dir(self):
if self.__galaxy_lib_dir is None:
self.__galaxy_lib_dir = os.path.abspath("lib") # cwd = galaxy root
return self.__galaxy_lib_dir
@property
def galaxy_virtual_env(self):
return os.environ.get('VIRTUAL_ENV', None)
# legacy naming
get_job_runner = get_job_runner_url
@property
def job_destination(self):
"""Return the JobDestination that this job will use to run. This will
either be a configured destination, a randomly selected destination if
the configured destination was a tag, or a dynamically generated
destination from the dynamic runner.
Calling this method for the first time causes the dynamic runner to do
its calculation, if any.
:returns: ``JobDestination``
"""
return self.job_runner_mapper.get_job_destination(self.params)
def get_job(self):
return self.sa_session.query(model.Job).get(self.job_id)
def get_id_tag(self):
# For compatability with drmaa, which uses job_id right now, and TaskWrapper
return self.get_job().get_id_tag()
def get_param_dict(self):
"""
Restore the dictionary of parameters from the database.
"""
job = self.get_job()
param_dict = dict([(p.name, p.value) for p in job.parameters])
param_dict = self.tool.params_from_strings(param_dict, self.app)
return param_dict
def get_version_string_path(self):
return os.path.abspath(os.path.join(self.app.config.new_file_path, "GALAXY_VERSION_STRING_%s" % self.job_id))
def prepare(self, compute_environment=None):
"""
Prepare the job to run by creating the working directory and the
config files.
"""
self.sa_session.expunge_all() # this prevents the metadata reverting that has been seen in conjunction with the PBS job runner
if not os.path.exists(self.working_directory):
os.mkdir(self.working_directory)
job = self._load_job()
def get_special():
special = self.sa_session.query(model.JobExportHistoryArchive).filter_by(job=job).first()
if not special:
special = self.sa_session.query(model.GenomeIndexToolData).filter_by(job=job).first()
return special
tool_evaluator = self._get_tool_evaluator(job)
compute_environment = compute_environment or self.default_compute_environment(job)
tool_evaluator.set_compute_environment(compute_environment, get_special=get_special)
self.sa_session.flush()
self.command_line, self.extra_filenames, self.environment_variables = tool_evaluator.build()
# Ensure galaxy_lib_dir is set in case there are any later chdirs
self.galaxy_lib_dir
# Shell fragment to inject dependencies
self.dependency_shell_commands = self.tool.build_dependency_shell_commands(job_directory=self.working_directory)
# We need command_line persisted to the db in order for Galaxy to re-queue the job
# if the server was stopped and restarted before the job finished
job.command_line = unicodify(self.command_line)
job.dependencies = self.tool.dependencies
self.sa_session.add(job)
self.sa_session.flush()
# Return list of all extra files
self.param_dict = tool_evaluator.param_dict
version_string_cmd_raw = self.tool.version_string_cmd
if version_string_cmd_raw:
version_command_template = string.Template(version_string_cmd_raw)
version_string_cmd = version_command_template.safe_substitute({"__tool_directory__": compute_environment.tool_directory()})
self.write_version_cmd = "%s > %s 2>&1" % (version_string_cmd, compute_environment.version_path())
else:
self.write_version_cmd = None
return self.extra_filenames
def _create_working_directory(self):
job = self.get_job()
try:
self.app.object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True)
self.working_directory = self.app.object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True)
# The tool execution is given a working directory beneath the
# "job" working directory.
self.tool_working_directory = os.path.join(self.working_directory, "working")
safe_makedirs(self.tool_working_directory)
log.debug('(%s) Working directory for job is: %s',
self.job_id, self.working_directory)
except ObjectInvalid:
raise Exception('(%s) Unable to create job working directory',
job.id)
def clear_working_directory(self):
job = self.get_job()
if not os.path.exists(self.working_directory):
log.warning('(%s): Working directory clear requested but %s does '
'not exist',
self.job_id,
self.working_directory)
return
self.app.object_store.create(
job, base_dir='job_work', dir_only=True, obj_dir=True,
extra_dir='_cleared_contents', extra_dir_at_root=True)
base = self.app.object_store.get_filename(
job, base_dir='job_work', dir_only=True, obj_dir=True,
extra_dir='_cleared_contents', extra_dir_at_root=True)
date_str = datetime.datetime.now().strftime('%Y%m%d-%H%M%S')
arc_dir = os.path.join(base, date_str)
shutil.move(self.working_directory, arc_dir)
self._create_working_directory()
log.debug('(%s) Previous working directory moved to %s',
self.job_id, arc_dir)
def default_compute_environment(self, job=None):
if not job:
job = self.get_job()
return SharedComputeEnvironment(self, job)
def _load_job(self):
# Load job from database and verify it has user or session.
# Restore parameters from the database
job = self.get_job()
if job.user is None and job.galaxy_session is None:
raise Exception('Job %s has no user and no session.' % job.id)
return job
def _get_tool_evaluator(self, job):
# Hacky way to avoid cirular import for now.
# Placing ToolEvaluator in either jobs or tools
# result in ciruclar dependency.
from galaxy.tools.evaluation import ToolEvaluator
tool_evaluator = ToolEvaluator(
app=self.app,
job=job,
tool=self.tool,
local_working_directory=self.working_directory,
)
return tool_evaluator
def fail(self, message, exception=False, stdout="", stderr="", exit_code=None):
"""
Indicate job failure by setting state and message on all output
datasets.
"""
job = self.get_job()
self.sa_session.refresh(job)
# if the job was deleted, don't fail it
if not job.state == job.states.DELETED:
# Check if the failure is due to an exception
if exception:
# Save the traceback immediately in case we generate another
# below
job.traceback = traceback.format_exc()
# Get the exception and let the tool attempt to generate
# a better message
etype, evalue, tb = sys.exc_info()
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory:
for dataset_path in self.get_output_fnames():
try:
shutil.move(dataset_path.false_path, dataset_path.real_path)
log.debug("fail(): Moved %s to %s" % (dataset_path.false_path, dataset_path.real_path))
except (IOError, OSError) as e:
log.error("fail(): Missing output file in working directory: %s" % e)
for dataset_assoc in job.output_datasets + job.output_library_datasets:
dataset = dataset_assoc.dataset
self.sa_session.refresh(dataset)
dataset.state = dataset.states.ERROR