-
Notifications
You must be signed in to change notification settings - Fork 965
/
datasets.py
113 lines (91 loc) · 3.43 KB
/
datasets.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
"""
Utility classes allowing Job interface to reason about datasets.
"""
import os.path
from abc import (
ABCMeta,
abstractmethod
)
import six
def dataset_path_rewrites( dataset_paths ):
dataset_paths_with_rewrites = [path for path in dataset_paths if getattr(path, "false_path", None)]
return dict( ( dp.real_path, dp ) for dp in dataset_paths_with_rewrites )
class DatasetPath( object ):
def __init__(
self,
dataset_id,
real_path,
false_path=None,
false_extra_files_path=None,
mutable=True
):
self.dataset_id = dataset_id
self.real_path = real_path
self.false_path = false_path
self.false_extra_files_path = false_extra_files_path
self.mutable = mutable
def __str__( self ):
if self.false_path is None:
return self.real_path
else:
return self.false_path
def with_path_for_job( self, false_path, false_extra_files_path=None ):
"""
Clone the dataset path but with a new false_path.
"""
dataset_path = self
if false_path is not None:
dataset_path = DatasetPath(
dataset_id=self.dataset_id,
real_path=self.real_path,
false_path=false_path,
false_extra_files_path=false_extra_files_path,
mutable=self.mutable,
)
return dataset_path
@six.add_metaclass(ABCMeta)
class DatasetPathRewriter( object ):
""" Used by runner to rewrite paths. """
@abstractmethod
def rewrite_dataset_path( self, dataset, dataset_type ):
"""
Dataset type is 'input' or 'output'.
Return None to indicate not to rewrite this path.
"""
class NullDatasetPathRewriter( object ):
""" Used by default for jobwrapper, do not rewrite anything.
"""
def rewrite_dataset_path( self, dataset, dataset_type ):
""" Keep path the same.
"""
return None
class OutputsToWorkingDirectoryPathRewriter( object ):
""" Rewrites all paths to place them in the specified working
directory for normal jobs when Galaxy is configured with
app.config.outputs_to_working_directory. Job runner base class
is responsible for copying these out after job is complete.
"""
def __init__( self, working_directory ):
self.working_directory = working_directory
def rewrite_dataset_path( self, dataset, dataset_type ):
""" Keep path the same.
"""
if dataset_type == 'output':
false_path = os.path.abspath( os.path.join( self.working_directory, "galaxy_dataset_%d.dat" % dataset.id ) )
return false_path
else:
return None
class TaskPathRewriter( object ):
""" Rewrites all paths to place them in the specified working
directory for TaskWrapper. TaskWrapper is responsible for putting
them there and pulling them out.
"""
def __init__( self, working_directory, job_dataset_path_rewriter ):
self.working_directory = working_directory
self.job_dataset_path_rewriter = job_dataset_path_rewriter
def rewrite_dataset_path( self, dataset, dataset_type ):
"""
"""
dataset_file_name = dataset.file_name
job_file_name = self.job_dataset_path_rewriter.rewrite_dataset_path( dataset, dataset_type ) or dataset_file_name
return os.path.join( self.working_directory, os.path.basename( job_file_name ) )