-
Notifications
You must be signed in to change notification settings - Fork 966
/
command_factory.py
259 lines (213 loc) · 10.6 KB
/
command_factory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
from logging import getLogger
from os import getcwd
from os.path import (
abspath,
join
)
from galaxy import util
from galaxy.jobs.runners.util.job_script import (
check_script_integrity,
INTEGRITY_INJECTION,
write_script,
)
log = getLogger(__name__)
CAPTURE_RETURN_CODE = "return_code=$?"
YIELD_CAPTURED_CODE = 'sh -c "exit $return_code"'
SETUP_GALAXY_FOR_METADATA = """
_galaxy_setup_environment True
"""
def build_command(
runner,
job_wrapper,
container=None,
modify_command_for_container=True,
include_metadata=False,
include_work_dir_outputs=True,
create_tool_working_directory=True,
remote_command_params={},
metadata_directory=None,
):
"""
Compose the sequence of commands necessary to execute a job. This will
currently include:
- environment settings corresponding to any requirement tags
- preparing input files
- command line taken from job wrapper
- commands to set metadata (if include_metadata is True)
"""
shell = job_wrapper.shell
base_command_line = job_wrapper.get_command_line()
# job_id = job_wrapper.job_id
# log.debug( 'Tool evaluation for job (%s) produced command-line: %s' % ( job_id, base_command_line ) )
if not base_command_line:
raise Exception("Attempting to run a tool with empty command definition.")
commands_builder = CommandsBuilder(base_command_line)
# All job runners currently handle this case which should never occur
if not commands_builder.commands:
return None
# Version, dependency resolution, and task splitting are prepended to the
# command - so they need to appear in the following order to ensure that
# the underlying application used by version command is available in the
# environment after dependency resolution, but the task splitting command
# is still executed in Galaxy's Python environment.
__handle_version_command(commands_builder, job_wrapper)
# One could imagine also allowing dependencies inside of the container but
# that is too sophisticated for a first crack at this - build your
# containers ready to go!
if not container or container.resolve_dependencies:
__handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params)
__handle_task_splitting(commands_builder, job_wrapper)
if (container and modify_command_for_container) or job_wrapper.commands_in_new_shell:
if container and modify_command_for_container:
# Many Docker containers do not have /bin/bash.
external_command_shell = container.shell
else:
external_command_shell = shell
externalized_commands = __externalize_commands(job_wrapper, external_command_shell, commands_builder, remote_command_params)
if container and modify_command_for_container:
# Stop now and build command before handling metadata and copying
# working directory files back. These should always happen outside
# of docker container - no security implications when generating
# metadata and means no need for Galaxy to be available to container
# and not copying workdir outputs back means on can be more restrictive
# of where container can write to in some circumstances.
run_in_container_command = container.containerize_command(
externalized_commands
)
commands_builder = CommandsBuilder(run_in_container_command)
else:
commands_builder = CommandsBuilder(externalized_commands)
# Don't need to create a separate tool working directory for Pulsar
# jobs - that is handled by Pulsar.
if create_tool_working_directory:
# usually working will already exist, but it will not for task
# split jobs.
# Remove the working directory incase this is for instance a SLURM re-submission.
# xref https://github.com/galaxyproject/galaxy/issues/3289
commands_builder.prepend_command("rm -rf working; mkdir -p working; cd working")
if include_work_dir_outputs:
__handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params)
commands_builder.capture_return_code()
if include_metadata and job_wrapper.requires_setting_metadata:
metadata_directory = metadata_directory or job_wrapper.working_directory
commands_builder.append_command("cd '%s'" % metadata_directory)
__handle_metadata(commands_builder, job_wrapper, runner, remote_command_params)
return commands_builder.build()
def __externalize_commands(job_wrapper, shell, commands_builder, remote_command_params, script_name="tool_script.sh"):
local_container_script = join(job_wrapper.working_directory, script_name)
tool_commands = commands_builder.build()
config = job_wrapper.app.config
integrity_injection = ""
# Setting shell to none in job_conf.xml disables creating a tool command script,
# set -e doesn't work for composite commands but this is necessary for Windows jobs
# for instance.
if shell and shell.lower() == 'none':
return tool_commands
if check_script_integrity(config):
integrity_injection = INTEGRITY_INJECTION
set_e = ""
if job_wrapper.strict_shell:
set_e = "set -e\n"
script_contents = u"#!%s\n%s%s%s" % (
shell,
integrity_injection,
set_e,
tool_commands
)
write_script(local_container_script, script_contents, config)
commands = local_container_script
if 'working_directory' in remote_command_params:
commands = "%s %s" % (shell, join(remote_command_params['working_directory'], script_name))
log.info("Built script [%s] for tool command [%s]" % (local_container_script, tool_commands))
return commands
def __handle_version_command(commands_builder, job_wrapper):
# Prepend version string
write_version_cmd = job_wrapper.write_version_cmd
if write_version_cmd:
commands_builder.prepend_command(write_version_cmd)
def __handle_task_splitting(commands_builder, job_wrapper):
# prepend getting input files (if defined)
if getattr(job_wrapper, 'prepare_input_files_cmds', None):
commands_builder.prepend_commands(job_wrapper.prepare_input_files_cmds)
def __handle_dependency_resolution(commands_builder, job_wrapper, remote_command_params):
local_dependency_resolution = remote_command_params.get("dependency_resolution", "local") == "local"
# Prepend dependency injection
if job_wrapper.dependency_shell_commands and local_dependency_resolution:
commands_builder.prepend_commands(job_wrapper.dependency_shell_commands)
def __handle_work_dir_outputs(commands_builder, job_wrapper, runner, remote_command_params):
# Append commands to copy job outputs based on from_work_dir attribute.
work_dir_outputs_kwds = {}
if 'working_directory' in remote_command_params:
work_dir_outputs_kwds['job_working_directory'] = remote_command_params['working_directory']
work_dir_outputs = runner.get_work_dir_outputs(job_wrapper, **work_dir_outputs_kwds)
if work_dir_outputs:
commands_builder.capture_return_code()
copy_commands = map(__copy_if_exists_command, work_dir_outputs)
commands_builder.append_commands(copy_commands)
def __handle_metadata(commands_builder, job_wrapper, runner, remote_command_params):
# Append metadata setting commands, we don't want to overwrite metadata
# that was copied over in init_meta(), as per established behavior
metadata_kwds = remote_command_params.get('metadata_kwds', {})
exec_dir = metadata_kwds.get('exec_dir', abspath(getcwd()))
tmp_dir = metadata_kwds.get('tmp_dir', job_wrapper.working_directory)
dataset_files_path = metadata_kwds.get('dataset_files_path', runner.app.model.Dataset.file_path)
output_fnames = metadata_kwds.get('output_fnames', job_wrapper.get_output_fnames())
config_root = metadata_kwds.get('config_root', None)
config_file = metadata_kwds.get('config_file', None)
compute_tmp_dir = metadata_kwds.get('compute_tmp_dir', None)
resolve_metadata_dependencies = job_wrapper.commands_in_new_shell
metadata_command = job_wrapper.setup_external_metadata(
exec_dir=exec_dir,
tmp_dir=tmp_dir,
dataset_files_path=dataset_files_path,
output_fnames=output_fnames,
set_extension=False,
config_root=config_root,
config_file=config_file,
compute_tmp_dir=compute_tmp_dir,
resolve_metadata_dependencies=resolve_metadata_dependencies,
kwds={'overwrite': False}
) or ''
metadata_command = metadata_command.strip()
if metadata_command:
# Place Galaxy and its dependencies in environment for metadata regardless of tool.
metadata_command = "%s%s" % (SETUP_GALAXY_FOR_METADATA, metadata_command)
commands_builder.capture_return_code()
commands_builder.append_command(metadata_command)
def __copy_if_exists_command(work_dir_output):
source_file, destination = work_dir_output
return "if [ -f %s ] ; then cp %s %s ; fi" % (source_file, source_file, destination)
class CommandsBuilder(object):
def __init__(self, initial_command=u''):
# Remove trailing semi-colon so we can start hacking up this command.
# TODO: Refactor to compose a list and join with ';', would be more clean.
initial_command = util.unicodify(initial_command)
commands = initial_command.rstrip(u"; ")
self.commands = commands
# Coping work dir outputs or setting metadata will mask return code of
# tool command. If these are used capture the return code and ensure
# the last thing that happens is an exit with return code.
self.return_code_captured = False
def prepend_command(self, command):
if command:
self.commands = u"%s; %s" % (command,
self.commands)
return self
def prepend_commands(self, commands):
return self.prepend_command(u"; ".join(c for c in commands if c))
def append_command(self, command):
if command:
self.commands = u"%s; %s" % (self.commands,
command)
return self
def append_commands(self, commands):
self.append_command(u"; ".join(c for c in commands if c))
def capture_return_code(self):
if not self.return_code_captured:
self.return_code_captured = True
self.append_command(CAPTURE_RETURN_CODE)
def build(self):
if self.return_code_captured:
self.append_command(YIELD_CAPTURED_CODE)
return self.commands
__all__ = ("build_command", )