Skip to content

Commit

Permalink
Merge b3d0c51 into 1e960f8
Browse files Browse the repository at this point in the history
  • Loading branch information
jmchilton committed May 13, 2019
2 parents 1e960f8 + b3d0c51 commit b7cc504
Show file tree
Hide file tree
Showing 13 changed files with 283 additions and 114 deletions.
8 changes: 4 additions & 4 deletions docker/coexecutor/Dockerfile
Expand Up @@ -15,10 +15,10 @@ RUN apt-get update \
&& apt-get autoclean \
&& rm -rf /var/lib/apt/lists/* /var/log/dpkg.log

RUN pip install -U pip && pip install wheel
RUN pip install -U pip && pip install wheel kombu pykube poster

ADD pulsar_app-0.10.0.dev0-py2.py3-none-any.whl /pulsar_app-0.10.0.dev0-py2.py3-none-any.whl
ENV PULSAR_VERSION 0.11.0.dev0

RUN pip install /pulsar_app-0.10.0.dev0-py2.py3-none-any.whl
ADD pulsar_app-$PULSAR_VERSION-py2.py3-none-any.whl /pulsar_app-$PULSAR_VERSION-py2.py3-none-any.whl

RUN pip install kombu pykube poster
RUN pip install /pulsar_app-$PULSAR_VERSION-py2.py3-none-any.whl
15 changes: 14 additions & 1 deletion docs/files/file_actions_sample_1.yaml
Expand Up @@ -19,7 +19,7 @@ paths:
match_type: glob
path_types: unstructured # Set to *any* to apply to defaults & unstructured paths.
action: transfer
depth: 1 # Stage whole directory with job and just file.
depth: 1 # Stage whole directory with job and not just file.

# Following block demonstrates rewriting paths without staging. Useful for
# instance if Galaxy's data indices are mounted on both servers but with
Expand All @@ -38,3 +38,16 @@ paths:
ssh_user: galaxy
ssh_host: f.q.d.n
ssh_port: 22

# See action_mapper.py for explaination of mapper types:
# - input: Galaxy input datasets and extra files.
# - config: Galaxy config and param files.
# - tool: Files from tool's tool_dir (for now just wrapper if available).
# - workdir: Input work dir files - e.g.task-split input file.
# - metadata: Input metadata files.
# - output: Galaxy output datasets in their final home.
# - output_workdir: Galaxy from_work_dir output paths and other files (e.g. galaxy.json)
# - output_metadata: Meta job and data files (e.g. Galaxy metadata generation files and
# metric instrumentation files)
# - unstructured: Other fixed tool parameter paths (likely coming from tool data, but not
# nessecarily). Not sure this is the best name...
15 changes: 12 additions & 3 deletions pulsar/client/__init__.py
Expand Up @@ -44,9 +44,14 @@
from .exceptions import PulsarClientTransportError
from .manager import build_client_manager
from .path_mapper import PathMapper
from .staging import ClientJobDescription
from .staging import PulsarOutputs
from .staging import ClientOutputs
from .staging import (
ClientJobDescription,
ClientInputs,
ClientInput,
ClientOutputs,
CLIENT_INPUT_PATH_TYPES,
PulsarOutputs,
)
from .staging.down import finish_job
from .staging.up import submit_job

Expand All @@ -58,6 +63,10 @@
'submit_job',
'ClientJobDescription',
'PulsarOutputs',
'ClientInput',
'ClientInputs',
'ClientOutputs',
'CLIENT_INPUT_PATH_TYPES',
'ClientOutputs',
'PathMapper',
'PulsarClientTransportError',
Expand Down
73 changes: 53 additions & 20 deletions pulsar/client/action_mapper.py
Expand Up @@ -48,7 +48,7 @@
TOOL="tool",
# Input work dir files - e.g.task-split input file
WORKDIR="workdir",
# Input work dir files - e.g. metadata files, etc..
# Input metadata dir files - e.g. metadata files, etc..
METADATA="metadata",
# Galaxy output datasets in their final home.
OUTPUT="output",
Expand All @@ -58,7 +58,7 @@
# metric instrumentation files)
OUTPUT_METADATA="output_metadata",
# Other fixed tool parameter paths (likely coming from tool data, but not
# nessecarily). Not sure this is the best name...
# nessecarily).
UNSTRUCTURED="unstructured",
)

Expand Down Expand Up @@ -98,53 +98,62 @@ class FileActionMapper(object):
... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None)
... mapper = FileActionMapper(mock_client)
... as_dict = config=mapper.to_dict()
... # print(as_dict["paths"])
... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works
... unlink(f.name)
... return mapper
>>> mapper = mapper_for(default_action='none', config_contents=json_string)
>>> # Test first config line above, implicit path prefix mapper
>>> action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input')
>>> action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input')
>>> action.action_type == u'none'
True
>>> action.staging_needed
False
>>> # Test another (2nd) mapper, this one with a different action
>>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input')
>>> action = mapper.action({'path': '/galaxy/data/files/000/dataset_1.dat'}, 'input')
>>> action.action_type == u'transfer'
True
>>> action.staging_needed
True
>>> # Always at least copy work_dir outputs.
>>> action = mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir')
>>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/45.sh'}, 'workdir')
>>> action.action_type == u'copy'
True
>>> action.staging_needed
True
>>> # Test glob mapper (matching test)
>>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input').action_type == u'copy'
>>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam'}, 'input').action_type == u'copy'
True
>>> # Test glob mapper (non-matching test)
>>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input').action_type == u'none'
>>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam.bai'}, 'input').action_type == u'none'
True
>>> # Regex mapper test.
>>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input').action_type == u'copy'
>>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'input').action_type == u'copy'
True
>>> # Doesn't map unstructured paths by default
>>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'none'
>>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'none'
True
>>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \
{"path": "/", "action": "transfer", "path_types": "input"} \
] }''')
>>> input_only_mapper.action('/dataset_1.dat', 'input').action_type == u'transfer'
>>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'input').action_type == u'transfer'
True
>>> input_only_mapper.action('/dataset_1.dat', 'output').action_type == u'none'
>>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'output').action_type == u'none'
True
>>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \
{"path": "/", "action": "transfer", "path_types": "*any*"} \
] }''')
>>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'transfer'
>>> unstructured_mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'transfer'
True
>>> match_type_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \
{"action": "transfer", "path_types": "input"}, \
{"action": "remote_copy", "path_types": "output"} \
] }''')
>>> input_action = match_type_only_mapper.action({}, 'input')
>>> input_action.action_type
'transfer'
>>> output_action = match_type_only_mapper.action({}, 'output')
>>> output_action.action_type
'remote_copy'
"""

def __init__(self, client=None, config=None):
Expand All @@ -161,7 +170,8 @@ def __init__(self, client=None, config=None):
self.mappers = mappers_from_dicts(config.get("paths", []))
self.files_endpoint = config.get("files_endpoint", None)

def action(self, path, type, mapper=None):
def action(self, source, type, mapper=None):
path = source.get("path", None)
mapper = self.__find_mapper(path, type, mapper)
action_class = self.__action_class(path, type, mapper)
file_lister = DEFAULT_FILE_LISTER
Expand Down Expand Up @@ -205,7 +215,10 @@ def __client_to_config(self, client):

def __find_mapper(self, path, type, mapper=None):
if not mapper:
normalized_path = abspath(path)
if path is not None:
normalized_path = abspath(path)
else:
normalized_path = None
for query_mapper in self.mappers:
if query_mapper.matches(normalized_path, type):
mapper = query_mapper
Expand Down Expand Up @@ -262,6 +275,7 @@ def __inject_ssh_properties(self, action):


class BaseAction(object):
whole_directory_transfer_supported = False
action_spec = {}

def __init__(self, path, file_lister=None):
Expand Down Expand Up @@ -608,6 +622,22 @@ def _extend_base_dict(self, **kwds):
base_dict.update(**kwds)
return base_dict

def to_pattern(self):
raise NotImplementedError()


class PathTypeOnlyMapper(BasePathMapper):
match_type = 'path_type_only'

def __init__(self, config):
super(PathTypeOnlyMapper, self).__init__(config)

def _path_matches(self, path):
return True

def to_dict(self):
return self._extend_base_dict()


class PrefixPathMapper(BasePathMapper):
match_type = 'prefix'
Expand All @@ -617,7 +647,7 @@ def __init__(self, config):
self.prefix_path = abspath(config['path'])

def _path_matches(self, path):
return path.startswith(self.prefix_path)
return path is not None and path.startswith(self.prefix_path)

def to_pattern(self):
pattern_str = r"(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep))
Expand All @@ -635,7 +665,7 @@ def __init__(self, config):
self.glob_path = config['path']

def _path_matches(self, path):
return fnmatch.fnmatch(path, self.glob_path)
return path is not None and fnmatch.fnmatch(path, self.glob_path)

def to_pattern(self):
return compile(fnmatch.translate(self.glob_path))
Expand All @@ -653,7 +683,7 @@ def __init__(self, config):
self.pattern = compile(self.pattern_raw)

def _path_matches(self, path):
return self.pattern.match(path) is not None
return path is not None and self.pattern.match(path) is not None

def to_pattern(self):
return self.pattern
Expand All @@ -662,7 +692,7 @@ def to_dict(self):
return self._extend_base_dict(path=self.pattern_raw)


MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper]
MAPPER_CLASSES = [PathTypeOnlyMapper, PrefixPathMapper, GlobPathMapper, RegexPathMapper]
MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES))


Expand All @@ -671,7 +701,10 @@ def mappers_from_dicts(mapper_def_list):


def _mappper_from_dict(mapper_dict):
map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE)
if "path" in mapper_dict:
map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE)
else:
map_type = 'path_type_only'
return MAPPER_CLASS_DICT[map_type](mapper_dict)


Expand Down
2 changes: 1 addition & 1 deletion pulsar/client/client.py
Expand Up @@ -365,7 +365,7 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient):

def __init__(self, destination_params, job_id, client_manager):
super(MessageCoexecutionPodJobClient, self).__init__(destination_params, job_id, client_manager)
self.pulsar_container_image = destination_params.get("pulsar_container_image", "galaxy/pulsar-pod-staging:0.1")
self.pulsar_container_image = destination_params.get("pulsar_container_image", "galaxy/pulsar-pod-staging:0.10.0")
self._default_pull_policy = pull_policy(destination_params)

def launch(self, command_line, dependencies_description=None, env=[], remote_staging=[], job_config=None, container=None, pulsar_app_config=None):
Expand Down
17 changes: 8 additions & 9 deletions pulsar/client/path_mapper.py
Expand Up @@ -4,6 +4,7 @@

from .action_mapper import FileActionMapper
from .action_mapper import path_type
from .staging import CLIENT_INPUT_PATH_TYPES
from .util import PathHelper


Expand Down Expand Up @@ -46,8 +47,11 @@ def remote_output_path_rewrite(self, local_path):
remote_path = self.__remote_path_rewrite(local_path, output_type)
return remote_path

def remote_input_path_rewrite(self, local_path):
remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT)
def remote_input_path_rewrite(self, local_path, client_input_path_type=None):
name = None
if client_input_path_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH:
name = "metadata_%s" % os.path.basename(local_path)
remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT, name=name)
return remote_path

def remote_version_path_rewrite(self, local_path):
Expand All @@ -56,7 +60,7 @@ def remote_version_path_rewrite(self, local_path):

def check_for_arbitrary_rewrite(self, local_path):
path = str(local_path) # Use false_path if needed.
action = self.action_mapper.action(path, path_type.UNSTRUCTURED)
action = self.action_mapper.action({"path": path}, path_type.UNSTRUCTURED)
if not action.staging_needed:
return action.path_rewrite(self.path_helper), []
unique_names = action.unstructured_map()
Expand All @@ -68,7 +72,7 @@ def __remote_path_rewrite(self, dataset_path, dataset_path_type, name=None):
""" Return remote path of this file (if staging is required) else None.
"""
path = str(dataset_path) # Use false_path if needed.
action = self.action_mapper.action(path, dataset_path_type)
action = self.action_mapper.action({"path": path}, dataset_path_type)
if action.staging_needed:
if name is None:
name = os.path.basename(path)
Expand All @@ -81,11 +85,6 @@ def __remote_path_rewrite(self, dataset_path, dataset_path_type, name=None):

return remote_path_rewrite

def __action(self, dataset_path, dataset_path_type):
path = str(dataset_path) # Use false_path if needed.
action = self.action_mapper.action(path, dataset_path_type)
return action

def __remote_directory(self, dataset_path_type):
if dataset_path_type in [path_type.OUTPUT]:
return self.output_directory
Expand Down

0 comments on commit b7cc504

Please sign in to comment.