/
output_collect.py
487 lines (432 loc) · 20.2 KB
/
output_collect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
""" Code allowing tools to define extra files associated with an output datset.
"""
import os
import re
import operator
import glob
import json
from galaxy import jobs
from galaxy import util
from galaxy.util import odict
from galaxy.tools.parser.output_collection_def import (
DEFAULT_DATASET_COLLECTOR_DESCRIPTION,
INPUT_DBKEY_TOKEN,
)
DATASET_ID_TOKEN = "DATASET_ID"
import logging
log = logging.getLogger( __name__ )
def collect_dynamic_collections(
tool,
output_collections,
job_working_directory,
inp_data={},
job=None,
input_dbkey="?",
):
collections_service = tool.app.dataset_collections_service
job_context = JobContext(
tool,
job,
job_working_directory,
inp_data,
input_dbkey,
)
for name, has_collection in output_collections.items():
if name not in tool.output_collections:
continue
output_collection_def = tool.output_collections[ name ]
if not output_collection_def.dynamic_structure:
continue
# Could be HDCA for normal jobs or a DC for mapping
# jobs.
if hasattr(has_collection, "collection"):
collection = has_collection.collection
else:
collection = has_collection
try:
collection_builder = collections_service.collection_builder_for(
collection
)
job_context.populate_collection_elements(
collection,
collection_builder,
output_collection_def,
)
collection_builder.populate()
except Exception:
log.exception("Problem gathering output collection.")
collection.handle_population_failed("Problem building datasets for collection.")
class JobContext( object ):
def __init__( self, tool, job, job_working_directory, inp_data, input_dbkey ):
self.inp_data = inp_data
self.input_dbkey = input_dbkey
self.app = tool.app
self.sa_session = tool.sa_session
self.job = job
self.job_working_directory = job_working_directory
@property
def permissions( self ):
inp_data = self.inp_data
existing_datasets = [ inp for inp in inp_data.values() if inp ]
if existing_datasets:
permissions = self.app.security_agent.guess_derived_permissions_for_datasets( existing_datasets )
else:
# No valid inputs, we will use history defaults
permissions = self.app.security_agent.history_get_default_permissions( self.job.history )
return permissions
def find_files( self, collection, dataset_collectors ):
filenames = odict.odict()
for path, extra_file_collector in walk_over_extra_files( dataset_collectors, self.job_working_directory, collection ):
filenames[ path ] = extra_file_collector
return filenames
def populate_collection_elements( self, collection, root_collection_builder, output_collection_def ):
# TODO: allow configurable sorting.
# <sort by="lexical" /> <!-- default -->
# <sort by="reverse_lexical" />
# <sort regex="example.(\d+).fastq" by="1:numerical" />
# <sort regex="part_(\d+)_sample_([^_]+).fastq" by="2:lexical,1:numerical" />
dataset_collectors = map(dataset_collector, output_collection_def.dataset_collector_descriptions)
filenames = self.find_files( collection, dataset_collectors )
for filename, extra_file_collector in filenames.iteritems():
fields_match = extra_file_collector.match( collection, os.path.basename( filename ) )
if not fields_match:
raise Exception( "Problem parsing metadata fields for file %s" % filename )
element_identifiers = fields_match.element_identifiers
current_builder = root_collection_builder
for element_identifier in element_identifiers[:-1]:
current_builder = current_builder.get_level(element_identifier)
designation = fields_match.designation
visible = fields_match.visible
ext = fields_match.ext
dbkey = fields_match.dbkey
if dbkey == INPUT_DBKEY_TOKEN:
dbkey = self.input_dbkey
# Create new primary dataset
name = fields_match.name or designation
dataset = self.create_dataset(
ext=ext,
designation=designation,
visible=visible,
dbkey=dbkey,
name=name,
filename=filename,
metadata_source_name=output_collection_def.metadata_source,
)
log.debug(
"(%s) Created dynamic collection dataset for path [%s] with element identifier [%s] for output [%s].",
self.job.id,
filename,
designation,
output_collection_def.name,
)
current_builder.add_dataset( element_identifiers[-1], dataset )
def create_dataset(
self,
ext,
designation,
visible,
dbkey,
name,
filename,
metadata_source_name,
):
app = self.app
sa_session = self.sa_session
# Copy metadata from one of the inputs if requested.
metadata_source = None
if metadata_source_name:
metadata_source = self.inp_data[ metadata_source_name ]
# Create new primary dataset
primary_data = app.model.HistoryDatasetAssociation( extension=ext,
designation=designation,
visible=visible,
dbkey=dbkey,
create_dataset=True,
sa_session=sa_session )
app.security_agent.set_all_dataset_permissions( primary_data.dataset, self.permissions )
sa_session.add( primary_data )
sa_session.flush()
# Move data from temp location to dataset location
app.object_store.update_from_file(primary_data.dataset, file_name=filename, create=True)
primary_data.set_size()
# If match specified a name use otherwise generate one from
# designation.
primary_data.name = name
if metadata_source:
primary_data.init_meta( copy_from=metadata_source )
else:
primary_data.init_meta()
# Associate new dataset with job
if self.job:
self.job.history.add_dataset( primary_data )
assoc = app.model.JobToOutputDatasetAssociation( '__new_primary_file_%s|%s__' % ( name, designation ), primary_data )
assoc.job = self.job
sa_session.add( assoc )
sa_session.flush()
primary_data.state = 'ok'
return primary_data
def collect_primary_datasets( tool, output, job_working_directory, input_ext, input_dbkey="?" ):
app = tool.app
sa_session = tool.sa_session
new_primary_datasets = {}
try:
galaxy_json_path = os.path.join( job_working_directory, "working", jobs.TOOL_PROVIDED_JOB_METADATA_FILE )
# LEGACY: Remove in 17.XX
if not os.path.exists( galaxy_json_path ):
# Maybe this is a legacy job, use the job working directory instead
galaxy_json_path = os.path.join( job_working_directory, jobs.TOOL_PROVIDED_JOB_METADATA_FILE )
json_file = open( galaxy_json_path, 'r' )
for line in json_file:
line = json.loads( line )
if line.get( 'type' ) == 'new_primary_dataset':
new_primary_datasets[ os.path.split( line.get( 'filename' ) )[-1] ] = line
except Exception:
# This should not be considered an error or warning condition, this file is optional
pass
# Loop through output file names, looking for generated primary
# datasets in form of:
# 'primary_associatedWithDatasetID_designation_visibility_extension(_DBKEY)'
primary_output_assigned = False
new_outdata_name = None
primary_datasets = {}
for output_index, ( name, outdata ) in enumerate( output.items() ):
dataset_collectors = map(dataset_collector, tool.outputs[ name ].dataset_collector_descriptions) if name in tool.outputs else [ DEFAULT_DATASET_COLLECTOR ]
filenames = odict.odict()
if 'new_file_path' in app.config.collect_outputs_from:
if DEFAULT_DATASET_COLLECTOR in dataset_collectors:
# 'new_file_path' collection should be considered deprecated,
# only use old-style matching (glob instead of regex and only
# using default collector - if enabled).
for filename in glob.glob(os.path.join(app.config.new_file_path, "primary_%i_*" % outdata.id) ):
filenames[ filename ] = DEFAULT_DATASET_COLLECTOR
if 'job_working_directory' in app.config.collect_outputs_from:
for path, extra_file_collector in walk_over_extra_files( dataset_collectors, job_working_directory, outdata ):
filenames[ path ] = extra_file_collector
for filename_index, ( filename, extra_file_collector ) in enumerate( filenames.iteritems() ):
fields_match = extra_file_collector.match( outdata, os.path.basename( filename ) )
if not fields_match:
# Before I guess pop() would just have thrown an IndexError
raise Exception( "Problem parsing metadata fields for file %s" % filename )
designation = fields_match.designation
if filename_index == 0 and extra_file_collector.assign_primary_output and output_index == 0:
new_outdata_name = fields_match.name or "%s (%s)" % ( outdata.name, designation )
# Move data from temp location to dataset location
app.object_store.update_from_file( outdata.dataset, file_name=filename, create=True )
primary_output_assigned = True
continue
if name not in primary_datasets:
primary_datasets[ name ] = odict.odict()
visible = fields_match.visible
ext = fields_match.ext
if ext == "input":
ext = input_ext
dbkey = fields_match.dbkey
if dbkey == INPUT_DBKEY_TOKEN:
dbkey = input_dbkey
# Create new primary dataset
primary_data = app.model.HistoryDatasetAssociation( extension=ext,
designation=designation,
visible=visible,
dbkey=dbkey,
create_dataset=True,
sa_session=sa_session )
app.security_agent.copy_dataset_permissions( outdata.dataset, primary_data.dataset )
sa_session.add( primary_data )
sa_session.flush()
# Move data from temp location to dataset location
app.object_store.update_from_file(primary_data.dataset, file_name=filename, create=True)
primary_data.set_size()
# If match specified a name use otherwise generate one from
# designation.
primary_data.name = fields_match.name or "%s (%s)" % ( outdata.name, designation )
primary_data.info = outdata.info
primary_data.init_meta( copy_from=outdata )
primary_data.dbkey = dbkey
# Associate new dataset with job
job = None
for assoc in outdata.creating_job_associations:
job = assoc.job
break
if job:
assoc = app.model.JobToOutputDatasetAssociation( '__new_primary_file_%s|%s__' % ( name, designation ), primary_data )
assoc.job = job
sa_session.add( assoc )
sa_session.flush()
primary_data.state = outdata.state
# add tool/metadata provided information
new_primary_datasets_attributes = new_primary_datasets.get( os.path.split( filename )[-1], {} )
if new_primary_datasets_attributes:
dataset_att_by_name = dict( ext='extension' )
for att_set in [ 'name', 'info', 'ext', 'dbkey' ]:
dataset_att_name = dataset_att_by_name.get( att_set, att_set )
setattr( primary_data, dataset_att_name, new_primary_datasets_attributes.get( att_set, getattr( primary_data, dataset_att_name ) ) )
extra_files_path = new_primary_datasets_attributes.get( 'extra_files', None )
if extra_files_path:
extra_files_path_joined = os.path.join( job_working_directory, extra_files_path )
for root, dirs, files in os.walk( extra_files_path_joined ):
extra_dir = os.path.join( primary_data.extra_files_path, root.replace( extra_files_path_joined, '', 1 ).lstrip( os.path.sep ) )
for f in files:
app.object_store.update_from_file(
primary_data.dataset,
extra_dir=extra_dir,
alt_name=f,
file_name=os.path.join( root, f ),
create=True,
dir_only=True,
preserve_symlinks=True
)
metadata_dict = new_primary_datasets_attributes.get( 'metadata', None )
if metadata_dict:
primary_data.metadata.from_JSON_dict( json_dict=metadata_dict )
else:
primary_data.set_meta()
primary_data.set_peek()
sa_session.add( primary_data )
sa_session.flush()
outdata.history.add_dataset( primary_data )
# Add dataset to return dict
primary_datasets[name][designation] = primary_data
# Need to update all associated output hdas, i.e. history was
# shared with job running
for dataset in outdata.dataset.history_associations:
if outdata == dataset:
continue
new_data = primary_data.copy()
dataset.history.add_dataset( new_data )
sa_session.add( new_data )
sa_session.flush()
if primary_output_assigned:
outdata.name = new_outdata_name
outdata.init_meta()
outdata.set_meta()
outdata.set_peek()
sa_session.add( outdata )
sa_session.flush()
return primary_datasets
def walk_over_extra_files( extra_file_collectors, job_working_directory, matchable ):
for extra_file_collector in extra_file_collectors:
matches = []
directory = job_working_directory
if extra_file_collector.directory:
directory = os.path.join( directory, extra_file_collector.directory )
if not util.in_directory( directory, job_working_directory ):
raise Exception( "Problem with tool configuration, attempting to pull in datasets from outside working directory." )
if not os.path.isdir( directory ):
continue
for filename in os.listdir( directory ):
path = os.path.join( directory, filename )
if not os.path.isfile( path ):
continue
match = extra_file_collector.match( matchable, filename, path=path )
if match:
matches.append(match)
for match in extra_file_collector.sort(matches):
yield match.path, extra_file_collector
def dataset_collector( dataset_collection_description ):
if dataset_collection_description is DEFAULT_DATASET_COLLECTOR_DESCRIPTION:
# Use 'is' and 'in' operators, so lets ensure this is
# treated like a singleton.
return DEFAULT_DATASET_COLLECTOR
else:
return DatasetCollector( dataset_collection_description )
class DatasetCollector( object ):
def __init__( self, dataset_collection_description ):
# dataset_collection_description is an abstract description
# built from the tool parsing module - see galaxy.tools.parser.output_colleciton_def
self.sort_key = dataset_collection_description.sort_key
self.sort_reverse = dataset_collection_description.sort_reverse
self.sort_comp = dataset_collection_description.sort_comp
self.pattern = dataset_collection_description.pattern
self.default_dbkey = dataset_collection_description.default_dbkey
self.default_ext = dataset_collection_description.default_ext
self.default_visible = dataset_collection_description.default_visible
self.directory = dataset_collection_description.directory
self.assign_primary_output = dataset_collection_description.assign_primary_output
def pattern_for_dataset( self, dataset_instance=None ):
token_replacement = r'\d+'
if dataset_instance:
token_replacement = str( dataset_instance.id )
return self.pattern.replace( DATASET_ID_TOKEN, token_replacement )
def match( self, dataset_instance, filename, path=None ):
pattern = self.pattern_for_dataset( dataset_instance )
re_match = re.match( pattern, filename )
match_object = None
if re_match:
match_object = CollectedDatasetMatch( re_match, self, filename, path=path )
return match_object
def sort( self, matches ):
reverse = self.sort_reverse
sort_key = self.sort_key
sort_comp = self.sort_comp
assert sort_key in ["filename", "dbkey", "name", "designation"]
assert sort_comp in ["lexical", "numeric"]
key = operator.attrgetter(sort_key)
if sort_comp == "numeric":
key = _compose(int, key)
return sorted(matches, key=key, reverse=reverse)
def _compose(f, g):
return lambda x: f(g(x))
class CollectedDatasetMatch( object ):
def __init__( self, re_match, collector, filename, path=None ):
self.re_match = re_match
self.collector = collector
self.filename = filename
self.path = path
@property
def designation( self ):
re_match = self.re_match
# If collecting nested collection, grap identifier_0,
# identifier_1, etc... and join on : to build designation.
element_identifiers = self.raw_element_identifiers
if element_identifiers:
return ":".join(element_identifiers)
elif "designation" in re_match.groupdict():
return re_match.group( "designation" )
elif "name" in re_match.groupdict():
return re_match.group( "name" )
else:
return None
@property
def element_identifiers( self ):
return self.raw_element_identifiers or [self.designation]
@property
def raw_element_identifiers( self ):
re_match = self.re_match
identifiers = []
i = 0
while True:
key = "identifier_%d" % i
if key in re_match.groupdict():
identifiers.append(re_match.group(key))
else:
break
i += 1
return identifiers
@property
def name( self ):
""" Return name or None if not defined by the discovery pattern.
"""
re_match = self.re_match
name = None
if "name" in re_match.groupdict():
name = re_match.group( "name" )
return name
@property
def dbkey( self ):
try:
return self.re_match.group( "dbkey" )
except IndexError:
return self.collector.default_dbkey
@property
def ext( self ):
try:
return self.re_match.group( "ext" )
except IndexError:
return self.collector.default_ext
@property
def visible( self ):
try:
return self.re_match.group( "visible" ).lower() == "visible"
except IndexError:
return self.collector.default_visible
DEFAULT_DATASET_COLLECTOR = DatasetCollector(DEFAULT_DATASET_COLLECTOR_DESCRIPTION)