Skip to content

Commit

Permalink
Merge pull request #6087 from natefoo/job-dest-config-fix
Browse files Browse the repository at this point in the history
Fix JobWrapper.get_destination_configuration() so that it actually checks the JobDestination params
  • Loading branch information
nsoranzo committed May 10, 2018
2 parents cd723e4 + 2dd301d commit cb8f832
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 14 deletions.
42 changes: 29 additions & 13 deletions lib/galaxy/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,9 @@ def __init__(self, job, queue, use_persisted_destination=False):
# and job recovery fail.
# Create the working dir if necessary
self._create_working_directory()
self.dataset_path_rewriter = self._job_dataset_path_rewriter(self.working_directory)
# the path rewriter needs destination params, so it cannot be set up until after the destination has been
# resolved
self._dataset_path_rewriter = None
self.output_paths = None
self.output_hdas_and_paths = None
self.tool_provided_job_metadata = None
Expand All @@ -729,13 +731,19 @@ def __init__(self, job, queue, use_persisted_destination=False):
self.__user_system_pwent = None
self.__galaxy_system_pwent = None

def _job_dataset_path_rewriter(self, working_directory):
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory:
dataset_path_rewriter = OutputsToWorkingDirectoryPathRewriter(working_directory)
else:
dataset_path_rewriter = NullDatasetPathRewriter()
return dataset_path_rewriter
@property
def _job_dataset_path_rewriter(self):
if self._dataset_path_rewriter is None:
outputs_to_working_directory = util.asbool(self.get_destination_configuration("outputs_to_working_directory", False))
if outputs_to_working_directory:
self._dataset_path_rewriter = OutputsToWorkingDirectoryPathRewriter(self.working_directory)
else:
self._dataset_path_rewriter = NullDatasetPathRewriter()
return self._dataset_path_rewriter

@property
def dataset_path_rewriter(self):
return self._job_dataset_path_rewriter

@property
def cleanup_job(self):
Expand Down Expand Up @@ -1144,8 +1152,13 @@ def get_destination_configuration(self, key, default=None):
""" Get a destination parameter that can be defaulted back
in app.config if it needs to be applied globally.
"""
# this is called by self._job_dataset_path_rewriter, which is called by self.job_destination(), so to access
# self.job_destination directly would cause infinite recursion
dest_params = {}
if hasattr(self, 'job_runner_mapper') and hasattr(self.job_runner_mapper, 'cached_job_destination'):
dest_params = self.job_runner_mapper.cached_job_destination.params
return self.get_job().get_destination_configuration(
self.app.config, key, default
dest_params, self.app.config, key, default
)

def finish(
Expand Down Expand Up @@ -1862,16 +1875,19 @@ class TaskWrapper(JobWrapper):
def __init__(self, task, queue):
super(TaskWrapper, self).__init__(task.job, queue)
self.task_id = task.id
working_directory = task.working_directory
self.working_directory = working_directory
job_dataset_path_rewriter = self._job_dataset_path_rewriter(self.working_directory)
self.dataset_path_rewriter = TaskPathRewriter(working_directory, job_dataset_path_rewriter)
self.working_directory = task.working_directory
if task.prepare_input_files_cmd is not None:
self.prepare_input_files_cmds = [task.prepare_input_files_cmd]
else:
self.prepare_input_files_cmds = None
self.status = task.states.NEW

@property
def dataset_path_rewriter(self):
if self._dataset_path_rewriter is None:
self._dataset_path_rewriter = TaskPathRewriter(self.working_directory, self._job_dataset_path_rewriter)
return self._dataset_path_rewriter

def can_split(self):
# Should the job handler split this job up? TaskWrapper should
# always return False as the job has already been split.
Expand Down
1 change: 1 addition & 0 deletions lib/galaxy/jobs/mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ def __cache_job_destination(self, params, raw_job_destination=None):
job_destination = self.__handle_dynamic_job_destination(raw_job_destination)
else:
job_destination = raw_job_destination
log.debug("(%s) Mapped job to destination id: %s", self.job_wrapper.job_id, job_destination.id)
self.cached_job_destination = job_destination

def get_job_destination(self, params):
Expand Down
4 changes: 3 additions & 1 deletion lib/galaxy/model/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -874,12 +874,14 @@ def set_final_state(self, final_state):
if self.workflow_invocation_step:
self.workflow_invocation_step.update()

def get_destination_configuration(self, config, key, default=None):
def get_destination_configuration(self, dest_params, config, key, default=None):
""" Get a destination parameter that can be defaulted back
in specified config if it needs to be applied globally.
"""
param_unspecified = object()
config_value = (self.destination_params or {}).get(key, param_unspecified)
if config_value is param_unspecified:
config_value = dest_params.get(key, param_unspecified)
if config_value is param_unspecified:
config_value = getattr(config, key, param_unspecified)
if config_value is param_unspecified:
Expand Down

0 comments on commit cb8f832

Please sign in to comment.