Skip to content

Commit

Permalink
Pass job output file unqualified names to Pulsar so that it can create
Browse files Browse the repository at this point in the history
them before running the job.
  • Loading branch information
natefoo committed Sep 21, 2017
1 parent 832da6c commit 2669ab0
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 0 deletions.
10 changes: 10 additions & 0 deletions lib/galaxy/jobs/__init__.py
Expand Up @@ -1571,6 +1571,9 @@ def get_input_paths(self, job=None):
paths.append(DatasetPath(da.id, real_path=real_path, false_path=false_path, mutable=False))
return paths

def get_output_basenames(self):
return map(os.path.basename, map(str, self.get_output_fnames()))

def get_output_fnames(self):
if self.output_paths is None:
self.compute_outputs()
Expand Down Expand Up @@ -1997,6 +2000,10 @@ class ComputeEnvironment(object):
compute server.
"""

@abstractmethod
def output_names(self):
""" Output unqualified filenames defined by job. """

@abstractmethod
def output_paths(self):
""" Output DatasetPaths defined by job. """
Expand Down Expand Up @@ -2062,6 +2069,9 @@ def __init__(self, job_wrapper, job):
self.job_wrapper = job_wrapper
self.job = job

def output_names(self):
return self.job_wrapper.get_output_basenames()

def output_paths(self):
return self.job_wrapper.get_output_fnames()

Expand Down
7 changes: 7 additions & 0 deletions lib/galaxy/jobs/runners/pulsar.py
Expand Up @@ -277,8 +277,10 @@ def queue_job(self, job_wrapper):
dependencies_description = PulsarJobRunner.__dependencies_description(client, job_wrapper)
rewrite_paths = not PulsarJobRunner.__rewrite_parameters(client)
unstructured_path_rewrites = {}
output_names = []
if compute_environment:
unstructured_path_rewrites = compute_environment.unstructured_path_rewrites
output_names = compute_environment.output_names()

client_job_description = ClientJobDescription(
command_line=command_line,
Expand All @@ -292,6 +294,7 @@ def queue_job(self, job_wrapper):
env=client.env,
rewrite_paths=rewrite_paths,
arbitrary_files=unstructured_path_rewrites,
touch_outputs=output_names,
)
job_id = pulsar_submit_job(client, client_job_description, remote_job_config)
log.info("Pulsar job submitted with job_id %s" % job_id)
Expand Down Expand Up @@ -783,6 +786,10 @@ def __init__(self, pulsar_client, job_wrapper, remote_job_config):
version_path = new_version_path
self._version_path = version_path

def output_names(self):
# Maybe this should use the path mapper, but the path mapper just uses basenames
return self.job_wrapper.get_output_basenames()

def output_paths(self):
local_output_paths = self._wrapper_output_paths

Expand Down

0 comments on commit 2669ab0

Please sign in to comment.