Skip to content

Commit 6f4328e

Browse files
committed
Introduce the concept of a metadata directory...
... for metadata about ... data and jobs. So formal Galaxy metadata stuff, job instrument files, etc.... Handle transfer of metadata and working directory outputs correctly in Galaxy 16.04.
1 parent 72c929d commit 6f4328e

File tree

12 files changed

+128
-42
lines changed

12 files changed

+128
-42
lines changed

pulsar/client/action_mapper.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,17 @@
3939
CONFIG="config",
4040
# Files from tool's tool_dir (for now just wrapper if available).
4141
TOOL="tool",
42-
# Input work dir files - e.g. metadata files, task-split input files, etc..
42+
# Input work dir files - e.g.task-split input file
4343
WORKDIR="workdir",
44+
# Input work dir files - e.g. metadata files, etc..
45+
METADATA="metadata",
4446
# Galaxy output datasets in their final home.
4547
OUTPUT="output",
4648
# Galaxy from_work_dir output paths and other files (e.g. galaxy.json)
4749
OUTPUT_WORKDIR="output_workdir",
50+
# Meta job and data files (e.g. Galaxy metadata generation files and
51+
# metric instrumentation files)
52+
OUTPUT_METADATA="output_metadata",
4853
# Other fixed tool parameter paths (likely coming from tool data, but not
4954
# nessecarily). Not sure this is the best name...
5055
UNSTRUCTURED="unstructured",
@@ -56,8 +61,10 @@
5661
path_type.CONFIG,
5762
path_type.TOOL,
5863
path_type.WORKDIR,
64+
path_type.METADATA,
5965
path_type.OUTPUT,
6066
path_type.OUTPUT_WORKDIR,
67+
path_type.OUTPUT_METADATA,
6168
]
6269
ALL_PATH_TYPES = ACTION_DEFAULT_PATH_TYPES + [path_type.UNSTRUCTURED]
6370

@@ -206,7 +213,7 @@ def __action_class(self, path, type, mapper):
206213
action_type = self.default_action if type in ACTION_DEFAULT_PATH_TYPES else "none"
207214
if mapper:
208215
action_type = mapper.action_type
209-
if type in ["workdir", "output_workdir"] and action_type == "none":
216+
if type in ["workdir", "output_workdir", "output_metadata"] and action_type == "none":
210217
# We are changing the working_directory relative to what
211218
# Galaxy would use, these need to be copied over.
212219
action_type = "copy"

pulsar/client/client.py

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,8 @@ def fetch_output(self, path, name, working_directory, action_type, output_type):
204204
used if targetting an older Pulsar server that didn't return statuses
205205
allowing this to be inferred.
206206
"""
207-
if output_type == 'output_workdir':
208-
self._fetch_work_dir_output(name, working_directory, path, action_type=action_type)
207+
if output_type in ['output_workdir', 'output_metadata']:
208+
self._populate_output_path(name, path, action_type, output_type)
209209
elif output_type == 'output':
210210
self._fetch_output(path=path, name=name, action_type=action_type)
211211
else:
@@ -219,22 +219,14 @@ def _fetch_output(self, path, name=None, check_exists_remotely=False, action_typ
219219
# Extra files will send in the path.
220220
name = os.path.basename(path)
221221

222-
self.__populate_output_path(name, path, action_type)
222+
self._populate_output_path(name, path, action_type, path_type.OUTPUT)
223223

224-
def _fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'):
224+
def _populate_output_path(self, name, output_path, action_type, path_type):
225225
ensure_directory(output_path)
226226
if action_type == 'transfer':
227-
self.__raw_download_output(name, self.job_id, path_type.OUTPUT_WORKDIR, output_path)
228-
else: # Even if action is none - Pulsar has a different work_dir so this needs to be copied.
229-
pulsar_path = self._output_path(name, self.job_id, path_type.OUTPUT_WORKDIR)['path']
230-
copy(pulsar_path, output_path)
231-
232-
def __populate_output_path(self, name, output_path, action_type):
233-
ensure_directory(output_path)
234-
if action_type == 'transfer':
235-
self.__raw_download_output(name, self.job_id, path_type.OUTPUT, output_path)
227+
self.__raw_download_output(name, self.job_id, path_type, output_path)
236228
elif action_type == 'copy':
237-
pulsar_path = self._output_path(name, self.job_id, path_type.OUTPUT)['path']
229+
pulsar_path = self._output_path(name, self.job_id, path_type)['path']
238230
copy(pulsar_path, output_path)
239231

240232
@parseJson()

pulsar/client/job_directory.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
config="configs_directory",
1818
tool="tool_files_directory",
1919
workdir="working_directory",
20+
metadata="metadata_directory",
2021
output="outputs_directory",
2122
output_workdir="working_directory",
23+
output_metadata="metadata_directory",
2224
)
2325

2426

@@ -33,6 +35,9 @@ def __init__(self, remote_staging_directory, remote_id, remote_sep):
3335
remote_id
3436
)
3537

38+
def metadata_directory(self):
39+
return self._sub_dir('metadata')
40+
3641
def working_directory(self):
3742
return self._sub_dir('working')
3843

@@ -72,7 +77,7 @@ def _directory_for_file_type(self, file_type):
7277
# Obviously this client won't be legacy because this is in the
7378
# client module, but this code is reused on server which may
7479
# serve legacy clients.
75-
allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir']
80+
allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir', 'metadata', 'output_metadata']
7681
directory_function = getattr(self, TYPES_TO_METHOD.get(file_type, None), None)
7782
if not directory_function:
7883
raise Exception("Unknown file_type specified %s" % file_type)

pulsar/client/path_mapper.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ def __init__(
3232
self.input_directory = remote_job_config["inputs_directory"]
3333
self.output_directory = remote_job_config["outputs_directory"]
3434
self.working_directory = remote_job_config["working_directory"]
35+
self.metadata_directory = remote_job_config.get("working_directory", None)
3536
self.unstructured_files_directory = remote_job_config["unstructured_files_directory"]
3637
self.config_directory = remote_job_config["configs_directory"]
3738
separator = remote_job_config["system_properties"]["separator"]
@@ -87,6 +88,8 @@ def __action(self, dataset_path, dataset_path_type):
8788
def __remote_directory(self, dataset_path_type):
8889
if dataset_path_type in [path_type.OUTPUT]:
8990
return self.output_directory
91+
elif dataset_path_type in [path_type.METADATA, path_type.OUTPUT_METADATA]:
92+
return self.metadata_directory
9093
elif dataset_path_type in [path_type.WORKDIR, path_type.OUTPUT_WORKDIR]:
9194
return self.working_directory
9295
elif dataset_path_type in [path_type.INPUT]:

pulsar/client/staging/__init__.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@ class ClientJobDescription(object):
3131
Directory containing tool to execute (if a wrapper is used, it will
3232
be transferred to remote server).
3333
working_directory : str
34-
Local path created by Galaxy for running this job.
34+
Local path created by Galaxy for running this job (job_wrapper.tool_working_directory).
35+
metadata_directory : str
36+
Local path created by Galaxy for running this job (job_wrapper.working_directory).
3537
dependencies_description : list
3638
galaxy.tools.deps.dependencies.DependencyDescription object describing
3739
tool dependency context for remote depenency resolution.
@@ -57,7 +59,8 @@ def __init__(
5759
config_files=[],
5860
input_files=[],
5961
client_outputs=None,
60-
working_directory=None, # More sensible default?
62+
working_directory=None,
63+
metadata_directory=None,
6164
dependencies_description=None,
6265
env=[],
6366
arbitrary_files=None,
@@ -69,6 +72,7 @@ def __init__(
6972
self.input_files = input_files
7073
self.client_outputs = client_outputs or ClientOutputs()
7174
self.working_directory = working_directory
75+
self.metadata_directory = metadata_directory
7276
self.dependencies_description = dependencies_description
7377
self.env = env
7478
self.rewrite_paths = rewrite_paths
@@ -103,9 +107,11 @@ def __init__(
103107
output_files=[],
104108
work_dir_outputs=None,
105109
version_file=None,
106-
dynamic_outputs=None
110+
dynamic_outputs=None,
111+
metadata_directory=None,
107112
):
108113
self.working_directory = working_directory
114+
self.metadata_directory = metadata_directory
109115
self.work_dir_outputs = work_dir_outputs or []
110116
self.output_files = output_files or []
111117
self.version_file = version_file
@@ -115,6 +121,7 @@ def __init__(
115121
def to_dict(self):
116122
return dict(
117123
working_directory=self.working_directory,
124+
metadata_directory=self.metadata_directory,
118125
work_dir_outputs=self.work_dir_outputs,
119126
output_files=self.output_files,
120127
version_file=self.version_file,
@@ -125,6 +132,7 @@ def to_dict(self):
125132
def from_dict(config_dict):
126133
return ClientOutputs(
127134
working_directory=config_dict.get('working_directory'),
135+
metadata_directory=config_dict.get('metadata_directory'),
128136
work_dir_outputs=config_dict.get('work_dir_outputs'),
129137
output_files=config_dict.get('output_files'),
130138
version_file=config_dict.get('version_file'),
@@ -139,9 +147,12 @@ class PulsarOutputs(object):
139147
""" Abstraction describing the output files PRODUCED by the remote Pulsar
140148
server. """
141149

142-
def __init__(self, working_directory_contents, output_directory_contents, remote_separator=sep):
150+
def __init__(
151+
self, working_directory_contents, output_directory_contents, metadata_directory_contents, remote_separator=sep
152+
):
143153
self.working_directory_contents = working_directory_contents
144154
self.output_directory_contents = output_directory_contents
155+
self.metadata_directory_contents = metadata_directory_contents
145156
self.path_helper = PathHelper(remote_separator)
146157

147158
@staticmethod
@@ -150,13 +161,15 @@ def from_status_response(complete_response):
150161
# by the Pulsar - older Pulsar instances will not set these in complete response.
151162
working_directory_contents = complete_response.get("working_directory_contents")
152163
output_directory_contents = complete_response.get("outputs_directory_contents")
164+
metadata_directory_contents = complete_response.get("metadata_directory_contents")
153165
# Older (pre-2014) Pulsar servers will not include separator in response,
154166
# so this should only be used when reasoning about outputs in
155167
# subdirectories (which was not previously supported prior to that).
156168
remote_separator = complete_response.get("system_properties", {}).get("separator", sep)
157169
return PulsarOutputs(
158170
working_directory_contents,
159171
output_directory_contents,
172+
metadata_directory_contents,
160173
remote_separator
161174
)
162175

pulsar/client/staging/down.py

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from os.path import join
22
from os.path import relpath
3-
from re import compile
43
from contextlib import contextmanager
54

65
from ..staging import COMMAND_VERSION_FILENAME
@@ -10,12 +9,6 @@
109
from logging import getLogger
1110
log = getLogger(__name__)
1211

13-
# All output files marked with from_work_dir attributes will copied or downloaded
14-
# this pattern picks up attiditional files to copy back - such as those
15-
# associated with multiple outputs and metadata configuration. Set to .* to just
16-
# copy everything
17-
COPY_FROM_WORKING_DIRECTORY_PATTERN = compile(r"primary_.*|galaxy.json|metadata_.*|dataset_\d+\.dat|__instrument_.*|dataset_\d+_files.+")
18-
1912

2013
def finish_job(client, cleanup_job, job_completed_normally, client_outputs, pulsar_outputs):
2114
""" Responsible for downloading results from remote server and cleaning up
@@ -63,12 +56,14 @@ def __init__(self, output_collector, action_mapper, client_outputs, pulsar_outpu
6356
self.exception_tracker = DownloadExceptionTracker()
6457
self.output_files = client_outputs.output_files
6558
self.working_directory_contents = pulsar_outputs.working_directory_contents or []
59+
self.metadata_directory_contents = pulsar_outputs.metadata_directory_contents or []
6660

6761
def collect(self):
6862
self.__collect_working_directory_outputs()
6963
self.__collect_outputs()
7064
self.__collect_version_file()
7165
self.__collect_other_working_directory_files()
66+
self.__collect_metadata_directory_files()
7267
return self.exception_tracker.collection_failure_exceptions
7368

7469
def __collect_working_directory_outputs(self):
@@ -105,15 +100,31 @@ def __collect_version_file(self):
105100
self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME)
106101

107102
def __collect_other_working_directory_files(self):
108-
working_directory = self.client_outputs.working_directory
103+
self.__collect_directory_files(
104+
self.client_outputs.working_directory,
105+
self.working_directory_contents,
106+
'output_workdir',
107+
)
108+
109+
def __collect_metadata_directory_files(self):
110+
self.__collect_directory_files(
111+
self.client_outputs.metadata_directory,
112+
self.metadata_directory_contents,
113+
'output_metadata',
114+
)
115+
116+
def __collect_directory_files(self, directory, contents, output_type):
117+
if directory is None: # e.g. output_metadata_directory
118+
return
119+
109120
# Fetch remaining working directory outputs of interest.
110-
for name in self.working_directory_contents:
121+
for name in contents:
111122
if name in self.downloaded_working_directory_files:
112123
continue
113124
if self.client_outputs.dynamic_match(name):
114-
log.debug("collecting dynamic output %s" % name)
115-
output_file = join(working_directory, self.pulsar_outputs.path_helper.local_name(name))
116-
if self._attempt_collect_output(output_type='output_workdir', path=output_file, name=name):
125+
log.debug("collecting dynamic %s file %s" % (output_type, name))
126+
output_file = join(directory, self.pulsar_outputs.path_helper.local_name(name))
127+
if self._attempt_collect_output(output_type=output_type, path=output_file, name=name):
117128
self.downloaded_working_directory_files.append(name)
118129

119130
def _attempt_collect_output(self, output_type, path, name=None):

pulsar/client/staging/up.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __init__(self, client, client_job_description, job_config):
7474
self.tool_version = None
7575
self.tool_dir = None
7676
self.working_directory = client_job_description.working_directory
77+
self.metadata_directory = client_job_description.metadata_directory
7778
self.version_file = client_job_description.version_file
7879
self.arbitrary_files = client_job_description.arbitrary_files
7980
self.rewrite_paths = client_job_description.rewrite_paths
@@ -95,6 +96,7 @@ def __init__(self, client, client_job_description, job_config):
9596
self.__upload_tool_files()
9697
self.__upload_input_files()
9798
self.__upload_working_directory_files()
99+
self.__upload_metadata_directory_files()
98100
self.__upload_arbitrary_files()
99101

100102
if self.rewrite_paths:
@@ -190,12 +192,24 @@ def __upload_working_directory_files(self):
190192
path = join(self.working_directory, working_directory_file)
191193
self.transfer_tracker.handle_transfer(path, path_type.WORKDIR)
192194

195+
def __upload_metadata_directory_files(self):
196+
metadata_directory_files = self.__metadata_directory_files()
197+
for metadata_directory_file in metadata_directory_files:
198+
path = join(self.metadata_directory, metadata_directory_file)
199+
self.transfer_tracker.handle_transfer(path, path_type.METADATA)
200+
193201
def __working_directory_files(self):
194202
if self.working_directory and exists(self.working_directory):
195203
return listdir(self.working_directory)
196204
else:
197205
return []
198206

207+
def __metadata_directory_files(self):
208+
if self.metadata_directory and exists(self.metadata_directory):
209+
return listdir(self.metadata_directory)
210+
else:
211+
return []
212+
199213
def __initialize_version_file_rename(self):
200214
version_file = self.version_file
201215
if version_file:

0 commit comments

Comments
 (0)