diff --git a/lib/galaxy/jobs/__init__.py b/lib/galaxy/jobs/__init__.py index 1ffac8a73848..74d72c7bbc75 100644 --- a/lib/galaxy/jobs/__init__.py +++ b/lib/galaxy/jobs/__init__.py @@ -986,12 +986,11 @@ def pause( self, job=None, message=None ): job.set_state( job.states.PAUSED ) self.sa_session.add( job ) - def mark_as_resubmitted( self ): + def mark_as_resubmitted( self, info=None ): job = self.get_job() self.sa_session.refresh( job ) - for dataset in [ dataset_assoc.dataset for dataset_assoc in job.output_datasets + job.output_library_datasets ]: - dataset._state = model.Dataset.states.RESUBMITTED - self.sa_session.add( dataset ) + if info is not None: + job.info = info job.set_state( model.Job.states.RESUBMITTED ) self.sa_session.add( job ) self.sa_session.flush() diff --git a/lib/galaxy/jobs/runners/__init__.py b/lib/galaxy/jobs/runners/__init__.py index 383dd886aad7..512c93f3682a 100644 --- a/lib/galaxy/jobs/runners/__init__.py +++ b/lib/galaxy/jobs/runners/__init__.py @@ -344,8 +344,8 @@ def _handle_runner_state( self, runner_state, job_state ): except: log.exception('Caught exception in runner state handler:') - def mark_as_resubmitted( self, job_state ): - job_state.job_wrapper.mark_as_resubmitted() + def mark_as_resubmitted( self, job_state, info=None ): + job_state.job_wrapper.mark_as_resubmitted( info=info ) if not self.app.config.track_jobs_in_database: job_state.job_wrapper.change_state( model.Job.states.QUEUED ) self.app.job_manager.job_handler.dispatcher.put( job_state.job_wrapper ) diff --git a/lib/galaxy/jobs/runners/state_handlers/resubmit.py b/lib/galaxy/jobs/runners/state_handlers/resubmit.py index 05bc913253a9..011a5338b11b 100644 --- a/lib/galaxy/jobs/runners/state_handlers/resubmit.py +++ b/lib/galaxy/jobs/runners/state_handlers/resubmit.py @@ -1,4 +1,5 @@ import logging + from galaxy import model @@ -13,31 +14,45 @@ def failure(app, job_runner, job_state): - if getattr( job_state, 'runner_state', None ) and job_state.runner_state in ( job_state.runner_states.WALLTIME_REACHED, job_state.runner_states.MEMORY_LIMIT_REACHED ): - # Intercept jobs that hit the walltime and have a walltime or nonspecific resubmit destination configured + if (getattr(job_state, 'runner_state', None) + and job_state.runner_state in + (job_state.runner_states.WALLTIME_REACHED, + job_state.runner_states.MEMORY_LIMIT_REACHED)): + # Intercept jobs that hit the walltime and have a walltime or + # nonspecific resubmit destination configured for resubmit in job_state.job_destination.get('resubmit'): - if resubmit.get('condition', None) and resubmit['condition'] != job_state.runner_state: - continue # There is a resubmit defined for the destination but its condition is not for walltime_reached - log.info("(%s/%s) Job will be resubmitted to '%s' because %s at the '%s' destination", - job_state.job_wrapper.job_id, - job_state.job_id, - resubmit['destination'], - MESSAGES[job_state.runner_state], - job_state.job_wrapper.job_destination.id ) + if (resubmit.get('condition', None) and resubmit['condition'] != + job_state.runner_state): + # There is a resubmit defined for the destination but + # its condition is not for walltime_reached + continue + log.info("(%s/%s) Job will be resubmitted to '%s' because %s at " + "the '%s' destination", + job_state.job_wrapper.job_id, + job_state.job_id, + resubmit['destination'], + MESSAGES[job_state.runner_state], + job_state.job_wrapper.job_destination.id ) # fetch JobDestination for the id or tag - new_destination = app.job_config.get_destination(resubmit['destination']) + new_destination = app.job_config.get_destination( + resubmit['destination']) # Resolve dynamic if necessary - new_destination = job_state.job_wrapper.job_runner_mapper.cache_job_destination( new_destination ) + new_destination = (job_state.job_wrapper.job_runner_mapper + .cache_job_destination(new_destination)) # Reset job state job = job_state.job_wrapper.get_job() if resubmit.get('handler', None): - log.debug('(%s/%s) Job reassigned to handler %s', job_state.job_wrapper.job_id, job_state.job_id, resubmit['handler']) + log.debug('(%s/%s) Job reassigned to handler %s', + job_state.job_wrapper.job_id, job_state.job_id, + resubmit['handler']) job.set_handler(resubmit['handler']) job_runner.sa_session.add( job ) # Is this safe to do here? job_runner.sa_session.flush() - # Cache the destination to prevent rerunning dynamic after resubmit - job_state.job_wrapper.job_runner_mapper.cached_job_destination = new_destination + # Cache the destination to prevent rerunning dynamic after + # resubmit + job_state.job_wrapper.job_runner_mapper \ + .cached_job_destination = new_destination job_state.job_wrapper.set_job_destination(new_destination) # Clear external ID (state change below flushes the change) job.job_runner_external_id = None @@ -45,5 +60,7 @@ def failure(app, job_runner, job_state): if job.params is None: job.params = {} job_state.runner_state_handled = True - job_runner.mark_as_resubmitted( job_state ) + info = "This job was resubmitted to the queue because %s on its " \ + "compute resource." % MESSAGES[job_state.runner_state]) + job_runner.mark_as_resubmitted(job_state, info=info) return diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index bb40305171c1..4bdc69f49d36 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -1335,9 +1335,8 @@ class Dataset( object ): DISCARDED = 'discarded', PAUSED = 'paused', SETTING_METADATA = 'setting_metadata', - FAILED_METADATA = 'failed_metadata', - RESUBMITTED = 'resubmitted' ) - # failed_metadata and resubmitted are only valid as DatasetInstance states currently + FAILED_METADATA = 'failed_metadata') + # failed_metadata is only valid as DatasetInstance state currently non_ready_states = ( states.UPLOAD,